diff options
-rw-r--r-- | block/nbd.c | 375 | ||||
-rw-r--r-- | nbd/client.c | 2 |
2 files changed, 100 insertions, 277 deletions
diff --git a/block/nbd.c b/block/nbd.c index 709c249..8ff6daf 100644 --- a/block/nbd.c +++ b/block/nbd.c @@ -57,7 +57,7 @@ typedef struct { Coroutine *coroutine; uint64_t offset; /* original offset of the request */ - bool receiving; /* waiting for connection_co? */ + bool receiving; /* sleeping in the yield in nbd_receive_replies */ } NBDClientRequest; typedef enum NBDClientState { @@ -73,14 +73,10 @@ typedef struct BDRVNBDState { CoMutex send_mutex; CoQueue free_sema; - Coroutine *connection_co; - Coroutine *teardown_co; - QemuCoSleep reconnect_sleep; - bool drained; - bool wait_drained_end; + + CoMutex receive_mutex; int in_flight; NBDClientState state; - bool wait_in_flight; QEMUTimer *reconnect_delay_timer; @@ -163,6 +159,8 @@ static void nbd_channel_error(BDRVNBDState *s, int ret) } else { s->state = NBD_CLIENT_QUIT; } + + nbd_recv_coroutines_wake(s, true); } static void reconnect_delay_timer_del(BDRVNBDState *s) @@ -179,6 +177,7 @@ static void reconnect_delay_timer_cb(void *opaque) if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) { s->state = NBD_CLIENT_CONNECTING_NOWAIT; + nbd_co_establish_connection_cancel(s->conn); while (qemu_co_enter_next(&s->free_sema, NULL)) { /* Resume all queued requests */ } @@ -201,113 +200,21 @@ static void reconnect_delay_timer_init(BDRVNBDState *s, uint64_t expire_time_ns) timer_mod(s->reconnect_delay_timer, expire_time_ns); } -static void nbd_client_detach_aio_context(BlockDriverState *bs) -{ - BDRVNBDState *s = (BDRVNBDState *)bs->opaque; - - /* Timer is deleted in nbd_client_co_drain_begin() */ - assert(!s->reconnect_delay_timer); - /* - * If reconnect is in progress we may have no ->ioc. It will be - * re-instantiated in the proper aio context once the connection is - * reestablished. - */ - if (s->ioc) { - qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc)); - } -} - -static void nbd_client_attach_aio_context_bh(void *opaque) -{ - BlockDriverState *bs = opaque; - BDRVNBDState *s = (BDRVNBDState *)bs->opaque; - - if (s->connection_co) { - /* - * The node is still drained, so we know the coroutine has yielded in - * nbd_read_eof(), the only place where bs->in_flight can reach 0, or - * it is entered for the first time. Both places are safe for entering - * the coroutine. - */ - qemu_aio_coroutine_enter(bs->aio_context, s->connection_co); - } - bdrv_dec_in_flight(bs); -} - -static void nbd_client_attach_aio_context(BlockDriverState *bs, - AioContext *new_context) -{ - BDRVNBDState *s = (BDRVNBDState *)bs->opaque; - - /* - * s->connection_co is either yielded from nbd_receive_reply or from - * nbd_co_reconnect_loop() - */ - if (nbd_client_connected(s)) { - qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context); - } - - bdrv_inc_in_flight(bs); - - /* - * Need to wait here for the BH to run because the BH must run while the - * node is still drained. - */ - aio_wait_bh_oneshot(new_context, nbd_client_attach_aio_context_bh, bs); -} - -static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs) -{ - BDRVNBDState *s = (BDRVNBDState *)bs->opaque; - - s->drained = true; - qemu_co_sleep_wake(&s->reconnect_sleep); - - nbd_co_establish_connection_cancel(s->conn); - - reconnect_delay_timer_del(s); - - if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) { - s->state = NBD_CLIENT_CONNECTING_NOWAIT; - qemu_co_queue_restart_all(&s->free_sema); - } -} - -static void coroutine_fn nbd_client_co_drain_end(BlockDriverState *bs) -{ - BDRVNBDState *s = (BDRVNBDState *)bs->opaque; - - s->drained = false; - if (s->wait_drained_end) { - s->wait_drained_end = false; - aio_co_wake(s->connection_co); - } -} - - static void nbd_teardown_connection(BlockDriverState *bs) { BDRVNBDState *s = (BDRVNBDState *)bs->opaque; + assert(!s->in_flight); + if (s->ioc) { - /* finish any pending coroutines */ qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); + yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name), + nbd_yank, s->bs); + object_unref(OBJECT(s->ioc)); + s->ioc = NULL; } s->state = NBD_CLIENT_QUIT; - if (s->connection_co) { - qemu_co_sleep_wake(&s->reconnect_sleep); - nbd_co_establish_connection_cancel(s->conn); - } - if (qemu_in_coroutine()) { - s->teardown_co = qemu_coroutine_self(); - /* connection_co resumes us when it terminates */ - qemu_coroutine_yield(); - s->teardown_co = NULL; - } else { - BDRV_POLL_WHILE(bs, s->connection_co); - } - assert(!s->connection_co); } static bool nbd_client_connecting(BDRVNBDState *s) @@ -372,10 +279,11 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs, { BDRVNBDState *s = (BDRVNBDState *)bs->opaque; int ret; + bool blocking = nbd_client_connecting_wait(s); assert(!s->ioc); - s->ioc = nbd_co_establish_connection(s->conn, &s->info, true, errp); + s->ioc = nbd_co_establish_connection(s->conn, &s->info, blocking, errp); if (!s->ioc) { return -ECONNREFUSED; } @@ -411,29 +319,22 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs, return 0; } +/* called under s->send_mutex */ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s) { - if (!nbd_client_connecting(s)) { - return; - } + assert(nbd_client_connecting(s)); + assert(s->in_flight == 0); - /* Wait for completion of all in-flight requests */ - - qemu_co_mutex_lock(&s->send_mutex); - - while (s->in_flight > 0) { - qemu_co_mutex_unlock(&s->send_mutex); - nbd_recv_coroutines_wake(s, true); - s->wait_in_flight = true; - qemu_coroutine_yield(); - s->wait_in_flight = false; - qemu_co_mutex_lock(&s->send_mutex); - } - - qemu_co_mutex_unlock(&s->send_mutex); - - if (!nbd_client_connecting(s)) { - return; + if (nbd_client_connecting_wait(s) && s->reconnect_delay && + !s->reconnect_delay_timer) + { + /* + * It's first reconnect attempt after switching to + * NBD_CLIENT_CONNECTING_WAIT + */ + reconnect_delay_timer_init(s, + qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + + s->reconnect_delay * NANOSECONDS_PER_SECOND); } /* @@ -453,135 +354,80 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s) nbd_co_do_establish_connection(s->bs, NULL); } -static coroutine_fn void nbd_co_reconnect_loop(BDRVNBDState *s) +static coroutine_fn int nbd_receive_replies(BDRVNBDState *s, uint64_t handle) { - uint64_t timeout = 1 * NANOSECONDS_PER_SECOND; - uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND; + int ret; + uint64_t ind = HANDLE_TO_INDEX(s, handle), ind2; + QEMU_LOCK_GUARD(&s->receive_mutex); - if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) { - reconnect_delay_timer_init(s, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + - s->reconnect_delay * NANOSECONDS_PER_SECOND); - } - - nbd_reconnect_attempt(s); - - while (nbd_client_connecting(s)) { - if (s->drained) { - bdrv_dec_in_flight(s->bs); - s->wait_drained_end = true; - while (s->drained) { - /* - * We may be entered once from nbd_client_attach_aio_context_bh - * and then from nbd_client_co_drain_end. So here is a loop. - */ - qemu_coroutine_yield(); - } - bdrv_inc_in_flight(s->bs); - } else { - qemu_co_sleep_ns_wakeable(&s->reconnect_sleep, - QEMU_CLOCK_REALTIME, timeout); - if (s->drained) { - continue; - } - if (timeout < max_timeout) { - timeout *= 2; - } + while (true) { + if (s->reply.handle == handle) { + /* We are done */ + return 0; } - nbd_reconnect_attempt(s); - } - - reconnect_delay_timer_del(s); -} + if (!nbd_client_connected(s)) { + return -EIO; + } -static coroutine_fn void nbd_connection_entry(void *opaque) -{ - BDRVNBDState *s = opaque; - uint64_t i; - int ret = 0; - Error *local_err = NULL; + if (s->reply.handle != 0) { + /* + * Some other request is being handled now. It should already be + * woken by whoever set s->reply.handle (or never wait in this + * yield). So, we should not wake it here. + */ + ind2 = HANDLE_TO_INDEX(s, s->reply.handle); + assert(!s->requests[ind2].receiving); - while (qatomic_load_acquire(&s->state) != NBD_CLIENT_QUIT) { - /* - * The NBD client can only really be considered idle when it has - * yielded from qio_channel_readv_all_eof(), waiting for data. This is - * the point where the additional scheduled coroutine entry happens - * after nbd_client_attach_aio_context(). - * - * Therefore we keep an additional in_flight reference all the time and - * only drop it temporarily here. - */ + s->requests[ind].receiving = true; + qemu_co_mutex_unlock(&s->receive_mutex); - if (nbd_client_connecting(s)) { - nbd_co_reconnect_loop(s); - } + qemu_coroutine_yield(); + /* + * We may be woken for 3 reasons: + * 1. From this function, executing in parallel coroutine, when our + * handle is received. + * 2. From nbd_channel_error(), when connection is lost. + * 3. From nbd_co_receive_one_chunk(), when previous request is + * finished and s->reply.handle set to 0. + * Anyway, it's OK to lock the mutex and go to the next iteration. + */ - if (!nbd_client_connected(s)) { + qemu_co_mutex_lock(&s->receive_mutex); + assert(!s->requests[ind].receiving); continue; } + /* We are under mutex and handle is 0. We have to do the dirty work. */ assert(s->reply.handle == 0); - ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err); - - if (local_err) { - trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err)); - error_free(local_err); - local_err = NULL; - } + ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, NULL); if (ret <= 0) { - nbd_channel_error(s, ret ? ret : -EIO); - continue; + ret = ret ? ret : -EIO; + nbd_channel_error(s, ret); + return ret; } - - /* - * There's no need for a mutex on the receive side, because the - * handler acts as a synchronization point and ensures that only - * one coroutine is called until the reply finishes. - */ - i = HANDLE_TO_INDEX(s, s->reply.handle); - if (i >= MAX_NBD_REQUESTS || - !s->requests[i].coroutine || - !s->requests[i].receiving || - (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply)) - { + if (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply) { nbd_channel_error(s, -EINVAL); - continue; + return -EINVAL; } - - /* - * We're woken up again by the request itself. Note that there - * is no race between yielding and reentering connection_co. This - * is because: - * - * - if the request runs on the same AioContext, it is only - * entered after we yield - * - * - if the request runs on a different AioContext, reentering - * connection_co happens through a bottom half, which can only - * run after we yield. - */ - s->requests[i].receiving = false; - aio_co_wake(s->requests[i].coroutine); - qemu_coroutine_yield(); - } - - qemu_co_queue_restart_all(&s->free_sema); - nbd_recv_coroutines_wake(s, true); - bdrv_dec_in_flight(s->bs); - - s->connection_co = NULL; - if (s->ioc) { - qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc)); - yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name), - nbd_yank, s->bs); - object_unref(OBJECT(s->ioc)); - s->ioc = NULL; - } - - if (s->teardown_co) { - aio_co_wake(s->teardown_co); + if (s->reply.handle == handle) { + /* We are done */ + return 0; + } + ind2 = HANDLE_TO_INDEX(s, s->reply.handle); + if (ind2 >= MAX_NBD_REQUESTS || !s->requests[ind2].coroutine) { + /* + * We only check that ind2 request exists. But don't check + * whether it is now waiting for the reply header or + * not. We can't just check s->requests[ind2].receiving: + * ind2 request may wait in trying to lock + * receive_mutex. So that's a TODO. + */ + nbd_channel_error(s, -EINVAL); + return -EINVAL; + } + nbd_recv_coroutine_wake_one(&s->requests[ind2]); } - aio_wait_kick(); } static int nbd_co_send_request(BlockDriverState *bs, @@ -592,10 +438,17 @@ static int nbd_co_send_request(BlockDriverState *bs, int rc, i = -1; qemu_co_mutex_lock(&s->send_mutex); - while (s->in_flight == MAX_NBD_REQUESTS || nbd_client_connecting_wait(s)) { + + while (s->in_flight == MAX_NBD_REQUESTS || + (!nbd_client_connected(s) && s->in_flight > 0)) + { qemu_co_queue_wait(&s->free_sema, &s->send_mutex); } + if (nbd_client_connecting(s)) { + nbd_reconnect_attempt(s); + } + if (!nbd_client_connected(s)) { rc = -EIO; goto err; @@ -642,10 +495,6 @@ err: if (i != -1) { s->requests[i].coroutine = NULL; s->in_flight--; - } - if (s->in_flight == 0 && s->wait_in_flight) { - aio_co_wake(s->connection_co); - } else { qemu_co_queue_next(&s->free_sema); } } @@ -944,10 +793,7 @@ static coroutine_fn int nbd_co_do_receive_one_chunk( } *request_ret = 0; - /* Wait until we're woken up by nbd_connection_entry. */ - s->requests[i].receiving = true; - qemu_coroutine_yield(); - assert(!s->requests[i].receiving); + nbd_receive_replies(s, handle); if (!nbd_client_connected(s)) { error_setg(errp, "Connection closed"); return -EIO; @@ -1040,14 +886,7 @@ static coroutine_fn int nbd_co_receive_one_chunk( } s->reply.handle = 0; - if (s->connection_co && !s->wait_in_flight) { - /* - * We must check s->wait_in_flight, because we may entered by - * nbd_recv_coroutines_wake(), in this case we should not - * wake connection_co here, it will woken by last request. - */ - aio_co_wake(s->connection_co); - } + nbd_recv_coroutines_wake(s, false); return ret; } @@ -1158,11 +997,7 @@ break_loop: qemu_co_mutex_lock(&s->send_mutex); s->in_flight--; - if (s->in_flight == 0 && s->wait_in_flight) { - aio_co_wake(s->connection_co); - } else { - qemu_co_queue_next(&s->free_sema); - } + qemu_co_queue_next(&s->free_sema); qemu_co_mutex_unlock(&s->send_mutex); return false; @@ -1984,6 +1819,7 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags, s->bs = bs; qemu_co_mutex_init(&s->send_mutex); qemu_co_queue_init(&s->free_sema); + qemu_co_mutex_init(&s->receive_mutex); if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) { return -EEXIST; @@ -1998,14 +1834,13 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags, s->x_dirty_bitmap, s->tlscreds); /* TODO: Configurable retry-until-timeout behaviour. */ + s->state = NBD_CLIENT_CONNECTING_WAIT; ret = nbd_do_establish_connection(bs, errp); if (ret < 0) { goto fail; } - s->connection_co = qemu_coroutine_create(nbd_connection_entry, s); - bdrv_inc_in_flight(bs); - aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co); + nbd_client_connection_enable_retry(s->conn); return 0; @@ -2159,6 +1994,8 @@ static void nbd_cancel_in_flight(BlockDriverState *bs) s->state = NBD_CLIENT_CONNECTING_NOWAIT; qemu_co_queue_restart_all(&s->free_sema); } + + nbd_co_establish_connection_cancel(s->conn); } static BlockDriver bdrv_nbd = { @@ -2179,10 +2016,6 @@ static BlockDriver bdrv_nbd = { .bdrv_refresh_limits = nbd_refresh_limits, .bdrv_co_truncate = nbd_co_truncate, .bdrv_getlength = nbd_getlength, - .bdrv_detach_aio_context = nbd_client_detach_aio_context, - .bdrv_attach_aio_context = nbd_client_attach_aio_context, - .bdrv_co_drain_begin = nbd_client_co_drain_begin, - .bdrv_co_drain_end = nbd_client_co_drain_end, .bdrv_refresh_filename = nbd_refresh_filename, .bdrv_co_block_status = nbd_client_co_block_status, .bdrv_dirname = nbd_dirname, @@ -2208,10 +2041,6 @@ static BlockDriver bdrv_nbd_tcp = { .bdrv_refresh_limits = nbd_refresh_limits, .bdrv_co_truncate = nbd_co_truncate, .bdrv_getlength = nbd_getlength, - .bdrv_detach_aio_context = nbd_client_detach_aio_context, - .bdrv_attach_aio_context = nbd_client_attach_aio_context, - .bdrv_co_drain_begin = nbd_client_co_drain_begin, - .bdrv_co_drain_end = nbd_client_co_drain_end, .bdrv_refresh_filename = nbd_refresh_filename, .bdrv_co_block_status = nbd_client_co_block_status, .bdrv_dirname = nbd_dirname, @@ -2237,10 +2066,6 @@ static BlockDriver bdrv_nbd_unix = { .bdrv_refresh_limits = nbd_refresh_limits, .bdrv_co_truncate = nbd_co_truncate, .bdrv_getlength = nbd_getlength, - .bdrv_detach_aio_context = nbd_client_detach_aio_context, - .bdrv_attach_aio_context = nbd_client_attach_aio_context, - .bdrv_co_drain_begin = nbd_client_co_drain_begin, - .bdrv_co_drain_end = nbd_client_co_drain_end, .bdrv_refresh_filename = nbd_refresh_filename, .bdrv_co_block_status = nbd_client_co_block_status, .bdrv_dirname = nbd_dirname, diff --git a/nbd/client.c b/nbd/client.c index 0c2db4b..30d5383 100644 --- a/nbd/client.c +++ b/nbd/client.c @@ -1434,9 +1434,7 @@ nbd_read_eof(BlockDriverState *bs, QIOChannel *ioc, void *buffer, size_t size, len = qio_channel_readv(ioc, &iov, 1, errp); if (len == QIO_CHANNEL_ERR_BLOCK) { - bdrv_dec_in_flight(bs); qio_channel_yield(ioc, G_IO_IN); - bdrv_inc_in_flight(bs); continue; } else if (len < 0) { return -EIO; |