aboutsummaryrefslogtreecommitdiff
path: root/net/stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/stream.c')
-rw-r--r--net/stream.c282
1 files changed, 64 insertions, 218 deletions
diff --git a/net/stream.c b/net/stream.c
index 4de5613..d893f02 100644
--- a/net/stream.c
+++ b/net/stream.c
@@ -27,173 +27,50 @@
#include "net/net.h"
#include "clients.h"
-#include "monitor/monitor.h"
#include "qapi/error.h"
-#include "qemu/error-report.h"
-#include "qemu/option.h"
-#include "qemu/sockets.h"
-#include "qemu/iov.h"
-#include "qemu/main-loop.h"
-#include "qemu/cutils.h"
-#include "io/channel.h"
-#include "io/channel-socket.h"
#include "io/net-listener.h"
#include "qapi/qapi-events-net.h"
#include "qapi/qapi-visit-sockets.h"
#include "qapi/clone-visitor.h"
+#include "stream_data.h"
+
typedef struct NetStreamState {
- NetClientState nc;
- QIOChannel *listen_ioc;
- QIONetListener *listener;
- QIOChannel *ioc;
- guint ioc_read_tag;
- guint ioc_write_tag;
- SocketReadState rs;
- unsigned int send_index; /* number of bytes sent*/
+ NetStreamData data;
uint32_t reconnect_ms;
guint timer_tag;
SocketAddress *addr;
} NetStreamState;
-static void net_stream_listen(QIONetListener *listener,
- QIOChannelSocket *cioc,
- void *opaque);
static void net_stream_arm_reconnect(NetStreamState *s);
-static gboolean net_stream_writable(QIOChannel *ioc,
- GIOCondition condition,
- gpointer data)
-{
- NetStreamState *s = data;
-
- s->ioc_write_tag = 0;
-
- qemu_flush_queued_packets(&s->nc);
-
- return G_SOURCE_REMOVE;
-}
-
static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf,
size_t size)
{
- NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
- uint32_t len = htonl(size);
- struct iovec iov[] = {
- {
- .iov_base = &len,
- .iov_len = sizeof(len),
- }, {
- .iov_base = (void *)buf,
- .iov_len = size,
- },
- };
- struct iovec local_iov[2];
- unsigned int nlocal_iov;
- size_t remaining;
- ssize_t ret;
-
- remaining = iov_size(iov, 2) - s->send_index;
- nlocal_iov = iov_copy(local_iov, 2, iov, 2, s->send_index, remaining);
- ret = qio_channel_writev(s->ioc, local_iov, nlocal_iov, NULL);
- if (ret == QIO_CHANNEL_ERR_BLOCK) {
- ret = 0; /* handled further down */
- }
- if (ret == -1) {
- s->send_index = 0;
- return -errno;
- }
- if (ret < (ssize_t)remaining) {
- s->send_index += ret;
- s->ioc_write_tag = qio_channel_add_watch(s->ioc, G_IO_OUT,
- net_stream_writable, s, NULL);
- return 0;
- }
- s->send_index = 0;
- return size;
-}
-
-static gboolean net_stream_send(QIOChannel *ioc,
- GIOCondition condition,
- gpointer data);
-
-static void net_stream_send_completed(NetClientState *nc, ssize_t len)
-{
- NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
-
- if (!s->ioc_read_tag) {
- s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN,
- net_stream_send, s, NULL);
- }
-}
+ NetStreamData *d = DO_UPCAST(NetStreamData, nc, nc);
-static void net_stream_rs_finalize(SocketReadState *rs)
-{
- NetStreamState *s = container_of(rs, NetStreamState, rs);
-
- if (qemu_send_packet_async(&s->nc, rs->buf,
- rs->packet_len,
- net_stream_send_completed) == 0) {
- if (s->ioc_read_tag) {
- g_source_remove(s->ioc_read_tag);
- s->ioc_read_tag = 0;
- }
- }
+ return net_stream_data_receive(d, buf, size);
}
static gboolean net_stream_send(QIOChannel *ioc,
GIOCondition condition,
gpointer data)
{
- NetStreamState *s = data;
- int size;
- int ret;
- char buf1[NET_BUFSIZE];
- const char *buf;
-
- size = qio_channel_read(s->ioc, buf1, sizeof(buf1), NULL);
- if (size < 0) {
- if (errno != EWOULDBLOCK) {
- goto eoc;
- }
- } else if (size == 0) {
- /* end of connection */
- eoc:
- s->ioc_read_tag = 0;
- if (s->ioc_write_tag) {
- g_source_remove(s->ioc_write_tag);
- s->ioc_write_tag = 0;
- }
- if (s->listener) {
- qemu_set_info_str(&s->nc, "listening");
- qio_net_listener_set_client_func(s->listener, net_stream_listen,
- s, NULL);
- }
- object_unref(OBJECT(s->ioc));
- s->ioc = NULL;
-
- net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
- s->nc.link_down = true;
+ if (net_stream_data_send(ioc, condition, data) == G_SOURCE_REMOVE) {
+ NetStreamState *s = DO_UPCAST(NetStreamState, data, data);
- qapi_event_send_netdev_stream_disconnected(s->nc.name);
+ qapi_event_send_netdev_stream_disconnected(s->data.nc.name);
net_stream_arm_reconnect(s);
return G_SOURCE_REMOVE;
}
- buf = buf1;
-
- ret = net_fill_rstate(&s->rs, (const uint8_t *)buf, size);
-
- if (ret == -1) {
- goto eoc;
- }
return G_SOURCE_CONTINUE;
}
static void net_stream_cleanup(NetClientState *nc)
{
- NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
+ NetStreamState *s = DO_UPCAST(NetStreamState, data.nc, nc);
if (s->timer_tag) {
g_source_remove(s->timer_tag);
s->timer_tag = 0;
@@ -202,28 +79,28 @@ static void net_stream_cleanup(NetClientState *nc)
qapi_free_SocketAddress(s->addr);
s->addr = NULL;
}
- if (s->ioc) {
- if (QIO_CHANNEL_SOCKET(s->ioc)->fd != -1) {
- if (s->ioc_read_tag) {
- g_source_remove(s->ioc_read_tag);
- s->ioc_read_tag = 0;
+ if (s->data.ioc) {
+ if (QIO_CHANNEL_SOCKET(s->data.ioc)->fd != -1) {
+ if (s->data.ioc_read_tag) {
+ g_source_remove(s->data.ioc_read_tag);
+ s->data.ioc_read_tag = 0;
}
- if (s->ioc_write_tag) {
- g_source_remove(s->ioc_write_tag);
- s->ioc_write_tag = 0;
+ if (s->data.ioc_write_tag) {
+ g_source_remove(s->data.ioc_write_tag);
+ s->data.ioc_write_tag = 0;
}
}
- object_unref(OBJECT(s->ioc));
- s->ioc = NULL;
+ object_unref(OBJECT(s->data.ioc));
+ s->data.ioc = NULL;
}
- if (s->listen_ioc) {
- if (s->listener) {
- qio_net_listener_disconnect(s->listener);
- object_unref(OBJECT(s->listener));
- s->listener = NULL;
+ if (s->data.listen_ioc) {
+ if (s->data.listener) {
+ qio_net_listener_disconnect(s->data.listener);
+ object_unref(OBJECT(s->data.listener));
+ s->data.listener = NULL;
}
- object_unref(OBJECT(s->listen_ioc));
- s->listen_ioc = NULL;
+ object_unref(OBJECT(s->data.listen_ioc));
+ s->data.listen_ioc = NULL;
}
}
@@ -235,23 +112,13 @@ static NetClientInfo net_stream_info = {
};
static void net_stream_listen(QIONetListener *listener,
- QIOChannelSocket *cioc,
- void *opaque)
+ QIOChannelSocket *cioc, gpointer data)
{
- NetStreamState *s = opaque;
+ NetStreamData *d = data;
SocketAddress *addr;
char *uri;
- object_ref(OBJECT(cioc));
-
- qio_net_listener_set_client_func(s->listener, NULL, s, NULL);
-
- s->ioc = QIO_CHANNEL(cioc);
- qio_channel_set_name(s->ioc, "stream-server");
- s->nc.link_down = false;
-
- s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
- s, NULL);
+ net_stream_data_listen(listener, cioc, data);
if (cioc->localAddr.ss_family == AF_UNIX) {
addr = qio_channel_socket_get_local_address(cioc, NULL);
@@ -260,22 +127,22 @@ static void net_stream_listen(QIONetListener *listener,
}
g_assert(addr != NULL);
uri = socket_uri(addr);
- qemu_set_info_str(&s->nc, "%s", uri);
+ qemu_set_info_str(&d->nc, "%s", uri);
g_free(uri);
- qapi_event_send_netdev_stream_connected(s->nc.name, addr);
+ qapi_event_send_netdev_stream_connected(d->nc.name, addr);
qapi_free_SocketAddress(addr);
}
static void net_stream_server_listening(QIOTask *task, gpointer opaque)
{
- NetStreamState *s = opaque;
- QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc);
+ NetStreamData *d = opaque;
+ QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(d->listen_ioc);
SocketAddress *addr;
int ret;
Error *err = NULL;
if (qio_task_propagate_error(task, &err)) {
- qemu_set_info_str(&s->nc, "error: %s", error_get_pretty(err));
+ qemu_set_info_str(&d->nc, "error: %s", error_get_pretty(err));
error_free(err);
return;
}
@@ -284,20 +151,21 @@ static void net_stream_server_listening(QIOTask *task, gpointer opaque)
g_assert(addr != NULL);
ret = qemu_socket_try_set_nonblock(listen_sioc->fd);
if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
- qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
+ qemu_set_info_str(&d->nc, "can't use file descriptor %s (errno %d)",
addr->u.fd.str, -ret);
return;
}
g_assert(ret == 0);
qapi_free_SocketAddress(addr);
- s->nc.link_down = true;
- s->listener = qio_net_listener_new();
+ d->nc.link_down = true;
+ d->listener = qio_net_listener_new();
- qemu_set_info_str(&s->nc, "listening");
- net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
- qio_net_listener_set_client_func(s->listener, net_stream_listen, s, NULL);
- qio_net_listener_add(s->listener, listen_sioc);
+ qemu_set_info_str(&d->nc, "listening");
+ net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false);
+ qio_net_listener_set_client_func(d->listener, d->listen, d,
+ NULL);
+ qio_net_listener_add(d->listener, listen_sioc);
}
static int net_stream_server_init(NetClientState *peer,
@@ -307,16 +175,18 @@ static int net_stream_server_init(NetClientState *peer,
Error **errp)
{
NetClientState *nc;
- NetStreamState *s;
+ NetStreamData *d;
QIOChannelSocket *listen_sioc = qio_channel_socket_new();
nc = qemu_new_net_client(&net_stream_info, peer, model, name);
- s = DO_UPCAST(NetStreamState, nc, nc);
- qemu_set_info_str(&s->nc, "initializing");
+ d = DO_UPCAST(NetStreamData, nc, nc);
+ d->send = net_stream_send;
+ d->listen = net_stream_listen;
+ qemu_set_info_str(&d->nc, "initializing");
- s->listen_ioc = QIO_CHANNEL(listen_sioc);
+ d->listen_ioc = QIO_CHANNEL(listen_sioc);
qio_channel_socket_listen_async(listen_sioc, addr, 0,
- net_stream_server_listening, s,
+ net_stream_server_listening, d,
NULL, NULL);
return 0;
@@ -325,49 +195,23 @@ static int net_stream_server_init(NetClientState *peer,
static void net_stream_client_connected(QIOTask *task, gpointer opaque)
{
NetStreamState *s = opaque;
- QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(s->ioc);
+ NetStreamData *d = &s->data;
+ QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(d->ioc);
SocketAddress *addr;
gchar *uri;
- int ret;
- Error *err = NULL;
- if (qio_task_propagate_error(task, &err)) {
- qemu_set_info_str(&s->nc, "error: %s", error_get_pretty(err));
- error_free(err);
- goto error;
+ if (net_stream_data_client_connected(task, d) == -1) {
+ net_stream_arm_reconnect(s);
+ return;
}
addr = qio_channel_socket_get_remote_address(sioc, NULL);
g_assert(addr != NULL);
uri = socket_uri(addr);
- qemu_set_info_str(&s->nc, "%s", uri);
+ qemu_set_info_str(&d->nc, "%s", uri);
g_free(uri);
-
- ret = qemu_socket_try_set_nonblock(sioc->fd);
- if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
- qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
- addr->u.fd.str, -ret);
- qapi_free_SocketAddress(addr);
- goto error;
- }
- g_assert(ret == 0);
-
- net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
-
- /* Disable Nagle algorithm on TCP sockets to reduce latency */
- qio_channel_set_delay(s->ioc, false);
-
- s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
- s, NULL);
- s->nc.link_down = false;
- qapi_event_send_netdev_stream_connected(s->nc.name, addr);
+ qapi_event_send_netdev_stream_connected(d->nc.name, addr);
qapi_free_SocketAddress(addr);
-
- return;
-error:
- object_unref(OBJECT(s->ioc));
- s->ioc = NULL;
- net_stream_arm_reconnect(s);
}
static gboolean net_stream_reconnect(gpointer data)
@@ -378,7 +222,7 @@ static gboolean net_stream_reconnect(gpointer data)
s->timer_tag = 0;
sioc = qio_channel_socket_new();
- s->ioc = QIO_CHANNEL(sioc);
+ s->data.ioc = QIO_CHANNEL(sioc);
qio_channel_socket_connect_async(sioc, s->addr,
net_stream_client_connected, s,
NULL, NULL);
@@ -388,7 +232,7 @@ static gboolean net_stream_reconnect(gpointer data)
static void net_stream_arm_reconnect(NetStreamState *s)
{
if (s->reconnect_ms && s->timer_tag == 0) {
- qemu_set_info_str(&s->nc, "connecting");
+ qemu_set_info_str(&s->data.nc, "connecting");
s->timer_tag = g_timeout_add(s->reconnect_ms, net_stream_reconnect, s);
}
}
@@ -405,11 +249,13 @@ static int net_stream_client_init(NetClientState *peer,
QIOChannelSocket *sioc = qio_channel_socket_new();
nc = qemu_new_net_client(&net_stream_info, peer, model, name);
- s = DO_UPCAST(NetStreamState, nc, nc);
- qemu_set_info_str(&s->nc, "connecting");
+ s = DO_UPCAST(NetStreamState, data.nc, nc);
+ qemu_set_info_str(&s->data.nc, "connecting");
- s->ioc = QIO_CHANNEL(sioc);
- s->nc.link_down = true;
+ s->data.ioc = QIO_CHANNEL(sioc);
+ s->data.nc.link_down = true;
+ s->data.send = net_stream_send;
+ s->data.listen = net_stream_listen;
s->reconnect_ms = reconnect_ms;
if (reconnect_ms) {