aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile.objs4
-rw-r--r--block/backup.c2
-rw-r--r--block/blkdebug.c9
-rwxr-xr-xblock/blkreplay.c2
-rw-r--r--block/block-backend.c13
-rw-r--r--block/curl.c44
-rw-r--r--block/gluster.c9
-rw-r--r--block/io.c42
-rw-r--r--block/iscsi.c15
-rw-r--r--block/linux-aio.c10
-rw-r--r--block/mirror.c12
-rw-r--r--block/nbd-client.c119
-rw-r--r--block/nbd-client.h2
-rw-r--r--block/nfs.c9
-rw-r--r--block/qcow2-cluster.c4
-rw-r--r--block/qed-cluster.c2
-rw-r--r--block/qed-table.c12
-rw-r--r--block/qed.c58
-rw-r--r--block/qed.h3
-rw-r--r--block/sheepdog.c31
-rw-r--r--block/ssh.c29
-rw-r--r--block/throttle-groups.c4
-rw-r--r--block/win32-aio.c9
-rw-r--r--dma-helpers.c2
-rw-r--r--hw/9pfs/9p.c2
-rw-r--r--hw/block/virtio-blk.c19
-rw-r--r--hw/scsi/scsi-bus.c2
-rw-r--r--hw/scsi/scsi-disk.c15
-rw-r--r--hw/scsi/scsi-generic.c20
-rw-r--r--hw/scsi/virtio-scsi.c7
-rw-r--r--include/block/aio.h38
-rw-r--r--include/block/block_int.h64
-rw-r--r--include/io/channel.h72
-rw-r--r--include/qemu/coroutine.h84
-rw-r--r--include/qemu/coroutine_int.h11
-rw-r--r--include/sysemu/block-backend.h14
-rw-r--r--io/channel-command.c13
-rw-r--r--io/channel-file.c11
-rw-r--r--io/channel-socket.c16
-rw-r--r--io/channel-tls.c12
-rw-r--r--io/channel-watch.c6
-rw-r--r--io/channel.c97
-rw-r--r--nbd/client.c2
-rw-r--r--nbd/common.c9
-rw-r--r--nbd/server.c94
-rw-r--r--stubs/Makefile.objs1
-rw-r--r--stubs/linux-aio.c32
-rw-r--r--stubs/set-fd-handler.c11
-rw-r--r--tests/Makefile.include19
-rw-r--r--tests/iothread.c91
-rw-r--r--tests/iothread.h25
-rw-r--r--tests/test-aio-multithread.c463
-rw-r--r--tests/test-thread-pool.c12
-rw-r--r--trace-events11
-rw-r--r--util/Makefile.objs6
-rw-r--r--util/aio-posix.c (renamed from aio-posix.c)62
-rw-r--r--util/aio-win32.c (renamed from aio-win32.c)30
-rw-r--r--util/aiocb.c55
-rw-r--r--util/async.c (renamed from async.c)84
-rw-r--r--util/iohandler.c (renamed from iohandler.c)0
-rw-r--r--util/main-loop.c (renamed from main-loop.c)0
-rw-r--r--util/qemu-coroutine-lock.c252
-rw-r--r--util/qemu-coroutine-sleep.c2
-rw-r--r--util/qemu-coroutine.c8
-rw-r--r--util/qemu-timer.c (renamed from qemu-timer.c)0
-rw-r--r--util/thread-pool.c (renamed from thread-pool.c)8
-rw-r--r--util/trace-events17
67 files changed, 1711 insertions, 532 deletions
diff --git a/Makefile.objs b/Makefile.objs
index e4bfc16..e740500 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -9,12 +9,8 @@ chardev-obj-y = chardev/
#######################################################################
# block-obj-y is code used by both qemu system emulation and qemu-img
-block-obj-y = async.o thread-pool.o
block-obj-y += nbd/
block-obj-y += block.o blockjob.o
-block-obj-y += main-loop.o iohandler.o qemu-timer.o
-block-obj-$(CONFIG_POSIX) += aio-posix.o
-block-obj-$(CONFIG_WIN32) += aio-win32.o
block-obj-y += block/
block-obj-y += qemu-io-cmds.o
block-obj-$(CONFIG_REPLICATION) += replication.o
diff --git a/block/backup.c b/block/backup.c
index ea38733..fe010e7 100644
--- a/block/backup.c
+++ b/block/backup.c
@@ -64,7 +64,7 @@ static void coroutine_fn wait_for_overlapping_requests(BackupBlockJob *job,
retry = false;
QLIST_FOREACH(req, &job->inflight_reqs, list) {
if (end > req->start && start < req->end) {
- qemu_co_queue_wait(&req->wait_queue);
+ qemu_co_queue_wait(&req->wait_queue, NULL);
retry = true;
break;
}
diff --git a/block/blkdebug.c b/block/blkdebug.c
index acccf85..d8eee1b 100644
--- a/block/blkdebug.c
+++ b/block/blkdebug.c
@@ -405,12 +405,6 @@ out:
return ret;
}
-static void error_callback_bh(void *opaque)
-{
- Coroutine *co = opaque;
- qemu_coroutine_enter(co);
-}
-
static int inject_error(BlockDriverState *bs, BlkdebugRule *rule)
{
BDRVBlkdebugState *s = bs->opaque;
@@ -423,8 +417,7 @@ static int inject_error(BlockDriverState *bs, BlkdebugRule *rule)
}
if (!immediately) {
- aio_bh_schedule_oneshot(bdrv_get_aio_context(bs), error_callback_bh,
- qemu_coroutine_self());
+ aio_co_schedule(qemu_get_current_aio_context(), qemu_coroutine_self());
qemu_coroutine_yield();
}
diff --git a/block/blkreplay.c b/block/blkreplay.c
index a741654..cfc8c5b 100755
--- a/block/blkreplay.c
+++ b/block/blkreplay.c
@@ -60,7 +60,7 @@ static int64_t blkreplay_getlength(BlockDriverState *bs)
static void blkreplay_bh_cb(void *opaque)
{
Request *req = opaque;
- qemu_coroutine_enter(req->co);
+ aio_co_wake(req->co);
qemu_bh_delete(req->bh);
g_free(req);
}
diff --git a/block/block-backend.c b/block/block-backend.c
index efbf398..819f272 100644
--- a/block/block-backend.c
+++ b/block/block-backend.c
@@ -880,7 +880,6 @@ static int blk_prw(BlockBackend *blk, int64_t offset, uint8_t *buf,
{
QEMUIOVector qiov;
struct iovec iov;
- Coroutine *co;
BlkRwCo rwco;
iov = (struct iovec) {
@@ -897,9 +896,14 @@ static int blk_prw(BlockBackend *blk, int64_t offset, uint8_t *buf,
.ret = NOT_DONE,
};
- co = qemu_coroutine_create(co_entry, &rwco);
- qemu_coroutine_enter(co);
- BDRV_POLL_WHILE(blk_bs(blk), rwco.ret == NOT_DONE);
+ if (qemu_in_coroutine()) {
+ /* Fast-path if already in coroutine context */
+ co_entry(&rwco);
+ } else {
+ Coroutine *co = qemu_coroutine_create(co_entry, &rwco);
+ qemu_coroutine_enter(co);
+ BDRV_POLL_WHILE(blk_bs(blk), rwco.ret == NOT_DONE);
+ }
return rwco.ret;
}
@@ -979,7 +983,6 @@ static void blk_aio_complete(BlkAioEmAIOCB *acb)
static void blk_aio_complete_bh(void *opaque)
{
BlkAioEmAIOCB *acb = opaque;
-
assert(acb->has_returned);
blk_aio_complete(acb);
}
diff --git a/block/curl.c b/block/curl.c
index 792fef8..2939cc7 100644
--- a/block/curl.c
+++ b/block/curl.c
@@ -386,9 +386,8 @@ static void curl_multi_check_completion(BDRVCURLState *s)
}
}
-static void curl_multi_do(void *arg)
+static void curl_multi_do_locked(CURLState *s)
{
- CURLState *s = (CURLState *)arg;
CURLSocket *socket, *next_socket;
int running;
int r;
@@ -406,12 +405,23 @@ static void curl_multi_do(void *arg)
}
}
+static void curl_multi_do(void *arg)
+{
+ CURLState *s = (CURLState *)arg;
+
+ aio_context_acquire(s->s->aio_context);
+ curl_multi_do_locked(s);
+ aio_context_release(s->s->aio_context);
+}
+
static void curl_multi_read(void *arg)
{
CURLState *s = (CURLState *)arg;
- curl_multi_do(arg);
+ aio_context_acquire(s->s->aio_context);
+ curl_multi_do_locked(s);
curl_multi_check_completion(s->s);
+ aio_context_release(s->s->aio_context);
}
static void curl_multi_timeout_do(void *arg)
@@ -424,9 +434,11 @@ static void curl_multi_timeout_do(void *arg)
return;
}
+ aio_context_acquire(s->aio_context);
curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running);
curl_multi_check_completion(s);
+ aio_context_release(s->aio_context);
#else
abort();
#endif
@@ -784,13 +796,18 @@ static void curl_readv_bh_cb(void *p)
{
CURLState *state;
int running;
+ int ret = -EINPROGRESS;
CURLAIOCB *acb = p;
- BDRVCURLState *s = acb->common.bs->opaque;
+ BlockDriverState *bs = acb->common.bs;
+ BDRVCURLState *s = bs->opaque;
+ AioContext *ctx = bdrv_get_aio_context(bs);
size_t start = acb->sector_num * BDRV_SECTOR_SIZE;
size_t end;
+ aio_context_acquire(ctx);
+
// In case we have the requested data already (e.g. read-ahead),
// we can just call the callback and be done.
switch (curl_find_buf(s, start, acb->nb_sectors * BDRV_SECTOR_SIZE, acb)) {
@@ -798,7 +815,7 @@ static void curl_readv_bh_cb(void *p)
qemu_aio_unref(acb);
// fall through
case FIND_RET_WAIT:
- return;
+ goto out;
default:
break;
}
@@ -806,9 +823,8 @@ static void curl_readv_bh_cb(void *p)
// No cache found, so let's start a new request
state = curl_init_state(acb->common.bs, s);
if (!state) {
- acb->common.cb(acb->common.opaque, -EIO);
- qemu_aio_unref(acb);
- return;
+ ret = -EIO;
+ goto out;
}
acb->start = 0;
@@ -822,9 +838,8 @@ static void curl_readv_bh_cb(void *p)
state->orig_buf = g_try_malloc(state->buf_len);
if (state->buf_len && state->orig_buf == NULL) {
curl_clean_state(state);
- acb->common.cb(acb->common.opaque, -ENOMEM);
- qemu_aio_unref(acb);
- return;
+ ret = -ENOMEM;
+ goto out;
}
state->acb[0] = acb;
@@ -837,6 +852,13 @@ static void curl_readv_bh_cb(void *p)
/* Tell curl it needs to kick things off */
curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running);
+
+out:
+ aio_context_release(ctx);
+ if (ret != -EINPROGRESS) {
+ acb->common.cb(acb->common.opaque, ret);
+ qemu_aio_unref(acb);
+ }
}
static BlockAIOCB *curl_aio_readv(BlockDriverState *bs,
diff --git a/block/gluster.c b/block/gluster.c
index 1a22f29..56b4abe 100644
--- a/block/gluster.c
+++ b/block/gluster.c
@@ -698,13 +698,6 @@ static struct glfs *qemu_gluster_init(BlockdevOptionsGluster *gconf,
return qemu_gluster_glfs_init(gconf, errp);
}
-static void qemu_gluster_complete_aio(void *opaque)
-{
- GlusterAIOCB *acb = (GlusterAIOCB *)opaque;
-
- qemu_coroutine_enter(acb->coroutine);
-}
-
/*
* AIO callback routine called from GlusterFS thread.
*/
@@ -720,7 +713,7 @@ static void gluster_finish_aiocb(struct glfs_fd *fd, ssize_t ret, void *arg)
acb->ret = -EIO; /* Partial read/write - fail it */
}
- aio_bh_schedule_oneshot(acb->aio_context, qemu_gluster_complete_aio, acb);
+ aio_co_schedule(acb->aio_context, acb->coroutine);
}
static void qemu_gluster_parse_flags(int bdrv_flags, int *open_flags)
diff --git a/block/io.c b/block/io.c
index c42b34a..d5c4544 100644
--- a/block/io.c
+++ b/block/io.c
@@ -189,7 +189,7 @@ static void bdrv_co_drain_bh_cb(void *opaque)
bdrv_dec_in_flight(bs);
bdrv_drained_begin(bs);
data->done = true;
- qemu_coroutine_enter(co);
+ aio_co_wake(co);
}
static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs)
@@ -539,7 +539,7 @@ static bool coroutine_fn wait_serialising_requests(BdrvTrackedRequest *self)
* (instead of producing a deadlock in the former case). */
if (!req->waiting_for) {
self->waiting_for = req;
- qemu_co_queue_wait(&req->wait_queue);
+ qemu_co_queue_wait(&req->wait_queue, NULL);
self->waiting_for = NULL;
retry = true;
waited = true;
@@ -813,7 +813,7 @@ static void bdrv_co_io_em_complete(void *opaque, int ret)
CoroutineIOCompletion *co = opaque;
co->ret = ret;
- qemu_coroutine_enter(co->coroutine);
+ aio_co_wake(co->coroutine);
}
static int coroutine_fn bdrv_driver_preadv(BlockDriverState *bs,
@@ -2080,6 +2080,11 @@ void bdrv_aio_cancel(BlockAIOCB *acb)
if (acb->aiocb_info->get_aio_context) {
aio_poll(acb->aiocb_info->get_aio_context(acb), true);
} else if (acb->bs) {
+ /* qemu_aio_ref and qemu_aio_unref are not thread-safe, so
+ * assert that we're not using an I/O thread. Thread-safe
+ * code should use bdrv_aio_cancel_async exclusively.
+ */
+ assert(bdrv_get_aio_context(acb->bs) == qemu_get_aio_context());
aio_poll(bdrv_get_aio_context(acb->bs), true);
} else {
abort();
@@ -2239,35 +2244,6 @@ BlockAIOCB *bdrv_aio_flush(BlockDriverState *bs,
return &acb->common;
}
-void *qemu_aio_get(const AIOCBInfo *aiocb_info, BlockDriverState *bs,
- BlockCompletionFunc *cb, void *opaque)
-{
- BlockAIOCB *acb;
-
- acb = g_malloc(aiocb_info->aiocb_size);
- acb->aiocb_info = aiocb_info;
- acb->bs = bs;
- acb->cb = cb;
- acb->opaque = opaque;
- acb->refcnt = 1;
- return acb;
-}
-
-void qemu_aio_ref(void *p)
-{
- BlockAIOCB *acb = p;
- acb->refcnt++;
-}
-
-void qemu_aio_unref(void *p)
-{
- BlockAIOCB *acb = p;
- assert(acb->refcnt > 0);
- if (--acb->refcnt == 0) {
- g_free(acb);
- }
-}
-
/**************************************************************/
/* Coroutine block device emulation */
@@ -2299,7 +2275,7 @@ int coroutine_fn bdrv_co_flush(BlockDriverState *bs)
/* Wait until any previous flushes are completed */
while (bs->active_flush_req) {
- qemu_co_queue_wait(&bs->flush_queue);
+ qemu_co_queue_wait(&bs->flush_queue, NULL);
}
bs->active_flush_req = true;
diff --git a/block/iscsi.c b/block/iscsi.c
index 1860f1b..2561be9 100644
--- a/block/iscsi.c
+++ b/block/iscsi.c
@@ -165,8 +165,9 @@ iscsi_schedule_bh(IscsiAIOCB *acb)
static void iscsi_co_generic_bh_cb(void *opaque)
{
struct IscsiTask *iTask = opaque;
+
iTask->complete = 1;
- qemu_coroutine_enter(iTask->co);
+ aio_co_wake(iTask->co);
}
static void iscsi_retry_timer_expired(void *opaque)
@@ -174,7 +175,7 @@ static void iscsi_retry_timer_expired(void *opaque)
struct IscsiTask *iTask = opaque;
iTask->complete = 1;
if (iTask->co) {
- qemu_coroutine_enter(iTask->co);
+ aio_co_wake(iTask->co);
}
}
@@ -394,8 +395,10 @@ iscsi_process_read(void *arg)
IscsiLun *iscsilun = arg;
struct iscsi_context *iscsi = iscsilun->iscsi;
+ aio_context_acquire(iscsilun->aio_context);
iscsi_service(iscsi, POLLIN);
iscsi_set_events(iscsilun);
+ aio_context_release(iscsilun->aio_context);
}
static void
@@ -404,8 +407,10 @@ iscsi_process_write(void *arg)
IscsiLun *iscsilun = arg;
struct iscsi_context *iscsi = iscsilun->iscsi;
+ aio_context_acquire(iscsilun->aio_context);
iscsi_service(iscsi, POLLOUT);
iscsi_set_events(iscsilun);
+ aio_context_release(iscsilun->aio_context);
}
static int64_t sector_lun2qemu(int64_t sector, IscsiLun *iscsilun)
@@ -1392,16 +1397,20 @@ static void iscsi_nop_timed_event(void *opaque)
{
IscsiLun *iscsilun = opaque;
+ aio_context_acquire(iscsilun->aio_context);
if (iscsi_get_nops_in_flight(iscsilun->iscsi) >= MAX_NOP_FAILURES) {
error_report("iSCSI: NOP timeout. Reconnecting...");
iscsilun->request_timed_out = true;
} else if (iscsi_nop_out_async(iscsilun->iscsi, NULL, NULL, 0, NULL) != 0) {
error_report("iSCSI: failed to sent NOP-Out. Disabling NOP messages.");
- return;
+ goto out;
}
timer_mod(iscsilun->nop_timer, qemu_clock_get_ms(QEMU_CLOCK_REALTIME) + NOP_INTERVAL);
iscsi_set_events(iscsilun);
+
+out:
+ aio_context_release(iscsilun->aio_context);
}
static void iscsi_readcapacity_sync(IscsiLun *iscsilun, Error **errp)
diff --git a/block/linux-aio.c b/block/linux-aio.c
index 03ab741..88b8d55 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -54,10 +54,10 @@ struct LinuxAioState {
io_context_t ctx;
EventNotifier e;
- /* io queue for submit at batch */
+ /* io queue for submit at batch. Protected by AioContext lock. */
LaioQueue io_q;
- /* I/O completion processing */
+ /* I/O completion processing. Only runs in I/O thread. */
QEMUBH *completion_bh;
int event_idx;
int event_max;
@@ -100,7 +100,7 @@ static void qemu_laio_process_completion(struct qemu_laiocb *laiocb)
* that!
*/
if (!qemu_coroutine_entered(laiocb->co)) {
- qemu_coroutine_enter(laiocb->co);
+ aio_co_wake(laiocb->co);
}
} else {
laiocb->common.cb(laiocb->common.opaque, ret);
@@ -234,9 +234,12 @@ static void qemu_laio_process_completions(LinuxAioState *s)
static void qemu_laio_process_completions_and_submit(LinuxAioState *s)
{
qemu_laio_process_completions(s);
+
+ aio_context_acquire(s->aio_context);
if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
ioq_submit(s);
}
+ aio_context_release(s->aio_context);
}
static void qemu_laio_completion_bh(void *opaque)
@@ -455,6 +458,7 @@ void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context)
{
aio_set_event_notifier(old_context, &s->e, false, NULL, NULL);
qemu_bh_delete(s->completion_bh);
+ s->aio_context = NULL;
}
void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context)
diff --git a/block/mirror.c b/block/mirror.c
index 301ba92..698a54e 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -132,6 +132,8 @@ static void mirror_write_complete(void *opaque, int ret)
{
MirrorOp *op = opaque;
MirrorBlockJob *s = op->s;
+
+ aio_context_acquire(blk_get_aio_context(s->common.blk));
if (ret < 0) {
BlockErrorAction action;
@@ -142,12 +144,15 @@ static void mirror_write_complete(void *opaque, int ret)
}
}
mirror_iteration_done(op, ret);
+ aio_context_release(blk_get_aio_context(s->common.blk));
}
static void mirror_read_complete(void *opaque, int ret)
{
MirrorOp *op = opaque;
MirrorBlockJob *s = op->s;
+
+ aio_context_acquire(blk_get_aio_context(s->common.blk));
if (ret < 0) {
BlockErrorAction action;
@@ -158,10 +163,11 @@ static void mirror_read_complete(void *opaque, int ret)
}
mirror_iteration_done(op, ret);
- return;
+ } else {
+ blk_aio_pwritev(s->target, op->sector_num * BDRV_SECTOR_SIZE, &op->qiov,
+ 0, mirror_write_complete, op);
}
- blk_aio_pwritev(s->target, op->sector_num * BDRV_SECTOR_SIZE, &op->qiov,
- 0, mirror_write_complete, op);
+ aio_context_release(blk_get_aio_context(s->common.blk));
}
static inline void mirror_clip_sectors(MirrorBlockJob *s,
diff --git a/block/nbd-client.c b/block/nbd-client.c
index 06f1532..0dc12c2 100644
--- a/block/nbd-client.c
+++ b/block/nbd-client.c
@@ -33,8 +33,9 @@
#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs))
#define INDEX_TO_HANDLE(bs, index) ((index) ^ ((uint64_t)(intptr_t)bs))
-static void nbd_recv_coroutines_enter_all(NBDClientSession *s)
+static void nbd_recv_coroutines_enter_all(BlockDriverState *bs)
{
+ NBDClientSession *s = nbd_get_client_session(bs);
int i;
for (i = 0; i < MAX_NBD_REQUESTS; i++) {
@@ -42,6 +43,7 @@ static void nbd_recv_coroutines_enter_all(NBDClientSession *s)
qemu_coroutine_enter(s->recv_coroutine[i]);
}
}
+ BDRV_POLL_WHILE(bs, s->read_reply_co);
}
static void nbd_teardown_connection(BlockDriverState *bs)
@@ -56,7 +58,7 @@ static void nbd_teardown_connection(BlockDriverState *bs)
qio_channel_shutdown(client->ioc,
QIO_CHANNEL_SHUTDOWN_BOTH,
NULL);
- nbd_recv_coroutines_enter_all(client);
+ nbd_recv_coroutines_enter_all(bs);
nbd_client_detach_aio_context(bs);
object_unref(OBJECT(client->sioc));
@@ -65,54 +67,43 @@ static void nbd_teardown_connection(BlockDriverState *bs)
client->ioc = NULL;
}
-static void nbd_reply_ready(void *opaque)
+static coroutine_fn void nbd_read_reply_entry(void *opaque)
{
- BlockDriverState *bs = opaque;
- NBDClientSession *s = nbd_get_client_session(bs);
+ NBDClientSession *s = opaque;
uint64_t i;
int ret;
- if (!s->ioc) { /* Already closed */
- return;
- }
-
- if (s->reply.handle == 0) {
- /* No reply already in flight. Fetch a header. It is possible
- * that another thread has done the same thing in parallel, so
- * the socket is not readable anymore.
- */
+ for (;;) {
+ assert(s->reply.handle == 0);
ret = nbd_receive_reply(s->ioc, &s->reply);
- if (ret == -EAGAIN) {
- return;
- }
if (ret < 0) {
- s->reply.handle = 0;
- goto fail;
+ break;
}
- }
- /* There's no need for a mutex on the receive side, because the
- * handler acts as a synchronization point and ensures that only
- * one coroutine is called until the reply finishes. */
- i = HANDLE_TO_INDEX(s, s->reply.handle);
- if (i >= MAX_NBD_REQUESTS) {
- goto fail;
- }
+ /* There's no need for a mutex on the receive side, because the
+ * handler acts as a synchronization point and ensures that only
+ * one coroutine is called until the reply finishes.
+ */
+ i = HANDLE_TO_INDEX(s, s->reply.handle);
+ if (i >= MAX_NBD_REQUESTS || !s->recv_coroutine[i]) {
+ break;
+ }
- if (s->recv_coroutine[i]) {
- qemu_coroutine_enter(s->recv_coroutine[i]);
- return;
+ /* We're woken up by the recv_coroutine itself. Note that there
+ * is no race between yielding and reentering read_reply_co. This
+ * is because:
+ *
+ * - if recv_coroutine[i] runs on the same AioContext, it is only
+ * entered after we yield
+ *
+ * - if recv_coroutine[i] runs on a different AioContext, reentering
+ * read_reply_co happens through a bottom half, which can only
+ * run after we yield.
+ */
+ aio_co_wake(s->recv_coroutine[i]);
+ qemu_coroutine_yield();
}
-
-fail:
- nbd_teardown_connection(bs);
-}
-
-static void nbd_restart_write(void *opaque)
-{
- BlockDriverState *bs = opaque;
-
- qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine);
+ s->read_reply_co = NULL;
}
static int nbd_co_send_request(BlockDriverState *bs,
@@ -120,7 +111,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
QEMUIOVector *qiov)
{
NBDClientSession *s = nbd_get_client_session(bs);
- AioContext *aio_context;
int rc, ret, i;
qemu_co_mutex_lock(&s->send_mutex);
@@ -141,11 +131,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
return -EPIPE;
}
- s->send_coroutine = qemu_coroutine_self();
- aio_context = bdrv_get_aio_context(bs);
-
- aio_set_fd_handler(aio_context, s->sioc->fd, false,
- nbd_reply_ready, nbd_restart_write, NULL, bs);
if (qiov) {
qio_channel_set_cork(s->ioc, true);
rc = nbd_send_request(s->ioc, request);
@@ -160,9 +145,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
} else {
rc = nbd_send_request(s->ioc, request);
}
- aio_set_fd_handler(aio_context, s->sioc->fd, false,
- nbd_reply_ready, NULL, NULL, bs);
- s->send_coroutine = NULL;
qemu_co_mutex_unlock(&s->send_mutex);
return rc;
}
@@ -174,8 +156,7 @@ static void nbd_co_receive_reply(NBDClientSession *s,
{
int ret;
- /* Wait until we're woken up by the read handler. TODO: perhaps
- * peek at the next reply and avoid yielding if it's ours? */
+ /* Wait until we're woken up by nbd_read_reply_entry. */
qemu_coroutine_yield();
*reply = s->reply;
if (reply->handle != request->handle ||
@@ -201,7 +182,7 @@ static void nbd_coroutine_start(NBDClientSession *s,
/* Poor man semaphore. The free_sema is locked when no other request
* can be accepted, and unlocked after receiving one reply. */
if (s->in_flight == MAX_NBD_REQUESTS) {
- qemu_co_queue_wait(&s->free_sema);
+ qemu_co_queue_wait(&s->free_sema, NULL);
assert(s->in_flight < MAX_NBD_REQUESTS);
}
s->in_flight++;
@@ -209,13 +190,19 @@ static void nbd_coroutine_start(NBDClientSession *s,
/* s->recv_coroutine[i] is set as soon as we get the send_lock. */
}
-static void nbd_coroutine_end(NBDClientSession *s,
+static void nbd_coroutine_end(BlockDriverState *bs,
NBDRequest *request)
{
+ NBDClientSession *s = nbd_get_client_session(bs);
int i = HANDLE_TO_INDEX(s, request->handle);
+
s->recv_coroutine[i] = NULL;
- if (s->in_flight-- == MAX_NBD_REQUESTS) {
- qemu_co_queue_next(&s->free_sema);
+ s->in_flight--;
+ qemu_co_queue_next(&s->free_sema);
+
+ /* Kick the read_reply_co to get the next reply. */
+ if (s->read_reply_co) {
+ aio_co_wake(s->read_reply_co);
}
}
@@ -241,7 +228,7 @@ int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
} else {
nbd_co_receive_reply(client, &request, &reply, qiov);
}
- nbd_coroutine_end(client, &request);
+ nbd_coroutine_end(bs, &request);
return -reply.error;
}
@@ -271,7 +258,7 @@ int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
} else {
nbd_co_receive_reply(client, &request, &reply, NULL);
}
- nbd_coroutine_end(client, &request);
+ nbd_coroutine_end(bs, &request);
return -reply.error;
}
@@ -306,7 +293,7 @@ int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
} else {
nbd_co_receive_reply(client, &request, &reply, NULL);
}
- nbd_coroutine_end(client, &request);
+ nbd_coroutine_end(bs, &request);
return -reply.error;
}
@@ -331,7 +318,7 @@ int nbd_client_co_flush(BlockDriverState *bs)
} else {
nbd_co_receive_reply(client, &request, &reply, NULL);
}
- nbd_coroutine_end(client, &request);
+ nbd_coroutine_end(bs, &request);
return -reply.error;
}
@@ -357,23 +344,23 @@ int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int count)
} else {
nbd_co_receive_reply(client, &request, &reply, NULL);
}
- nbd_coroutine_end(client, &request);
+ nbd_coroutine_end(bs, &request);
return -reply.error;
}
void nbd_client_detach_aio_context(BlockDriverState *bs)
{
- aio_set_fd_handler(bdrv_get_aio_context(bs),
- nbd_get_client_session(bs)->sioc->fd,
- false, NULL, NULL, NULL, NULL);
+ NBDClientSession *client = nbd_get_client_session(bs);
+ qio_channel_detach_aio_context(QIO_CHANNEL(client->sioc));
}
void nbd_client_attach_aio_context(BlockDriverState *bs,
AioContext *new_context)
{
- aio_set_fd_handler(new_context, nbd_get_client_session(bs)->sioc->fd,
- false, nbd_reply_ready, NULL, NULL, bs);
+ NBDClientSession *client = nbd_get_client_session(bs);
+ qio_channel_attach_aio_context(QIO_CHANNEL(client->sioc), new_context);
+ aio_co_schedule(new_context, client->read_reply_co);
}
void nbd_client_close(BlockDriverState *bs)
@@ -434,7 +421,7 @@ int nbd_client_init(BlockDriverState *bs,
/* Now that we're connected, set the socket to be non-blocking and
* kick the reply mechanism. */
qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
-
+ client->read_reply_co = qemu_coroutine_create(nbd_read_reply_entry, client);
nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
logout("Established connection with NBD server\n");
diff --git a/block/nbd-client.h b/block/nbd-client.h
index f8d6006..8cdfc92 100644
--- a/block/nbd-client.h
+++ b/block/nbd-client.h
@@ -25,7 +25,7 @@ typedef struct NBDClientSession {
CoMutex send_mutex;
CoQueue free_sema;
- Coroutine *send_coroutine;
+ Coroutine *read_reply_co;
int in_flight;
Coroutine *recv_coroutine[MAX_NBD_REQUESTS];
diff --git a/block/nfs.c b/block/nfs.c
index 689eaa7..08b43dd 100644
--- a/block/nfs.c
+++ b/block/nfs.c
@@ -208,15 +208,21 @@ static void nfs_set_events(NFSClient *client)
static void nfs_process_read(void *arg)
{
NFSClient *client = arg;
+
+ aio_context_acquire(client->aio_context);
nfs_service(client->context, POLLIN);
nfs_set_events(client);
+ aio_context_release(client->aio_context);
}
static void nfs_process_write(void *arg)
{
NFSClient *client = arg;
+
+ aio_context_acquire(client->aio_context);
nfs_service(client->context, POLLOUT);
nfs_set_events(client);
+ aio_context_release(client->aio_context);
}
static void nfs_co_init_task(BlockDriverState *bs, NFSRPC *task)
@@ -231,8 +237,9 @@ static void nfs_co_init_task(BlockDriverState *bs, NFSRPC *task)
static void nfs_co_generic_bh_cb(void *opaque)
{
NFSRPC *task = opaque;
+
task->complete = 1;
- qemu_coroutine_enter(task->co);
+ aio_co_wake(task->co);
}
static void
diff --git a/block/qcow2-cluster.c b/block/qcow2-cluster.c
index 928c1e2..78c11d4 100644
--- a/block/qcow2-cluster.c
+++ b/block/qcow2-cluster.c
@@ -932,9 +932,7 @@ static int handle_dependencies(BlockDriverState *bs, uint64_t guest_offset,
if (bytes == 0) {
/* Wait for the dependency to complete. We need to recheck
* the free/allocated clusters when we continue. */
- qemu_co_mutex_unlock(&s->lock);
- qemu_co_queue_wait(&old_alloc->dependent_requests);
- qemu_co_mutex_lock(&s->lock);
+ qemu_co_queue_wait(&old_alloc->dependent_requests, &s->lock);
return -EAGAIN;
}
}
diff --git a/block/qed-cluster.c b/block/qed-cluster.c
index c24e756..8f5da74 100644
--- a/block/qed-cluster.c
+++ b/block/qed-cluster.c
@@ -83,6 +83,7 @@ static void qed_find_cluster_cb(void *opaque, int ret)
unsigned int index;
unsigned int n;
+ qed_acquire(s);
if (ret) {
goto out;
}
@@ -109,6 +110,7 @@ static void qed_find_cluster_cb(void *opaque, int ret)
out:
find_cluster_cb->cb(find_cluster_cb->opaque, ret, offset, len);
+ qed_release(s);
g_free(find_cluster_cb);
}
diff --git a/block/qed-table.c b/block/qed-table.c
index ed443e2..b12c298 100644
--- a/block/qed-table.c
+++ b/block/qed-table.c
@@ -31,6 +31,7 @@ static void qed_read_table_cb(void *opaque, int ret)
{
QEDReadTableCB *read_table_cb = opaque;
QEDTable *table = read_table_cb->table;
+ BDRVQEDState *s = read_table_cb->s;
int noffsets = read_table_cb->qiov.size / sizeof(uint64_t);
int i;
@@ -40,13 +41,15 @@ static void qed_read_table_cb(void *opaque, int ret)
}
/* Byteswap offsets */
+ qed_acquire(s);
for (i = 0; i < noffsets; i++) {
table->offsets[i] = le64_to_cpu(table->offsets[i]);
}
+ qed_release(s);
out:
/* Completion */
- trace_qed_read_table_cb(read_table_cb->s, read_table_cb->table, ret);
+ trace_qed_read_table_cb(s, read_table_cb->table, ret);
gencb_complete(&read_table_cb->gencb, ret);
}
@@ -84,8 +87,9 @@ typedef struct {
static void qed_write_table_cb(void *opaque, int ret)
{
QEDWriteTableCB *write_table_cb = opaque;
+ BDRVQEDState *s = write_table_cb->s;
- trace_qed_write_table_cb(write_table_cb->s,
+ trace_qed_write_table_cb(s,
write_table_cb->orig_table,
write_table_cb->flush,
ret);
@@ -97,8 +101,10 @@ static void qed_write_table_cb(void *opaque, int ret)
if (write_table_cb->flush) {
/* We still need to flush first */
write_table_cb->flush = false;
+ qed_acquire(s);
bdrv_aio_flush(write_table_cb->s->bs, qed_write_table_cb,
write_table_cb);
+ qed_release(s);
return;
}
@@ -213,6 +219,7 @@ static void qed_read_l2_table_cb(void *opaque, int ret)
CachedL2Table *l2_table = request->l2_table;
uint64_t l2_offset = read_l2_table_cb->l2_offset;
+ qed_acquire(s);
if (ret) {
/* can't trust loaded L2 table anymore */
qed_unref_l2_cache_entry(l2_table);
@@ -228,6 +235,7 @@ static void qed_read_l2_table_cb(void *opaque, int ret)
request->l2_table = qed_find_l2_cache_entry(&s->l2_cache, l2_offset);
assert(request->l2_table != NULL);
}
+ qed_release(s);
gencb_complete(&read_l2_table_cb->gencb, ret);
}
diff --git a/block/qed.c b/block/qed.c
index 1a7ef0a..0b62c77 100644
--- a/block/qed.c
+++ b/block/qed.c
@@ -273,7 +273,19 @@ static CachedL2Table *qed_new_l2_table(BDRVQEDState *s)
return l2_table;
}
-static void qed_aio_next_io(void *opaque, int ret);
+static void qed_aio_next_io(QEDAIOCB *acb, int ret);
+
+static void qed_aio_start_io(QEDAIOCB *acb)
+{
+ qed_aio_next_io(acb, 0);
+}
+
+static void qed_aio_next_io_cb(void *opaque, int ret)
+{
+ QEDAIOCB *acb = opaque;
+
+ qed_aio_next_io(acb, ret);
+}
static void qed_plug_allocating_write_reqs(BDRVQEDState *s)
{
@@ -292,7 +304,7 @@ static void qed_unplug_allocating_write_reqs(BDRVQEDState *s)
acb = QSIMPLEQ_FIRST(&s->allocating_write_reqs);
if (acb) {
- qed_aio_next_io(acb, 0);
+ qed_aio_start_io(acb);
}
}
@@ -333,10 +345,22 @@ static void qed_need_check_timer_cb(void *opaque)
trace_qed_need_check_timer_cb(s);
+ qed_acquire(s);
qed_plug_allocating_write_reqs(s);
/* Ensure writes are on disk before clearing flag */
bdrv_aio_flush(s->bs->file->bs, qed_clear_need_check, s);
+ qed_release(s);
+}
+
+void qed_acquire(BDRVQEDState *s)
+{
+ aio_context_acquire(bdrv_get_aio_context(s->bs));
+}
+
+void qed_release(BDRVQEDState *s)
+{
+ aio_context_release(bdrv_get_aio_context(s->bs));
}
static void qed_start_need_check_timer(BDRVQEDState *s)
@@ -721,7 +745,7 @@ static void qed_is_allocated_cb(void *opaque, int ret, uint64_t offset, size_t l
}
if (cb->co) {
- qemu_coroutine_enter(cb->co);
+ aio_co_wake(cb->co);
}
}
@@ -918,6 +942,7 @@ static void qed_update_l2_table(BDRVQEDState *s, QEDTable *table, int index,
static void qed_aio_complete_bh(void *opaque)
{
QEDAIOCB *acb = opaque;
+ BDRVQEDState *s = acb_to_s(acb);
BlockCompletionFunc *cb = acb->common.cb;
void *user_opaque = acb->common.opaque;
int ret = acb->bh_ret;
@@ -925,7 +950,9 @@ static void qed_aio_complete_bh(void *opaque)
qemu_aio_unref(acb);
/* Invoke callback */
+ qed_acquire(s);
cb(user_opaque, ret);
+ qed_release(s);
}
static void qed_aio_complete(QEDAIOCB *acb, int ret)
@@ -959,7 +986,7 @@ static void qed_aio_complete(QEDAIOCB *acb, int ret)
QSIMPLEQ_REMOVE_HEAD(&s->allocating_write_reqs, next);
acb = QSIMPLEQ_FIRST(&s->allocating_write_reqs);
if (acb) {
- qed_aio_next_io(acb, 0);
+ qed_aio_start_io(acb);
} else if (s->header.features & QED_F_NEED_CHECK) {
qed_start_need_check_timer(s);
}
@@ -984,7 +1011,7 @@ static void qed_commit_l2_update(void *opaque, int ret)
acb->request.l2_table = qed_find_l2_cache_entry(&s->l2_cache, l2_offset);
assert(acb->request.l2_table != NULL);
- qed_aio_next_io(opaque, ret);
+ qed_aio_next_io(acb, ret);
}
/**
@@ -1032,11 +1059,11 @@ static void qed_aio_write_l2_update(QEDAIOCB *acb, int ret, uint64_t offset)
if (need_alloc) {
/* Write out the whole new L2 table */
qed_write_l2_table(s, &acb->request, 0, s->table_nelems, true,
- qed_aio_write_l1_update, acb);
+ qed_aio_write_l1_update, acb);
} else {
/* Write out only the updated part of the L2 table */
qed_write_l2_table(s, &acb->request, index, acb->cur_nclusters, false,
- qed_aio_next_io, acb);
+ qed_aio_next_io_cb, acb);
}
return;
@@ -1088,7 +1115,7 @@ static void qed_aio_write_main(void *opaque, int ret)
}
if (acb->find_cluster_ret == QED_CLUSTER_FOUND) {
- next_fn = qed_aio_next_io;
+ next_fn = qed_aio_next_io_cb;
} else {
if (s->bs->backing) {
next_fn = qed_aio_write_flush_before_l2_update;
@@ -1201,7 +1228,7 @@ static void qed_aio_write_alloc(QEDAIOCB *acb, size_t len)
if (acb->flags & QED_AIOCB_ZERO) {
/* Skip ahead if the clusters are already zero */
if (acb->find_cluster_ret == QED_CLUSTER_ZERO) {
- qed_aio_next_io(acb, 0);
+ qed_aio_start_io(acb);
return;
}
@@ -1321,18 +1348,18 @@ static void qed_aio_read_data(void *opaque, int ret,
/* Handle zero cluster and backing file reads */
if (ret == QED_CLUSTER_ZERO) {
qemu_iovec_memset(&acb->cur_qiov, 0, 0, acb->cur_qiov.size);
- qed_aio_next_io(acb, 0);
+ qed_aio_start_io(acb);
return;
} else if (ret != QED_CLUSTER_FOUND) {
qed_read_backing_file(s, acb->cur_pos, &acb->cur_qiov,
- &acb->backing_qiov, qed_aio_next_io, acb);
+ &acb->backing_qiov, qed_aio_next_io_cb, acb);
return;
}
BLKDBG_EVENT(bs->file, BLKDBG_READ_AIO);
bdrv_aio_readv(bs->file, offset / BDRV_SECTOR_SIZE,
&acb->cur_qiov, acb->cur_qiov.size / BDRV_SECTOR_SIZE,
- qed_aio_next_io, acb);
+ qed_aio_next_io_cb, acb);
return;
err:
@@ -1342,9 +1369,8 @@ err:
/**
* Begin next I/O or complete the request
*/
-static void qed_aio_next_io(void *opaque, int ret)
+static void qed_aio_next_io(QEDAIOCB *acb, int ret)
{
- QEDAIOCB *acb = opaque;
BDRVQEDState *s = acb_to_s(acb);
QEDFindClusterFunc *io_fn = (acb->flags & QED_AIOCB_WRITE) ?
qed_aio_write_data : qed_aio_read_data;
@@ -1400,7 +1426,7 @@ static BlockAIOCB *qed_aio_setup(BlockDriverState *bs,
qemu_iovec_init(&acb->cur_qiov, qiov->niov);
/* Start request */
- qed_aio_next_io(acb, 0);
+ qed_aio_start_io(acb);
return &acb->common;
}
@@ -1436,7 +1462,7 @@ static void coroutine_fn qed_co_pwrite_zeroes_cb(void *opaque, int ret)
cb->done = true;
cb->ret = ret;
if (cb->co) {
- qemu_coroutine_enter(cb->co);
+ aio_co_wake(cb->co);
}
}
diff --git a/block/qed.h b/block/qed.h
index 9676ab9..ce8c314 100644
--- a/block/qed.h
+++ b/block/qed.h
@@ -198,6 +198,9 @@ enum {
*/
typedef void QEDFindClusterFunc(void *opaque, int ret, uint64_t offset, size_t len);
+void qed_acquire(BDRVQEDState *s);
+void qed_release(BDRVQEDState *s);
+
/**
* Generic callback for chaining async callbacks
*/
diff --git a/block/sheepdog.c b/block/sheepdog.c
index f757157..860ba61 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -486,7 +486,7 @@ static void wait_for_overlapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *acb)
retry:
QLIST_FOREACH(cb, &s->inflight_aiocb_head, aiocb_siblings) {
if (AIOCBOverlapping(acb, cb)) {
- qemu_co_queue_wait(&s->overlapping_queue);
+ qemu_co_queue_wait(&s->overlapping_queue, NULL);
goto retry;
}
}
@@ -575,13 +575,6 @@ static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
return ret;
}
-static void restart_co_req(void *opaque)
-{
- Coroutine *co = opaque;
-
- qemu_coroutine_enter(co);
-}
-
typedef struct SheepdogReqCo {
int sockfd;
BlockDriverState *bs;
@@ -592,12 +585,19 @@ typedef struct SheepdogReqCo {
unsigned int *rlen;
int ret;
bool finished;
+ Coroutine *co;
} SheepdogReqCo;
+static void restart_co_req(void *opaque)
+{
+ SheepdogReqCo *srco = opaque;
+
+ aio_co_wake(srco->co);
+}
+
static coroutine_fn void do_co_req(void *opaque)
{
int ret;
- Coroutine *co;
SheepdogReqCo *srco = opaque;
int sockfd = srco->sockfd;
SheepdogReq *hdr = srco->hdr;
@@ -605,9 +605,9 @@ static coroutine_fn void do_co_req(void *opaque)
unsigned int *wlen = srco->wlen;
unsigned int *rlen = srco->rlen;
- co = qemu_coroutine_self();
+ srco->co = qemu_coroutine_self();
aio_set_fd_handler(srco->aio_context, sockfd, false,
- NULL, restart_co_req, NULL, co);
+ NULL, restart_co_req, NULL, srco);
ret = send_co_req(sockfd, hdr, data, wlen);
if (ret < 0) {
@@ -615,7 +615,7 @@ static coroutine_fn void do_co_req(void *opaque)
}
aio_set_fd_handler(srco->aio_context, sockfd, false,
- restart_co_req, NULL, NULL, co);
+ restart_co_req, NULL, NULL, srco);
ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
if (ret != sizeof(*hdr)) {
@@ -643,6 +643,7 @@ out:
aio_set_fd_handler(srco->aio_context, sockfd, false,
NULL, NULL, NULL, NULL);
+ srco->co = NULL;
srco->ret = ret;
srco->finished = true;
if (srco->bs) {
@@ -866,7 +867,7 @@ static void coroutine_fn aio_read_response(void *opaque)
* We've finished all requests which belong to the AIOCB, so
* we can switch back to sd_co_readv/writev now.
*/
- qemu_coroutine_enter(acb->coroutine);
+ aio_co_wake(acb->coroutine);
}
return;
@@ -883,14 +884,14 @@ static void co_read_response(void *opaque)
s->co_recv = qemu_coroutine_create(aio_read_response, opaque);
}
- qemu_coroutine_enter(s->co_recv);
+ aio_co_wake(s->co_recv);
}
static void co_write_request(void *opaque)
{
BDRVSheepdogState *s = opaque;
- qemu_coroutine_enter(s->co_send);
+ aio_co_wake(s->co_send);
}
/*
diff --git a/block/ssh.c b/block/ssh.c
index e0edf20..835932e 100644
--- a/block/ssh.c
+++ b/block/ssh.c
@@ -889,10 +889,14 @@ static void restart_coroutine(void *opaque)
DPRINTF("co=%p", co);
- qemu_coroutine_enter(co);
+ aio_co_wake(co);
}
-static coroutine_fn void set_fd_handler(BDRVSSHState *s, BlockDriverState *bs)
+/* A non-blocking call returned EAGAIN, so yield, ensuring the
+ * handlers are set up so that we'll be rescheduled when there is an
+ * interesting event on the socket.
+ */
+static coroutine_fn void co_yield(BDRVSSHState *s, BlockDriverState *bs)
{
int r;
IOHandler *rd_handler = NULL, *wr_handler = NULL;
@@ -912,25 +916,10 @@ static coroutine_fn void set_fd_handler(BDRVSSHState *s, BlockDriverState *bs)
aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock,
false, rd_handler, wr_handler, NULL, co);
-}
-
-static coroutine_fn void clear_fd_handler(BDRVSSHState *s,
- BlockDriverState *bs)
-{
- DPRINTF("s->sock=%d", s->sock);
- aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock,
- false, NULL, NULL, NULL, NULL);
-}
-
-/* A non-blocking call returned EAGAIN, so yield, ensuring the
- * handlers are set up so that we'll be rescheduled when there is an
- * interesting event on the socket.
- */
-static coroutine_fn void co_yield(BDRVSSHState *s, BlockDriverState *bs)
-{
- set_fd_handler(s, bs);
qemu_coroutine_yield();
- clear_fd_handler(s, bs);
+ DPRINTF("s->sock=%d - back", s->sock);
+ aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock, false,
+ NULL, NULL, NULL, NULL);
}
/* SFTP has a function `libssh2_sftp_seek64' which seeks to a position
diff --git a/block/throttle-groups.c b/block/throttle-groups.c
index 17b2efb..b73e7a8 100644
--- a/block/throttle-groups.c
+++ b/block/throttle-groups.c
@@ -326,7 +326,7 @@ void coroutine_fn throttle_group_co_io_limits_intercept(BlockBackend *blk,
if (must_wait || blkp->pending_reqs[is_write]) {
blkp->pending_reqs[is_write]++;
qemu_mutex_unlock(&tg->lock);
- qemu_co_queue_wait(&blkp->throttled_reqs[is_write]);
+ qemu_co_queue_wait(&blkp->throttled_reqs[is_write], NULL);
qemu_mutex_lock(&tg->lock);
blkp->pending_reqs[is_write]--;
}
@@ -416,7 +416,9 @@ static void timer_cb(BlockBackend *blk, bool is_write)
qemu_mutex_unlock(&tg->lock);
/* Run the request that was waiting for this timer */
+ aio_context_acquire(blk_get_aio_context(blk));
empty_queue = !qemu_co_enter_next(&blkp->throttled_reqs[is_write]);
+ aio_context_release(blk_get_aio_context(blk));
/* If the request queue was empty then we have to take care of
* scheduling the next one */
diff --git a/block/win32-aio.c b/block/win32-aio.c
index 8cdf73b..3be8f45 100644
--- a/block/win32-aio.c
+++ b/block/win32-aio.c
@@ -41,7 +41,7 @@ struct QEMUWin32AIOState {
HANDLE hIOCP;
EventNotifier e;
int count;
- bool is_aio_context_attached;
+ AioContext *aio_ctx;
};
typedef struct QEMUWin32AIOCB {
@@ -87,7 +87,6 @@ static void win32_aio_process_completion(QEMUWin32AIOState *s,
qemu_vfree(waiocb->buf);
}
-
waiocb->common.cb(waiocb->common.opaque, ret);
qemu_aio_unref(waiocb);
}
@@ -176,13 +175,13 @@ void win32_aio_detach_aio_context(QEMUWin32AIOState *aio,
AioContext *old_context)
{
aio_set_event_notifier(old_context, &aio->e, false, NULL, NULL);
- aio->is_aio_context_attached = false;
+ aio->aio_ctx = NULL;
}
void win32_aio_attach_aio_context(QEMUWin32AIOState *aio,
AioContext *new_context)
{
- aio->is_aio_context_attached = true;
+ aio->aio_ctx = new_context;
aio_set_event_notifier(new_context, &aio->e, false,
win32_aio_completion_cb, NULL);
}
@@ -212,7 +211,7 @@ out_free_state:
void win32_aio_cleanup(QEMUWin32AIOState *aio)
{
- assert(!aio->is_aio_context_attached);
+ assert(!aio->aio_ctx);
CloseHandle(aio->hIOCP);
event_notifier_cleanup(&aio->e);
g_free(aio);
diff --git a/dma-helpers.c b/dma-helpers.c
index 97157cc..2d7e02d 100644
--- a/dma-helpers.c
+++ b/dma-helpers.c
@@ -166,8 +166,10 @@ static void dma_blk_cb(void *opaque, int ret)
QEMU_ALIGN_DOWN(dbs->iov.size, dbs->align));
}
+ aio_context_acquire(dbs->ctx);
dbs->acb = dbs->io_func(dbs->offset, &dbs->iov,
dma_blk_cb, dbs, dbs->io_func_opaque);
+ aio_context_release(dbs->ctx);
assert(dbs->acb);
}
diff --git a/hw/9pfs/9p.c b/hw/9pfs/9p.c
index 99e9472..3af1c93 100644
--- a/hw/9pfs/9p.c
+++ b/hw/9pfs/9p.c
@@ -2374,7 +2374,7 @@ static void coroutine_fn v9fs_flush(void *opaque)
/*
* Wait for pdu to complete.
*/
- qemu_co_queue_wait(&cancel_pdu->complete);
+ qemu_co_queue_wait(&cancel_pdu->complete, NULL);
cancel_pdu->cancelled = 0;
pdu_free(cancel_pdu);
}
diff --git a/hw/block/virtio-blk.c b/hw/block/virtio-blk.c
index baaa195..843bd2f 100644
--- a/hw/block/virtio-blk.c
+++ b/hw/block/virtio-blk.c
@@ -89,7 +89,9 @@ static int virtio_blk_handle_rw_error(VirtIOBlockReq *req, int error,
static void virtio_blk_rw_complete(void *opaque, int ret)
{
VirtIOBlockReq *next = opaque;
+ VirtIOBlock *s = next->dev;
+ aio_context_acquire(blk_get_aio_context(s->conf.conf.blk));
while (next) {
VirtIOBlockReq *req = next;
next = req->mr_next;
@@ -122,21 +124,27 @@ static void virtio_blk_rw_complete(void *opaque, int ret)
block_acct_done(blk_get_stats(req->dev->blk), &req->acct);
virtio_blk_free_request(req);
}
+ aio_context_release(blk_get_aio_context(s->conf.conf.blk));
}
static void virtio_blk_flush_complete(void *opaque, int ret)
{
VirtIOBlockReq *req = opaque;
+ VirtIOBlock *s = req->dev;
+ aio_context_acquire(blk_get_aio_context(s->conf.conf.blk));
if (ret) {
if (virtio_blk_handle_rw_error(req, -ret, 0)) {
- return;
+ goto out;
}
}
virtio_blk_req_complete(req, VIRTIO_BLK_S_OK);
block_acct_done(blk_get_stats(req->dev->blk), &req->acct);
virtio_blk_free_request(req);
+
+out:
+ aio_context_release(blk_get_aio_context(s->conf.conf.blk));
}
#ifdef __linux__
@@ -150,7 +158,8 @@ static void virtio_blk_ioctl_complete(void *opaque, int status)
{
VirtIOBlockIoctlReq *ioctl_req = opaque;
VirtIOBlockReq *req = ioctl_req->req;
- VirtIODevice *vdev = VIRTIO_DEVICE(req->dev);
+ VirtIOBlock *s = req->dev;
+ VirtIODevice *vdev = VIRTIO_DEVICE(s);
struct virtio_scsi_inhdr *scsi;
struct sg_io_hdr *hdr;
@@ -182,8 +191,10 @@ static void virtio_blk_ioctl_complete(void *opaque, int status)
virtio_stl_p(vdev, &scsi->data_len, hdr->dxfer_len);
out:
+ aio_context_acquire(blk_get_aio_context(s->conf.conf.blk));
virtio_blk_req_complete(req, status);
virtio_blk_free_request(req);
+ aio_context_release(blk_get_aio_context(s->conf.conf.blk));
g_free(ioctl_req);
}
@@ -587,6 +598,7 @@ bool virtio_blk_handle_vq(VirtIOBlock *s, VirtQueue *vq)
MultiReqBuffer mrb = {};
bool progress = false;
+ aio_context_acquire(blk_get_aio_context(s->blk));
blk_io_plug(s->blk);
do {
@@ -609,6 +621,7 @@ bool virtio_blk_handle_vq(VirtIOBlock *s, VirtQueue *vq)
}
blk_io_unplug(s->blk);
+ aio_context_release(blk_get_aio_context(s->blk));
return progress;
}
@@ -644,6 +657,7 @@ static void virtio_blk_dma_restart_bh(void *opaque)
s->rq = NULL;
+ aio_context_acquire(blk_get_aio_context(s->conf.conf.blk));
while (req) {
VirtIOBlockReq *next = req->next;
if (virtio_blk_handle_request(req, &mrb)) {
@@ -664,6 +678,7 @@ static void virtio_blk_dma_restart_bh(void *opaque)
if (mrb.num_reqs) {
virtio_blk_submit_multireq(s->blk, &mrb);
}
+ aio_context_release(blk_get_aio_context(s->conf.conf.blk));
}
static void virtio_blk_dma_restart_cb(void *opaque, int running,
diff --git a/hw/scsi/scsi-bus.c b/hw/scsi/scsi-bus.c
index 5940cb1..c9f0ac0 100644
--- a/hw/scsi/scsi-bus.c
+++ b/hw/scsi/scsi-bus.c
@@ -105,6 +105,7 @@ static void scsi_dma_restart_bh(void *opaque)
qemu_bh_delete(s->bh);
s->bh = NULL;
+ aio_context_acquire(blk_get_aio_context(s->conf.blk));
QTAILQ_FOREACH_SAFE(req, &s->requests, next, next) {
scsi_req_ref(req);
if (req->retry) {
@@ -122,6 +123,7 @@ static void scsi_dma_restart_bh(void *opaque)
}
scsi_req_unref(req);
}
+ aio_context_release(blk_get_aio_context(s->conf.blk));
}
void scsi_req_retry(SCSIRequest *req)
diff --git a/hw/scsi/scsi-disk.c b/hw/scsi/scsi-disk.c
index cc06fe5..bbfb5dc 100644
--- a/hw/scsi/scsi-disk.c
+++ b/hw/scsi/scsi-disk.c
@@ -207,6 +207,7 @@ static void scsi_aio_complete(void *opaque, int ret)
assert(r->req.aiocb != NULL);
r->req.aiocb = NULL;
+ aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
if (scsi_disk_req_check_error(r, ret, true)) {
goto done;
}
@@ -215,6 +216,7 @@ static void scsi_aio_complete(void *opaque, int ret)
scsi_req_complete(&r->req, GOOD);
done:
+ aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
scsi_req_unref(&r->req);
}
@@ -290,12 +292,14 @@ static void scsi_dma_complete(void *opaque, int ret)
assert(r->req.aiocb != NULL);
r->req.aiocb = NULL;
+ aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
if (ret < 0) {
block_acct_failed(blk_get_stats(s->qdev.conf.blk), &r->acct);
} else {
block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
}
scsi_dma_complete_noio(r, ret);
+ aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
}
static void scsi_read_complete(void * opaque, int ret)
@@ -306,6 +310,7 @@ static void scsi_read_complete(void * opaque, int ret)
assert(r->req.aiocb != NULL);
r->req.aiocb = NULL;
+ aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
if (scsi_disk_req_check_error(r, ret, true)) {
goto done;
}
@@ -320,6 +325,7 @@ static void scsi_read_complete(void * opaque, int ret)
done:
scsi_req_unref(&r->req);
+ aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
}
/* Actually issue a read to the block device. */
@@ -364,12 +370,14 @@ static void scsi_do_read_cb(void *opaque, int ret)
assert (r->req.aiocb != NULL);
r->req.aiocb = NULL;
+ aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
if (ret < 0) {
block_acct_failed(blk_get_stats(s->qdev.conf.blk), &r->acct);
} else {
block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
}
scsi_do_read(opaque, ret);
+ aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
}
/* Read more data from scsi device into buffer. */
@@ -489,12 +497,14 @@ static void scsi_write_complete(void * opaque, int ret)
assert (r->req.aiocb != NULL);
r->req.aiocb = NULL;
+ aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
if (ret < 0) {
block_acct_failed(blk_get_stats(s->qdev.conf.blk), &r->acct);
} else {
block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
}
scsi_write_complete_noio(r, ret);
+ aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
}
static void scsi_write_data(SCSIRequest *req)
@@ -1625,11 +1635,14 @@ static void scsi_unmap_complete(void *opaque, int ret)
{
UnmapCBData *data = opaque;
SCSIDiskReq *r = data->r;
+ SCSIDiskState *s = DO_UPCAST(SCSIDiskState, qdev, r->req.dev);
assert(r->req.aiocb != NULL);
r->req.aiocb = NULL;
+ aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
scsi_unmap_complete_noio(data, ret);
+ aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
}
static void scsi_disk_emulate_unmap(SCSIDiskReq *r, uint8_t *inbuf)
@@ -1696,6 +1709,7 @@ static void scsi_write_same_complete(void *opaque, int ret)
assert(r->req.aiocb != NULL);
r->req.aiocb = NULL;
+ aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
if (scsi_disk_req_check_error(r, ret, true)) {
goto done;
}
@@ -1724,6 +1738,7 @@ done:
scsi_req_unref(&r->req);
qemu_vfree(data->iov.iov_base);
g_free(data);
+ aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
}
static void scsi_disk_emulate_write_same(SCSIDiskReq *r, uint8_t *inbuf)
diff --git a/hw/scsi/scsi-generic.c b/hw/scsi/scsi-generic.c
index 92f091a..2933119 100644
--- a/hw/scsi/scsi-generic.c
+++ b/hw/scsi/scsi-generic.c
@@ -143,10 +143,14 @@ done:
static void scsi_command_complete(void *opaque, int ret)
{
SCSIGenericReq *r = (SCSIGenericReq *)opaque;
+ SCSIDevice *s = r->req.dev;
assert(r->req.aiocb != NULL);
r->req.aiocb = NULL;
+
+ aio_context_acquire(blk_get_aio_context(s->conf.blk));
scsi_command_complete_noio(r, ret);
+ aio_context_release(blk_get_aio_context(s->conf.blk));
}
static int execute_command(BlockBackend *blk,
@@ -182,9 +186,11 @@ static void scsi_read_complete(void * opaque, int ret)
assert(r->req.aiocb != NULL);
r->req.aiocb = NULL;
+ aio_context_acquire(blk_get_aio_context(s->conf.blk));
+
if (ret || r->req.io_canceled) {
scsi_command_complete_noio(r, ret);
- return;
+ goto done;
}
len = r->io_header.dxfer_len - r->io_header.resid;
@@ -193,7 +199,7 @@ static void scsi_read_complete(void * opaque, int ret)
r->len = -1;
if (len == 0) {
scsi_command_complete_noio(r, 0);
- return;
+ goto done;
}
/* Snoop READ CAPACITY output to set the blocksize. */
@@ -237,6 +243,9 @@ static void scsi_read_complete(void * opaque, int ret)
}
scsi_req_data(&r->req, len);
scsi_req_unref(&r->req);
+
+done:
+ aio_context_release(blk_get_aio_context(s->conf.blk));
}
/* Read more data from scsi device into buffer. */
@@ -272,9 +281,11 @@ static void scsi_write_complete(void * opaque, int ret)
assert(r->req.aiocb != NULL);
r->req.aiocb = NULL;
+ aio_context_acquire(blk_get_aio_context(s->conf.blk));
+
if (ret || r->req.io_canceled) {
scsi_command_complete_noio(r, ret);
- return;
+ goto done;
}
if (r->req.cmd.buf[0] == MODE_SELECT && r->req.cmd.buf[4] == 12 &&
@@ -284,6 +295,9 @@ static void scsi_write_complete(void * opaque, int ret)
}
scsi_command_complete_noio(r, ret);
+
+done:
+ aio_context_release(blk_get_aio_context(s->conf.blk));
}
/* Write data to a scsi device. Returns nonzero on failure.
diff --git a/hw/scsi/virtio-scsi.c b/hw/scsi/virtio-scsi.c
index b01030b..9e6f0e8 100644
--- a/hw/scsi/virtio-scsi.c
+++ b/hw/scsi/virtio-scsi.c
@@ -441,10 +441,12 @@ bool virtio_scsi_handle_ctrl_vq(VirtIOSCSI *s, VirtQueue *vq)
VirtIOSCSIReq *req;
bool progress = false;
+ virtio_scsi_acquire(s);
while ((req = virtio_scsi_pop_req(s, vq))) {
progress = true;
virtio_scsi_handle_ctrl_req(s, req);
}
+ virtio_scsi_release(s);
return progress;
}
@@ -602,6 +604,7 @@ bool virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq)
QTAILQ_HEAD(, VirtIOSCSIReq) reqs = QTAILQ_HEAD_INITIALIZER(reqs);
+ virtio_scsi_acquire(s);
do {
virtio_queue_set_notification(vq, 0);
@@ -629,6 +632,7 @@ bool virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq)
QTAILQ_FOREACH_SAFE(req, &reqs, next, next) {
virtio_scsi_handle_cmd_req_submit(s, req);
}
+ virtio_scsi_release(s);
return progress;
}
@@ -760,10 +764,13 @@ out:
bool virtio_scsi_handle_event_vq(VirtIOSCSI *s, VirtQueue *vq)
{
+ virtio_scsi_acquire(s);
if (s->events_dropped) {
virtio_scsi_push_event(s, NULL, VIRTIO_SCSI_T_NO_EVENT, 0);
+ virtio_scsi_release(s);
return true;
}
+ virtio_scsi_release(s);
return false;
}
diff --git a/include/block/aio.h b/include/block/aio.h
index 7df271d..677b6ff 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -47,6 +47,7 @@ typedef void QEMUBHFunc(void *opaque);
typedef bool AioPollFn(void *opaque);
typedef void IOHandler(void *opaque);
+struct Coroutine;
struct ThreadPool;
struct LinuxAioState;
@@ -108,6 +109,9 @@ struct AioContext {
bool notified;
EventNotifier notifier;
+ QSLIST_HEAD(, Coroutine) scheduled_coroutines;
+ QEMUBH *co_schedule_bh;
+
/* Thread pool for performing work and receiving completion callbacks.
* Has its own locking.
*/
@@ -306,12 +310,8 @@ bool aio_pending(AioContext *ctx);
/* Dispatch any pending callbacks from the GSource attached to the AioContext.
*
* This is used internally in the implementation of the GSource.
- *
- * @dispatch_fds: true to process fds, false to skip them
- * (can be used as an optimization by callers that know there
- * are no fds ready)
*/
-bool aio_dispatch(AioContext *ctx, bool dispatch_fds);
+void aio_dispatch(AioContext *ctx);
/* Progress in completing AIO work to occur. This can issue new pending
* aio as a result of executing I/O completion or bh callbacks.
@@ -483,6 +483,34 @@ static inline bool aio_node_check(AioContext *ctx, bool is_external)
}
/**
+ * aio_co_schedule:
+ * @ctx: the aio context
+ * @co: the coroutine
+ *
+ * Start a coroutine on a remote AioContext.
+ *
+ * The coroutine must not be entered by anyone else while aio_co_schedule()
+ * is active. In addition the coroutine must have yielded unless ctx
+ * is the context in which the coroutine is running (i.e. the value of
+ * qemu_get_current_aio_context() from the coroutine itself).
+ */
+void aio_co_schedule(AioContext *ctx, struct Coroutine *co);
+
+/**
+ * aio_co_wake:
+ * @co: the coroutine
+ *
+ * Restart a coroutine on the AioContext where it was running last, thus
+ * preventing coroutines from jumping from one context to another when they
+ * go to sleep.
+ *
+ * aio_co_wake may be executed either in coroutine or non-coroutine
+ * context. The coroutine must not be entered by anyone else while
+ * aio_co_wake() is active.
+ */
+void aio_co_wake(struct Coroutine *co);
+
+/**
* Return the AioContext whose event loop runs in the current thread.
*
* If called from an IOThread this will be the IOThread's AioContext. If
diff --git a/include/block/block_int.h b/include/block/block_int.h
index 2d92d7e..1670941 100644
--- a/include/block/block_int.h
+++ b/include/block/block_int.h
@@ -430,8 +430,9 @@ struct BdrvChild {
* copied as well.
*/
struct BlockDriverState {
- int64_t total_sectors; /* if we are reading a disk image, give its
- size in sectors */
+ /* Protected by big QEMU lock or read-only after opening. No special
+ * locking needed during I/O...
+ */
int open_flags; /* flags used to open the file, re-used for re-open */
bool read_only; /* if true, the media is read only */
bool encrypted; /* if true, the media is encrypted */
@@ -439,14 +440,6 @@ struct BlockDriverState {
bool sg; /* if true, the device is a /dev/sg* */
bool probed; /* if true, format was probed rather than specified */
- int copy_on_read; /* if nonzero, copy read backing sectors into image.
- note this is a reference count */
-
- CoQueue flush_queue; /* Serializing flush queue */
- bool active_flush_req; /* Flush request in flight? */
- unsigned int write_gen; /* Current data generation */
- unsigned int flushed_gen; /* Flushed write generation */
-
BlockDriver *drv; /* NULL means no media */
void *opaque;
@@ -468,18 +461,6 @@ struct BlockDriverState {
BdrvChild *backing;
BdrvChild *file;
- /* Callback before write request is processed */
- NotifierWithReturnList before_write_notifiers;
-
- /* number of in-flight requests; overall and serialising */
- unsigned int in_flight;
- unsigned int serialising_in_flight;
-
- bool wakeup;
-
- /* Offset after the highest byte written to */
- uint64_t wr_highest_offset;
-
/* I/O Limits */
BlockLimits bl;
@@ -497,11 +478,8 @@ struct BlockDriverState {
QTAILQ_ENTRY(BlockDriverState) bs_list;
/* element of the list of monitor-owned BDS */
QTAILQ_ENTRY(BlockDriverState) monitor_list;
- QLIST_HEAD(, BdrvDirtyBitmap) dirty_bitmaps;
int refcnt;
- QLIST_HEAD(, BdrvTrackedRequest) tracked_requests;
-
/* operation blockers */
QLIST_HEAD(, BdrvOpBlocker) op_blockers[BLOCK_OP_TYPE_MAX];
@@ -522,6 +500,31 @@ struct BlockDriverState {
/* The error object in use for blocking operations on backing_hd */
Error *backing_blocker;
+ /* Protected by AioContext lock */
+
+ /* If true, copy read backing sectors into image. Can be >1 if more
+ * than one client has requested copy-on-read.
+ */
+ int copy_on_read;
+
+ /* If we are reading a disk image, give its size in sectors.
+ * Generally read-only; it is written to by load_vmstate and save_vmstate,
+ * but the block layer is quiescent during those.
+ */
+ int64_t total_sectors;
+
+ /* Callback before write request is processed */
+ NotifierWithReturnList before_write_notifiers;
+
+ /* number of in-flight requests; overall and serialising */
+ unsigned int in_flight;
+ unsigned int serialising_in_flight;
+
+ bool wakeup;
+
+ /* Offset after the highest byte written to */
+ uint64_t wr_highest_offset;
+
/* threshold limit for writes, in bytes. "High water mark". */
uint64_t write_threshold_offset;
NotifierWithReturn write_threshold_notifier;
@@ -529,6 +532,17 @@ struct BlockDriverState {
/* counter for nested bdrv_io_plug */
unsigned io_plugged;
+ QLIST_HEAD(, BdrvTrackedRequest) tracked_requests;
+ CoQueue flush_queue; /* Serializing flush queue */
+ bool active_flush_req; /* Flush request in flight? */
+ unsigned int write_gen; /* Current data generation */
+ unsigned int flushed_gen; /* Flushed write generation */
+
+ QLIST_HEAD(, BdrvDirtyBitmap) dirty_bitmaps;
+
+ /* do we need to tell the quest if we have a volatile write cache? */
+ int enable_write_cache;
+
int quiesce_counter;
};
diff --git a/include/io/channel.h b/include/io/channel.h
index 32a9470..5d48906 100644
--- a/include/io/channel.h
+++ b/include/io/channel.h
@@ -23,6 +23,8 @@
#include "qemu-common.h"
#include "qom/object.h"
+#include "qemu/coroutine.h"
+#include "block/aio.h"
#define TYPE_QIO_CHANNEL "qio-channel"
#define QIO_CHANNEL(obj) \
@@ -80,6 +82,9 @@ struct QIOChannel {
Object parent;
unsigned int features; /* bitmask of QIOChannelFeatures */
char *name;
+ AioContext *ctx;
+ Coroutine *read_coroutine;
+ Coroutine *write_coroutine;
#ifdef _WIN32
HANDLE event; /* For use with GSource on Win32 */
#endif
@@ -132,6 +137,11 @@ struct QIOChannelClass {
off_t offset,
int whence,
Error **errp);
+ void (*io_set_aio_fd_handler)(QIOChannel *ioc,
+ AioContext *ctx,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ void *opaque);
};
/* General I/O handling functions */
@@ -497,13 +507,50 @@ guint qio_channel_add_watch(QIOChannel *ioc,
/**
+ * qio_channel_attach_aio_context:
+ * @ioc: the channel object
+ * @ctx: the #AioContext to set the handlers on
+ *
+ * Request that qio_channel_yield() sets I/O handlers on
+ * the given #AioContext. If @ctx is %NULL, qio_channel_yield()
+ * uses QEMU's main thread event loop.
+ *
+ * You can move a #QIOChannel from one #AioContext to another even if
+ * I/O handlers are set for a coroutine. However, #QIOChannel provides
+ * no synchronization between the calls to qio_channel_yield() and
+ * qio_channel_attach_aio_context().
+ *
+ * Therefore you should first call qio_channel_detach_aio_context()
+ * to ensure that the coroutine is not entered concurrently. Then,
+ * while the coroutine has yielded, call qio_channel_attach_aio_context(),
+ * and then aio_co_schedule() to place the coroutine on the new
+ * #AioContext. The calls to qio_channel_detach_aio_context()
+ * and qio_channel_attach_aio_context() should be protected with
+ * aio_context_acquire() and aio_context_release().
+ */
+void qio_channel_attach_aio_context(QIOChannel *ioc,
+ AioContext *ctx);
+
+/**
+ * qio_channel_detach_aio_context:
+ * @ioc: the channel object
+ *
+ * Disable any I/O handlers set by qio_channel_yield(). With the
+ * help of aio_co_schedule(), this allows moving a coroutine that was
+ * paused by qio_channel_yield() to another context.
+ */
+void qio_channel_detach_aio_context(QIOChannel *ioc);
+
+/**
* qio_channel_yield:
* @ioc: the channel object
* @condition: the I/O condition to wait for
*
- * Yields execution from the current coroutine until
- * the condition indicated by @condition becomes
- * available.
+ * Yields execution from the current coroutine until the condition
+ * indicated by @condition becomes available. @condition must
+ * be either %G_IO_IN or %G_IO_OUT; it cannot contain both. In
+ * addition, no two coroutine can be waiting on the same condition
+ * and channel at the same time.
*
* This must only be called from coroutine context
*/
@@ -525,4 +572,23 @@ void qio_channel_yield(QIOChannel *ioc,
void qio_channel_wait(QIOChannel *ioc,
GIOCondition condition);
+/**
+ * qio_channel_set_aio_fd_handler:
+ * @ioc: the channel object
+ * @ctx: the AioContext to set the handlers on
+ * @io_read: the read handler
+ * @io_write: the write handler
+ * @opaque: the opaque value passed to the handler
+ *
+ * This is used internally by qio_channel_yield(). It can
+ * be used by channel implementations to forward the handlers
+ * to another channel (e.g. from #QIOChannelTLS to the
+ * underlying socket).
+ */
+void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
+ AioContext *ctx,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ void *opaque);
+
#endif /* QIO_CHANNEL_H */
diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index 12584ed..e60beaf 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -112,11 +112,56 @@ bool qemu_in_coroutine(void);
*/
bool qemu_coroutine_entered(Coroutine *co);
+/**
+ * Provides a mutex that can be used to synchronise coroutines
+ */
+struct CoWaitRecord;
+typedef struct CoMutex {
+ /* Count of pending lockers; 0 for a free mutex, 1 for an
+ * uncontended mutex.
+ */
+ unsigned locked;
+
+ /* Context that is holding the lock. Useful to avoid spinning
+ * when two coroutines on the same AioContext try to get the lock. :)
+ */
+ AioContext *ctx;
+
+ /* A queue of waiters. Elements are added atomically in front of
+ * from_push. to_pop is only populated, and popped from, by whoever
+ * is in charge of the next wakeup. This can be an unlocker or,
+ * through the handoff protocol, a locker that is about to go to sleep.
+ */
+ QSLIST_HEAD(, CoWaitRecord) from_push, to_pop;
+
+ unsigned handoff, sequence;
+
+ Coroutine *holder;
+} CoMutex;
+
+/**
+ * Initialises a CoMutex. This must be called before any other operation is used
+ * on the CoMutex.
+ */
+void qemu_co_mutex_init(CoMutex *mutex);
+
+/**
+ * Locks the mutex. If the lock cannot be taken immediately, control is
+ * transferred to the caller of the current coroutine.
+ */
+void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex);
+
+/**
+ * Unlocks the mutex and schedules the next coroutine that was waiting for this
+ * lock to be run.
+ */
+void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex);
+
/**
* CoQueues are a mechanism to queue coroutines in order to continue executing
- * them later. They provide the fundamental primitives on which coroutine locks
- * are built.
+ * them later. They are similar to condition variables, but they need help
+ * from an external mutex in order to maintain thread-safety.
*/
typedef struct CoQueue {
QSIMPLEQ_HEAD(, Coroutine) entries;
@@ -130,9 +175,10 @@ void qemu_co_queue_init(CoQueue *queue);
/**
* Adds the current coroutine to the CoQueue and transfers control to the
- * caller of the coroutine.
+ * caller of the coroutine. The mutex is unlocked during the wait and
+ * locked again afterwards.
*/
-void coroutine_fn qemu_co_queue_wait(CoQueue *queue);
+void coroutine_fn qemu_co_queue_wait(CoQueue *queue, CoMutex *mutex);
/**
* Restarts the next coroutine in the CoQueue and removes it from the queue.
@@ -157,36 +203,10 @@ bool qemu_co_enter_next(CoQueue *queue);
bool qemu_co_queue_empty(CoQueue *queue);
-/**
- * Provides a mutex that can be used to synchronise coroutines
- */
-typedef struct CoMutex {
- bool locked;
- Coroutine *holder;
- CoQueue queue;
-} CoMutex;
-
-/**
- * Initialises a CoMutex. This must be called before any other operation is used
- * on the CoMutex.
- */
-void qemu_co_mutex_init(CoMutex *mutex);
-
-/**
- * Locks the mutex. If the lock cannot be taken immediately, control is
- * transferred to the caller of the current coroutine.
- */
-void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex);
-
-/**
- * Unlocks the mutex and schedules the next coroutine that was waiting for this
- * lock to be run.
- */
-void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex);
-
typedef struct CoRwlock {
- bool writer;
+ int pending_writer;
int reader;
+ CoMutex mutex;
CoQueue queue;
} CoRwlock;
diff --git a/include/qemu/coroutine_int.h b/include/qemu/coroutine_int.h
index 14d4f1d..cb98892 100644
--- a/include/qemu/coroutine_int.h
+++ b/include/qemu/coroutine_int.h
@@ -40,12 +40,21 @@ struct Coroutine {
CoroutineEntry *entry;
void *entry_arg;
Coroutine *caller;
+
+ /* Only used when the coroutine has terminated. */
QSLIST_ENTRY(Coroutine) pool_next;
+
size_t locks_held;
- /* Coroutines that should be woken up when we yield or terminate */
+ /* Coroutines that should be woken up when we yield or terminate.
+ * Only used when the coroutine is running.
+ */
QSIMPLEQ_HEAD(, Coroutine) co_queue_wakeup;
+
+ /* Only used when the coroutine has yielded. */
+ AioContext *ctx;
QSIMPLEQ_ENTRY(Coroutine) co_queue_next;
+ QSLIST_ENTRY(Coroutine) co_scheduled_next;
};
Coroutine *qemu_coroutine_new(void);
diff --git a/include/sysemu/block-backend.h b/include/sysemu/block-backend.h
index 6444e41..f365a51 100644
--- a/include/sysemu/block-backend.h
+++ b/include/sysemu/block-backend.h
@@ -64,14 +64,20 @@ typedef struct BlockDevOps {
* fields that must be public. This is in particular for QLIST_ENTRY() and
* friends so that BlockBackends can be kept in lists outside block-backend.c */
typedef struct BlockBackendPublic {
- /* I/O throttling.
- * throttle_state tells us if this BlockBackend has I/O limits configured.
- * io_limits_disabled tells us if they are currently being enforced */
+ /* I/O throttling has its own locking, but also some fields are
+ * protected by the AioContext lock.
+ */
+
+ /* Protected by AioContext lock. */
CoQueue throttled_reqs[2];
+
+ /* Nonzero if the I/O limits are currently being ignored; generally
+ * it is zero. */
unsigned int io_limits_disabled;
/* The following fields are protected by the ThrottleGroup lock.
- * See the ThrottleGroup documentation for details. */
+ * See the ThrottleGroup documentation for details.
+ * throttle_state tells us if I/O limits are configured. */
ThrottleState *throttle_state;
ThrottleTimers throttle_timers;
unsigned pending_reqs[2];
diff --git a/io/channel-command.c b/io/channel-command.c
index ad25313..319c5ed 100644
--- a/io/channel-command.c
+++ b/io/channel-command.c
@@ -328,6 +328,18 @@ static int qio_channel_command_close(QIOChannel *ioc,
}
+static void qio_channel_command_set_aio_fd_handler(QIOChannel *ioc,
+ AioContext *ctx,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ void *opaque)
+{
+ QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
+ aio_set_fd_handler(ctx, cioc->readfd, false, io_read, NULL, NULL, opaque);
+ aio_set_fd_handler(ctx, cioc->writefd, false, NULL, io_write, NULL, opaque);
+}
+
+
static GSource *qio_channel_command_create_watch(QIOChannel *ioc,
GIOCondition condition)
{
@@ -349,6 +361,7 @@ static void qio_channel_command_class_init(ObjectClass *klass,
ioc_klass->io_set_blocking = qio_channel_command_set_blocking;
ioc_klass->io_close = qio_channel_command_close;
ioc_klass->io_create_watch = qio_channel_command_create_watch;
+ ioc_klass->io_set_aio_fd_handler = qio_channel_command_set_aio_fd_handler;
}
static const TypeInfo qio_channel_command_info = {
diff --git a/io/channel-file.c b/io/channel-file.c
index e1da243..b383273 100644
--- a/io/channel-file.c
+++ b/io/channel-file.c
@@ -186,6 +186,16 @@ static int qio_channel_file_close(QIOChannel *ioc,
}
+static void qio_channel_file_set_aio_fd_handler(QIOChannel *ioc,
+ AioContext *ctx,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ void *opaque)
+{
+ QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);
+ aio_set_fd_handler(ctx, fioc->fd, false, io_read, io_write, NULL, opaque);
+}
+
static GSource *qio_channel_file_create_watch(QIOChannel *ioc,
GIOCondition condition)
{
@@ -206,6 +216,7 @@ static void qio_channel_file_class_init(ObjectClass *klass,
ioc_klass->io_seek = qio_channel_file_seek;
ioc_klass->io_close = qio_channel_file_close;
ioc_klass->io_create_watch = qio_channel_file_create_watch;
+ ioc_klass->io_set_aio_fd_handler = qio_channel_file_set_aio_fd_handler;
}
static const TypeInfo qio_channel_file_info = {
diff --git a/io/channel-socket.c b/io/channel-socket.c
index f385233..f546c68 100644
--- a/io/channel-socket.c
+++ b/io/channel-socket.c
@@ -649,11 +649,6 @@ qio_channel_socket_set_blocking(QIOChannel *ioc,
qemu_set_block(sioc->fd);
} else {
qemu_set_nonblock(sioc->fd);
-#ifdef WIN32
- WSAEventSelect(sioc->fd, ioc->event,
- FD_READ | FD_ACCEPT | FD_CLOSE |
- FD_CONNECT | FD_WRITE | FD_OOB);
-#endif
}
return 0;
}
@@ -733,6 +728,16 @@ qio_channel_socket_shutdown(QIOChannel *ioc,
return 0;
}
+static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
+ AioContext *ctx,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ void *opaque)
+{
+ QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+ aio_set_fd_handler(ctx, sioc->fd, false, io_read, io_write, NULL, opaque);
+}
+
static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,
GIOCondition condition)
{
@@ -755,6 +760,7 @@ static void qio_channel_socket_class_init(ObjectClass *klass,
ioc_klass->io_set_cork = qio_channel_socket_set_cork;
ioc_klass->io_set_delay = qio_channel_socket_set_delay;
ioc_klass->io_create_watch = qio_channel_socket_create_watch;
+ ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler;
}
static const TypeInfo qio_channel_socket_info = {
diff --git a/io/channel-tls.c b/io/channel-tls.c
index f25ab0a..6182702 100644
--- a/io/channel-tls.c
+++ b/io/channel-tls.c
@@ -345,6 +345,17 @@ static int qio_channel_tls_close(QIOChannel *ioc,
return qio_channel_close(tioc->master, errp);
}
+static void qio_channel_tls_set_aio_fd_handler(QIOChannel *ioc,
+ AioContext *ctx,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ void *opaque)
+{
+ QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc);
+
+ qio_channel_set_aio_fd_handler(tioc->master, ctx, io_read, io_write, opaque);
+}
+
static GSource *qio_channel_tls_create_watch(QIOChannel *ioc,
GIOCondition condition)
{
@@ -372,6 +383,7 @@ static void qio_channel_tls_class_init(ObjectClass *klass,
ioc_klass->io_close = qio_channel_tls_close;
ioc_klass->io_shutdown = qio_channel_tls_shutdown;
ioc_klass->io_create_watch = qio_channel_tls_create_watch;
+ ioc_klass->io_set_aio_fd_handler = qio_channel_tls_set_aio_fd_handler;
}
static const TypeInfo qio_channel_tls_info = {
diff --git a/io/channel-watch.c b/io/channel-watch.c
index cf1cdff..8640d1c 100644
--- a/io/channel-watch.c
+++ b/io/channel-watch.c
@@ -285,6 +285,12 @@ GSource *qio_channel_create_socket_watch(QIOChannel *ioc,
GSource *source;
QIOChannelSocketSource *ssource;
+#ifdef WIN32
+ WSAEventSelect(socket, ioc->event,
+ FD_READ | FD_ACCEPT | FD_CLOSE |
+ FD_CONNECT | FD_WRITE | FD_OOB);
+#endif
+
source = g_source_new(&qio_channel_socket_source_funcs,
sizeof(QIOChannelSocketSource));
ssource = (QIOChannelSocketSource *)source;
diff --git a/io/channel.c b/io/channel.c
index 80924c1..cdf7454 100644
--- a/io/channel.c
+++ b/io/channel.c
@@ -21,7 +21,7 @@
#include "qemu/osdep.h"
#include "io/channel.h"
#include "qapi/error.h"
-#include "qemu/coroutine.h"
+#include "qemu/main-loop.h"
bool qio_channel_has_feature(QIOChannel *ioc,
QIOChannelFeature feature)
@@ -154,6 +154,17 @@ GSource *qio_channel_create_watch(QIOChannel *ioc,
}
+void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
+ AioContext *ctx,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ void *opaque)
+{
+ QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+ klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque);
+}
+
guint qio_channel_add_watch(QIOChannel *ioc,
GIOCondition condition,
QIOChannelFunc func,
@@ -227,36 +238,80 @@ off_t qio_channel_io_seek(QIOChannel *ioc,
}
-typedef struct QIOChannelYieldData QIOChannelYieldData;
-struct QIOChannelYieldData {
- QIOChannel *ioc;
- Coroutine *co;
-};
+static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc);
+
+static void qio_channel_restart_read(void *opaque)
+{
+ QIOChannel *ioc = opaque;
+ Coroutine *co = ioc->read_coroutine;
+ ioc->read_coroutine = NULL;
+ qio_channel_set_aio_fd_handlers(ioc);
+ aio_co_wake(co);
+}
-static gboolean qio_channel_yield_enter(QIOChannel *ioc,
- GIOCondition condition,
- gpointer opaque)
+static void qio_channel_restart_write(void *opaque)
{
- QIOChannelYieldData *data = opaque;
- qemu_coroutine_enter(data->co);
- return FALSE;
+ QIOChannel *ioc = opaque;
+ Coroutine *co = ioc->write_coroutine;
+
+ ioc->write_coroutine = NULL;
+ qio_channel_set_aio_fd_handlers(ioc);
+ aio_co_wake(co);
+}
+
+static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
+{
+ IOHandler *rd_handler = NULL, *wr_handler = NULL;
+ AioContext *ctx;
+
+ if (ioc->read_coroutine) {
+ rd_handler = qio_channel_restart_read;
+ }
+ if (ioc->write_coroutine) {
+ wr_handler = qio_channel_restart_write;
+ }
+
+ ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
+ qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc);
+}
+
+void qio_channel_attach_aio_context(QIOChannel *ioc,
+ AioContext *ctx)
+{
+ AioContext *old_ctx;
+ if (ioc->ctx == ctx) {
+ return;
+ }
+
+ old_ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
+ qio_channel_set_aio_fd_handler(ioc, old_ctx, NULL, NULL, NULL);
+ ioc->ctx = ctx;
+ qio_channel_set_aio_fd_handlers(ioc);
}
+void qio_channel_detach_aio_context(QIOChannel *ioc)
+{
+ ioc->read_coroutine = NULL;
+ ioc->write_coroutine = NULL;
+ qio_channel_set_aio_fd_handlers(ioc);
+ ioc->ctx = NULL;
+}
void coroutine_fn qio_channel_yield(QIOChannel *ioc,
GIOCondition condition)
{
- QIOChannelYieldData data;
-
assert(qemu_in_coroutine());
- data.ioc = ioc;
- data.co = qemu_coroutine_self();
- qio_channel_add_watch(ioc,
- condition,
- qio_channel_yield_enter,
- &data,
- NULL);
+ if (condition == G_IO_IN) {
+ assert(!ioc->read_coroutine);
+ ioc->read_coroutine = qemu_coroutine_self();
+ } else if (condition == G_IO_OUT) {
+ assert(!ioc->write_coroutine);
+ ioc->write_coroutine = qemu_coroutine_self();
+ } else {
+ abort();
+ }
+ qio_channel_set_aio_fd_handlers(ioc);
qemu_coroutine_yield();
}
diff --git a/nbd/client.c b/nbd/client.c
index ffb0743..5c9dee3 100644
--- a/nbd/client.c
+++ b/nbd/client.c
@@ -778,7 +778,7 @@ ssize_t nbd_receive_reply(QIOChannel *ioc, NBDReply *reply)
ssize_t ret;
ret = read_sync(ioc, buf, sizeof(buf));
- if (ret < 0) {
+ if (ret <= 0) {
return ret;
}
diff --git a/nbd/common.c b/nbd/common.c
index a5f39ea..dccbb8e 100644
--- a/nbd/common.c
+++ b/nbd/common.c
@@ -43,14 +43,7 @@ ssize_t nbd_wr_syncv(QIOChannel *ioc,
}
if (len == QIO_CHANNEL_ERR_BLOCK) {
if (qemu_in_coroutine()) {
- /* XXX figure out if we can create a variant on
- * qio_channel_yield() that works with AIO contexts
- * and consider using that in this branch */
- qemu_coroutine_yield();
- } else if (done) {
- /* XXX this is needed by nbd_reply_ready. */
- qio_channel_wait(ioc,
- do_read ? G_IO_IN : G_IO_OUT);
+ qio_channel_yield(ioc, do_read ? G_IO_IN : G_IO_OUT);
} else {
return -EAGAIN;
}
diff --git a/nbd/server.c b/nbd/server.c
index efe5cb8..ac92fa0 100644
--- a/nbd/server.c
+++ b/nbd/server.c
@@ -95,8 +95,6 @@ struct NBDClient {
CoMutex send_lock;
Coroutine *send_coroutine;
- bool can_read;
-
QTAILQ_ENTRY(NBDClient) next;
int nb_requests;
bool closing;
@@ -104,9 +102,7 @@ struct NBDClient {
/* That's all folks */
-static void nbd_set_handlers(NBDClient *client);
-static void nbd_unset_handlers(NBDClient *client);
-static void nbd_update_can_read(NBDClient *client);
+static void nbd_client_receive_next_request(NBDClient *client);
static gboolean nbd_negotiate_continue(QIOChannel *ioc,
GIOCondition condition,
@@ -785,7 +781,7 @@ void nbd_client_put(NBDClient *client)
*/
assert(client->closing);
- nbd_unset_handlers(client);
+ qio_channel_detach_aio_context(client->ioc);
object_unref(OBJECT(client->sioc));
object_unref(OBJECT(client->ioc));
if (client->tlscreds) {
@@ -826,7 +822,6 @@ static NBDRequestData *nbd_request_get(NBDClient *client)
assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
client->nb_requests++;
- nbd_update_can_read(client);
req = g_new0(NBDRequestData, 1);
nbd_client_get(client);
@@ -844,7 +839,8 @@ static void nbd_request_put(NBDRequestData *req)
g_free(req);
client->nb_requests--;
- nbd_update_can_read(client);
+ nbd_client_receive_next_request(client);
+
nbd_client_put(client);
}
@@ -858,7 +854,13 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
exp->ctx = ctx;
QTAILQ_FOREACH(client, &exp->clients, next) {
- nbd_set_handlers(client);
+ qio_channel_attach_aio_context(client->ioc, ctx);
+ if (client->recv_coroutine) {
+ aio_co_schedule(ctx, client->recv_coroutine);
+ }
+ if (client->send_coroutine) {
+ aio_co_schedule(ctx, client->send_coroutine);
+ }
}
}
@@ -870,7 +872,7 @@ static void blk_aio_detach(void *opaque)
TRACE("Export %s: Detaching clients from AIO context %p\n", exp->name, exp->ctx);
QTAILQ_FOREACH(client, &exp->clients, next) {
- nbd_unset_handlers(client);
+ qio_channel_detach_aio_context(client->ioc);
}
exp->ctx = NULL;
@@ -1045,7 +1047,6 @@ static ssize_t nbd_co_send_reply(NBDRequestData *req, NBDReply *reply,
g_assert(qemu_in_coroutine());
qemu_co_mutex_lock(&client->send_lock);
client->send_coroutine = qemu_coroutine_self();
- nbd_set_handlers(client);
if (!len) {
rc = nbd_send_reply(client->ioc, reply);
@@ -1062,7 +1063,6 @@ static ssize_t nbd_co_send_reply(NBDRequestData *req, NBDReply *reply,
}
client->send_coroutine = NULL;
- nbd_set_handlers(client);
qemu_co_mutex_unlock(&client->send_lock);
return rc;
}
@@ -1079,9 +1079,7 @@ static ssize_t nbd_co_receive_request(NBDRequestData *req,
ssize_t rc;
g_assert(qemu_in_coroutine());
- client->recv_coroutine = qemu_coroutine_self();
- nbd_update_can_read(client);
-
+ assert(client->recv_coroutine == qemu_coroutine_self());
rc = nbd_receive_request(client->ioc, request);
if (rc < 0) {
if (rc != -EAGAIN) {
@@ -1163,23 +1161,25 @@ static ssize_t nbd_co_receive_request(NBDRequestData *req,
out:
client->recv_coroutine = NULL;
- nbd_update_can_read(client);
+ nbd_client_receive_next_request(client);
return rc;
}
-static void nbd_trip(void *opaque)
+/* Owns a reference to the NBDClient passed as opaque. */
+static coroutine_fn void nbd_trip(void *opaque)
{
NBDClient *client = opaque;
NBDExport *exp = client->exp;
NBDRequestData *req;
- NBDRequest request;
+ NBDRequest request = { 0 }; /* GCC thinks it can be used uninitialized */
NBDReply reply;
ssize_t ret;
int flags;
TRACE("Reading request.");
if (client->closing) {
+ nbd_client_put(client);
return;
}
@@ -1338,60 +1338,21 @@ static void nbd_trip(void *opaque)
done:
nbd_request_put(req);
+ nbd_client_put(client);
return;
out:
nbd_request_put(req);
client_close(client);
+ nbd_client_put(client);
}
-static void nbd_read(void *opaque)
-{
- NBDClient *client = opaque;
-
- if (client->recv_coroutine) {
- qemu_coroutine_enter(client->recv_coroutine);
- } else {
- qemu_coroutine_enter(qemu_coroutine_create(nbd_trip, client));
- }
-}
-
-static void nbd_restart_write(void *opaque)
-{
- NBDClient *client = opaque;
-
- qemu_coroutine_enter(client->send_coroutine);
-}
-
-static void nbd_set_handlers(NBDClient *client)
-{
- if (client->exp && client->exp->ctx) {
- aio_set_fd_handler(client->exp->ctx, client->sioc->fd, true,
- client->can_read ? nbd_read : NULL,
- client->send_coroutine ? nbd_restart_write : NULL,
- NULL, client);
- }
-}
-
-static void nbd_unset_handlers(NBDClient *client)
-{
- if (client->exp && client->exp->ctx) {
- aio_set_fd_handler(client->exp->ctx, client->sioc->fd, true, NULL,
- NULL, NULL, NULL);
- }
-}
-
-static void nbd_update_can_read(NBDClient *client)
+static void nbd_client_receive_next_request(NBDClient *client)
{
- bool can_read = client->recv_coroutine ||
- client->nb_requests < MAX_NBD_REQUESTS;
-
- if (can_read != client->can_read) {
- client->can_read = can_read;
- nbd_set_handlers(client);
-
- /* There is no need to invoke aio_notify(), since aio_set_fd_handler()
- * in nbd_set_handlers() will have taken care of that */
+ if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) {
+ nbd_client_get(client);
+ client->recv_coroutine = qemu_coroutine_create(nbd_trip, client);
+ aio_co_schedule(client->exp->ctx, client->recv_coroutine);
}
}
@@ -1409,11 +1370,13 @@ static coroutine_fn void nbd_co_client_start(void *opaque)
goto out;
}
qemu_co_mutex_init(&client->send_lock);
- nbd_set_handlers(client);
if (exp) {
QTAILQ_INSERT_TAIL(&exp->clients, client, next);
}
+
+ nbd_client_receive_next_request(client);
+
out:
g_free(data);
}
@@ -1439,7 +1402,6 @@ void nbd_client_new(NBDExport *exp,
object_ref(OBJECT(client->sioc));
client->ioc = QIO_CHANNEL(sioc);
object_ref(OBJECT(client->ioc));
- client->can_read = true;
client->close = close_fn;
data->client = client;
diff --git a/stubs/Makefile.objs b/stubs/Makefile.objs
index a187295..aa6050f 100644
--- a/stubs/Makefile.objs
+++ b/stubs/Makefile.objs
@@ -16,6 +16,7 @@ stub-obj-y += get-vm-name.o
stub-obj-y += iothread.o
stub-obj-y += iothread-lock.o
stub-obj-y += is-daemonized.o
+stub-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
stub-obj-y += machine-init-done.o
stub-obj-y += migr-blocker.o
stub-obj-y += monitor.o
diff --git a/stubs/linux-aio.c b/stubs/linux-aio.c
new file mode 100644
index 0000000..ed47bd4
--- /dev/null
+++ b/stubs/linux-aio.c
@@ -0,0 +1,32 @@
+/*
+ * Linux native AIO support.
+ *
+ * Copyright (C) 2009 IBM, Corp.
+ * Copyright (C) 2009 Red Hat, Inc.
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+#include "qemu/osdep.h"
+#include "block/aio.h"
+#include "block/raw-aio.h"
+
+void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context)
+{
+ abort();
+}
+
+void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context)
+{
+ abort();
+}
+
+LinuxAioState *laio_init(void)
+{
+ abort();
+}
+
+void laio_cleanup(LinuxAioState *s)
+{
+ abort();
+}
diff --git a/stubs/set-fd-handler.c b/stubs/set-fd-handler.c
index acbe65c..26965de 100644
--- a/stubs/set-fd-handler.c
+++ b/stubs/set-fd-handler.c
@@ -9,14 +9,3 @@ void qemu_set_fd_handler(int fd,
{
abort();
}
-
-void aio_set_fd_handler(AioContext *ctx,
- int fd,
- bool is_external,
- IOHandler *io_read,
- IOHandler *io_write,
- AioPollFn *io_poll,
- void *opaque)
-{
- abort();
-}
diff --git a/tests/Makefile.include b/tests/Makefile.include
index 634394a..e60bb6c 100644
--- a/tests/Makefile.include
+++ b/tests/Makefile.include
@@ -45,9 +45,13 @@ check-unit-y += tests/test-visitor-serialization$(EXESUF)
check-unit-y += tests/test-iov$(EXESUF)
gcov-files-test-iov-y = util/iov.c
check-unit-y += tests/test-aio$(EXESUF)
+gcov-files-test-aio-y = util/async.c util/qemu-timer.o
+gcov-files-test-aio-$(CONFIG_WIN32) += util/aio-win32.c
+gcov-files-test-aio-$(CONFIG_POSIX) += util/aio-posix.c
+check-unit-y += tests/test-aio-multithread$(EXESUF)
+gcov-files-test-aio-multithread-y = $(gcov-files-test-aio-y)
+gcov-files-test-aio-multithread-y += util/qemu-coroutine.c tests/iothread.c
check-unit-y += tests/test-throttle$(EXESUF)
-gcov-files-test-aio-$(CONFIG_WIN32) = aio-win32.c
-gcov-files-test-aio-$(CONFIG_POSIX) = aio-posix.c
check-unit-y += tests/test-thread-pool$(EXESUF)
gcov-files-test-thread-pool-y = thread-pool.c
gcov-files-test-hbitmap-y = util/hbitmap.c
@@ -505,7 +509,7 @@ test-qapi-obj-y = tests/test-qapi-visit.o tests/test-qapi-types.o \
$(test-qom-obj-y)
test-crypto-obj-y = $(crypto-obj-y) $(test-qom-obj-y)
test-io-obj-y = $(io-obj-y) $(test-crypto-obj-y)
-test-block-obj-y = $(block-obj-y) $(test-io-obj-y)
+test-block-obj-y = $(block-obj-y) $(test-io-obj-y) tests/iothread.o
tests/check-qint$(EXESUF): tests/check-qint.o $(test-util-obj-y)
tests/check-qstring$(EXESUF): tests/check-qstring.o $(test-util-obj-y)
@@ -517,10 +521,10 @@ tests/check-qjson$(EXESUF): tests/check-qjson.o $(test-util-obj-y)
tests/check-qom-interface$(EXESUF): tests/check-qom-interface.o $(test-qom-obj-y)
tests/check-qom-proplist$(EXESUF): tests/check-qom-proplist.o $(test-qom-obj-y)
-tests/test-char$(EXESUF): tests/test-char.o qemu-timer.o \
- $(test-util-obj-y) $(qtest-obj-y) $(test-block-obj-y) $(chardev-obj-y)
+tests/test-char$(EXESUF): tests/test-char.o $(test-util-obj-y) $(qtest-obj-y) $(test-io-obj-y) $(chardev-obj-y)
tests/test-coroutine$(EXESUF): tests/test-coroutine.o $(test-block-obj-y)
tests/test-aio$(EXESUF): tests/test-aio.o $(test-block-obj-y)
+tests/test-aio-multithread$(EXESUF): tests/test-aio-multithread.o $(test-block-obj-y)
tests/test-throttle$(EXESUF): tests/test-throttle.o $(test-block-obj-y)
tests/test-blockjob$(EXESUF): tests/test-blockjob.o $(test-block-obj-y) $(test-util-obj-y)
tests/test-blockjob-txn$(EXESUF): tests/test-blockjob-txn.o $(test-block-obj-y) $(test-util-obj-y)
@@ -551,8 +555,7 @@ tests/test-vmstate$(EXESUF): tests/test-vmstate.o \
migration/vmstate.o migration/qemu-file.o \
migration/qemu-file-channel.o migration/qjson.o \
$(test-io-obj-y)
-tests/test-timed-average$(EXESUF): tests/test-timed-average.o qemu-timer.o \
- $(test-util-obj-y)
+tests/test-timed-average$(EXESUF): tests/test-timed-average.o $(test-util-obj-y)
tests/test-base64$(EXESUF): tests/test-base64.o \
libqemuutil.a libqemustub.a
tests/ptimer-test$(EXESUF): tests/ptimer-test.o tests/ptimer-test-stubs.o hw/core/ptimer.o libqemustub.a
@@ -712,7 +715,7 @@ tests/usb-hcd-ehci-test$(EXESUF): tests/usb-hcd-ehci-test.o $(libqos-usb-obj-y)
tests/usb-hcd-xhci-test$(EXESUF): tests/usb-hcd-xhci-test.o $(libqos-usb-obj-y)
tests/pc-cpu-test$(EXESUF): tests/pc-cpu-test.o
tests/postcopy-test$(EXESUF): tests/postcopy-test.o
-tests/vhost-user-test$(EXESUF): tests/vhost-user-test.o qemu-timer.o \
+tests/vhost-user-test$(EXESUF): tests/vhost-user-test.o $(test-util-obj-y) \
$(qtest-obj-y) $(test-io-obj-y) $(libqos-virtio-obj-y) $(libqos-pc-obj-y) \
$(chardev-obj-y)
tests/qemu-iotests/socket_scm_helper$(EXESUF): tests/qemu-iotests/socket_scm_helper.o
diff --git a/tests/iothread.c b/tests/iothread.c
new file mode 100644
index 0000000..777d9ee
--- /dev/null
+++ b/tests/iothread.c
@@ -0,0 +1,91 @@
+/*
+ * Event loop thread implementation for unit tests
+ *
+ * Copyright Red Hat Inc., 2013, 2016
+ *
+ * Authors:
+ * Stefan Hajnoczi <stefanha@redhat.com>
+ * Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "qemu/osdep.h"
+#include "qapi/error.h"
+#include "block/aio.h"
+#include "qemu/main-loop.h"
+#include "qemu/rcu.h"
+#include "iothread.h"
+
+struct IOThread {
+ AioContext *ctx;
+
+ QemuThread thread;
+ QemuMutex init_done_lock;
+ QemuCond init_done_cond; /* is thread initialization done? */
+ bool stopping;
+};
+
+static __thread IOThread *my_iothread;
+
+AioContext *qemu_get_current_aio_context(void)
+{
+ return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
+}
+
+static void *iothread_run(void *opaque)
+{
+ IOThread *iothread = opaque;
+
+ rcu_register_thread();
+
+ my_iothread = iothread;
+ qemu_mutex_lock(&iothread->init_done_lock);
+ iothread->ctx = aio_context_new(&error_abort);
+ qemu_cond_signal(&iothread->init_done_cond);
+ qemu_mutex_unlock(&iothread->init_done_lock);
+
+ while (!atomic_read(&iothread->stopping)) {
+ aio_poll(iothread->ctx, true);
+ }
+
+ rcu_unregister_thread();
+ return NULL;
+}
+
+void iothread_join(IOThread *iothread)
+{
+ iothread->stopping = true;
+ aio_notify(iothread->ctx);
+ qemu_thread_join(&iothread->thread);
+ qemu_cond_destroy(&iothread->init_done_cond);
+ qemu_mutex_destroy(&iothread->init_done_lock);
+ aio_context_unref(iothread->ctx);
+ g_free(iothread);
+}
+
+IOThread *iothread_new(void)
+{
+ IOThread *iothread = g_new0(IOThread, 1);
+
+ qemu_mutex_init(&iothread->init_done_lock);
+ qemu_cond_init(&iothread->init_done_cond);
+ qemu_thread_create(&iothread->thread, NULL, iothread_run,
+ iothread, QEMU_THREAD_JOINABLE);
+
+ /* Wait for initialization to complete */
+ qemu_mutex_lock(&iothread->init_done_lock);
+ while (iothread->ctx == NULL) {
+ qemu_cond_wait(&iothread->init_done_cond,
+ &iothread->init_done_lock);
+ }
+ qemu_mutex_unlock(&iothread->init_done_lock);
+ return iothread;
+}
+
+AioContext *iothread_get_aio_context(IOThread *iothread)
+{
+ return iothread->ctx;
+}
diff --git a/tests/iothread.h b/tests/iothread.h
new file mode 100644
index 0000000..4877cea
--- /dev/null
+++ b/tests/iothread.h
@@ -0,0 +1,25 @@
+/*
+ * Event loop thread implementation for unit tests
+ *
+ * Copyright Red Hat Inc., 2013, 2016
+ *
+ * Authors:
+ * Stefan Hajnoczi <stefanha@redhat.com>
+ * Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+#ifndef TEST_IOTHREAD_H
+#define TEST_IOTHREAD_H
+
+#include "block/aio.h"
+#include "qemu/thread.h"
+
+typedef struct IOThread IOThread;
+
+IOThread *iothread_new(void);
+void iothread_join(IOThread *iothread);
+AioContext *iothread_get_aio_context(IOThread *iothread);
+
+#endif
diff --git a/tests/test-aio-multithread.c b/tests/test-aio-multithread.c
new file mode 100644
index 0000000..f11e990
--- /dev/null
+++ b/tests/test-aio-multithread.c
@@ -0,0 +1,463 @@
+/*
+ * AioContext multithreading tests
+ *
+ * Copyright Red Hat, Inc. 2016
+ *
+ * Authors:
+ * Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include <glib.h>
+#include "block/aio.h"
+#include "qapi/error.h"
+#include "qemu/coroutine.h"
+#include "qemu/thread.h"
+#include "qemu/error-report.h"
+#include "iothread.h"
+
+/* AioContext management */
+
+#define NUM_CONTEXTS 5
+
+static IOThread *threads[NUM_CONTEXTS];
+static AioContext *ctx[NUM_CONTEXTS];
+static __thread int id = -1;
+
+static QemuEvent done_event;
+
+/* Run a function synchronously on a remote iothread. */
+
+typedef struct CtxRunData {
+ QEMUBHFunc *cb;
+ void *arg;
+} CtxRunData;
+
+static void ctx_run_bh_cb(void *opaque)
+{
+ CtxRunData *data = opaque;
+
+ data->cb(data->arg);
+ qemu_event_set(&done_event);
+}
+
+static void ctx_run(int i, QEMUBHFunc *cb, void *opaque)
+{
+ CtxRunData data = {
+ .cb = cb,
+ .arg = opaque
+ };
+
+ qemu_event_reset(&done_event);
+ aio_bh_schedule_oneshot(ctx[i], ctx_run_bh_cb, &data);
+ qemu_event_wait(&done_event);
+}
+
+/* Starting the iothreads. */
+
+static void set_id_cb(void *opaque)
+{
+ int *i = opaque;
+
+ id = *i;
+}
+
+static void create_aio_contexts(void)
+{
+ int i;
+
+ for (i = 0; i < NUM_CONTEXTS; i++) {
+ threads[i] = iothread_new();
+ ctx[i] = iothread_get_aio_context(threads[i]);
+ }
+
+ qemu_event_init(&done_event, false);
+ for (i = 0; i < NUM_CONTEXTS; i++) {
+ ctx_run(i, set_id_cb, &i);
+ }
+}
+
+/* Stopping the iothreads. */
+
+static void join_aio_contexts(void)
+{
+ int i;
+
+ for (i = 0; i < NUM_CONTEXTS; i++) {
+ aio_context_ref(ctx[i]);
+ }
+ for (i = 0; i < NUM_CONTEXTS; i++) {
+ iothread_join(threads[i]);
+ }
+ for (i = 0; i < NUM_CONTEXTS; i++) {
+ aio_context_unref(ctx[i]);
+ }
+ qemu_event_destroy(&done_event);
+}
+
+/* Basic test for the stuff above. */
+
+static void test_lifecycle(void)
+{
+ create_aio_contexts();
+ join_aio_contexts();
+}
+
+/* aio_co_schedule test. */
+
+static Coroutine *to_schedule[NUM_CONTEXTS];
+
+static bool now_stopping;
+
+static int count_retry;
+static int count_here;
+static int count_other;
+
+static bool schedule_next(int n)
+{
+ Coroutine *co;
+
+ co = atomic_xchg(&to_schedule[n], NULL);
+ if (!co) {
+ atomic_inc(&count_retry);
+ return false;
+ }
+
+ if (n == id) {
+ atomic_inc(&count_here);
+ } else {
+ atomic_inc(&count_other);
+ }
+
+ aio_co_schedule(ctx[n], co);
+ return true;
+}
+
+static void finish_cb(void *opaque)
+{
+ schedule_next(id);
+}
+
+static coroutine_fn void test_multi_co_schedule_entry(void *opaque)
+{
+ g_assert(to_schedule[id] == NULL);
+ atomic_mb_set(&to_schedule[id], qemu_coroutine_self());
+
+ while (!atomic_mb_read(&now_stopping)) {
+ int n;
+
+ n = g_test_rand_int_range(0, NUM_CONTEXTS);
+ schedule_next(n);
+ qemu_coroutine_yield();
+
+ g_assert(to_schedule[id] == NULL);
+ atomic_mb_set(&to_schedule[id], qemu_coroutine_self());
+ }
+}
+
+
+static void test_multi_co_schedule(int seconds)
+{
+ int i;
+
+ count_here = count_other = count_retry = 0;
+ now_stopping = false;
+
+ create_aio_contexts();
+ for (i = 0; i < NUM_CONTEXTS; i++) {
+ Coroutine *co1 = qemu_coroutine_create(test_multi_co_schedule_entry, NULL);
+ aio_co_schedule(ctx[i], co1);
+ }
+
+ g_usleep(seconds * 1000000);
+
+ atomic_mb_set(&now_stopping, true);
+ for (i = 0; i < NUM_CONTEXTS; i++) {
+ ctx_run(i, finish_cb, NULL);
+ to_schedule[i] = NULL;
+ }
+
+ join_aio_contexts();
+ g_test_message("scheduled %d, queued %d, retry %d, total %d\n",
+ count_other, count_here, count_retry,
+ count_here + count_other + count_retry);
+}
+
+static void test_multi_co_schedule_1(void)
+{
+ test_multi_co_schedule(1);
+}
+
+static void test_multi_co_schedule_10(void)
+{
+ test_multi_co_schedule(10);
+}
+
+/* CoMutex thread-safety. */
+
+static uint32_t atomic_counter;
+static uint32_t running;
+static uint32_t counter;
+static CoMutex comutex;
+
+static void coroutine_fn test_multi_co_mutex_entry(void *opaque)
+{
+ while (!atomic_mb_read(&now_stopping)) {
+ qemu_co_mutex_lock(&comutex);
+ counter++;
+ qemu_co_mutex_unlock(&comutex);
+
+ /* Increase atomic_counter *after* releasing the mutex. Otherwise
+ * there is a chance (it happens about 1 in 3 runs) that the iothread
+ * exits before the coroutine is woken up, causing a spurious
+ * assertion failure.
+ */
+ atomic_inc(&atomic_counter);
+ }
+ atomic_dec(&running);
+}
+
+static void test_multi_co_mutex(int threads, int seconds)
+{
+ int i;
+
+ qemu_co_mutex_init(&comutex);
+ counter = 0;
+ atomic_counter = 0;
+ now_stopping = false;
+
+ create_aio_contexts();
+ assert(threads <= NUM_CONTEXTS);
+ running = threads;
+ for (i = 0; i < threads; i++) {
+ Coroutine *co1 = qemu_coroutine_create(test_multi_co_mutex_entry, NULL);
+ aio_co_schedule(ctx[i], co1);
+ }
+
+ g_usleep(seconds * 1000000);
+
+ atomic_mb_set(&now_stopping, true);
+ while (running > 0) {
+ g_usleep(100000);
+ }
+
+ join_aio_contexts();
+ g_test_message("%d iterations/second\n", counter / seconds);
+ g_assert_cmpint(counter, ==, atomic_counter);
+}
+
+/* Testing with NUM_CONTEXTS threads focuses on the queue. The mutex however
+ * is too contended (and the threads spend too much time in aio_poll)
+ * to actually stress the handoff protocol.
+ */
+static void test_multi_co_mutex_1(void)
+{
+ test_multi_co_mutex(NUM_CONTEXTS, 1);
+}
+
+static void test_multi_co_mutex_10(void)
+{
+ test_multi_co_mutex(NUM_CONTEXTS, 10);
+}
+
+/* Testing with fewer threads stresses the handoff protocol too. Still, the
+ * case where the locker _can_ pick up a handoff is very rare, happening
+ * about 10 times in 1 million, so increase the runtime a bit compared to
+ * other "quick" testcases that only run for 1 second.
+ */
+static void test_multi_co_mutex_2_3(void)
+{
+ test_multi_co_mutex(2, 3);
+}
+
+static void test_multi_co_mutex_2_30(void)
+{
+ test_multi_co_mutex(2, 30);
+}
+
+/* Same test with fair mutexes, for performance comparison. */
+
+#ifdef CONFIG_LINUX
+#include "qemu/futex.h"
+
+/* The nodes for the mutex reside in this structure (on which we try to avoid
+ * false sharing). The head of the mutex is in the "mutex_head" variable.
+ */
+static struct {
+ int next, locked;
+ int padding[14];
+} nodes[NUM_CONTEXTS] __attribute__((__aligned__(64)));
+
+static int mutex_head = -1;
+
+static void mcs_mutex_lock(void)
+{
+ int prev;
+
+ nodes[id].next = -1;
+ nodes[id].locked = 1;
+ prev = atomic_xchg(&mutex_head, id);
+ if (prev != -1) {
+ atomic_set(&nodes[prev].next, id);
+ qemu_futex_wait(&nodes[id].locked, 1);
+ }
+}
+
+static void mcs_mutex_unlock(void)
+{
+ int next;
+ if (nodes[id].next == -1) {
+ if (atomic_read(&mutex_head) == id &&
+ atomic_cmpxchg(&mutex_head, id, -1) == id) {
+ /* Last item in the list, exit. */
+ return;
+ }
+ while (atomic_read(&nodes[id].next) == -1) {
+ /* mcs_mutex_lock did the xchg, but has not updated
+ * nodes[prev].next yet.
+ */
+ }
+ }
+
+ /* Wake up the next in line. */
+ next = nodes[id].next;
+ nodes[next].locked = 0;
+ qemu_futex_wake(&nodes[next].locked, 1);
+}
+
+static void test_multi_fair_mutex_entry(void *opaque)
+{
+ while (!atomic_mb_read(&now_stopping)) {
+ mcs_mutex_lock();
+ counter++;
+ mcs_mutex_unlock();
+ atomic_inc(&atomic_counter);
+ }
+ atomic_dec(&running);
+}
+
+static void test_multi_fair_mutex(int threads, int seconds)
+{
+ int i;
+
+ assert(mutex_head == -1);
+ counter = 0;
+ atomic_counter = 0;
+ now_stopping = false;
+
+ create_aio_contexts();
+ assert(threads <= NUM_CONTEXTS);
+ running = threads;
+ for (i = 0; i < threads; i++) {
+ Coroutine *co1 = qemu_coroutine_create(test_multi_fair_mutex_entry, NULL);
+ aio_co_schedule(ctx[i], co1);
+ }
+
+ g_usleep(seconds * 1000000);
+
+ atomic_mb_set(&now_stopping, true);
+ while (running > 0) {
+ g_usleep(100000);
+ }
+
+ join_aio_contexts();
+ g_test_message("%d iterations/second\n", counter / seconds);
+ g_assert_cmpint(counter, ==, atomic_counter);
+}
+
+static void test_multi_fair_mutex_1(void)
+{
+ test_multi_fair_mutex(NUM_CONTEXTS, 1);
+}
+
+static void test_multi_fair_mutex_10(void)
+{
+ test_multi_fair_mutex(NUM_CONTEXTS, 10);
+}
+#endif
+
+/* Same test with pthread mutexes, for performance comparison and
+ * portability. */
+
+static QemuMutex mutex;
+
+static void test_multi_mutex_entry(void *opaque)
+{
+ while (!atomic_mb_read(&now_stopping)) {
+ qemu_mutex_lock(&mutex);
+ counter++;
+ qemu_mutex_unlock(&mutex);
+ atomic_inc(&atomic_counter);
+ }
+ atomic_dec(&running);
+}
+
+static void test_multi_mutex(int threads, int seconds)
+{
+ int i;
+
+ qemu_mutex_init(&mutex);
+ counter = 0;
+ atomic_counter = 0;
+ now_stopping = false;
+
+ create_aio_contexts();
+ assert(threads <= NUM_CONTEXTS);
+ running = threads;
+ for (i = 0; i < threads; i++) {
+ Coroutine *co1 = qemu_coroutine_create(test_multi_mutex_entry, NULL);
+ aio_co_schedule(ctx[i], co1);
+ }
+
+ g_usleep(seconds * 1000000);
+
+ atomic_mb_set(&now_stopping, true);
+ while (running > 0) {
+ g_usleep(100000);
+ }
+
+ join_aio_contexts();
+ g_test_message("%d iterations/second\n", counter / seconds);
+ g_assert_cmpint(counter, ==, atomic_counter);
+}
+
+static void test_multi_mutex_1(void)
+{
+ test_multi_mutex(NUM_CONTEXTS, 1);
+}
+
+static void test_multi_mutex_10(void)
+{
+ test_multi_mutex(NUM_CONTEXTS, 10);
+}
+
+/* End of tests. */
+
+int main(int argc, char **argv)
+{
+ init_clocks();
+
+ g_test_init(&argc, &argv, NULL);
+ g_test_add_func("/aio/multi/lifecycle", test_lifecycle);
+ if (g_test_quick()) {
+ g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1);
+ g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_1);
+ g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_3);
+#ifdef CONFIG_LINUX
+ g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_1);
+#endif
+ g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_1);
+ } else {
+ g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10);
+ g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_10);
+ g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_30);
+#ifdef CONFIG_LINUX
+ g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_10);
+#endif
+ g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_10);
+ }
+ return g_test_run();
+}
diff --git a/tests/test-thread-pool.c b/tests/test-thread-pool.c
index 8dbf66a..91b4ec5 100644
--- a/tests/test-thread-pool.c
+++ b/tests/test-thread-pool.c
@@ -6,6 +6,7 @@
#include "qapi/error.h"
#include "qemu/timer.h"
#include "qemu/error-report.h"
+#include "qemu/main-loop.h"
static AioContext *ctx;
static ThreadPool *pool;
@@ -224,15 +225,9 @@ static void test_cancel_async(void)
int main(int argc, char **argv)
{
int ret;
- Error *local_error = NULL;
- init_clocks();
-
- ctx = aio_context_new(&local_error);
- if (!ctx) {
- error_reportf_err(local_error, "Failed to create AIO Context: ");
- exit(1);
- }
+ qemu_init_main_loop(&error_abort);
+ ctx = qemu_get_current_aio_context();
pool = aio_get_thread_pool(ctx);
g_test_init(&argc, &argv, NULL);
@@ -245,6 +240,5 @@ int main(int argc, char **argv)
ret = g_test_run();
- aio_context_unref(ctx);
return ret;
}
diff --git a/trace-events b/trace-events
index 756a947..7288557 100644
--- a/trace-events
+++ b/trace-events
@@ -25,17 +25,6 @@
#
# The <format-string> should be a sprintf()-compatible format string.
-# aio-posix.c
-run_poll_handlers_begin(void *ctx, int64_t max_ns) "ctx %p max_ns %"PRId64
-run_poll_handlers_end(void *ctx, bool progress) "ctx %p progress %d"
-poll_shrink(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64
-poll_grow(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64
-
-# thread-pool.c
-thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p"
-thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d"
-thread_pool_cancel(void *req, void *opaque) "req %p opaque %p"
-
# ioport.c
cpu_in(unsigned int addr, char size, unsigned int val) "addr %#x(%c) value %u"
cpu_out(unsigned int addr, char size, unsigned int val) "addr %#x(%c) value %u"
diff --git a/util/Makefile.objs b/util/Makefile.objs
index 56c8c23..bc629e2 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -1,14 +1,18 @@
util-obj-y = osdep.o cutils.o unicode.o qemu-timer-common.o
util-obj-y += bufferiszero.o
util-obj-y += lockcnt.o
+util-obj-y += aiocb.o async.o thread-pool.o qemu-timer.o
+util-obj-y += main-loop.o iohandler.o
+util-obj-$(CONFIG_POSIX) += aio-posix.o
util-obj-$(CONFIG_POSIX) += compatfd.o
util-obj-$(CONFIG_POSIX) += event_notifier-posix.o
util-obj-$(CONFIG_POSIX) += mmap-alloc.o
util-obj-$(CONFIG_POSIX) += oslib-posix.o
util-obj-$(CONFIG_POSIX) += qemu-openpty.o
util-obj-$(CONFIG_POSIX) += qemu-thread-posix.o
-util-obj-$(CONFIG_WIN32) += event_notifier-win32.o
util-obj-$(CONFIG_POSIX) += memfd.o
+util-obj-$(CONFIG_WIN32) += aio-win32.o
+util-obj-$(CONFIG_WIN32) += event_notifier-win32.o
util-obj-$(CONFIG_WIN32) += oslib-win32.o
util-obj-$(CONFIG_WIN32) += qemu-thread-win32.o
util-obj-y += envlist.o path.o module.o
diff --git a/aio-posix.c b/util/aio-posix.c
index 577527f..2d51239 100644
--- a/aio-posix.c
+++ b/util/aio-posix.c
@@ -19,7 +19,7 @@
#include "qemu/rcu_queue.h"
#include "qemu/sockets.h"
#include "qemu/cutils.h"
-#include "trace-root.h"
+#include "trace.h"
#ifdef CONFIG_EPOLL_CREATE1
#include <sys/epoll.h>
#endif
@@ -386,12 +386,6 @@ static bool aio_dispatch_handlers(AioContext *ctx)
AioHandler *node, *tmp;
bool progress = false;
- /*
- * We have to walk very carefully in case aio_set_fd_handler is
- * called while we're walking.
- */
- qemu_lockcnt_inc(&ctx->list_lock);
-
QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
int revents;
@@ -426,33 +420,17 @@ static bool aio_dispatch_handlers(AioContext *ctx)
}
}
- qemu_lockcnt_dec(&ctx->list_lock);
return progress;
}
-/*
- * Note that dispatch_fds == false has the side-effect of post-poning the
- * freeing of deleted handlers.
- */
-bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
+void aio_dispatch(AioContext *ctx)
{
- bool progress;
-
- /*
- * If there are callbacks left that have been queued, we need to call them.
- * Do not call select in this case, because it is possible that the caller
- * does not need a complete flush (as is the case for aio_poll loops).
- */
- progress = aio_bh_poll(ctx);
-
- if (dispatch_fds) {
- progress |= aio_dispatch_handlers(ctx);
- }
-
- /* Run our timers */
- progress |= timerlistgroup_run_timers(&ctx->tlg);
+ qemu_lockcnt_inc(&ctx->list_lock);
+ aio_bh_poll(ctx);
+ aio_dispatch_handlers(ctx);
+ qemu_lockcnt_dec(&ctx->list_lock);
- return progress;
+ timerlistgroup_run_timers(&ctx->tlg);
}
/* These thread-local variables are used only in a small part of aio_poll
@@ -597,9 +575,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
int64_t timeout;
int64_t start = 0;
- aio_context_acquire(ctx);
- progress = false;
-
/* aio_notify can avoid the expensive event_notifier_set if
* everything (file descriptors, bottom halves, timers) will
* be re-evaluated before the next blocking poll(). This is
@@ -617,9 +592,8 @@ bool aio_poll(AioContext *ctx, bool blocking)
start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
}
- if (try_poll_mode(ctx, blocking)) {
- progress = true;
- } else {
+ progress = try_poll_mode(ctx, blocking);
+ if (!progress) {
assert(npfd == 0);
/* fill pollfds */
@@ -636,9 +610,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
timeout = blocking ? aio_compute_timeout(ctx) : 0;
/* wait until next event */
- if (timeout) {
- aio_context_release(ctx);
- }
if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) {
AioHandler epoll_handler;
@@ -650,9 +621,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
} else {
ret = qemu_poll_ns(pollfds, npfd, timeout);
}
- if (timeout) {
- aio_context_acquire(ctx);
- }
}
if (blocking) {
@@ -710,14 +678,16 @@ bool aio_poll(AioContext *ctx, bool blocking)
}
npfd = 0;
- qemu_lockcnt_dec(&ctx->list_lock);
- /* Run dispatch even if there were no readable fds to run timers */
- if (aio_dispatch(ctx, ret > 0)) {
- progress = true;
+ progress |= aio_bh_poll(ctx);
+
+ if (ret > 0) {
+ progress |= aio_dispatch_handlers(ctx);
}
- aio_context_release(ctx);
+ qemu_lockcnt_dec(&ctx->list_lock);
+
+ progress |= timerlistgroup_run_timers(&ctx->tlg);
return progress;
}
diff --git a/aio-win32.c b/util/aio-win32.c
index 900524c..bca496a 100644
--- a/aio-win32.c
+++ b/util/aio-win32.c
@@ -253,8 +253,6 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
bool progress = false;
AioHandler *tmp;
- qemu_lockcnt_inc(&ctx->list_lock);
-
/*
* We have to walk very carefully in case aio_set_fd_handler is
* called while we're walking.
@@ -305,20 +303,16 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
}
}
- qemu_lockcnt_dec(&ctx->list_lock);
return progress;
}
-bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
+void aio_dispatch(AioContext *ctx)
{
- bool progress;
-
- progress = aio_bh_poll(ctx);
- if (dispatch_fds) {
- progress |= aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE);
- }
- progress |= timerlistgroup_run_timers(&ctx->tlg);
- return progress;
+ qemu_lockcnt_inc(&ctx->list_lock);
+ aio_bh_poll(ctx);
+ aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE);
+ qemu_lockcnt_dec(&ctx->list_lock);
+ timerlistgroup_run_timers(&ctx->tlg);
}
bool aio_poll(AioContext *ctx, bool blocking)
@@ -329,7 +323,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
int count;
int timeout;
- aio_context_acquire(ctx);
progress = false;
/* aio_notify can avoid the expensive event_notifier_set if
@@ -355,7 +348,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
}
}
- qemu_lockcnt_dec(&ctx->list_lock);
first = true;
/* ctx->notifier is always registered. */
@@ -371,17 +363,11 @@ bool aio_poll(AioContext *ctx, bool blocking)
timeout = blocking && !have_select_revents
? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0;
- if (timeout) {
- aio_context_release(ctx);
- }
ret = WaitForMultipleObjects(count, events, FALSE, timeout);
if (blocking) {
assert(first);
atomic_sub(&ctx->notify_me, 2);
}
- if (timeout) {
- aio_context_acquire(ctx);
- }
if (first) {
aio_notify_accept(ctx);
@@ -404,9 +390,9 @@ bool aio_poll(AioContext *ctx, bool blocking)
progress |= aio_dispatch_handlers(ctx, event);
} while (count > 0);
- progress |= timerlistgroup_run_timers(&ctx->tlg);
+ qemu_lockcnt_dec(&ctx->list_lock);
- aio_context_release(ctx);
+ progress |= timerlistgroup_run_timers(&ctx->tlg);
return progress;
}
diff --git a/util/aiocb.c b/util/aiocb.c
new file mode 100644
index 0000000..5aef3a0
--- /dev/null
+++ b/util/aiocb.c
@@ -0,0 +1,55 @@
+/*
+ * BlockAIOCB allocation
+ *
+ * Copyright (c) 2003-2017 Fabrice Bellard and other QEMU contributors
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#include "qemu/osdep.h"
+#include "block/aio.h"
+
+void *qemu_aio_get(const AIOCBInfo *aiocb_info, BlockDriverState *bs,
+ BlockCompletionFunc *cb, void *opaque)
+{
+ BlockAIOCB *acb;
+
+ acb = g_malloc(aiocb_info->aiocb_size);
+ acb->aiocb_info = aiocb_info;
+ acb->bs = bs;
+ acb->cb = cb;
+ acb->opaque = opaque;
+ acb->refcnt = 1;
+ return acb;
+}
+
+void qemu_aio_ref(void *p)
+{
+ BlockAIOCB *acb = p;
+ acb->refcnt++;
+}
+
+void qemu_aio_unref(void *p)
+{
+ BlockAIOCB *acb = p;
+ assert(acb->refcnt > 0);
+ if (--acb->refcnt == 0) {
+ g_free(acb);
+ }
+}
diff --git a/async.c b/util/async.c
index 0d218ab..7d469eb 100644
--- a/async.c
+++ b/util/async.c
@@ -1,7 +1,8 @@
/*
- * QEMU System Emulator
+ * Data plane event loop
*
* Copyright (c) 2003-2008 Fabrice Bellard
+ * Copyright (c) 2009-2017 QEMU contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -30,6 +31,8 @@
#include "qemu/main-loop.h"
#include "qemu/atomic.h"
#include "block/raw-aio.h"
+#include "qemu/coroutine_int.h"
+#include "trace.h"
/***********************************************************/
/* bottom halves (can be seen as timers which expire ASAP) */
@@ -87,15 +90,16 @@ void aio_bh_call(QEMUBH *bh)
bh->cb(bh->opaque);
}
-/* Multiple occurrences of aio_bh_poll cannot be called concurrently */
+/* Multiple occurrences of aio_bh_poll cannot be called concurrently.
+ * The count in ctx->list_lock is incremented before the call, and is
+ * not affected by the call.
+ */
int aio_bh_poll(AioContext *ctx)
{
QEMUBH *bh, **bhp, *next;
int ret;
bool deleted = false;
- qemu_lockcnt_inc(&ctx->list_lock);
-
ret = 0;
for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) {
next = atomic_rcu_read(&bh->next);
@@ -120,11 +124,10 @@ int aio_bh_poll(AioContext *ctx)
/* remove deleted bhs */
if (!deleted) {
- qemu_lockcnt_dec(&ctx->list_lock);
return ret;
}
- if (qemu_lockcnt_dec_and_lock(&ctx->list_lock)) {
+ if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
bhp = &ctx->first_bh;
while (*bhp) {
bh = *bhp;
@@ -135,7 +138,7 @@ int aio_bh_poll(AioContext *ctx)
bhp = &bh->next;
}
}
- qemu_lockcnt_unlock(&ctx->list_lock);
+ qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
}
return ret;
}
@@ -255,7 +258,7 @@ aio_ctx_dispatch(GSource *source,
AioContext *ctx = (AioContext *) source;
assert(callback == NULL);
- aio_dispatch(ctx, true);
+ aio_dispatch(ctx);
return true;
}
@@ -274,6 +277,9 @@ aio_ctx_finalize(GSource *source)
}
#endif
+ assert(QSLIST_EMPTY(&ctx->scheduled_coroutines));
+ qemu_bh_delete(ctx->co_schedule_bh);
+
qemu_lockcnt_lock(&ctx->list_lock);
assert(!qemu_lockcnt_count(&ctx->list_lock));
while (ctx->first_bh) {
@@ -363,6 +369,30 @@ static bool event_notifier_poll(void *opaque)
return atomic_read(&ctx->notified);
}
+static void co_schedule_bh_cb(void *opaque)
+{
+ AioContext *ctx = opaque;
+ QSLIST_HEAD(, Coroutine) straight, reversed;
+
+ QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
+ QSLIST_INIT(&straight);
+
+ while (!QSLIST_EMPTY(&reversed)) {
+ Coroutine *co = QSLIST_FIRST(&reversed);
+ QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
+ QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
+ }
+
+ while (!QSLIST_EMPTY(&straight)) {
+ Coroutine *co = QSLIST_FIRST(&straight);
+ QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
+ trace_aio_co_schedule_bh_cb(ctx, co);
+ aio_context_acquire(ctx);
+ qemu_coroutine_enter(co);
+ aio_context_release(ctx);
+ }
+}
+
AioContext *aio_context_new(Error **errp)
{
int ret;
@@ -378,6 +408,10 @@ AioContext *aio_context_new(Error **errp)
}
g_source_set_can_recurse(&ctx->source, true);
qemu_lockcnt_init(&ctx->list_lock);
+
+ ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx);
+ QSLIST_INIT(&ctx->scheduled_coroutines);
+
aio_set_event_notifier(ctx, &ctx->notifier,
false,
(EventNotifierHandler *)
@@ -401,6 +435,40 @@ fail:
return NULL;
}
+void aio_co_schedule(AioContext *ctx, Coroutine *co)
+{
+ trace_aio_co_schedule(ctx, co);
+ QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines,
+ co, co_scheduled_next);
+ qemu_bh_schedule(ctx->co_schedule_bh);
+}
+
+void aio_co_wake(struct Coroutine *co)
+{
+ AioContext *ctx;
+
+ /* Read coroutine before co->ctx. Matches smp_wmb in
+ * qemu_coroutine_enter.
+ */
+ smp_read_barrier_depends();
+ ctx = atomic_read(&co->ctx);
+
+ if (ctx != qemu_get_current_aio_context()) {
+ aio_co_schedule(ctx, co);
+ return;
+ }
+
+ if (qemu_in_coroutine()) {
+ Coroutine *self = qemu_coroutine_self();
+ assert(self != co);
+ QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next);
+ } else {
+ aio_context_acquire(ctx);
+ qemu_coroutine_enter(co);
+ aio_context_release(ctx);
+ }
+}
+
void aio_context_ref(AioContext *ctx)
{
g_source_ref(&ctx->source);
diff --git a/iohandler.c b/util/iohandler.c
index 623b55b..623b55b 100644
--- a/iohandler.c
+++ b/util/iohandler.c
diff --git a/main-loop.c b/util/main-loop.c
index ad10bca..ad10bca 100644
--- a/main-loop.c
+++ b/util/main-loop.c
diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
index 14cf9ce..6328eed 100644
--- a/util/qemu-coroutine-lock.c
+++ b/util/qemu-coroutine-lock.c
@@ -20,13 +20,19 @@
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
+ *
+ * The lock-free mutex implementation is based on OSv
+ * (core/lfmutex.cc, include/lockfree/mutex.hh).
+ * Copyright (C) 2013 Cloudius Systems, Ltd.
*/
#include "qemu/osdep.h"
#include "qemu-common.h"
#include "qemu/coroutine.h"
#include "qemu/coroutine_int.h"
+#include "qemu/processor.h"
#include "qemu/queue.h"
+#include "block/aio.h"
#include "trace.h"
void qemu_co_queue_init(CoQueue *queue)
@@ -34,12 +40,30 @@ void qemu_co_queue_init(CoQueue *queue)
QSIMPLEQ_INIT(&queue->entries);
}
-void coroutine_fn qemu_co_queue_wait(CoQueue *queue)
+void coroutine_fn qemu_co_queue_wait(CoQueue *queue, CoMutex *mutex)
{
Coroutine *self = qemu_coroutine_self();
QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next);
+
+ if (mutex) {
+ qemu_co_mutex_unlock(mutex);
+ }
+
+ /* There is no race condition here. Other threads will call
+ * aio_co_schedule on our AioContext, which can reenter this
+ * coroutine but only after this yield and after the main loop
+ * has gone through the next iteration.
+ */
qemu_coroutine_yield();
assert(qemu_in_coroutine());
+
+ /* TODO: OSv implements wait morphing here, where the wakeup
+ * primitive automatically places the woken coroutine on the
+ * mutex's queue. This avoids the thundering herd effect.
+ */
+ if (mutex) {
+ qemu_co_mutex_lock(mutex);
+ }
}
/**
@@ -63,7 +87,6 @@ void qemu_co_queue_run_restart(Coroutine *co)
static bool qemu_co_queue_do_restart(CoQueue *queue, bool single)
{
- Coroutine *self = qemu_coroutine_self();
Coroutine *next;
if (QSIMPLEQ_EMPTY(&queue->entries)) {
@@ -72,8 +95,7 @@ static bool qemu_co_queue_do_restart(CoQueue *queue, bool single)
while ((next = QSIMPLEQ_FIRST(&queue->entries)) != NULL) {
QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
- QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, next, co_queue_next);
- trace_qemu_co_queue_next(next);
+ aio_co_wake(next);
if (single) {
break;
}
@@ -112,27 +134,157 @@ bool qemu_co_queue_empty(CoQueue *queue)
return QSIMPLEQ_FIRST(&queue->entries) == NULL;
}
+/* The wait records are handled with a multiple-producer, single-consumer
+ * lock-free queue. There cannot be two concurrent pop_waiter() calls
+ * because pop_waiter() can only be called while mutex->handoff is zero.
+ * This can happen in three cases:
+ * - in qemu_co_mutex_unlock, before the hand-off protocol has started.
+ * In this case, qemu_co_mutex_lock will see mutex->handoff == 0 and
+ * not take part in the handoff.
+ * - in qemu_co_mutex_lock, if it steals the hand-off responsibility from
+ * qemu_co_mutex_unlock. In this case, qemu_co_mutex_unlock will fail
+ * the cmpxchg (it will see either 0 or the next sequence value) and
+ * exit. The next hand-off cannot begin until qemu_co_mutex_lock has
+ * woken up someone.
+ * - in qemu_co_mutex_unlock, if it takes the hand-off token itself.
+ * In this case another iteration starts with mutex->handoff == 0;
+ * a concurrent qemu_co_mutex_lock will fail the cmpxchg, and
+ * qemu_co_mutex_unlock will go back to case (1).
+ *
+ * The following functions manage this queue.
+ */
+typedef struct CoWaitRecord {
+ Coroutine *co;
+ QSLIST_ENTRY(CoWaitRecord) next;
+} CoWaitRecord;
+
+static void push_waiter(CoMutex *mutex, CoWaitRecord *w)
+{
+ w->co = qemu_coroutine_self();
+ QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next);
+}
+
+static void move_waiters(CoMutex *mutex)
+{
+ QSLIST_HEAD(, CoWaitRecord) reversed;
+ QSLIST_MOVE_ATOMIC(&reversed, &mutex->from_push);
+ while (!QSLIST_EMPTY(&reversed)) {
+ CoWaitRecord *w = QSLIST_FIRST(&reversed);
+ QSLIST_REMOVE_HEAD(&reversed, next);
+ QSLIST_INSERT_HEAD(&mutex->to_pop, w, next);
+ }
+}
+
+static CoWaitRecord *pop_waiter(CoMutex *mutex)
+{
+ CoWaitRecord *w;
+
+ if (QSLIST_EMPTY(&mutex->to_pop)) {
+ move_waiters(mutex);
+ if (QSLIST_EMPTY(&mutex->to_pop)) {
+ return NULL;
+ }
+ }
+ w = QSLIST_FIRST(&mutex->to_pop);
+ QSLIST_REMOVE_HEAD(&mutex->to_pop, next);
+ return w;
+}
+
+static bool has_waiters(CoMutex *mutex)
+{
+ return QSLIST_EMPTY(&mutex->to_pop) || QSLIST_EMPTY(&mutex->from_push);
+}
+
void qemu_co_mutex_init(CoMutex *mutex)
{
memset(mutex, 0, sizeof(*mutex));
- qemu_co_queue_init(&mutex->queue);
}
-void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
+static void coroutine_fn qemu_co_mutex_wake(CoMutex *mutex, Coroutine *co)
+{
+ /* Read co before co->ctx; pairs with smp_wmb() in
+ * qemu_coroutine_enter().
+ */
+ smp_read_barrier_depends();
+ mutex->ctx = co->ctx;
+ aio_co_wake(co);
+}
+
+static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx,
+ CoMutex *mutex)
{
Coroutine *self = qemu_coroutine_self();
+ CoWaitRecord w;
+ unsigned old_handoff;
trace_qemu_co_mutex_lock_entry(mutex, self);
+ w.co = self;
+ push_waiter(mutex, &w);
+
+ /* This is the "Responsibility Hand-Off" protocol; a lock() picks from
+ * a concurrent unlock() the responsibility of waking somebody up.
+ */
+ old_handoff = atomic_mb_read(&mutex->handoff);
+ if (old_handoff &&
+ has_waiters(mutex) &&
+ atomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) {
+ /* There can be no concurrent pops, because there can be only
+ * one active handoff at a time.
+ */
+ CoWaitRecord *to_wake = pop_waiter(mutex);
+ Coroutine *co = to_wake->co;
+ if (co == self) {
+ /* We got the lock ourselves! */
+ assert(to_wake == &w);
+ mutex->ctx = ctx;
+ return;
+ }
+
+ qemu_co_mutex_wake(mutex, co);
+ }
- while (mutex->locked) {
- qemu_co_queue_wait(&mutex->queue);
+ qemu_coroutine_yield();
+ trace_qemu_co_mutex_lock_return(mutex, self);
+}
+
+void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
+{
+ AioContext *ctx = qemu_get_current_aio_context();
+ Coroutine *self = qemu_coroutine_self();
+ int waiters, i;
+
+ /* Running a very small critical section on pthread_mutex_t and CoMutex
+ * shows that pthread_mutex_t is much faster because it doesn't actually
+ * go to sleep. What happens is that the critical section is shorter
+ * than the latency of entering the kernel and thus FUTEX_WAIT always
+ * fails. With CoMutex there is no such latency but you still want to
+ * avoid wait and wakeup. So introduce it artificially.
+ */
+ i = 0;
+retry_fast_path:
+ waiters = atomic_cmpxchg(&mutex->locked, 0, 1);
+ if (waiters != 0) {
+ while (waiters == 1 && ++i < 1000) {
+ if (atomic_read(&mutex->ctx) == ctx) {
+ break;
+ }
+ if (atomic_read(&mutex->locked) == 0) {
+ goto retry_fast_path;
+ }
+ cpu_relax();
+ }
+ waiters = atomic_fetch_inc(&mutex->locked);
}
- mutex->locked = true;
+ if (waiters == 0) {
+ /* Uncontended. */
+ trace_qemu_co_mutex_lock_uncontended(mutex, self);
+ mutex->ctx = ctx;
+ } else {
+ qemu_co_mutex_lock_slowpath(ctx, mutex);
+ }
mutex->holder = self;
self->locks_held++;
-
- trace_qemu_co_mutex_lock_return(mutex, self);
}
void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
@@ -141,14 +293,51 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
trace_qemu_co_mutex_unlock_entry(mutex, self);
- assert(mutex->locked == true);
+ assert(mutex->locked);
assert(mutex->holder == self);
assert(qemu_in_coroutine());
- mutex->locked = false;
+ mutex->ctx = NULL;
mutex->holder = NULL;
self->locks_held--;
- qemu_co_queue_next(&mutex->queue);
+ if (atomic_fetch_dec(&mutex->locked) == 1) {
+ /* No waiting qemu_co_mutex_lock(). Pfew, that was easy! */
+ return;
+ }
+
+ for (;;) {
+ CoWaitRecord *to_wake = pop_waiter(mutex);
+ unsigned our_handoff;
+
+ if (to_wake) {
+ qemu_co_mutex_wake(mutex, to_wake->co);
+ break;
+ }
+
+ /* Some concurrent lock() is in progress (we know this because
+ * mutex->locked was >1) but it hasn't yet put itself on the wait
+ * queue. Pick a sequence number for the handoff protocol (not 0).
+ */
+ if (++mutex->sequence == 0) {
+ mutex->sequence = 1;
+ }
+
+ our_handoff = mutex->sequence;
+ atomic_mb_set(&mutex->handoff, our_handoff);
+ if (!has_waiters(mutex)) {
+ /* The concurrent lock has not added itself yet, so it
+ * will be able to pick our handoff.
+ */
+ break;
+ }
+
+ /* Try to do the handoff protocol ourselves; if somebody else has
+ * already taken it, however, we're done and they're responsible.
+ */
+ if (atomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) {
+ break;
+ }
+ }
trace_qemu_co_mutex_unlock_return(mutex, self);
}
@@ -157,16 +346,22 @@ void qemu_co_rwlock_init(CoRwlock *lock)
{
memset(lock, 0, sizeof(*lock));
qemu_co_queue_init(&lock->queue);
+ qemu_co_mutex_init(&lock->mutex);
}
void qemu_co_rwlock_rdlock(CoRwlock *lock)
{
Coroutine *self = qemu_coroutine_self();
- while (lock->writer) {
- qemu_co_queue_wait(&lock->queue);
+ qemu_co_mutex_lock(&lock->mutex);
+ /* For fairness, wait if a writer is in line. */
+ while (lock->pending_writer) {
+ qemu_co_queue_wait(&lock->queue, &lock->mutex);
}
lock->reader++;
+ qemu_co_mutex_unlock(&lock->mutex);
+
+ /* The rest of the read-side critical section is run without the mutex. */
self->locks_held++;
}
@@ -175,10 +370,13 @@ void qemu_co_rwlock_unlock(CoRwlock *lock)
Coroutine *self = qemu_coroutine_self();
assert(qemu_in_coroutine());
- if (lock->writer) {
- lock->writer = false;
+ if (!lock->reader) {
+ /* The critical section started in qemu_co_rwlock_wrlock. */
qemu_co_queue_restart_all(&lock->queue);
} else {
+ self->locks_held--;
+
+ qemu_co_mutex_lock(&lock->mutex);
lock->reader--;
assert(lock->reader >= 0);
/* Wakeup only one waiting writer */
@@ -186,16 +384,20 @@ void qemu_co_rwlock_unlock(CoRwlock *lock)
qemu_co_queue_next(&lock->queue);
}
}
- self->locks_held--;
+ qemu_co_mutex_unlock(&lock->mutex);
}
void qemu_co_rwlock_wrlock(CoRwlock *lock)
{
- Coroutine *self = qemu_coroutine_self();
-
- while (lock->writer || lock->reader) {
- qemu_co_queue_wait(&lock->queue);
+ qemu_co_mutex_lock(&lock->mutex);
+ lock->pending_writer++;
+ while (lock->reader) {
+ qemu_co_queue_wait(&lock->queue, &lock->mutex);
}
- lock->writer = true;
- self->locks_held++;
+ lock->pending_writer--;
+
+ /* The rest of the write-side critical section is run with
+ * the mutex taken, so that lock->reader remains zero.
+ * There is no need to update self->locks_held.
+ */
}
diff --git a/util/qemu-coroutine-sleep.c b/util/qemu-coroutine-sleep.c
index 25de3ed..9c56550 100644
--- a/util/qemu-coroutine-sleep.c
+++ b/util/qemu-coroutine-sleep.c
@@ -25,7 +25,7 @@ static void co_sleep_cb(void *opaque)
{
CoSleepCB *sleep_cb = opaque;
- qemu_coroutine_enter(sleep_cb->co);
+ aio_co_wake(sleep_cb->co);
}
void coroutine_fn co_aio_sleep_ns(AioContext *ctx, QEMUClockType type,
diff --git a/util/qemu-coroutine.c b/util/qemu-coroutine.c
index a5d2f6c..72412e5 100644
--- a/util/qemu-coroutine.c
+++ b/util/qemu-coroutine.c
@@ -19,6 +19,7 @@
#include "qemu/atomic.h"
#include "qemu/coroutine.h"
#include "qemu/coroutine_int.h"
+#include "block/aio.h"
enum {
POOL_BATCH_SIZE = 64,
@@ -114,6 +115,13 @@ void qemu_coroutine_enter(Coroutine *co)
}
co->caller = self;
+ co->ctx = qemu_get_current_aio_context();
+
+ /* Store co->ctx before anything that stores co. Matches
+ * barrier in aio_co_wake and qemu_co_mutex_wake.
+ */
+ smp_wmb();
+
ret = qemu_coroutine_switch(self, co, COROUTINE_ENTER);
qemu_co_queue_run_restart(co);
diff --git a/qemu-timer.c b/util/qemu-timer.c
index ff620ec..ff620ec 100644
--- a/qemu-timer.c
+++ b/util/qemu-timer.c
diff --git a/thread-pool.c b/util/thread-pool.c
index 3847969..ce6cd30 100644
--- a/thread-pool.c
+++ b/util/thread-pool.c
@@ -19,7 +19,7 @@
#include "qemu/queue.h"
#include "qemu/thread.h"
#include "qemu/coroutine.h"
-#include "trace-root.h"
+#include "trace.h"
#include "block/thread-pool.h"
#include "qemu/main-loop.h"
@@ -165,6 +165,7 @@ static void thread_pool_completion_bh(void *opaque)
ThreadPool *pool = opaque;
ThreadPoolElement *elem, *next;
+ aio_context_acquire(pool->ctx);
restart:
QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
if (elem->state != THREAD_DONE) {
@@ -184,13 +185,16 @@ restart:
*/
qemu_bh_schedule(pool->completion_bh);
+ aio_context_release(pool->ctx);
elem->common.cb(elem->common.opaque, elem->ret);
+ aio_context_acquire(pool->ctx);
qemu_aio_unref(elem);
goto restart;
} else {
qemu_aio_unref(elem);
}
}
+ aio_context_release(pool->ctx);
}
static void thread_pool_cancel(BlockAIOCB *acb)
@@ -267,7 +271,7 @@ static void thread_pool_co_cb(void *opaque, int ret)
ThreadPoolCo *co = opaque;
co->ret = ret;
- qemu_coroutine_enter(co->co);
+ aio_co_wake(co->co);
}
int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
diff --git a/util/trace-events b/util/trace-events
index 2b8aa30..ac27d94 100644
--- a/util/trace-events
+++ b/util/trace-events
@@ -1,5 +1,20 @@
# See docs/tracing.txt for syntax documentation.
+# util/aio-posix.c
+run_poll_handlers_begin(void *ctx, int64_t max_ns) "ctx %p max_ns %"PRId64
+run_poll_handlers_end(void *ctx, bool progress) "ctx %p progress %d"
+poll_shrink(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64
+poll_grow(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64
+
+# util/async.c
+aio_co_schedule(void *ctx, void *co) "ctx %p co %p"
+aio_co_schedule_bh_cb(void *ctx, void *co) "ctx %p co %p"
+
+# util/thread-pool.c
+thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p"
+thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d"
+thread_pool_cancel(void *req, void *opaque) "req %p opaque %p"
+
# util/buffer.c
buffer_resize(const char *buf, size_t olen, size_t len) "%s: old %zd, new %zd"
buffer_move_empty(const char *buf, size_t len, const char *from) "%s: %zd bytes from %s"
@@ -13,7 +28,7 @@ qemu_coroutine_terminate(void *co) "self %p"
# util/qemu-coroutine-lock.c
qemu_co_queue_run_restart(void *co) "co %p"
-qemu_co_queue_next(void *nxt) "next %p"
+qemu_co_mutex_lock_uncontended(void *mutex, void *self) "mutex %p self %p"
qemu_co_mutex_lock_entry(void *mutex, void *self) "mutex %p self %p"
qemu_co_mutex_lock_return(void *mutex, void *self) "mutex %p self %p"
qemu_co_mutex_unlock_entry(void *mutex, void *self) "mutex %p self %p"