diff options
author | Sergio Lopez <slp@redhat.com> | 2020-12-14 18:05:18 +0100 |
---|---|---|
committer | Eric Blake <eblake@redhat.com> | 2021-01-20 14:48:54 -0600 |
commit | f148ae7d36cbb924447f4b528a94d7799836c749 (patch) | |
tree | 9c08b2964964a841c219c1ea61f7f2addd42ffdf /nbd/server.c | |
parent | c7040ff64ec93ee925a81d3547db925fe7d1f1c0 (diff) | |
download | qemu-f148ae7d36cbb924447f4b528a94d7799836c749.zip qemu-f148ae7d36cbb924447f4b528a94d7799836c749.tar.gz qemu-f148ae7d36cbb924447f4b528a94d7799836c749.tar.bz2 |
nbd/server: Quiesce coroutines on context switch
When switching between AIO contexts we need to me make sure that both
recv_coroutine and send_coroutine are not scheduled to run. Otherwise,
QEMU may crash while attaching the new context with an error like
this one:
aio_co_schedule: Co-routine was already scheduled in 'aio_co_schedule'
To achieve this we need a local implementation of
'qio_channel_readv_all_eof' named 'nbd_read_eof' (a trick already done
by 'nbd/client.c') that allows us to interrupt the operation and to
know when recv_coroutine is yielding.
With this in place, we delegate detaching the AIO context to the
owning context with a BH ('nbd_aio_detach_bh') scheduled using
'aio_wait_bh_oneshot'. This BH signals that we need to quiesce the
channel by setting 'client->quiescing' to 'true', and either waits for
the coroutine to finish using AIO_WAIT_WHILE or, if it's yielding in
'nbd_read_eof', actively enters the coroutine to interrupt it.
RHBZ: https://bugzilla.redhat.com/show_bug.cgi?id=1900326
Signed-off-by: Sergio Lopez <slp@redhat.com>
Reviewed-by: Eric Blake <eblake@redhat.com>
Message-Id: <20201214170519.223781-4-slp@redhat.com>
Signed-off-by: Eric Blake <eblake@redhat.com>
Diffstat (limited to 'nbd/server.c')
-rw-r--r-- | nbd/server.c | 120 |
1 files changed, 106 insertions, 14 deletions
diff --git a/nbd/server.c b/nbd/server.c index 613ed26..7229f48 100644 --- a/nbd/server.c +++ b/nbd/server.c @@ -132,6 +132,9 @@ struct NBDClient { CoMutex send_lock; Coroutine *send_coroutine; + bool read_yielding; + bool quiescing; + QTAILQ_ENTRY(NBDClient) next; int nb_requests; bool closing; @@ -1352,14 +1355,60 @@ static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp) return 0; } -static int nbd_receive_request(QIOChannel *ioc, NBDRequest *request, +/* nbd_read_eof + * Tries to read @size bytes from @ioc. This is a local implementation of + * qio_channel_readv_all_eof. We have it here because we need it to be + * interruptible and to know when the coroutine is yielding. + * Returns 1 on success + * 0 on eof, when no data was read (errp is not set) + * negative errno on failure (errp is set) + */ +static inline int coroutine_fn +nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp) +{ + bool partial = false; + + assert(size); + while (size > 0) { + struct iovec iov = { .iov_base = buffer, .iov_len = size }; + ssize_t len; + + len = qio_channel_readv(client->ioc, &iov, 1, errp); + if (len == QIO_CHANNEL_ERR_BLOCK) { + client->read_yielding = true; + qio_channel_yield(client->ioc, G_IO_IN); + client->read_yielding = false; + if (client->quiescing) { + return -EAGAIN; + } + continue; + } else if (len < 0) { + return -EIO; + } else if (len == 0) { + if (partial) { + error_setg(errp, + "Unexpected end-of-file before all bytes were read"); + return -EIO; + } else { + return 0; + } + } + + partial = true; + size -= len; + buffer = (uint8_t *) buffer + len; + } + return 1; +} + +static int nbd_receive_request(NBDClient *client, NBDRequest *request, Error **errp) { uint8_t buf[NBD_REQUEST_SIZE]; uint32_t magic; int ret; - ret = nbd_read(ioc, buf, sizeof(buf), "request", errp); + ret = nbd_read_eof(client, buf, sizeof(buf), errp); if (ret < 0) { return ret; } @@ -1480,11 +1529,37 @@ static void blk_aio_attached(AioContext *ctx, void *opaque) QTAILQ_FOREACH(client, &exp->clients, next) { qio_channel_attach_aio_context(client->ioc, ctx); + + assert(client->recv_coroutine == NULL); + assert(client->send_coroutine == NULL); + + if (client->quiescing) { + client->quiescing = false; + nbd_client_receive_next_request(client); + } + } +} + +static void nbd_aio_detach_bh(void *opaque) +{ + NBDExport *exp = opaque; + NBDClient *client; + + QTAILQ_FOREACH(client, &exp->clients, next) { + qio_channel_detach_aio_context(client->ioc); + client->quiescing = true; + if (client->recv_coroutine) { - aio_co_schedule(ctx, client->recv_coroutine); + if (client->read_yielding) { + qemu_aio_coroutine_enter(exp->common.ctx, + client->recv_coroutine); + } else { + AIO_WAIT_WHILE(exp->common.ctx, client->recv_coroutine != NULL); + } } + if (client->send_coroutine) { - aio_co_schedule(ctx, client->send_coroutine); + AIO_WAIT_WHILE(exp->common.ctx, client->send_coroutine != NULL); } } } @@ -1492,13 +1567,10 @@ static void blk_aio_attached(AioContext *ctx, void *opaque) static void blk_aio_detach(void *opaque) { NBDExport *exp = opaque; - NBDClient *client; trace_nbd_blk_aio_detach(exp->name, exp->common.ctx); - QTAILQ_FOREACH(client, &exp->clients, next) { - qio_channel_detach_aio_context(client->ioc); - } + aio_wait_bh_oneshot(exp->common.ctx, nbd_aio_detach_bh, exp); exp->common.ctx = NULL; } @@ -2151,20 +2223,23 @@ static int nbd_co_send_bitmap(NBDClient *client, uint64_t handle, /* nbd_co_receive_request * Collect a client request. Return 0 if request looks valid, -EIO to drop - * connection right away, and any other negative value to report an error to - * the client (although the caller may still need to disconnect after reporting - * the error). + * connection right away, -EAGAIN to indicate we were interrupted and the + * channel should be quiesced, and any other negative value to report an error + * to the client (although the caller may still need to disconnect after + * reporting the error). */ static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request, Error **errp) { NBDClient *client = req->client; int valid_flags; + int ret; g_assert(qemu_in_coroutine()); assert(client->recv_coroutine == qemu_coroutine_self()); - if (nbd_receive_request(client->ioc, request, errp) < 0) { - return -EIO; + ret = nbd_receive_request(client, request, errp); + if (ret < 0) { + return ret; } trace_nbd_co_receive_request_decode_type(request->handle, request->type, @@ -2507,6 +2582,17 @@ static coroutine_fn void nbd_trip(void *opaque) return; } + if (client->quiescing) { + /* + * We're switching between AIO contexts. Don't attempt to receive a new + * request and kick the main context which may be waiting for us. + */ + nbd_client_put(client); + client->recv_coroutine = NULL; + aio_wait_kick(); + return; + } + req = nbd_request_get(client); ret = nbd_co_receive_request(req, &request, &local_err); client->recv_coroutine = NULL; @@ -2519,6 +2605,11 @@ static coroutine_fn void nbd_trip(void *opaque) goto done; } + if (ret == -EAGAIN) { + assert(client->quiescing); + goto done; + } + nbd_client_receive_next_request(client); if (ret == -EIO) { goto disconnect; @@ -2565,7 +2656,8 @@ disconnect: static void nbd_client_receive_next_request(NBDClient *client) { - if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) { + if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS && + !client->quiescing) { nbd_client_get(client); client->recv_coroutine = qemu_coroutine_create(nbd_trip, client); aio_co_schedule(client->exp->common.ctx, client->recv_coroutine); |