aboutsummaryrefslogtreecommitdiff
path: root/net/stream_data.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/stream_data.c')
-rw-r--r--net/stream_data.c193
1 files changed, 193 insertions, 0 deletions
diff --git a/net/stream_data.c b/net/stream_data.c
new file mode 100644
index 0000000..5af27e0
--- /dev/null
+++ b/net/stream_data.c
@@ -0,0 +1,193 @@
+/*
+ * net stream generic functions
+ *
+ * Copyright Red Hat
+ *
+ * SPDX-License-Identifier: GPL-2.0-or-later
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/iov.h"
+#include "qapi/error.h"
+#include "net/net.h"
+#include "io/channel.h"
+#include "io/net-listener.h"
+
+#include "stream_data.h"
+
+static gboolean net_stream_data_writable(QIOChannel *ioc,
+ GIOCondition condition, gpointer data)
+{
+ NetStreamData *d = data;
+
+ d->ioc_write_tag = 0;
+
+ qemu_flush_queued_packets(&d->nc);
+
+ return G_SOURCE_REMOVE;
+}
+
+ssize_t net_stream_data_receive(NetStreamData *d, const uint8_t *buf,
+ size_t size)
+{
+ uint32_t len = htonl(size);
+ struct iovec iov[] = {
+ {
+ .iov_base = &len,
+ .iov_len = sizeof(len),
+ }, {
+ .iov_base = (void *)buf,
+ .iov_len = size,
+ },
+ };
+ struct iovec local_iov[2];
+ unsigned int nlocal_iov;
+ size_t remaining;
+ ssize_t ret;
+
+ remaining = iov_size(iov, 2) - d->send_index;
+ nlocal_iov = iov_copy(local_iov, 2, iov, 2, d->send_index, remaining);
+ ret = qio_channel_writev(d->ioc, local_iov, nlocal_iov, NULL);
+ if (ret == QIO_CHANNEL_ERR_BLOCK) {
+ ret = 0; /* handled further down */
+ }
+ if (ret == -1) {
+ d->send_index = 0;
+ return -errno;
+ }
+ if (ret < (ssize_t)remaining) {
+ d->send_index += ret;
+ d->ioc_write_tag = qio_channel_add_watch(d->ioc, G_IO_OUT,
+ net_stream_data_writable, d,
+ NULL);
+ return 0;
+ }
+ d->send_index = 0;
+ return size;
+}
+
+static void net_stream_data_send_completed(NetClientState *nc, ssize_t len)
+{
+ NetStreamData *d = DO_UPCAST(NetStreamData, nc, nc);
+
+ if (!d->ioc_read_tag) {
+ d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d,
+ NULL);
+ }
+}
+
+void net_stream_data_rs_finalize(SocketReadState *rs)
+{
+ NetStreamData *d = container_of(rs, NetStreamData, rs);
+
+ if (qemu_send_packet_async(&d->nc, rs->buf,
+ rs->packet_len,
+ net_stream_data_send_completed) == 0) {
+ if (d->ioc_read_tag) {
+ g_source_remove(d->ioc_read_tag);
+ d->ioc_read_tag = 0;
+ }
+ }
+}
+
+gboolean net_stream_data_send(QIOChannel *ioc, GIOCondition condition,
+ NetStreamData *d)
+{
+ int size;
+ int ret;
+ QEMU_UNINITIALIZED char buf1[NET_BUFSIZE];
+ const char *buf;
+
+ size = qio_channel_read(d->ioc, buf1, sizeof(buf1), NULL);
+ if (size < 0) {
+ if (errno != EWOULDBLOCK) {
+ goto eoc;
+ }
+ } else if (size == 0) {
+ /* end of connection */
+ eoc:
+ d->ioc_read_tag = 0;
+ if (d->ioc_write_tag) {
+ g_source_remove(d->ioc_write_tag);
+ d->ioc_write_tag = 0;
+ }
+ if (d->listener) {
+ qemu_set_info_str(&d->nc, "listening");
+ qio_net_listener_set_client_func(d->listener,
+ d->listen, d, NULL);
+ }
+ object_unref(OBJECT(d->ioc));
+ d->ioc = NULL;
+
+ net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false);
+ d->nc.link_down = true;
+
+ return G_SOURCE_REMOVE;
+ }
+ buf = buf1;
+
+ ret = net_fill_rstate(&d->rs, (const uint8_t *)buf, size);
+
+ if (ret == -1) {
+ goto eoc;
+ }
+
+ return G_SOURCE_CONTINUE;
+}
+
+void net_stream_data_listen(QIONetListener *listener,
+ QIOChannelSocket *cioc,
+ NetStreamData *d)
+{
+ object_ref(OBJECT(cioc));
+
+ qio_net_listener_set_client_func(d->listener, NULL, d, NULL);
+
+ d->ioc = QIO_CHANNEL(cioc);
+ qio_channel_set_name(d->ioc, "stream-server");
+ d->nc.link_down = false;
+
+ d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d, NULL);
+}
+
+int net_stream_data_client_connected(QIOTask *task, NetStreamData *d)
+{
+ QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(d->ioc);
+ SocketAddress *addr;
+ int ret;
+ Error *err = NULL;
+
+ if (qio_task_propagate_error(task, &err)) {
+ qemu_set_info_str(&d->nc, "error: %s", error_get_pretty(err));
+ error_free(err);
+ goto error;
+ }
+
+ addr = qio_channel_socket_get_remote_address(sioc, NULL);
+ g_assert(addr != NULL);
+
+ ret = qemu_socket_try_set_nonblock(sioc->fd);
+ if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
+ qemu_set_info_str(&d->nc, "can't use file descriptor %s (errno %d)",
+ addr->u.fd.str, -ret);
+ qapi_free_SocketAddress(addr);
+ goto error;
+ }
+ g_assert(ret == 0);
+ qapi_free_SocketAddress(addr);
+
+ net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false);
+
+ /* Disable Nagle algorithm on TCP sockets to reduce latency */
+ qio_channel_set_delay(d->ioc, false);
+
+ d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d, NULL);
+ d->nc.link_down = false;
+
+ return 0;
+error:
+ object_unref(OBJECT(d->ioc));
+ d->ioc = NULL;
+
+ return -1;
+}