diff options
Diffstat (limited to 'nbd')
-rw-r--r-- | nbd/server.c | 144 |
1 files changed, 111 insertions, 33 deletions
diff --git a/nbd/server.c b/nbd/server.c index e91e2e0..941832f 100644 --- a/nbd/server.c +++ b/nbd/server.c @@ -125,23 +125,25 @@ struct NBDClient { int refcount; /* atomic */ void (*close_fn)(NBDClient *client, bool negotiated); + QemuMutex lock; + NBDExport *exp; QCryptoTLSCreds *tlscreds; char *tlsauthz; QIOChannelSocket *sioc; /* The underlying data channel */ QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */ - Coroutine *recv_coroutine; + Coroutine *recv_coroutine; /* protected by lock */ CoMutex send_lock; Coroutine *send_coroutine; - bool read_yielding; - bool quiescing; + bool read_yielding; /* protected by lock */ + bool quiescing; /* protected by lock */ QTAILQ_ENTRY(NBDClient) next; - int nb_requests; - bool closing; + int nb_requests; /* protected by lock */ + bool closing; /* protected by lock */ uint32_t check_align; /* If non-zero, check for aligned client requests */ @@ -1415,11 +1417,18 @@ nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp) len = qio_channel_readv(client->ioc, &iov, 1, errp); if (len == QIO_CHANNEL_ERR_BLOCK) { - client->read_yielding = true; + WITH_QEMU_LOCK_GUARD(&client->lock) { + client->read_yielding = true; + + /* Prompt main loop thread to re-run nbd_drained_poll() */ + aio_wait_kick(); + } qio_channel_yield(client->ioc, G_IO_IN); - client->read_yielding = false; - if (client->quiescing) { - return -EAGAIN; + WITH_QEMU_LOCK_GUARD(&client->lock) { + client->read_yielding = false; + if (client->quiescing) { + return -EAGAIN; + } } continue; } else if (len < 0) { @@ -1528,6 +1537,7 @@ void nbd_client_put(NBDClient *client) blk_exp_unref(&client->exp->common); } g_free(client->contexts.bitmaps); + qemu_mutex_destroy(&client->lock); g_free(client); } } @@ -1561,11 +1571,13 @@ static void client_close(NBDClient *client, bool negotiated) { assert(qemu_in_main_thread()); - if (client->closing) { - return; - } + WITH_QEMU_LOCK_GUARD(&client->lock) { + if (client->closing) { + return; + } - client->closing = true; + client->closing = true; + } /* Force requests to finish. They will drop their own references, * then we'll close the socket and free the NBDClient. @@ -1579,6 +1591,7 @@ static void client_close(NBDClient *client, bool negotiated) } } +/* Runs in export AioContext with client->lock held */ static NBDRequestData *nbd_request_get(NBDClient *client) { NBDRequestData *req; @@ -1591,6 +1604,7 @@ static NBDRequestData *nbd_request_get(NBDClient *client) return req; } +/* Runs in export AioContext with client->lock held */ static void nbd_request_put(NBDRequestData *req) { NBDClient *client = req->client; @@ -1614,14 +1628,18 @@ static void blk_aio_attached(AioContext *ctx, void *opaque) NBDExport *exp = opaque; NBDClient *client; + assert(qemu_in_main_thread()); + trace_nbd_blk_aio_attached(exp->name, ctx); exp->common.ctx = ctx; QTAILQ_FOREACH(client, &exp->clients, next) { - assert(client->nb_requests == 0); - assert(client->recv_coroutine == NULL); - assert(client->send_coroutine == NULL); + WITH_QEMU_LOCK_GUARD(&client->lock) { + assert(client->nb_requests == 0); + assert(client->recv_coroutine == NULL); + assert(client->send_coroutine == NULL); + } } } @@ -1629,6 +1647,8 @@ static void blk_aio_detach(void *opaque) { NBDExport *exp = opaque; + assert(qemu_in_main_thread()); + trace_nbd_blk_aio_detach(exp->name, exp->common.ctx); exp->common.ctx = NULL; @@ -1639,8 +1659,12 @@ static void nbd_drained_begin(void *opaque) NBDExport *exp = opaque; NBDClient *client; + assert(qemu_in_main_thread()); + QTAILQ_FOREACH(client, &exp->clients, next) { - client->quiescing = true; + WITH_QEMU_LOCK_GUARD(&client->lock) { + client->quiescing = true; + } } } @@ -1649,28 +1673,48 @@ static void nbd_drained_end(void *opaque) NBDExport *exp = opaque; NBDClient *client; + assert(qemu_in_main_thread()); + QTAILQ_FOREACH(client, &exp->clients, next) { - client->quiescing = false; - nbd_client_receive_next_request(client); + WITH_QEMU_LOCK_GUARD(&client->lock) { + client->quiescing = false; + nbd_client_receive_next_request(client); + } } } +/* Runs in export AioContext */ +static void nbd_wake_read_bh(void *opaque) +{ + NBDClient *client = opaque; + qio_channel_wake_read(client->ioc); +} + static bool nbd_drained_poll(void *opaque) { NBDExport *exp = opaque; NBDClient *client; + assert(qemu_in_main_thread()); + QTAILQ_FOREACH(client, &exp->clients, next) { - if (client->nb_requests != 0) { - /* - * If there's a coroutine waiting for a request on nbd_read_eof() - * enter it here so we don't depend on the client to wake it up. - */ - if (client->recv_coroutine != NULL && client->read_yielding) { - qio_channel_wake_read(client->ioc); - } + WITH_QEMU_LOCK_GUARD(&client->lock) { + if (client->nb_requests != 0) { + /* + * If there's a coroutine waiting for a request on nbd_read_eof() + * enter it here so we don't depend on the client to wake it up. + * + * Schedule a BH in the export AioContext to avoid missing the + * wake up due to the race between qio_channel_wake_read() and + * qio_channel_yield(). + */ + if (client->recv_coroutine != NULL && client->read_yielding) { + aio_bh_schedule_oneshot(nbd_export_aio_context(client->exp), + nbd_wake_read_bh, client); + } - return true; + return true; + } } } @@ -1681,6 +1725,8 @@ static void nbd_eject_notifier(Notifier *n, void *data) { NBDExport *exp = container_of(n, NBDExport, eject_notifier); + assert(qemu_in_main_thread()); + blk_exp_request_shutdown(&exp->common); } @@ -2566,7 +2612,6 @@ static int coroutine_fn nbd_co_receive_request(NBDRequestData *req, int ret; g_assert(qemu_in_coroutine()); - assert(client->recv_coroutine == qemu_coroutine_self()); ret = nbd_receive_request(client, request, errp); if (ret < 0) { return ret; @@ -2975,6 +3020,9 @@ static coroutine_fn void nbd_trip(void *opaque) */ trace_nbd_trip(); + + qemu_mutex_lock(&client->lock); + if (client->closing) { goto done; } @@ -2990,7 +3038,21 @@ static coroutine_fn void nbd_trip(void *opaque) } req = nbd_request_get(client); - ret = nbd_co_receive_request(req, &request, &local_err); + + /* + * nbd_co_receive_request() returns -EAGAIN when nbd_drained_begin() has + * set client->quiescing but by the time we get back nbd_drained_end() may + * have already cleared client->quiescing. In that case we try again + * because nothing else will spawn an nbd_trip() coroutine until we set + * client->recv_coroutine = NULL further down. + */ + do { + assert(client->recv_coroutine == qemu_coroutine_self()); + qemu_mutex_unlock(&client->lock); + ret = nbd_co_receive_request(req, &request, &local_err); + qemu_mutex_lock(&client->lock); + } while (ret == -EAGAIN && !client->quiescing); + client->recv_coroutine = NULL; if (client->closing) { @@ -3002,15 +3064,16 @@ static coroutine_fn void nbd_trip(void *opaque) } if (ret == -EAGAIN) { - assert(client->quiescing); goto done; } nbd_client_receive_next_request(client); + if (ret == -EIO) { goto disconnect; } + qemu_mutex_unlock(&client->lock); qio_channel_set_cork(client->ioc, true); if (ret < 0) { @@ -3030,6 +3093,10 @@ static coroutine_fn void nbd_trip(void *opaque) g_free(request.contexts->bitmaps); g_free(request.contexts); } + + qio_channel_set_cork(client->ioc, false); + qemu_mutex_lock(&client->lock); + if (ret < 0) { error_prepend(&local_err, "Failed to send reply: "); goto disconnect; @@ -3044,11 +3111,13 @@ static coroutine_fn void nbd_trip(void *opaque) goto disconnect; } - qio_channel_set_cork(client->ioc, false); done: if (req) { nbd_request_put(req); } + + qemu_mutex_unlock(&client->lock); + if (!nbd_client_put_nonzero(client)) { aio_co_reschedule_self(qemu_get_aio_context()); nbd_client_put(client); @@ -3059,13 +3128,19 @@ disconnect: if (local_err) { error_reportf_err(local_err, "Disconnect client, due to: "); } + nbd_request_put(req); + qemu_mutex_unlock(&client->lock); aio_co_reschedule_self(qemu_get_aio_context()); client_close(client, true); nbd_client_put(client); } +/* + * Runs in export AioContext and main loop thread. Caller must hold + * client->lock. + */ static void nbd_client_receive_next_request(NBDClient *client) { if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS && @@ -3091,7 +3166,9 @@ static coroutine_fn void nbd_co_client_start(void *opaque) return; } - nbd_client_receive_next_request(client); + WITH_QEMU_LOCK_GUARD(&client->lock) { + nbd_client_receive_next_request(client); + } } /* @@ -3108,6 +3185,7 @@ void nbd_client_new(QIOChannelSocket *sioc, Coroutine *co; client = g_new0(NBDClient, 1); + qemu_mutex_init(&client->lock); client->refcount = 1; client->tlscreds = tlscreds; if (tlscreds) { |