From 064097d919508e8636b220baedb52b382b9b07c6 Mon Sep 17 00:00:00 2001 From: "Daniel P. Berrange" Date: Wed, 10 Feb 2016 18:41:01 +0000 Subject: nbd: convert block client to use I/O channels for connection setup This converts the NBD block driver client to use the QIOChannelSocket class for initial connection setup. The NbdClientSession struct has two pointers, one to the master QIOChannelSocket providing the raw data channel, and one to a QIOChannel which is the current channel used for I/O. Initially the two point to the same object, but when TLS support is added, they will point to different objects. The qemu-img & qemu-io tools now need to use MODULE_INIT_QOM to ensure the QIOChannel object classes are registered. The qemu-nbd tool already did this. In this initial conversion though, all I/O is still actually done using the raw POSIX sockets APIs. Signed-off-by: Daniel P. Berrange Message-Id: <1455129674-17255-4-git-send-email-berrange@redhat.com> Signed-off-by: Paolo Bonzini --- block/nbd-client.c | 76 +++++++++++++++++++++++++++++++++--------------------- 1 file changed, 47 insertions(+), 29 deletions(-) (limited to 'block/nbd-client.c') diff --git a/block/nbd-client.c b/block/nbd-client.c index 568c56c..c4379a9 100644 --- a/block/nbd-client.c +++ b/block/nbd-client.c @@ -28,7 +28,6 @@ #include "qemu/osdep.h" #include "nbd-client.h" -#include "qemu/sockets.h" #define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs)) #define INDEX_TO_HANDLE(bs, index) ((index) ^ ((uint64_t)(intptr_t)bs)) @@ -48,13 +47,21 @@ static void nbd_teardown_connection(BlockDriverState *bs) { NbdClientSession *client = nbd_get_client_session(bs); + if (!client->ioc) { /* Already closed */ + return; + } + /* finish any pending coroutines */ - shutdown(client->sock, 2); + qio_channel_shutdown(client->ioc, + QIO_CHANNEL_SHUTDOWN_BOTH, + NULL); nbd_recv_coroutines_enter_all(client); nbd_client_detach_aio_context(bs); - closesocket(client->sock); - client->sock = -1; + object_unref(OBJECT(client->sioc)); + client->sioc = NULL; + object_unref(OBJECT(client->ioc)); + client->ioc = NULL; } static void nbd_reply_ready(void *opaque) @@ -64,12 +71,16 @@ static void nbd_reply_ready(void *opaque) uint64_t i; int ret; + if (!s->ioc) { /* Already closed */ + return; + } + if (s->reply.handle == 0) { /* No reply already in flight. Fetch a header. It is possible * that another thread has done the same thing in parallel, so * the socket is not readable anymore. */ - ret = nbd_receive_reply(s->sock, &s->reply); + ret = nbd_receive_reply(s->sioc->fd, &s->reply); if (ret == -EAGAIN) { return; } @@ -122,30 +133,32 @@ static int nbd_co_send_request(BlockDriverState *bs, assert(i < MAX_NBD_REQUESTS); request->handle = INDEX_TO_HANDLE(s, i); + + if (!s->ioc) { + qemu_co_mutex_unlock(&s->send_mutex); + return -EPIPE; + } + s->send_coroutine = qemu_coroutine_self(); aio_context = bdrv_get_aio_context(bs); - aio_set_fd_handler(aio_context, s->sock, false, + aio_set_fd_handler(aio_context, s->sioc->fd, false, nbd_reply_ready, nbd_restart_write, bs); if (qiov) { - if (!s->is_unix) { - socket_set_cork(s->sock, 1); - } - rc = nbd_send_request(s->sock, request); + qio_channel_set_cork(s->ioc, true); + rc = nbd_send_request(s->sioc->fd, request); if (rc >= 0) { - ret = qemu_co_sendv(s->sock, qiov->iov, qiov->niov, + ret = qemu_co_sendv(s->sioc->fd, qiov->iov, qiov->niov, offset, request->len); if (ret != request->len) { rc = -EIO; } } - if (!s->is_unix) { - socket_set_cork(s->sock, 0); - } + qio_channel_set_cork(s->ioc, false); } else { - rc = nbd_send_request(s->sock, request); + rc = nbd_send_request(s->sioc->fd, request); } - aio_set_fd_handler(aio_context, s->sock, false, + aio_set_fd_handler(aio_context, s->sioc->fd, false, nbd_reply_ready, NULL, bs); s->send_coroutine = NULL; qemu_co_mutex_unlock(&s->send_mutex); @@ -162,11 +175,12 @@ static void nbd_co_receive_reply(NbdClientSession *s, * peek at the next reply and avoid yielding if it's ours? */ qemu_coroutine_yield(); *reply = s->reply; - if (reply->handle != request->handle) { + if (reply->handle != request->handle || + !s->ioc) { reply->error = EIO; } else { if (qiov && reply->error == 0) { - ret = qemu_co_recvv(s->sock, qiov->iov, qiov->niov, + ret = qemu_co_recvv(s->sioc->fd, qiov->iov, qiov->niov, offset, request->len); if (ret != request->len) { reply->error = EIO; @@ -350,14 +364,14 @@ int nbd_client_co_discard(BlockDriverState *bs, int64_t sector_num, void nbd_client_detach_aio_context(BlockDriverState *bs) { aio_set_fd_handler(bdrv_get_aio_context(bs), - nbd_get_client_session(bs)->sock, + nbd_get_client_session(bs)->sioc->fd, false, NULL, NULL, NULL); } void nbd_client_attach_aio_context(BlockDriverState *bs, AioContext *new_context) { - aio_set_fd_handler(new_context, nbd_get_client_session(bs)->sock, + aio_set_fd_handler(new_context, nbd_get_client_session(bs)->sioc->fd, false, nbd_reply_ready, NULL, bs); } @@ -370,39 +384,43 @@ void nbd_client_close(BlockDriverState *bs) .len = 0 }; - if (client->sock == -1) { + if (client->ioc == NULL) { return; } - nbd_send_request(client->sock, &request); + nbd_send_request(client->sioc->fd, &request); nbd_teardown_connection(bs); } -int nbd_client_init(BlockDriverState *bs, int sock, const char *export, - Error **errp) +int nbd_client_init(BlockDriverState *bs, QIOChannelSocket *sioc, + const char *export, Error **errp) { NbdClientSession *client = nbd_get_client_session(bs); int ret; /* NBD handshake */ logout("session init %s\n", export); - qemu_set_block(sock); - ret = nbd_receive_negotiate(sock, export, + qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL); + + ret = nbd_receive_negotiate(sioc->fd, export, &client->nbdflags, &client->size, errp); if (ret < 0) { logout("Failed to negotiate with the NBD server\n"); - closesocket(sock); return ret; } qemu_co_mutex_init(&client->send_mutex); qemu_co_mutex_init(&client->free_sema); - client->sock = sock; + client->sioc = sioc; + object_ref(OBJECT(client->sioc)); + client->ioc = QIO_CHANNEL(sioc); + object_ref(OBJECT(client->ioc)); /* Now that we're connected, set the socket to be non-blocking and * kick the reply mechanism. */ - qemu_set_nonblock(sock); + qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL); + nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs)); logout("Established connection with NBD server\n"); -- cgit v1.1