aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/io/channel.h10
-rw-r--r--io/channel.c33
-rw-r--r--nbd/server.c3
3 files changed, 38 insertions, 8 deletions
diff --git a/include/io/channel.h b/include/io/channel.h
index 446a566..229bf36 100644
--- a/include/io/channel.h
+++ b/include/io/channel.h
@@ -758,6 +758,16 @@ void coroutine_fn qio_channel_yield(QIOChannel *ioc,
GIOCondition condition);
/**
+ * qio_channel_wake_read:
+ * @ioc: the channel object
+ *
+ * If qio_channel_yield() is currently waiting for the channel to become
+ * readable, interrupt it and reenter immediately. This function is safe to call
+ * from any thread.
+ */
+void qio_channel_wake_read(QIOChannel *ioc);
+
+/**
* qio_channel_wait:
* @ioc: the channel object
* @condition: the I/O condition to wait for
diff --git a/io/channel.c b/io/channel.c
index 375a130..72f0066 100644
--- a/io/channel.c
+++ b/io/channel.c
@@ -19,6 +19,7 @@
*/
#include "qemu/osdep.h"
+#include "block/aio-wait.h"
#include "io/channel.h"
#include "qapi/error.h"
#include "qemu/main-loop.h"
@@ -514,7 +515,11 @@ int qio_channel_flush(QIOChannel *ioc,
static void qio_channel_restart_read(void *opaque)
{
QIOChannel *ioc = opaque;
- Coroutine *co = ioc->read_coroutine;
+ Coroutine *co = qatomic_xchg(&ioc->read_coroutine, NULL);
+
+ if (!co) {
+ return;
+ }
/* Assert that aio_co_wake() reenters the coroutine directly */
assert(qemu_get_current_aio_context() ==
@@ -525,7 +530,11 @@ static void qio_channel_restart_read(void *opaque)
static void qio_channel_restart_write(void *opaque)
{
QIOChannel *ioc = opaque;
- Coroutine *co = ioc->write_coroutine;
+ Coroutine *co = qatomic_xchg(&ioc->write_coroutine, NULL);
+
+ if (!co) {
+ return;
+ }
/* Assert that aio_co_wake() reenters the coroutine directly */
assert(qemu_get_current_aio_context() ==
@@ -568,7 +577,11 @@ void qio_channel_detach_aio_context(QIOChannel *ioc)
void coroutine_fn qio_channel_yield(QIOChannel *ioc,
GIOCondition condition)
{
+ AioContext *ioc_ctx = ioc->ctx ?: qemu_get_aio_context();
+
assert(qemu_in_coroutine());
+ assert(in_aio_context_home_thread(ioc_ctx));
+
if (condition == G_IO_IN) {
assert(!ioc->read_coroutine);
ioc->read_coroutine = qemu_coroutine_self();
@@ -580,18 +593,26 @@ void coroutine_fn qio_channel_yield(QIOChannel *ioc,
}
qio_channel_set_aio_fd_handlers(ioc);
qemu_coroutine_yield();
+ assert(in_aio_context_home_thread(ioc_ctx));
/* Allow interrupting the operation by reentering the coroutine other than
* through the aio_fd_handlers. */
- if (condition == G_IO_IN && ioc->read_coroutine) {
- ioc->read_coroutine = NULL;
+ if (condition == G_IO_IN) {
+ assert(ioc->read_coroutine == NULL);
qio_channel_set_aio_fd_handlers(ioc);
- } else if (condition == G_IO_OUT && ioc->write_coroutine) {
- ioc->write_coroutine = NULL;
+ } else if (condition == G_IO_OUT) {
+ assert(ioc->write_coroutine == NULL);
qio_channel_set_aio_fd_handlers(ioc);
}
}
+void qio_channel_wake_read(QIOChannel *ioc)
+{
+ Coroutine *co = qatomic_xchg(&ioc->read_coroutine, NULL);
+ if (co) {
+ aio_co_wake(co);
+ }
+}
static gboolean qio_channel_wait_complete(QIOChannel *ioc,
GIOCondition condition,
diff --git a/nbd/server.c b/nbd/server.c
index e239c28..2664d43 100644
--- a/nbd/server.c
+++ b/nbd/server.c
@@ -1599,8 +1599,7 @@ static bool nbd_drained_poll(void *opaque)
* enter it here so we don't depend on the client to wake it up.
*/
if (client->recv_coroutine != NULL && client->read_yielding) {
- qemu_aio_coroutine_enter(exp->common.ctx,
- client->recv_coroutine);
+ qio_channel_wake_read(client->ioc);
}
return true;