aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Maydell <peter.maydell@linaro.org>2021-06-20 21:20:13 +0100
committerPeter Maydell <peter.maydell@linaro.org>2021-06-20 21:20:13 +0100
commite4bfa6cd68e0b19f42c0c4ef26c024d39ebab044 (patch)
tree66eeefcbfadeda43b6946930d135edcd4d141643
parent8f521741e1280f0957ac1b873292c19219e1fb9a (diff)
parentbbfb7c2f350262f893642433dea66352fc168295 (diff)
downloadqemu-e4bfa6cd68e0b19f42c0c4ef26c024d39ebab044.zip
qemu-e4bfa6cd68e0b19f42c0c4ef26c024d39ebab044.tar.gz
qemu-e4bfa6cd68e0b19f42c0c4ef26c024d39ebab044.tar.bz2
Merge remote-tracking branch 'remotes/ericb/tags/pull-nbd-2021-06-15-v2' into staging
nbd patches for 2021-06-15 - bug fixes in coroutine aio context handling - rework NBD client connection logic to perform more work in coroutine # gpg: Signature made Fri 18 Jun 2021 18:29:39 BST # gpg: using RSA key 71C2CC22B1C4602927D2F3AAA7A16B4A2527436A # gpg: Good signature from "Eric Blake <eblake@redhat.com>" [full] # gpg: aka "Eric Blake (Free Software Programmer) <ebb9@byu.net>" [full] # gpg: aka "[jpeg image of size 6874]" [full] # Primary key fingerprint: 71C2 CC22 B1C4 6029 27D2 F3AA A7A1 6B4A 2527 436A * remotes/ericb/tags/pull-nbd-2021-06-15-v2: (34 commits) block/nbd: safer transition to receiving request block/nbd: add nbd_client_connected() helper block/nbd: reuse nbd_co_do_establish_connection() in nbd_open() nbd/client-connection: add option for non-blocking connection attempt block/nbd: split nbd_co_do_establish_connection out of nbd_reconnect_attempt block-coroutine-wrapper: allow non bdrv_ prefix nbd/client-connection: return only one io channel block/nbd: drop BDRVNBDState::sioc block/nbd: don't touch s->sioc in nbd_teardown_connection() block/nbd: use negotiation of NBDClientConnection block/nbd: split nbd_handle_updated_info out of nbd_client_handshake() nbd/client-connection: shutdown connection on release nbd/client-connection: implement connection retry nbd/client-connection: add possibility of negotiation nbd/client-connection: use QEMU_LOCK_GUARD nbd: move connection code from block/nbd to nbd/client-connection block/nbd: introduce nbd_client_connection_release() block/nbd: introduce nbd_client_connection_new() block/nbd: rename NBDConnectThread to NBDClientConnection block/nbd: make nbd_co_establish_connection_cancel() bs-independent ... Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
-rw-r--r--block/coroutines.h6
-rw-r--r--block/nbd.c553
-rw-r--r--include/block/aio.h5
-rw-r--r--include/block/nbd.h18
-rw-r--r--include/qemu/coroutine.h6
-rw-r--r--include/qemu/sockets.h11
-rw-r--r--iothread.c9
-rw-r--r--nbd/client-connection.c388
-rw-r--r--nbd/meson.build1
-rw-r--r--scripts/block-coroutine-wrapper.py7
-rw-r--r--stubs/iothread-lock.c2
-rw-r--r--stubs/iothread.c8
-rw-r--r--stubs/meson.build1
-rw-r--r--tests/unit/iothread.c9
-rw-r--r--tests/unit/test-aio.c37
-rw-r--r--util/async.c20
-rw-r--r--util/main-loop.c1
-rw-r--r--util/qemu-sockets.c19
18 files changed, 626 insertions, 475 deletions
diff --git a/block/coroutines.h b/block/coroutines.h
index 4cfb494..514d169 100644
--- a/block/coroutines.h
+++ b/block/coroutines.h
@@ -66,4 +66,10 @@ int coroutine_fn bdrv_co_readv_vmstate(BlockDriverState *bs,
int coroutine_fn bdrv_co_writev_vmstate(BlockDriverState *bs,
QEMUIOVector *qiov, int64_t pos);
+int generated_co_wrapper
+nbd_do_establish_connection(BlockDriverState *bs, Error **errp);
+int coroutine_fn
+nbd_co_do_establish_connection(BlockDriverState *bs, Error **errp);
+
+
#endif /* BLOCK_COROUTINES_INT_H */
diff --git a/block/nbd.c b/block/nbd.c
index 616f9ae..3cbee76 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -44,6 +44,7 @@
#include "block/qdict.h"
#include "block/nbd.h"
#include "block/block_int.h"
+#include "block/coroutines.h"
#include "qemu/yank.h"
@@ -66,50 +67,8 @@ typedef enum NBDClientState {
NBD_CLIENT_QUIT
} NBDClientState;
-typedef enum NBDConnectThreadState {
- /* No thread, no pending results */
- CONNECT_THREAD_NONE,
-
- /* Thread is running, no results for now */
- CONNECT_THREAD_RUNNING,
-
- /*
- * Thread is running, but requestor exited. Thread should close
- * the new socket and free the connect state on exit.
- */
- CONNECT_THREAD_RUNNING_DETACHED,
-
- /* Thread finished, results are stored in a state */
- CONNECT_THREAD_FAIL,
- CONNECT_THREAD_SUCCESS
-} NBDConnectThreadState;
-
-typedef struct NBDConnectThread {
- /* Initialization constants */
- SocketAddress *saddr; /* address to connect to */
- /*
- * Bottom half to schedule on completion. Scheduled only if bh_ctx is not
- * NULL
- */
- QEMUBHFunc *bh_func;
- void *bh_opaque;
-
- /*
- * Result of last attempt. Valid in FAIL and SUCCESS states.
- * If you want to steal error, don't forget to set pointer to NULL.
- */
- QIOChannelSocket *sioc;
- Error *err;
-
- /* state and bh_ctx are protected by mutex */
- QemuMutex mutex;
- NBDConnectThreadState state; /* current state of the thread */
- AioContext *bh_ctx; /* where to schedule bh (NULL means don't schedule) */
-} NBDConnectThread;
-
typedef struct BDRVNBDState {
- QIOChannelSocket *sioc; /* The master data channel */
- QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
+ QIOChannel *ioc; /* The current I/O channel */
NBDExportInfo info;
CoMutex send_mutex;
@@ -121,8 +80,6 @@ typedef struct BDRVNBDState {
bool wait_drained_end;
int in_flight;
NBDClientState state;
- int connect_status;
- Error *connect_err;
bool wait_in_flight;
QEMUTimer *reconnect_delay_timer;
@@ -140,20 +97,20 @@ typedef struct BDRVNBDState {
char *x_dirty_bitmap;
bool alloc_depth;
- bool wait_connect;
- NBDConnectThread *connect_thread;
+ NBDClientConnection *conn;
} BDRVNBDState;
-static int nbd_establish_connection(BlockDriverState *bs, SocketAddress *saddr,
- Error **errp);
-static int nbd_co_establish_connection(BlockDriverState *bs, Error **errp);
-static void nbd_co_establish_connection_cancel(BlockDriverState *bs,
- bool detach);
-static int nbd_client_handshake(BlockDriverState *bs, Error **errp);
static void nbd_yank(void *opaque);
-static void nbd_clear_bdrvstate(BDRVNBDState *s)
+static void nbd_clear_bdrvstate(BlockDriverState *bs)
{
+ BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
+
+ nbd_client_connection_release(s->conn);
+ s->conn = NULL;
+
+ yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
+
object_unref(OBJECT(s->tlscreds));
qapi_free_SocketAddress(s->saddr);
s->saddr = NULL;
@@ -165,15 +122,20 @@ static void nbd_clear_bdrvstate(BDRVNBDState *s)
s->x_dirty_bitmap = NULL;
}
+static bool nbd_client_connected(BDRVNBDState *s)
+{
+ return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED;
+}
+
static void nbd_channel_error(BDRVNBDState *s, int ret)
{
if (ret == -EIO) {
- if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) {
+ if (nbd_client_connected(s)) {
s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
NBD_CLIENT_CONNECTING_NOWAIT;
}
} else {
- if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) {
+ if (nbd_client_connected(s)) {
qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
}
s->state = NBD_CLIENT_QUIT;
@@ -188,6 +150,7 @@ static void nbd_recv_coroutines_wake_all(BDRVNBDState *s)
NBDClientRequest *req = &s->requests[i];
if (req->coroutine && req->receiving) {
+ req->receiving = false;
aio_co_wake(req->coroutine);
}
}
@@ -271,7 +234,7 @@ static void nbd_client_attach_aio_context(BlockDriverState *bs,
* s->connection_co is either yielded from nbd_receive_reply or from
* nbd_co_reconnect_loop()
*/
- if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) {
+ if (nbd_client_connected(s)) {
qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context);
}
@@ -291,7 +254,7 @@ static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs)
s->drained = true;
qemu_co_sleep_wake(&s->reconnect_sleep);
- nbd_co_establish_connection_cancel(bs, false);
+ nbd_co_establish_connection_cancel(s->conn);
reconnect_delay_timer_del(s);
@@ -320,16 +283,12 @@ static void nbd_teardown_connection(BlockDriverState *bs)
if (s->ioc) {
/* finish any pending coroutines */
qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
- } else if (s->sioc) {
- /* abort negotiation */
- qio_channel_shutdown(QIO_CHANNEL(s->sioc), QIO_CHANNEL_SHUTDOWN_BOTH,
- NULL);
}
s->state = NBD_CLIENT_QUIT;
if (s->connection_co) {
qemu_co_sleep_wake(&s->reconnect_sleep);
- nbd_co_establish_connection_cancel(bs, true);
+ nbd_co_establish_connection_cancel(s->conn);
}
if (qemu_in_coroutine()) {
s->teardown_co = qemu_coroutine_self();
@@ -354,239 +313,95 @@ static bool nbd_client_connecting_wait(BDRVNBDState *s)
return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
}
-static void connect_bh(void *opaque)
-{
- BDRVNBDState *state = opaque;
-
- assert(state->wait_connect);
- state->wait_connect = false;
- aio_co_wake(state->connection_co);
-}
-
-static void nbd_init_connect_thread(BDRVNBDState *s)
-{
- s->connect_thread = g_new(NBDConnectThread, 1);
-
- *s->connect_thread = (NBDConnectThread) {
- .saddr = QAPI_CLONE(SocketAddress, s->saddr),
- .state = CONNECT_THREAD_NONE,
- .bh_func = connect_bh,
- .bh_opaque = s,
- };
-
- qemu_mutex_init(&s->connect_thread->mutex);
-}
-
-static void nbd_free_connect_thread(NBDConnectThread *thr)
-{
- if (thr->sioc) {
- qio_channel_close(QIO_CHANNEL(thr->sioc), NULL);
- }
- error_free(thr->err);
- qapi_free_SocketAddress(thr->saddr);
- g_free(thr);
-}
-
-static void *connect_thread_func(void *opaque)
+/*
+ * Update @bs with information learned during a completed negotiation process.
+ * Return failure if the server's advertised options are incompatible with the
+ * client's needs.
+ */
+static int nbd_handle_updated_info(BlockDriverState *bs, Error **errp)
{
- NBDConnectThread *thr = opaque;
+ BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
int ret;
- bool do_free = false;
-
- thr->sioc = qio_channel_socket_new();
- error_free(thr->err);
- thr->err = NULL;
- ret = qio_channel_socket_connect_sync(thr->sioc, thr->saddr, &thr->err);
- if (ret < 0) {
- object_unref(OBJECT(thr->sioc));
- thr->sioc = NULL;
+ if (s->x_dirty_bitmap) {
+ if (!s->info.base_allocation) {
+ error_setg(errp, "requested x-dirty-bitmap %s not found",
+ s->x_dirty_bitmap);
+ return -EINVAL;
+ }
+ if (strcmp(s->x_dirty_bitmap, "qemu:allocation-depth") == 0) {
+ s->alloc_depth = true;
+ }
}
- qemu_mutex_lock(&thr->mutex);
-
- switch (thr->state) {
- case CONNECT_THREAD_RUNNING:
- thr->state = ret < 0 ? CONNECT_THREAD_FAIL : CONNECT_THREAD_SUCCESS;
- if (thr->bh_ctx) {
- aio_bh_schedule_oneshot(thr->bh_ctx, thr->bh_func, thr->bh_opaque);
-
- /* play safe, don't reuse bh_ctx on further connection attempts */
- thr->bh_ctx = NULL;
+ if (s->info.flags & NBD_FLAG_READ_ONLY) {
+ ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp);
+ if (ret < 0) {
+ return ret;
}
- break;
- case CONNECT_THREAD_RUNNING_DETACHED:
- do_free = true;
- break;
- default:
- abort();
}
- qemu_mutex_unlock(&thr->mutex);
+ if (s->info.flags & NBD_FLAG_SEND_FUA) {
+ bs->supported_write_flags = BDRV_REQ_FUA;
+ bs->supported_zero_flags |= BDRV_REQ_FUA;
+ }
- if (do_free) {
- nbd_free_connect_thread(thr);
+ if (s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
+ bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
+ if (s->info.flags & NBD_FLAG_SEND_FAST_ZERO) {
+ bs->supported_zero_flags |= BDRV_REQ_NO_FALLBACK;
+ }
}
- return NULL;
+ trace_nbd_client_handshake_success(s->export);
+
+ return 0;
}
-static int coroutine_fn
-nbd_co_establish_connection(BlockDriverState *bs, Error **errp)
+int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
+ Error **errp)
{
+ BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
int ret;
- QemuThread thread;
- BDRVNBDState *s = bs->opaque;
- NBDConnectThread *thr = s->connect_thread;
-
- if (!thr) {
- /* detached */
- return -1;
- }
-
- qemu_mutex_lock(&thr->mutex);
-
- switch (thr->state) {
- case CONNECT_THREAD_FAIL:
- case CONNECT_THREAD_NONE:
- error_free(thr->err);
- thr->err = NULL;
- thr->state = CONNECT_THREAD_RUNNING;
- qemu_thread_create(&thread, "nbd-connect",
- connect_thread_func, thr, QEMU_THREAD_DETACHED);
- break;
- case CONNECT_THREAD_SUCCESS:
- /* Previous attempt finally succeeded in background */
- thr->state = CONNECT_THREAD_NONE;
- s->sioc = thr->sioc;
- thr->sioc = NULL;
- yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
- nbd_yank, bs);
- qemu_mutex_unlock(&thr->mutex);
- return 0;
- case CONNECT_THREAD_RUNNING:
- /* Already running, will wait */
- break;
- default:
- abort();
- }
-
- thr->bh_ctx = qemu_get_current_aio_context();
-
- qemu_mutex_unlock(&thr->mutex);
+ assert(!s->ioc);
- /*
- * We are going to wait for connect-thread finish, but
- * nbd_client_co_drain_begin() can interrupt.
- *
- * Note that wait_connect variable is not visible for connect-thread. It
- * doesn't need mutex protection, it used only inside home aio context of
- * bs.
- */
- s->wait_connect = true;
- qemu_coroutine_yield();
-
- if (!s->connect_thread) {
- /* detached */
- return -1;
- }
- assert(thr == s->connect_thread);
-
- qemu_mutex_lock(&thr->mutex);
-
- switch (thr->state) {
- case CONNECT_THREAD_SUCCESS:
- case CONNECT_THREAD_FAIL:
- thr->state = CONNECT_THREAD_NONE;
- error_propagate(errp, thr->err);
- thr->err = NULL;
- s->sioc = thr->sioc;
- thr->sioc = NULL;
- if (s->sioc) {
- yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
- nbd_yank, bs);
- }
- ret = (s->sioc ? 0 : -1);
- break;
- case CONNECT_THREAD_RUNNING:
- case CONNECT_THREAD_RUNNING_DETACHED:
- /*
- * Obviously, drained section wants to start. Report the attempt as
- * failed. Still connect thread is executing in background, and its
- * result may be used for next connection attempt.
- */
- ret = -1;
- error_setg(errp, "Connection attempt cancelled by other operation");
- break;
+ s->ioc = nbd_co_establish_connection(s->conn, &s->info, true, errp);
+ if (!s->ioc) {
+ return -ECONNREFUSED;
+ }
- case CONNECT_THREAD_NONE:
+ ret = nbd_handle_updated_info(s->bs, NULL);
+ if (ret < 0) {
/*
- * Impossible. We've seen this thread running. So it should be
- * running or at least give some results.
+ * We have connected, but must fail for other reasons.
+ * Send NBD_CMD_DISC as a courtesy to the server.
*/
- abort();
-
- default:
- abort();
- }
+ NBDRequest request = { .type = NBD_CMD_DISC };
- qemu_mutex_unlock(&thr->mutex);
+ nbd_send_request(s->ioc, &request);
- return ret;
-}
+ object_unref(OBJECT(s->ioc));
+ s->ioc = NULL;
-/*
- * nbd_co_establish_connection_cancel
- * Cancel nbd_co_establish_connection asynchronously: it will finish soon, to
- * allow drained section to begin.
- *
- * If detach is true, also cleanup the state (or if thread is running, move it
- * to CONNECT_THREAD_RUNNING_DETACHED state). s->connect_thread becomes NULL if
- * detach is true.
- */
-static void nbd_co_establish_connection_cancel(BlockDriverState *bs,
- bool detach)
-{
- BDRVNBDState *s = bs->opaque;
- NBDConnectThread *thr = s->connect_thread;
- bool wake = false;
- bool do_free = false;
-
- qemu_mutex_lock(&thr->mutex);
-
- if (thr->state == CONNECT_THREAD_RUNNING) {
- /* We can cancel only in running state, when bh is not yet scheduled */
- thr->bh_ctx = NULL;
- if (s->wait_connect) {
- s->wait_connect = false;
- wake = true;
- }
- if (detach) {
- thr->state = CONNECT_THREAD_RUNNING_DETACHED;
- s->connect_thread = NULL;
- }
- } else if (detach) {
- do_free = true;
+ return ret;
}
- qemu_mutex_unlock(&thr->mutex);
+ qio_channel_set_blocking(s->ioc, false, NULL);
+ qio_channel_attach_aio_context(s->ioc, bdrv_get_aio_context(bs));
- if (do_free) {
- nbd_free_connect_thread(thr);
- s->connect_thread = NULL;
- }
+ yank_register_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name), nbd_yank,
+ bs);
- if (wake) {
- aio_co_wake(s->connection_co);
- }
+ /* successfully connected */
+ s->state = NBD_CLIENT_CONNECTED;
+ qemu_co_queue_restart_all(&s->free_sema);
+
+ return 0;
}
static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
{
- int ret;
- Error *local_err = NULL;
-
if (!nbd_client_connecting(s)) {
return;
}
@@ -620,44 +435,11 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
nbd_yank, s->bs);
- object_unref(OBJECT(s->sioc));
- s->sioc = NULL;
object_unref(OBJECT(s->ioc));
s->ioc = NULL;
}
- if (nbd_co_establish_connection(s->bs, &local_err) < 0) {
- ret = -ECONNREFUSED;
- goto out;
- }
-
- bdrv_dec_in_flight(s->bs);
-
- ret = nbd_client_handshake(s->bs, &local_err);
-
- if (s->drained) {
- s->wait_drained_end = true;
- while (s->drained) {
- /*
- * We may be entered once from nbd_client_attach_aio_context_bh
- * and then from nbd_client_co_drain_end. So here is a loop.
- */
- qemu_coroutine_yield();
- }
- }
- bdrv_inc_in_flight(s->bs);
-
-out:
- s->connect_status = ret;
- error_free(s->connect_err);
- s->connect_err = NULL;
- error_propagate(&s->connect_err, local_err);
-
- if (ret >= 0) {
- /* successfully connected */
- s->state = NBD_CLIENT_CONNECTED;
- qemu_co_queue_restart_all(&s->free_sema);
- }
+ nbd_co_do_establish_connection(s->bs, NULL);
}
static coroutine_fn void nbd_co_reconnect_loop(BDRVNBDState *s)
@@ -723,7 +505,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
nbd_co_reconnect_loop(s);
}
- if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
+ if (!nbd_client_connected(s)) {
continue;
}
@@ -767,6 +549,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
* connection_co happens through a bottom half, which can only
* run after we yield.
*/
+ s->requests[i].receiving = false;
aio_co_wake(s->requests[i].coroutine);
qemu_coroutine_yield();
}
@@ -780,8 +563,6 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
nbd_yank, s->bs);
- object_unref(OBJECT(s->sioc));
- s->sioc = NULL;
object_unref(OBJECT(s->ioc));
s->ioc = NULL;
}
@@ -804,7 +585,7 @@ static int nbd_co_send_request(BlockDriverState *bs,
qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
}
- if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
+ if (!nbd_client_connected(s)) {
rc = -EIO;
goto err;
}
@@ -831,8 +612,7 @@ static int nbd_co_send_request(BlockDriverState *bs,
if (qiov) {
qio_channel_set_cork(s->ioc, true);
rc = nbd_send_request(s->ioc, request);
- if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED &&
- rc >= 0) {
+ if (nbd_client_connected(s) && rc >= 0) {
if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
NULL) < 0) {
rc = -EIO;
@@ -1156,8 +936,8 @@ static coroutine_fn int nbd_co_do_receive_one_chunk(
/* Wait until we're woken up by nbd_connection_entry. */
s->requests[i].receiving = true;
qemu_coroutine_yield();
- s->requests[i].receiving = false;
- if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
+ assert(!s->requests[i].receiving);
+ if (!nbd_client_connected(s)) {
error_setg(errp, "Connection closed");
return -EIO;
}
@@ -1316,7 +1096,7 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
NBDReply local_reply;
NBDStructuredReplyChunk *chunk;
Error *local_err = NULL;
- if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
+ if (!nbd_client_connected(s)) {
error_setg(&local_err, "Connection closed");
nbd_iter_channel_error(iter, -EIO, &local_err);
goto break_loop;
@@ -1341,8 +1121,7 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
}
/* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
- if (nbd_reply_is_simple(reply) ||
- qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) {
+ if (nbd_reply_is_simple(reply) || !nbd_client_connected(s)) {
goto break_loop;
}
@@ -1780,7 +1559,7 @@ static void nbd_yank(void *opaque)
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
qatomic_store_release(&s->state, NBD_CLIENT_QUIT);
- qio_channel_shutdown(QIO_CHANNEL(s->sioc), QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+ qio_channel_shutdown(QIO_CHANNEL(s->ioc), QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
}
static void nbd_client_close(BlockDriverState *bs)
@@ -1795,111 +1574,6 @@ static void nbd_client_close(BlockDriverState *bs)
nbd_teardown_connection(bs);
}
-static int nbd_establish_connection(BlockDriverState *bs,
- SocketAddress *saddr,
- Error **errp)
-{
- ERRP_GUARD();
- BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
- s->sioc = qio_channel_socket_new();
- qio_channel_set_name(QIO_CHANNEL(s->sioc), "nbd-client");
-
- qio_channel_socket_connect_sync(s->sioc, saddr, errp);
- if (*errp) {
- object_unref(OBJECT(s->sioc));
- s->sioc = NULL;
- return -1;
- }
-
- yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name), nbd_yank, bs);
- qio_channel_set_delay(QIO_CHANNEL(s->sioc), false);
-
- return 0;
-}
-
-/* nbd_client_handshake takes ownership on s->sioc. On failure it's unref'ed. */
-static int nbd_client_handshake(BlockDriverState *bs, Error **errp)
-{
- BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
- AioContext *aio_context = bdrv_get_aio_context(bs);
- int ret;
-
- trace_nbd_client_handshake(s->export);
- qio_channel_set_blocking(QIO_CHANNEL(s->sioc), false, NULL);
- qio_channel_attach_aio_context(QIO_CHANNEL(s->sioc), aio_context);
-
- s->info.request_sizes = true;
- s->info.structured_reply = true;
- s->info.base_allocation = true;
- s->info.x_dirty_bitmap = g_strdup(s->x_dirty_bitmap);
- s->info.name = g_strdup(s->export ?: "");
- ret = nbd_receive_negotiate(aio_context, QIO_CHANNEL(s->sioc), s->tlscreds,
- s->hostname, &s->ioc, &s->info, errp);
- g_free(s->info.x_dirty_bitmap);
- g_free(s->info.name);
- if (ret < 0) {
- yank_unregister_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
- nbd_yank, bs);
- object_unref(OBJECT(s->sioc));
- s->sioc = NULL;
- return ret;
- }
- if (s->x_dirty_bitmap) {
- if (!s->info.base_allocation) {
- error_setg(errp, "requested x-dirty-bitmap %s not found",
- s->x_dirty_bitmap);
- ret = -EINVAL;
- goto fail;
- }
- if (strcmp(s->x_dirty_bitmap, "qemu:allocation-depth") == 0) {
- s->alloc_depth = true;
- }
- }
- if (s->info.flags & NBD_FLAG_READ_ONLY) {
- ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp);
- if (ret < 0) {
- goto fail;
- }
- }
- if (s->info.flags & NBD_FLAG_SEND_FUA) {
- bs->supported_write_flags = BDRV_REQ_FUA;
- bs->supported_zero_flags |= BDRV_REQ_FUA;
- }
- if (s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
- bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
- if (s->info.flags & NBD_FLAG_SEND_FAST_ZERO) {
- bs->supported_zero_flags |= BDRV_REQ_NO_FALLBACK;
- }
- }
-
- if (!s->ioc) {
- s->ioc = QIO_CHANNEL(s->sioc);
- object_ref(OBJECT(s->ioc));
- }
-
- trace_nbd_client_handshake_success(s->export);
-
- return 0;
-
- fail:
- /*
- * We have connected, but must fail for other reasons.
- * Send NBD_CMD_DISC as a courtesy to the server.
- */
- {
- NBDRequest request = { .type = NBD_CMD_DISC };
-
- nbd_send_request(s->ioc ?: QIO_CHANNEL(s->sioc), &request);
-
- yank_unregister_function(BLOCKDEV_YANK_INSTANCE(bs->node_name),
- nbd_yank, bs);
- object_unref(OBJECT(s->sioc));
- s->sioc = NULL;
-
- return ret;
- }
-}
/*
* Parse nbd_open options
@@ -2133,6 +1807,12 @@ static SocketAddress *nbd_config(BDRVNBDState *s, QDict *options,
goto done;
}
+ if (socket_address_parse_named_fd(saddr, errp) < 0) {
+ qapi_free_SocketAddress(saddr);
+ saddr = NULL;
+ goto done;
+ }
+
done:
qobject_unref(addr);
visit_free(iv);
@@ -2274,9 +1954,6 @@ static int nbd_process_options(BlockDriverState *bs, QDict *options,
ret = 0;
error:
- if (ret < 0) {
- nbd_clear_bdrvstate(s);
- }
qemu_opts_del(opts);
return ret;
}
@@ -2287,11 +1964,6 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
int ret;
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
- ret = nbd_process_options(bs, options, errp);
- if (ret < 0) {
- return ret;
- }
-
s->bs = bs;
qemu_co_mutex_init(&s->send_mutex);
qemu_co_queue_init(&s->free_sema);
@@ -2300,31 +1972,29 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
return -EEXIST;
}
- /*
- * establish TCP connection, return error if it fails
- * TODO: Configurable retry-until-timeout behaviour.
- */
- if (nbd_establish_connection(bs, s->saddr, errp) < 0) {
- yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
- return -ECONNREFUSED;
+ ret = nbd_process_options(bs, options, errp);
+ if (ret < 0) {
+ goto fail;
}
- ret = nbd_client_handshake(bs, errp);
+ s->conn = nbd_client_connection_new(s->saddr, true, s->export,
+ s->x_dirty_bitmap, s->tlscreds);
+
+ /* TODO: Configurable retry-until-timeout behaviour. */
+ ret = nbd_do_establish_connection(bs, errp);
if (ret < 0) {
- yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
- nbd_clear_bdrvstate(s);
- return ret;
+ goto fail;
}
- /* successfully connected */
- s->state = NBD_CLIENT_CONNECTED;
-
- nbd_init_connect_thread(s);
s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
bdrv_inc_in_flight(bs);
aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);
return 0;
+
+fail:
+ nbd_clear_bdrvstate(bs);
+ return ret;
}
static int nbd_co_flush(BlockDriverState *bs)
@@ -2368,11 +2038,8 @@ static void nbd_refresh_limits(BlockDriverState *bs, Error **errp)
static void nbd_close(BlockDriverState *bs)
{
- BDRVNBDState *s = bs->opaque;
-
nbd_client_close(bs);
- yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
- nbd_clear_bdrvstate(s);
+ nbd_clear_bdrvstate(bs);
}
/*
diff --git a/include/block/aio.h b/include/block/aio.h
index 5f34226..10fcae1 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -691,10 +691,13 @@ void aio_co_enter(AioContext *ctx, struct Coroutine *co);
* Return the AioContext whose event loop runs in the current thread.
*
* If called from an IOThread this will be the IOThread's AioContext. If
- * called from another thread it will be the main loop AioContext.
+ * called from the main thread or with the "big QEMU lock" taken it
+ * will be the main loop AioContext.
*/
AioContext *qemu_get_current_aio_context(void);
+void qemu_set_current_aio_context(AioContext *ctx);
+
/**
* aio_context_setup:
* @ctx: the aio context
diff --git a/include/block/nbd.h b/include/block/nbd.h
index 5f34d23..78d101b 100644
--- a/include/block/nbd.h
+++ b/include/block/nbd.h
@@ -406,4 +406,22 @@ const char *nbd_info_lookup(uint16_t info);
const char *nbd_cmd_lookup(uint16_t info);
const char *nbd_err_lookup(int err);
+/* nbd/client-connection.c */
+typedef struct NBDClientConnection NBDClientConnection;
+
+void nbd_client_connection_enable_retry(NBDClientConnection *conn);
+
+NBDClientConnection *nbd_client_connection_new(const SocketAddress *saddr,
+ bool do_negotiation,
+ const char *export_name,
+ const char *x_dirty_bitmap,
+ QCryptoTLSCreds *tlscreds);
+void nbd_client_connection_release(NBDClientConnection *conn);
+
+QIOChannel *coroutine_fn
+nbd_co_establish_connection(NBDClientConnection *conn, NBDExportInfo *info,
+ bool blocking, Error **errp);
+
+void coroutine_fn nbd_co_establish_connection_cancel(NBDClientConnection *conn);
+
#endif
diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index 292e61a..4829ff3 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -210,13 +210,15 @@ void coroutine_fn qemu_co_queue_wait_impl(CoQueue *queue, QemuLockable *lock);
/**
* Removes the next coroutine from the CoQueue, and wake it up.
* Returns true if a coroutine was removed, false if the queue is empty.
+ * OK to run from coroutine and non-coroutine context.
*/
-bool coroutine_fn qemu_co_queue_next(CoQueue *queue);
+bool qemu_co_queue_next(CoQueue *queue);
/**
* Empties the CoQueue; all coroutines are woken up.
+ * OK to run from coroutine and non-coroutine context.
*/
-void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue);
+void qemu_co_queue_restart_all(CoQueue *queue);
/**
* Removes the next coroutine from the CoQueue, and wake it up. Unlike
diff --git a/include/qemu/sockets.h b/include/qemu/sockets.h
index 7d1f813..0c34bf2 100644
--- a/include/qemu/sockets.h
+++ b/include/qemu/sockets.h
@@ -111,4 +111,15 @@ SocketAddress *socket_remote_address(int fd, Error **errp);
*/
SocketAddress *socket_address_flatten(SocketAddressLegacy *addr);
+/**
+ * socket_address_parse_named_fd:
+ *
+ * Modify @addr, replacing a named fd by its corresponding number.
+ * Needed for callers that plan to pass @addr to a context where the
+ * current monitor is not available.
+ *
+ * Return 0 on success.
+ */
+int socket_address_parse_named_fd(SocketAddress *addr, Error **errp);
+
#endif /* QEMU_SOCKETS_H */
diff --git a/iothread.c b/iothread.c
index 7f08638..2c5ccd7 100644
--- a/iothread.c
+++ b/iothread.c
@@ -39,13 +39,6 @@ DECLARE_CLASS_CHECKERS(IOThreadClass, IOTHREAD,
#define IOTHREAD_POLL_MAX_NS_DEFAULT 0ULL
#endif
-static __thread IOThread *my_iothread;
-
-AioContext *qemu_get_current_aio_context(void)
-{
- return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
-}
-
static void *iothread_run(void *opaque)
{
IOThread *iothread = opaque;
@@ -56,7 +49,7 @@ static void *iothread_run(void *opaque)
* in this new thread uses glib.
*/
g_main_context_push_thread_default(iothread->worker_context);
- my_iothread = iothread;
+ qemu_set_current_aio_context(iothread->ctx);
iothread->thread_id = qemu_get_thread_id();
qemu_sem_post(&iothread->init_done_sem);
diff --git a/nbd/client-connection.c b/nbd/client-connection.c
new file mode 100644
index 0000000..7123b1e
--- /dev/null
+++ b/nbd/client-connection.c
@@ -0,0 +1,388 @@
+/*
+ * QEMU Block driver for NBD
+ *
+ * Copyright (c) 2021 Virtuozzo International GmbH.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#include "qemu/osdep.h"
+
+#include "block/nbd.h"
+
+#include "qapi/qapi-visit-sockets.h"
+#include "qapi/clone-visitor.h"
+
+struct NBDClientConnection {
+ /* Initialization constants, never change */
+ SocketAddress *saddr; /* address to connect to */
+ QCryptoTLSCreds *tlscreds;
+ NBDExportInfo initial_info;
+ bool do_negotiation;
+ bool do_retry;
+
+ QemuMutex mutex;
+
+ /*
+ * @sioc and @err represent a connection attempt. While running
+ * is true, they are only used by the connection thread, and mutex
+ * locking is not needed. Once the thread finishes,
+ * nbd_co_establish_connection then steals these pointers while
+ * under the mutex.
+ */
+ NBDExportInfo updated_info;
+ QIOChannelSocket *sioc;
+ QIOChannel *ioc;
+ Error *err;
+
+ /* All further fields are accessed only under mutex */
+ bool running; /* thread is running now */
+ bool detached; /* thread is detached and should cleanup the state */
+
+ /*
+ * wait_co: if non-NULL, which coroutine to wake in
+ * nbd_co_establish_connection() after yield()
+ */
+ Coroutine *wait_co;
+};
+
+/*
+ * The function isn't protected by any mutex, only call it when the client
+ * connection attempt has not yet started.
+ */
+void nbd_client_connection_enable_retry(NBDClientConnection *conn)
+{
+ conn->do_retry = true;
+}
+
+NBDClientConnection *nbd_client_connection_new(const SocketAddress *saddr,
+ bool do_negotiation,
+ const char *export_name,
+ const char *x_dirty_bitmap,
+ QCryptoTLSCreds *tlscreds)
+{
+ NBDClientConnection *conn = g_new(NBDClientConnection, 1);
+
+ object_ref(OBJECT(tlscreds));
+ *conn = (NBDClientConnection) {
+ .saddr = QAPI_CLONE(SocketAddress, saddr),
+ .tlscreds = tlscreds,
+ .do_negotiation = do_negotiation,
+
+ .initial_info.request_sizes = true,
+ .initial_info.structured_reply = true,
+ .initial_info.base_allocation = true,
+ .initial_info.x_dirty_bitmap = g_strdup(x_dirty_bitmap),
+ .initial_info.name = g_strdup(export_name ?: "")
+ };
+
+ qemu_mutex_init(&conn->mutex);
+
+ return conn;
+}
+
+static void nbd_client_connection_do_free(NBDClientConnection *conn)
+{
+ if (conn->sioc) {
+ qio_channel_close(QIO_CHANNEL(conn->sioc), NULL);
+ object_unref(OBJECT(conn->sioc));
+ }
+ error_free(conn->err);
+ qapi_free_SocketAddress(conn->saddr);
+ object_unref(OBJECT(conn->tlscreds));
+ g_free(conn->initial_info.x_dirty_bitmap);
+ g_free(conn->initial_info.name);
+ g_free(conn);
+}
+
+/*
+ * Connect to @addr and do NBD negotiation if @info is not null. If @tlscreds
+ * are given @outioc is returned. @outioc is provided only on success. The call
+ * may be cancelled from other thread by simply qio_channel_shutdown(sioc).
+ */
+static int nbd_connect(QIOChannelSocket *sioc, SocketAddress *addr,
+ NBDExportInfo *info, QCryptoTLSCreds *tlscreds,
+ QIOChannel **outioc, Error **errp)
+{
+ int ret;
+
+ if (outioc) {
+ *outioc = NULL;
+ }
+
+ ret = qio_channel_socket_connect_sync(sioc, addr, errp);
+ if (ret < 0) {
+ return ret;
+ }
+
+ qio_channel_set_delay(QIO_CHANNEL(sioc), false);
+
+ if (!info) {
+ return 0;
+ }
+
+ ret = nbd_receive_negotiate(NULL, QIO_CHANNEL(sioc), tlscreds,
+ tlscreds ? addr->u.inet.host : NULL,
+ outioc, info, errp);
+ if (ret < 0) {
+ /*
+ * nbd_receive_negotiate() may setup tls ioc and return it even on
+ * failure path. In this case we should use it instead of original
+ * channel.
+ */
+ if (outioc && *outioc) {
+ qio_channel_close(QIO_CHANNEL(*outioc), NULL);
+ object_unref(OBJECT(*outioc));
+ *outioc = NULL;
+ } else {
+ qio_channel_close(QIO_CHANNEL(sioc), NULL);
+ }
+
+ return ret;
+ }
+
+ return 0;
+}
+
+static void *connect_thread_func(void *opaque)
+{
+ NBDClientConnection *conn = opaque;
+ int ret;
+ bool do_free;
+ uint64_t timeout = 1;
+ uint64_t max_timeout = 16;
+
+ qemu_mutex_lock(&conn->mutex);
+ while (!conn->detached) {
+ assert(!conn->sioc);
+ conn->sioc = qio_channel_socket_new();
+
+ qemu_mutex_unlock(&conn->mutex);
+
+ error_free(conn->err);
+ conn->err = NULL;
+ conn->updated_info = conn->initial_info;
+
+ ret = nbd_connect(conn->sioc, conn->saddr,
+ conn->do_negotiation ? &conn->updated_info : NULL,
+ conn->tlscreds, &conn->ioc, &conn->err);
+
+ /*
+ * conn->updated_info will finally be returned to the user. Clear the
+ * pointers to our internally allocated strings, which are IN parameters
+ * of nbd_receive_negotiate() and therefore nbd_connect(). Caller
+ * shoudn't be interested in these fields.
+ */
+ conn->updated_info.x_dirty_bitmap = NULL;
+ conn->updated_info.name = NULL;
+
+ qemu_mutex_lock(&conn->mutex);
+
+ if (ret < 0) {
+ object_unref(OBJECT(conn->sioc));
+ conn->sioc = NULL;
+ if (conn->do_retry && !conn->detached) {
+ qemu_mutex_unlock(&conn->mutex);
+
+ sleep(timeout);
+ if (timeout < max_timeout) {
+ timeout *= 2;
+ }
+
+ qemu_mutex_lock(&conn->mutex);
+ continue;
+ }
+ }
+
+ break;
+ }
+
+ /* mutex is locked */
+
+ assert(conn->running);
+ conn->running = false;
+ if (conn->wait_co) {
+ aio_co_wake(conn->wait_co);
+ conn->wait_co = NULL;
+ }
+ do_free = conn->detached;
+
+ qemu_mutex_unlock(&conn->mutex);
+
+ if (do_free) {
+ nbd_client_connection_do_free(conn);
+ }
+
+ return NULL;
+}
+
+void nbd_client_connection_release(NBDClientConnection *conn)
+{
+ bool do_free = false;
+
+ if (!conn) {
+ return;
+ }
+
+ WITH_QEMU_LOCK_GUARD(&conn->mutex) {
+ assert(!conn->detached);
+ if (conn->running) {
+ conn->detached = true;
+ } else {
+ do_free = true;
+ }
+ if (conn->sioc) {
+ qio_channel_shutdown(QIO_CHANNEL(conn->sioc),
+ QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+ }
+ }
+
+ if (do_free) {
+ nbd_client_connection_do_free(conn);
+ }
+}
+
+/*
+ * Get a new connection in context of @conn:
+ * if the thread is running, wait for completion
+ * if the thread already succeeded in the background, and user didn't get the
+ * result, just return it now
+ * otherwise the thread is not running, so start a thread and wait for
+ * completion
+ *
+ * If @blocking is false, don't wait for the thread, return immediately.
+ *
+ * If @info is not NULL, also do nbd-negotiation after successful connection.
+ * In this case info is used only as out parameter, and is fully initialized by
+ * nbd_co_establish_connection(). "IN" fields of info as well as related only to
+ * nbd_receive_export_list() would be zero (see description of NBDExportInfo in
+ * include/block/nbd.h).
+ */
+QIOChannel *coroutine_fn
+nbd_co_establish_connection(NBDClientConnection *conn, NBDExportInfo *info,
+ bool blocking, Error **errp)
+{
+ QemuThread thread;
+
+ if (conn->do_negotiation) {
+ assert(info);
+ }
+
+ WITH_QEMU_LOCK_GUARD(&conn->mutex) {
+ /*
+ * Don't call nbd_co_establish_connection() in several coroutines in
+ * parallel. Only one call at once is supported.
+ */
+ assert(!conn->wait_co);
+
+ if (!conn->running) {
+ if (conn->sioc) {
+ /* Previous attempt finally succeeded in background */
+ if (conn->do_negotiation) {
+ memcpy(info, &conn->updated_info, sizeof(*info));
+ if (conn->ioc) {
+ /* TLS channel now has own reference to parent */
+ object_unref(OBJECT(conn->sioc));
+ conn->sioc = NULL;
+
+ return g_steal_pointer(&conn->ioc);
+ }
+ }
+
+ assert(!conn->ioc);
+
+ return QIO_CHANNEL(g_steal_pointer(&conn->sioc));
+ }
+
+ conn->running = true;
+ error_free(conn->err);
+ conn->err = NULL;
+ qemu_thread_create(&thread, "nbd-connect",
+ connect_thread_func, conn, QEMU_THREAD_DETACHED);
+ }
+
+ if (!blocking) {
+ return NULL;
+ }
+
+ conn->wait_co = qemu_coroutine_self();
+ }
+
+ /*
+ * We are going to wait for connect-thread finish, but
+ * nbd_co_establish_connection_cancel() can interrupt.
+ */
+ qemu_coroutine_yield();
+
+ WITH_QEMU_LOCK_GUARD(&conn->mutex) {
+ if (conn->running) {
+ /*
+ * The connection attempt was canceled and the coroutine resumed
+ * before the connection thread finished its job. Report the
+ * attempt as failed, but leave the connection thread running,
+ * to reuse it for the next connection attempt.
+ */
+ error_setg(errp, "Connection attempt cancelled by other operation");
+ return NULL;
+ } else {
+ error_propagate(errp, conn->err);
+ conn->err = NULL;
+ if (!conn->sioc) {
+ return NULL;
+ }
+ if (conn->do_negotiation) {
+ memcpy(info, &conn->updated_info, sizeof(*info));
+ if (conn->ioc) {
+ /* TLS channel now has own reference to parent */
+ object_unref(OBJECT(conn->sioc));
+ conn->sioc = NULL;
+
+ return g_steal_pointer(&conn->ioc);
+ }
+ }
+
+ assert(!conn->ioc);
+
+ return QIO_CHANNEL(g_steal_pointer(&conn->sioc));
+ }
+ }
+
+ abort(); /* unreachable */
+}
+
+/*
+ * nbd_co_establish_connection_cancel
+ * Cancel nbd_co_establish_connection() asynchronously.
+ *
+ * Note that this function neither directly stops the thread nor closes the
+ * socket, but rather safely wakes nbd_co_establish_connection() which is
+ * sleeping in yield()
+ */
+void nbd_co_establish_connection_cancel(NBDClientConnection *conn)
+{
+ Coroutine *wait_co;
+
+ WITH_QEMU_LOCK_GUARD(&conn->mutex) {
+ wait_co = g_steal_pointer(&conn->wait_co);
+ }
+
+ if (wait_co) {
+ aio_co_wake(wait_co);
+ }
+}
diff --git a/nbd/meson.build b/nbd/meson.build
index 2baaa36..b26d705 100644
--- a/nbd/meson.build
+++ b/nbd/meson.build
@@ -1,5 +1,6 @@
block_ss.add(files(
'client.c',
+ 'client-connection.c',
'common.c',
))
blockdev_ss.add(files(
diff --git a/scripts/block-coroutine-wrapper.py b/scripts/block-coroutine-wrapper.py
index 0461fd1..85dbeb9 100644
--- a/scripts/block-coroutine-wrapper.py
+++ b/scripts/block-coroutine-wrapper.py
@@ -98,12 +98,13 @@ def snake_to_camel(func_name: str) -> str:
def gen_wrapper(func: FuncDecl) -> str:
- assert func.name.startswith('bdrv_')
- assert not func.name.startswith('bdrv_co_')
+ assert not '_co_' in func.name
assert func.return_type == 'int'
assert func.args[0].type in ['BlockDriverState *', 'BdrvChild *']
- name = 'bdrv_co_' + func.name[5:]
+ subsystem, subname = func.name.split('_', 1)
+
+ name = f'{subsystem}_co_{subname}'
bs = 'bs' if func.args[0].type == 'BlockDriverState *' else 'child->bs'
struct_name = snake_to_camel(name)
diff --git a/stubs/iothread-lock.c b/stubs/iothread-lock.c
index 2a6efad..5b45b7f 100644
--- a/stubs/iothread-lock.c
+++ b/stubs/iothread-lock.c
@@ -3,7 +3,7 @@
bool qemu_mutex_iothread_locked(void)
{
- return true;
+ return false;
}
void qemu_mutex_lock_iothread_impl(const char *file, int line)
diff --git a/stubs/iothread.c b/stubs/iothread.c
deleted file mode 100644
index 8cc9e28..0000000
--- a/stubs/iothread.c
+++ /dev/null
@@ -1,8 +0,0 @@
-#include "qemu/osdep.h"
-#include "block/aio.h"
-#include "qemu/main-loop.h"
-
-AioContext *qemu_get_current_aio_context(void)
-{
- return qemu_get_aio_context();
-}
diff --git a/stubs/meson.build b/stubs/meson.build
index d4e9549..2e79ff9 100644
--- a/stubs/meson.build
+++ b/stubs/meson.build
@@ -16,7 +16,6 @@ stub_ss.add(files('fw_cfg.c'))
stub_ss.add(files('gdbstub.c'))
stub_ss.add(files('get-vm-name.c'))
stub_ss.add(when: 'CONFIG_LINUX_IO_URING', if_true: files('io_uring.c'))
-stub_ss.add(files('iothread.c'))
stub_ss.add(files('iothread-lock.c'))
stub_ss.add(files('isa-bus.c'))
stub_ss.add(files('is-daemonized.c'))
diff --git a/tests/unit/iothread.c b/tests/unit/iothread.c
index afde12b..f9b0791 100644
--- a/tests/unit/iothread.c
+++ b/tests/unit/iothread.c
@@ -30,13 +30,6 @@ struct IOThread {
bool stopping;
};
-static __thread IOThread *my_iothread;
-
-AioContext *qemu_get_current_aio_context(void)
-{
- return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
-}
-
static void iothread_init_gcontext(IOThread *iothread)
{
GSource *source;
@@ -54,9 +47,9 @@ static void *iothread_run(void *opaque)
rcu_register_thread();
- my_iothread = iothread;
qemu_mutex_lock(&iothread->init_done_lock);
iothread->ctx = aio_context_new(&error_abort);
+ qemu_set_current_aio_context(iothread->ctx);
/*
* We must connect the ctx to a GMainContext, because in older versions
diff --git a/tests/unit/test-aio.c b/tests/unit/test-aio.c
index 8a46078..6feeb9a 100644
--- a/tests/unit/test-aio.c
+++ b/tests/unit/test-aio.c
@@ -877,6 +877,42 @@ static void test_queue_chaining(void)
g_assert_cmpint(data_b.i, ==, data_b.max);
}
+static void co_check_current_thread(void *opaque)
+{
+ QemuThread *main_thread = opaque;
+ assert(qemu_thread_is_self(main_thread));
+}
+
+static void *test_aio_co_enter(void *co)
+{
+ /*
+ * qemu_get_current_aio_context() should not to be the main thread
+ * AioContext, because this is a worker thread that has not taken
+ * the BQL. So aio_co_enter will schedule the coroutine in the
+ * main thread AioContext.
+ */
+ aio_co_enter(qemu_get_aio_context(), co);
+ return NULL;
+}
+
+static void test_worker_thread_co_enter(void)
+{
+ QemuThread this_thread, worker_thread;
+ Coroutine *co;
+
+ qemu_thread_get_self(&this_thread);
+ co = qemu_coroutine_create(co_check_current_thread, &this_thread);
+
+ qemu_thread_create(&worker_thread, "test_acquire_thread",
+ test_aio_co_enter,
+ co, QEMU_THREAD_JOINABLE);
+
+ /* Test aio_co_enter from a worker thread. */
+ qemu_thread_join(&worker_thread);
+ g_assert(aio_poll(ctx, true));
+ g_assert(!aio_poll(ctx, false));
+}
+
/* End of tests. */
int main(int argc, char **argv)
@@ -903,6 +939,7 @@ int main(int argc, char **argv)
g_test_add_func("/aio/timer/schedule", test_timer_schedule);
g_test_add_func("/aio/coroutine/queue-chaining", test_queue_chaining);
+ g_test_add_func("/aio/coroutine/worker-thread-co-enter", test_worker_thread_co_enter);
g_test_add_func("/aio-gsource/flush", test_source_flush);
g_test_add_func("/aio-gsource/bh/schedule", test_source_bh_schedule);
diff --git a/util/async.c b/util/async.c
index 674dbef..5d9b7cc 100644
--- a/util/async.c
+++ b/util/async.c
@@ -649,3 +649,23 @@ void aio_context_release(AioContext *ctx)
{
qemu_rec_mutex_unlock(&ctx->lock);
}
+
+static __thread AioContext *my_aiocontext;
+
+AioContext *qemu_get_current_aio_context(void)
+{
+ if (my_aiocontext) {
+ return my_aiocontext;
+ }
+ if (qemu_mutex_iothread_locked()) {
+ /* Possibly in a vCPU thread. */
+ return qemu_get_aio_context();
+ }
+ return NULL;
+}
+
+void qemu_set_current_aio_context(AioContext *ctx)
+{
+ assert(!my_aiocontext);
+ my_aiocontext = ctx;
+}
diff --git a/util/main-loop.c b/util/main-loop.c
index d9c55df..4ae5b23 100644
--- a/util/main-loop.c
+++ b/util/main-loop.c
@@ -170,6 +170,7 @@ int qemu_init_main_loop(Error **errp)
if (!qemu_aio_context) {
return -EMFILE;
}
+ qemu_set_current_aio_context(qemu_aio_context);
qemu_notify_bh = qemu_bh_new(notify_event_cb, NULL);
gpollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
src = aio_get_g_source(qemu_aio_context);
diff --git a/util/qemu-sockets.c b/util/qemu-sockets.c
index c415c34..080a240 100644
--- a/util/qemu-sockets.c
+++ b/util/qemu-sockets.c
@@ -1164,6 +1164,25 @@ static int socket_get_fd(const char *fdstr, Error **errp)
return fd;
}
+int socket_address_parse_named_fd(SocketAddress *addr, Error **errp)
+{
+ int fd;
+
+ if (addr->type != SOCKET_ADDRESS_TYPE_FD) {
+ return 0;
+ }
+
+ fd = socket_get_fd(addr->u.fd.str, errp);
+ if (fd < 0) {
+ return fd;
+ }
+
+ g_free(addr->u.fd.str);
+ addr->u.fd.str = g_strdup_printf("%d", fd);
+
+ return 0;
+}
+
int socket_connect(SocketAddress *addr, Error **errp)
{
int fd;