aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--block/nbd.c375
-rw-r--r--nbd/client.c2
2 files changed, 100 insertions, 277 deletions
diff --git a/block/nbd.c b/block/nbd.c
index 709c249..8ff6daf 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -57,7 +57,7 @@
typedef struct {
Coroutine *coroutine;
uint64_t offset; /* original offset of the request */
- bool receiving; /* waiting for connection_co? */
+ bool receiving; /* sleeping in the yield in nbd_receive_replies */
} NBDClientRequest;
typedef enum NBDClientState {
@@ -73,14 +73,10 @@ typedef struct BDRVNBDState {
CoMutex send_mutex;
CoQueue free_sema;
- Coroutine *connection_co;
- Coroutine *teardown_co;
- QemuCoSleep reconnect_sleep;
- bool drained;
- bool wait_drained_end;
+
+ CoMutex receive_mutex;
int in_flight;
NBDClientState state;
- bool wait_in_flight;
QEMUTimer *reconnect_delay_timer;
@@ -163,6 +159,8 @@ static void nbd_channel_error(BDRVNBDState *s, int ret)
} else {
s->state = NBD_CLIENT_QUIT;
}
+
+ nbd_recv_coroutines_wake(s, true);
}
static void reconnect_delay_timer_del(BDRVNBDState *s)
@@ -179,6 +177,7 @@ static void reconnect_delay_timer_cb(void *opaque)
if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
s->state = NBD_CLIENT_CONNECTING_NOWAIT;
+ nbd_co_establish_connection_cancel(s->conn);
while (qemu_co_enter_next(&s->free_sema, NULL)) {
/* Resume all queued requests */
}
@@ -201,113 +200,21 @@ static void reconnect_delay_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
timer_mod(s->reconnect_delay_timer, expire_time_ns);
}
-static void nbd_client_detach_aio_context(BlockDriverState *bs)
-{
- BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
- /* Timer is deleted in nbd_client_co_drain_begin() */
- assert(!s->reconnect_delay_timer);
- /*
- * If reconnect is in progress we may have no ->ioc. It will be
- * re-instantiated in the proper aio context once the connection is
- * reestablished.
- */
- if (s->ioc) {
- qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
- }
-}
-
-static void nbd_client_attach_aio_context_bh(void *opaque)
-{
- BlockDriverState *bs = opaque;
- BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
- if (s->connection_co) {
- /*
- * The node is still drained, so we know the coroutine has yielded in
- * nbd_read_eof(), the only place where bs->in_flight can reach 0, or
- * it is entered for the first time. Both places are safe for entering
- * the coroutine.
- */
- qemu_aio_coroutine_enter(bs->aio_context, s->connection_co);
- }
- bdrv_dec_in_flight(bs);
-}
-
-static void nbd_client_attach_aio_context(BlockDriverState *bs,
- AioContext *new_context)
-{
- BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
- /*
- * s->connection_co is either yielded from nbd_receive_reply or from
- * nbd_co_reconnect_loop()
- */
- if (nbd_client_connected(s)) {
- qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context);
- }
-
- bdrv_inc_in_flight(bs);
-
- /*
- * Need to wait here for the BH to run because the BH must run while the
- * node is still drained.
- */
- aio_wait_bh_oneshot(new_context, nbd_client_attach_aio_context_bh, bs);
-}
-
-static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs)
-{
- BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
- s->drained = true;
- qemu_co_sleep_wake(&s->reconnect_sleep);
-
- nbd_co_establish_connection_cancel(s->conn);
-
- reconnect_delay_timer_del(s);
-
- if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
- s->state = NBD_CLIENT_CONNECTING_NOWAIT;
- qemu_co_queue_restart_all(&s->free_sema);
- }
-}
-
-static void coroutine_fn nbd_client_co_drain_end(BlockDriverState *bs)
-{
- BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
- s->drained = false;
- if (s->wait_drained_end) {
- s->wait_drained_end = false;
- aio_co_wake(s->connection_co);
- }
-}
-
-
static void nbd_teardown_connection(BlockDriverState *bs)
{
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
+ assert(!s->in_flight);
+
if (s->ioc) {
- /* finish any pending coroutines */
qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+ yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
+ nbd_yank, s->bs);
+ object_unref(OBJECT(s->ioc));
+ s->ioc = NULL;
}
s->state = NBD_CLIENT_QUIT;
- if (s->connection_co) {
- qemu_co_sleep_wake(&s->reconnect_sleep);
- nbd_co_establish_connection_cancel(s->conn);
- }
- if (qemu_in_coroutine()) {
- s->teardown_co = qemu_coroutine_self();
- /* connection_co resumes us when it terminates */
- qemu_coroutine_yield();
- s->teardown_co = NULL;
- } else {
- BDRV_POLL_WHILE(bs, s->connection_co);
- }
- assert(!s->connection_co);
}
static bool nbd_client_connecting(BDRVNBDState *s)
@@ -372,10 +279,11 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
{
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
int ret;
+ bool blocking = nbd_client_connecting_wait(s);
assert(!s->ioc);
- s->ioc = nbd_co_establish_connection(s->conn, &s->info, true, errp);
+ s->ioc = nbd_co_establish_connection(s->conn, &s->info, blocking, errp);
if (!s->ioc) {
return -ECONNREFUSED;
}
@@ -411,29 +319,22 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
return 0;
}
+/* called under s->send_mutex */
static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
{
- if (!nbd_client_connecting(s)) {
- return;
- }
+ assert(nbd_client_connecting(s));
+ assert(s->in_flight == 0);
- /* Wait for completion of all in-flight requests */
-
- qemu_co_mutex_lock(&s->send_mutex);
-
- while (s->in_flight > 0) {
- qemu_co_mutex_unlock(&s->send_mutex);
- nbd_recv_coroutines_wake(s, true);
- s->wait_in_flight = true;
- qemu_coroutine_yield();
- s->wait_in_flight = false;
- qemu_co_mutex_lock(&s->send_mutex);
- }
-
- qemu_co_mutex_unlock(&s->send_mutex);
-
- if (!nbd_client_connecting(s)) {
- return;
+ if (nbd_client_connecting_wait(s) && s->reconnect_delay &&
+ !s->reconnect_delay_timer)
+ {
+ /*
+ * It's first reconnect attempt after switching to
+ * NBD_CLIENT_CONNECTING_WAIT
+ */
+ reconnect_delay_timer_init(s,
+ qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
+ s->reconnect_delay * NANOSECONDS_PER_SECOND);
}
/*
@@ -453,135 +354,80 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
nbd_co_do_establish_connection(s->bs, NULL);
}
-static coroutine_fn void nbd_co_reconnect_loop(BDRVNBDState *s)
+static coroutine_fn int nbd_receive_replies(BDRVNBDState *s, uint64_t handle)
{
- uint64_t timeout = 1 * NANOSECONDS_PER_SECOND;
- uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND;
+ int ret;
+ uint64_t ind = HANDLE_TO_INDEX(s, handle), ind2;
+ QEMU_LOCK_GUARD(&s->receive_mutex);
- if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
- reconnect_delay_timer_init(s, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
- s->reconnect_delay * NANOSECONDS_PER_SECOND);
- }
-
- nbd_reconnect_attempt(s);
-
- while (nbd_client_connecting(s)) {
- if (s->drained) {
- bdrv_dec_in_flight(s->bs);
- s->wait_drained_end = true;
- while (s->drained) {
- /*
- * We may be entered once from nbd_client_attach_aio_context_bh
- * and then from nbd_client_co_drain_end. So here is a loop.
- */
- qemu_coroutine_yield();
- }
- bdrv_inc_in_flight(s->bs);
- } else {
- qemu_co_sleep_ns_wakeable(&s->reconnect_sleep,
- QEMU_CLOCK_REALTIME, timeout);
- if (s->drained) {
- continue;
- }
- if (timeout < max_timeout) {
- timeout *= 2;
- }
+ while (true) {
+ if (s->reply.handle == handle) {
+ /* We are done */
+ return 0;
}
- nbd_reconnect_attempt(s);
- }
-
- reconnect_delay_timer_del(s);
-}
+ if (!nbd_client_connected(s)) {
+ return -EIO;
+ }
-static coroutine_fn void nbd_connection_entry(void *opaque)
-{
- BDRVNBDState *s = opaque;
- uint64_t i;
- int ret = 0;
- Error *local_err = NULL;
+ if (s->reply.handle != 0) {
+ /*
+ * Some other request is being handled now. It should already be
+ * woken by whoever set s->reply.handle (or never wait in this
+ * yield). So, we should not wake it here.
+ */
+ ind2 = HANDLE_TO_INDEX(s, s->reply.handle);
+ assert(!s->requests[ind2].receiving);
- while (qatomic_load_acquire(&s->state) != NBD_CLIENT_QUIT) {
- /*
- * The NBD client can only really be considered idle when it has
- * yielded from qio_channel_readv_all_eof(), waiting for data. This is
- * the point where the additional scheduled coroutine entry happens
- * after nbd_client_attach_aio_context().
- *
- * Therefore we keep an additional in_flight reference all the time and
- * only drop it temporarily here.
- */
+ s->requests[ind].receiving = true;
+ qemu_co_mutex_unlock(&s->receive_mutex);
- if (nbd_client_connecting(s)) {
- nbd_co_reconnect_loop(s);
- }
+ qemu_coroutine_yield();
+ /*
+ * We may be woken for 3 reasons:
+ * 1. From this function, executing in parallel coroutine, when our
+ * handle is received.
+ * 2. From nbd_channel_error(), when connection is lost.
+ * 3. From nbd_co_receive_one_chunk(), when previous request is
+ * finished and s->reply.handle set to 0.
+ * Anyway, it's OK to lock the mutex and go to the next iteration.
+ */
- if (!nbd_client_connected(s)) {
+ qemu_co_mutex_lock(&s->receive_mutex);
+ assert(!s->requests[ind].receiving);
continue;
}
+ /* We are under mutex and handle is 0. We have to do the dirty work. */
assert(s->reply.handle == 0);
- ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err);
-
- if (local_err) {
- trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
- error_free(local_err);
- local_err = NULL;
- }
+ ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, NULL);
if (ret <= 0) {
- nbd_channel_error(s, ret ? ret : -EIO);
- continue;
+ ret = ret ? ret : -EIO;
+ nbd_channel_error(s, ret);
+ return ret;
}
-
- /*
- * There's no need for a mutex on the receive side, because the
- * handler acts as a synchronization point and ensures that only
- * one coroutine is called until the reply finishes.
- */
- i = HANDLE_TO_INDEX(s, s->reply.handle);
- if (i >= MAX_NBD_REQUESTS ||
- !s->requests[i].coroutine ||
- !s->requests[i].receiving ||
- (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
- {
+ if (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply) {
nbd_channel_error(s, -EINVAL);
- continue;
+ return -EINVAL;
}
-
- /*
- * We're woken up again by the request itself. Note that there
- * is no race between yielding and reentering connection_co. This
- * is because:
- *
- * - if the request runs on the same AioContext, it is only
- * entered after we yield
- *
- * - if the request runs on a different AioContext, reentering
- * connection_co happens through a bottom half, which can only
- * run after we yield.
- */
- s->requests[i].receiving = false;
- aio_co_wake(s->requests[i].coroutine);
- qemu_coroutine_yield();
- }
-
- qemu_co_queue_restart_all(&s->free_sema);
- nbd_recv_coroutines_wake(s, true);
- bdrv_dec_in_flight(s->bs);
-
- s->connection_co = NULL;
- if (s->ioc) {
- qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
- yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
- nbd_yank, s->bs);
- object_unref(OBJECT(s->ioc));
- s->ioc = NULL;
- }
-
- if (s->teardown_co) {
- aio_co_wake(s->teardown_co);
+ if (s->reply.handle == handle) {
+ /* We are done */
+ return 0;
+ }
+ ind2 = HANDLE_TO_INDEX(s, s->reply.handle);
+ if (ind2 >= MAX_NBD_REQUESTS || !s->requests[ind2].coroutine) {
+ /*
+ * We only check that ind2 request exists. But don't check
+ * whether it is now waiting for the reply header or
+ * not. We can't just check s->requests[ind2].receiving:
+ * ind2 request may wait in trying to lock
+ * receive_mutex. So that's a TODO.
+ */
+ nbd_channel_error(s, -EINVAL);
+ return -EINVAL;
+ }
+ nbd_recv_coroutine_wake_one(&s->requests[ind2]);
}
- aio_wait_kick();
}
static int nbd_co_send_request(BlockDriverState *bs,
@@ -592,10 +438,17 @@ static int nbd_co_send_request(BlockDriverState *bs,
int rc, i = -1;
qemu_co_mutex_lock(&s->send_mutex);
- while (s->in_flight == MAX_NBD_REQUESTS || nbd_client_connecting_wait(s)) {
+
+ while (s->in_flight == MAX_NBD_REQUESTS ||
+ (!nbd_client_connected(s) && s->in_flight > 0))
+ {
qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
}
+ if (nbd_client_connecting(s)) {
+ nbd_reconnect_attempt(s);
+ }
+
if (!nbd_client_connected(s)) {
rc = -EIO;
goto err;
@@ -642,10 +495,6 @@ err:
if (i != -1) {
s->requests[i].coroutine = NULL;
s->in_flight--;
- }
- if (s->in_flight == 0 && s->wait_in_flight) {
- aio_co_wake(s->connection_co);
- } else {
qemu_co_queue_next(&s->free_sema);
}
}
@@ -944,10 +793,7 @@ static coroutine_fn int nbd_co_do_receive_one_chunk(
}
*request_ret = 0;
- /* Wait until we're woken up by nbd_connection_entry. */
- s->requests[i].receiving = true;
- qemu_coroutine_yield();
- assert(!s->requests[i].receiving);
+ nbd_receive_replies(s, handle);
if (!nbd_client_connected(s)) {
error_setg(errp, "Connection closed");
return -EIO;
@@ -1040,14 +886,7 @@ static coroutine_fn int nbd_co_receive_one_chunk(
}
s->reply.handle = 0;
- if (s->connection_co && !s->wait_in_flight) {
- /*
- * We must check s->wait_in_flight, because we may entered by
- * nbd_recv_coroutines_wake(), in this case we should not
- * wake connection_co here, it will woken by last request.
- */
- aio_co_wake(s->connection_co);
- }
+ nbd_recv_coroutines_wake(s, false);
return ret;
}
@@ -1158,11 +997,7 @@ break_loop:
qemu_co_mutex_lock(&s->send_mutex);
s->in_flight--;
- if (s->in_flight == 0 && s->wait_in_flight) {
- aio_co_wake(s->connection_co);
- } else {
- qemu_co_queue_next(&s->free_sema);
- }
+ qemu_co_queue_next(&s->free_sema);
qemu_co_mutex_unlock(&s->send_mutex);
return false;
@@ -1984,6 +1819,7 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
s->bs = bs;
qemu_co_mutex_init(&s->send_mutex);
qemu_co_queue_init(&s->free_sema);
+ qemu_co_mutex_init(&s->receive_mutex);
if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) {
return -EEXIST;
@@ -1998,14 +1834,13 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
s->x_dirty_bitmap, s->tlscreds);
/* TODO: Configurable retry-until-timeout behaviour. */
+ s->state = NBD_CLIENT_CONNECTING_WAIT;
ret = nbd_do_establish_connection(bs, errp);
if (ret < 0) {
goto fail;
}
- s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
- bdrv_inc_in_flight(bs);
- aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);
+ nbd_client_connection_enable_retry(s->conn);
return 0;
@@ -2159,6 +1994,8 @@ static void nbd_cancel_in_flight(BlockDriverState *bs)
s->state = NBD_CLIENT_CONNECTING_NOWAIT;
qemu_co_queue_restart_all(&s->free_sema);
}
+
+ nbd_co_establish_connection_cancel(s->conn);
}
static BlockDriver bdrv_nbd = {
@@ -2179,10 +2016,6 @@ static BlockDriver bdrv_nbd = {
.bdrv_refresh_limits = nbd_refresh_limits,
.bdrv_co_truncate = nbd_co_truncate,
.bdrv_getlength = nbd_getlength,
- .bdrv_detach_aio_context = nbd_client_detach_aio_context,
- .bdrv_attach_aio_context = nbd_client_attach_aio_context,
- .bdrv_co_drain_begin = nbd_client_co_drain_begin,
- .bdrv_co_drain_end = nbd_client_co_drain_end,
.bdrv_refresh_filename = nbd_refresh_filename,
.bdrv_co_block_status = nbd_client_co_block_status,
.bdrv_dirname = nbd_dirname,
@@ -2208,10 +2041,6 @@ static BlockDriver bdrv_nbd_tcp = {
.bdrv_refresh_limits = nbd_refresh_limits,
.bdrv_co_truncate = nbd_co_truncate,
.bdrv_getlength = nbd_getlength,
- .bdrv_detach_aio_context = nbd_client_detach_aio_context,
- .bdrv_attach_aio_context = nbd_client_attach_aio_context,
- .bdrv_co_drain_begin = nbd_client_co_drain_begin,
- .bdrv_co_drain_end = nbd_client_co_drain_end,
.bdrv_refresh_filename = nbd_refresh_filename,
.bdrv_co_block_status = nbd_client_co_block_status,
.bdrv_dirname = nbd_dirname,
@@ -2237,10 +2066,6 @@ static BlockDriver bdrv_nbd_unix = {
.bdrv_refresh_limits = nbd_refresh_limits,
.bdrv_co_truncate = nbd_co_truncate,
.bdrv_getlength = nbd_getlength,
- .bdrv_detach_aio_context = nbd_client_detach_aio_context,
- .bdrv_attach_aio_context = nbd_client_attach_aio_context,
- .bdrv_co_drain_begin = nbd_client_co_drain_begin,
- .bdrv_co_drain_end = nbd_client_co_drain_end,
.bdrv_refresh_filename = nbd_refresh_filename,
.bdrv_co_block_status = nbd_client_co_block_status,
.bdrv_dirname = nbd_dirname,
diff --git a/nbd/client.c b/nbd/client.c
index 0c2db4b..30d5383 100644
--- a/nbd/client.c
+++ b/nbd/client.c
@@ -1434,9 +1434,7 @@ nbd_read_eof(BlockDriverState *bs, QIOChannel *ioc, void *buffer, size_t size,
len = qio_channel_readv(ioc, &iov, 1, errp);
if (len == QIO_CHANNEL_ERR_BLOCK) {
- bdrv_dec_in_flight(bs);
qio_channel_yield(ioc, G_IO_IN);
- bdrv_inc_in_flight(bs);
continue;
} else if (len < 0) {
return -EIO;