diff options
author | Peter Maydell <peter.maydell@linaro.org> | 2017-02-21 11:58:03 +0000 |
---|---|---|
committer | Peter Maydell <peter.maydell@linaro.org> | 2017-02-21 11:58:03 +0000 |
commit | a0775e28cd6cae7eae248f74db7bc4a03da20c6b (patch) | |
tree | 6d141e7710855c40fdaf81a4c2731995782e443c | |
parent | b856256179f14c33a513d0b9cc3e4be355b95f43 (diff) | |
parent | a7b91d35bab97a2d3e779d0c64c9b837b52a6cf7 (diff) | |
download | qemu-a0775e28cd6cae7eae248f74db7bc4a03da20c6b.zip qemu-a0775e28cd6cae7eae248f74db7bc4a03da20c6b.tar.gz qemu-a0775e28cd6cae7eae248f74db7bc4a03da20c6b.tar.bz2 |
Merge remote-tracking branch 'remotes/stefanha/tags/block-pull-request' into staging
Pull request
v2:
* Rebased to resolve scsi conflicts
# gpg: Signature made Tue 21 Feb 2017 11:56:24 GMT
# gpg: using RSA key 0x9CA4ABB381AB73C8
# gpg: Good signature from "Stefan Hajnoczi <stefanha@redhat.com>"
# gpg: aka "Stefan Hajnoczi <stefanha@gmail.com>"
# Primary key fingerprint: 8695 A8BF D3F9 7CDA AC35 775A 9CA4 ABB3 81AB 73C8
* remotes/stefanha/tags/block-pull-request: (24 commits)
coroutine-lock: make CoRwlock thread-safe and fair
coroutine-lock: add mutex argument to CoQueue APIs
coroutine-lock: place CoMutex before CoQueue in header
test-aio-multithread: add performance comparison with thread-based mutexes
coroutine-lock: add limited spinning to CoMutex
coroutine-lock: make CoMutex thread-safe
block: document fields protected by AioContext lock
async: remove unnecessary inc/dec pairs
aio-posix: partially inline aio_dispatch into aio_poll
block: explicitly acquire aiocontext in aio callbacks that need it
block: explicitly acquire aiocontext in bottom halves that need it
block: explicitly acquire aiocontext in callbacks that need it
block: explicitly acquire aiocontext in timers that need it
aio: push aio_context_acquire/release down to dispatching
qed: introduce qed_aio_start_io and qed_aio_next_io_cb
blkdebug: reschedule coroutine on the AioContext it is running on
coroutine-lock: reschedule coroutine on the AioContext it was running on
nbd: convert to use qio_channel_yield
io: make qio_channel_yield aware of AioContexts
io: add methods to set I/O handlers on AioContext
...
Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
67 files changed, 1711 insertions, 532 deletions
diff --git a/Makefile.objs b/Makefile.objs index e4bfc16..e740500 100644 --- a/Makefile.objs +++ b/Makefile.objs @@ -9,12 +9,8 @@ chardev-obj-y = chardev/ ####################################################################### # block-obj-y is code used by both qemu system emulation and qemu-img -block-obj-y = async.o thread-pool.o block-obj-y += nbd/ block-obj-y += block.o blockjob.o -block-obj-y += main-loop.o iohandler.o qemu-timer.o -block-obj-$(CONFIG_POSIX) += aio-posix.o -block-obj-$(CONFIG_WIN32) += aio-win32.o block-obj-y += block/ block-obj-y += qemu-io-cmds.o block-obj-$(CONFIG_REPLICATION) += replication.o diff --git a/block/backup.c b/block/backup.c index ea38733..fe010e7 100644 --- a/block/backup.c +++ b/block/backup.c @@ -64,7 +64,7 @@ static void coroutine_fn wait_for_overlapping_requests(BackupBlockJob *job, retry = false; QLIST_FOREACH(req, &job->inflight_reqs, list) { if (end > req->start && start < req->end) { - qemu_co_queue_wait(&req->wait_queue); + qemu_co_queue_wait(&req->wait_queue, NULL); retry = true; break; } diff --git a/block/blkdebug.c b/block/blkdebug.c index acccf85..d8eee1b 100644 --- a/block/blkdebug.c +++ b/block/blkdebug.c @@ -405,12 +405,6 @@ out: return ret; } -static void error_callback_bh(void *opaque) -{ - Coroutine *co = opaque; - qemu_coroutine_enter(co); -} - static int inject_error(BlockDriverState *bs, BlkdebugRule *rule) { BDRVBlkdebugState *s = bs->opaque; @@ -423,8 +417,7 @@ static int inject_error(BlockDriverState *bs, BlkdebugRule *rule) } if (!immediately) { - aio_bh_schedule_oneshot(bdrv_get_aio_context(bs), error_callback_bh, - qemu_coroutine_self()); + aio_co_schedule(qemu_get_current_aio_context(), qemu_coroutine_self()); qemu_coroutine_yield(); } diff --git a/block/blkreplay.c b/block/blkreplay.c index a741654..cfc8c5b 100755 --- a/block/blkreplay.c +++ b/block/blkreplay.c @@ -60,7 +60,7 @@ static int64_t blkreplay_getlength(BlockDriverState *bs) static void blkreplay_bh_cb(void *opaque) { Request *req = opaque; - qemu_coroutine_enter(req->co); + aio_co_wake(req->co); qemu_bh_delete(req->bh); g_free(req); } diff --git a/block/block-backend.c b/block/block-backend.c index efbf398..819f272 100644 --- a/block/block-backend.c +++ b/block/block-backend.c @@ -880,7 +880,6 @@ static int blk_prw(BlockBackend *blk, int64_t offset, uint8_t *buf, { QEMUIOVector qiov; struct iovec iov; - Coroutine *co; BlkRwCo rwco; iov = (struct iovec) { @@ -897,9 +896,14 @@ static int blk_prw(BlockBackend *blk, int64_t offset, uint8_t *buf, .ret = NOT_DONE, }; - co = qemu_coroutine_create(co_entry, &rwco); - qemu_coroutine_enter(co); - BDRV_POLL_WHILE(blk_bs(blk), rwco.ret == NOT_DONE); + if (qemu_in_coroutine()) { + /* Fast-path if already in coroutine context */ + co_entry(&rwco); + } else { + Coroutine *co = qemu_coroutine_create(co_entry, &rwco); + qemu_coroutine_enter(co); + BDRV_POLL_WHILE(blk_bs(blk), rwco.ret == NOT_DONE); + } return rwco.ret; } @@ -979,7 +983,6 @@ static void blk_aio_complete(BlkAioEmAIOCB *acb) static void blk_aio_complete_bh(void *opaque) { BlkAioEmAIOCB *acb = opaque; - assert(acb->has_returned); blk_aio_complete(acb); } diff --git a/block/curl.c b/block/curl.c index 792fef8..2939cc7 100644 --- a/block/curl.c +++ b/block/curl.c @@ -386,9 +386,8 @@ static void curl_multi_check_completion(BDRVCURLState *s) } } -static void curl_multi_do(void *arg) +static void curl_multi_do_locked(CURLState *s) { - CURLState *s = (CURLState *)arg; CURLSocket *socket, *next_socket; int running; int r; @@ -406,12 +405,23 @@ static void curl_multi_do(void *arg) } } +static void curl_multi_do(void *arg) +{ + CURLState *s = (CURLState *)arg; + + aio_context_acquire(s->s->aio_context); + curl_multi_do_locked(s); + aio_context_release(s->s->aio_context); +} + static void curl_multi_read(void *arg) { CURLState *s = (CURLState *)arg; - curl_multi_do(arg); + aio_context_acquire(s->s->aio_context); + curl_multi_do_locked(s); curl_multi_check_completion(s->s); + aio_context_release(s->s->aio_context); } static void curl_multi_timeout_do(void *arg) @@ -424,9 +434,11 @@ static void curl_multi_timeout_do(void *arg) return; } + aio_context_acquire(s->aio_context); curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running); curl_multi_check_completion(s); + aio_context_release(s->aio_context); #else abort(); #endif @@ -784,13 +796,18 @@ static void curl_readv_bh_cb(void *p) { CURLState *state; int running; + int ret = -EINPROGRESS; CURLAIOCB *acb = p; - BDRVCURLState *s = acb->common.bs->opaque; + BlockDriverState *bs = acb->common.bs; + BDRVCURLState *s = bs->opaque; + AioContext *ctx = bdrv_get_aio_context(bs); size_t start = acb->sector_num * BDRV_SECTOR_SIZE; size_t end; + aio_context_acquire(ctx); + // In case we have the requested data already (e.g. read-ahead), // we can just call the callback and be done. switch (curl_find_buf(s, start, acb->nb_sectors * BDRV_SECTOR_SIZE, acb)) { @@ -798,7 +815,7 @@ static void curl_readv_bh_cb(void *p) qemu_aio_unref(acb); // fall through case FIND_RET_WAIT: - return; + goto out; default: break; } @@ -806,9 +823,8 @@ static void curl_readv_bh_cb(void *p) // No cache found, so let's start a new request state = curl_init_state(acb->common.bs, s); if (!state) { - acb->common.cb(acb->common.opaque, -EIO); - qemu_aio_unref(acb); - return; + ret = -EIO; + goto out; } acb->start = 0; @@ -822,9 +838,8 @@ static void curl_readv_bh_cb(void *p) state->orig_buf = g_try_malloc(state->buf_len); if (state->buf_len && state->orig_buf == NULL) { curl_clean_state(state); - acb->common.cb(acb->common.opaque, -ENOMEM); - qemu_aio_unref(acb); - return; + ret = -ENOMEM; + goto out; } state->acb[0] = acb; @@ -837,6 +852,13 @@ static void curl_readv_bh_cb(void *p) /* Tell curl it needs to kick things off */ curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running); + +out: + aio_context_release(ctx); + if (ret != -EINPROGRESS) { + acb->common.cb(acb->common.opaque, ret); + qemu_aio_unref(acb); + } } static BlockAIOCB *curl_aio_readv(BlockDriverState *bs, diff --git a/block/gluster.c b/block/gluster.c index 1a22f29..56b4abe 100644 --- a/block/gluster.c +++ b/block/gluster.c @@ -698,13 +698,6 @@ static struct glfs *qemu_gluster_init(BlockdevOptionsGluster *gconf, return qemu_gluster_glfs_init(gconf, errp); } -static void qemu_gluster_complete_aio(void *opaque) -{ - GlusterAIOCB *acb = (GlusterAIOCB *)opaque; - - qemu_coroutine_enter(acb->coroutine); -} - /* * AIO callback routine called from GlusterFS thread. */ @@ -720,7 +713,7 @@ static void gluster_finish_aiocb(struct glfs_fd *fd, ssize_t ret, void *arg) acb->ret = -EIO; /* Partial read/write - fail it */ } - aio_bh_schedule_oneshot(acb->aio_context, qemu_gluster_complete_aio, acb); + aio_co_schedule(acb->aio_context, acb->coroutine); } static void qemu_gluster_parse_flags(int bdrv_flags, int *open_flags) @@ -189,7 +189,7 @@ static void bdrv_co_drain_bh_cb(void *opaque) bdrv_dec_in_flight(bs); bdrv_drained_begin(bs); data->done = true; - qemu_coroutine_enter(co); + aio_co_wake(co); } static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs) @@ -539,7 +539,7 @@ static bool coroutine_fn wait_serialising_requests(BdrvTrackedRequest *self) * (instead of producing a deadlock in the former case). */ if (!req->waiting_for) { self->waiting_for = req; - qemu_co_queue_wait(&req->wait_queue); + qemu_co_queue_wait(&req->wait_queue, NULL); self->waiting_for = NULL; retry = true; waited = true; @@ -813,7 +813,7 @@ static void bdrv_co_io_em_complete(void *opaque, int ret) CoroutineIOCompletion *co = opaque; co->ret = ret; - qemu_coroutine_enter(co->coroutine); + aio_co_wake(co->coroutine); } static int coroutine_fn bdrv_driver_preadv(BlockDriverState *bs, @@ -2080,6 +2080,11 @@ void bdrv_aio_cancel(BlockAIOCB *acb) if (acb->aiocb_info->get_aio_context) { aio_poll(acb->aiocb_info->get_aio_context(acb), true); } else if (acb->bs) { + /* qemu_aio_ref and qemu_aio_unref are not thread-safe, so + * assert that we're not using an I/O thread. Thread-safe + * code should use bdrv_aio_cancel_async exclusively. + */ + assert(bdrv_get_aio_context(acb->bs) == qemu_get_aio_context()); aio_poll(bdrv_get_aio_context(acb->bs), true); } else { abort(); @@ -2239,35 +2244,6 @@ BlockAIOCB *bdrv_aio_flush(BlockDriverState *bs, return &acb->common; } -void *qemu_aio_get(const AIOCBInfo *aiocb_info, BlockDriverState *bs, - BlockCompletionFunc *cb, void *opaque) -{ - BlockAIOCB *acb; - - acb = g_malloc(aiocb_info->aiocb_size); - acb->aiocb_info = aiocb_info; - acb->bs = bs; - acb->cb = cb; - acb->opaque = opaque; - acb->refcnt = 1; - return acb; -} - -void qemu_aio_ref(void *p) -{ - BlockAIOCB *acb = p; - acb->refcnt++; -} - -void qemu_aio_unref(void *p) -{ - BlockAIOCB *acb = p; - assert(acb->refcnt > 0); - if (--acb->refcnt == 0) { - g_free(acb); - } -} - /**************************************************************/ /* Coroutine block device emulation */ @@ -2299,7 +2275,7 @@ int coroutine_fn bdrv_co_flush(BlockDriverState *bs) /* Wait until any previous flushes are completed */ while (bs->active_flush_req) { - qemu_co_queue_wait(&bs->flush_queue); + qemu_co_queue_wait(&bs->flush_queue, NULL); } bs->active_flush_req = true; diff --git a/block/iscsi.c b/block/iscsi.c index 1860f1b..2561be9 100644 --- a/block/iscsi.c +++ b/block/iscsi.c @@ -165,8 +165,9 @@ iscsi_schedule_bh(IscsiAIOCB *acb) static void iscsi_co_generic_bh_cb(void *opaque) { struct IscsiTask *iTask = opaque; + iTask->complete = 1; - qemu_coroutine_enter(iTask->co); + aio_co_wake(iTask->co); } static void iscsi_retry_timer_expired(void *opaque) @@ -174,7 +175,7 @@ static void iscsi_retry_timer_expired(void *opaque) struct IscsiTask *iTask = opaque; iTask->complete = 1; if (iTask->co) { - qemu_coroutine_enter(iTask->co); + aio_co_wake(iTask->co); } } @@ -394,8 +395,10 @@ iscsi_process_read(void *arg) IscsiLun *iscsilun = arg; struct iscsi_context *iscsi = iscsilun->iscsi; + aio_context_acquire(iscsilun->aio_context); iscsi_service(iscsi, POLLIN); iscsi_set_events(iscsilun); + aio_context_release(iscsilun->aio_context); } static void @@ -404,8 +407,10 @@ iscsi_process_write(void *arg) IscsiLun *iscsilun = arg; struct iscsi_context *iscsi = iscsilun->iscsi; + aio_context_acquire(iscsilun->aio_context); iscsi_service(iscsi, POLLOUT); iscsi_set_events(iscsilun); + aio_context_release(iscsilun->aio_context); } static int64_t sector_lun2qemu(int64_t sector, IscsiLun *iscsilun) @@ -1392,16 +1397,20 @@ static void iscsi_nop_timed_event(void *opaque) { IscsiLun *iscsilun = opaque; + aio_context_acquire(iscsilun->aio_context); if (iscsi_get_nops_in_flight(iscsilun->iscsi) >= MAX_NOP_FAILURES) { error_report("iSCSI: NOP timeout. Reconnecting..."); iscsilun->request_timed_out = true; } else if (iscsi_nop_out_async(iscsilun->iscsi, NULL, NULL, 0, NULL) != 0) { error_report("iSCSI: failed to sent NOP-Out. Disabling NOP messages."); - return; + goto out; } timer_mod(iscsilun->nop_timer, qemu_clock_get_ms(QEMU_CLOCK_REALTIME) + NOP_INTERVAL); iscsi_set_events(iscsilun); + +out: + aio_context_release(iscsilun->aio_context); } static void iscsi_readcapacity_sync(IscsiLun *iscsilun, Error **errp) diff --git a/block/linux-aio.c b/block/linux-aio.c index 03ab741..88b8d55 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -54,10 +54,10 @@ struct LinuxAioState { io_context_t ctx; EventNotifier e; - /* io queue for submit at batch */ + /* io queue for submit at batch. Protected by AioContext lock. */ LaioQueue io_q; - /* I/O completion processing */ + /* I/O completion processing. Only runs in I/O thread. */ QEMUBH *completion_bh; int event_idx; int event_max; @@ -100,7 +100,7 @@ static void qemu_laio_process_completion(struct qemu_laiocb *laiocb) * that! */ if (!qemu_coroutine_entered(laiocb->co)) { - qemu_coroutine_enter(laiocb->co); + aio_co_wake(laiocb->co); } } else { laiocb->common.cb(laiocb->common.opaque, ret); @@ -234,9 +234,12 @@ static void qemu_laio_process_completions(LinuxAioState *s) static void qemu_laio_process_completions_and_submit(LinuxAioState *s) { qemu_laio_process_completions(s); + + aio_context_acquire(s->aio_context); if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) { ioq_submit(s); } + aio_context_release(s->aio_context); } static void qemu_laio_completion_bh(void *opaque) @@ -455,6 +458,7 @@ void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context) { aio_set_event_notifier(old_context, &s->e, false, NULL, NULL); qemu_bh_delete(s->completion_bh); + s->aio_context = NULL; } void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context) diff --git a/block/mirror.c b/block/mirror.c index 301ba92..698a54e 100644 --- a/block/mirror.c +++ b/block/mirror.c @@ -132,6 +132,8 @@ static void mirror_write_complete(void *opaque, int ret) { MirrorOp *op = opaque; MirrorBlockJob *s = op->s; + + aio_context_acquire(blk_get_aio_context(s->common.blk)); if (ret < 0) { BlockErrorAction action; @@ -142,12 +144,15 @@ static void mirror_write_complete(void *opaque, int ret) } } mirror_iteration_done(op, ret); + aio_context_release(blk_get_aio_context(s->common.blk)); } static void mirror_read_complete(void *opaque, int ret) { MirrorOp *op = opaque; MirrorBlockJob *s = op->s; + + aio_context_acquire(blk_get_aio_context(s->common.blk)); if (ret < 0) { BlockErrorAction action; @@ -158,10 +163,11 @@ static void mirror_read_complete(void *opaque, int ret) } mirror_iteration_done(op, ret); - return; + } else { + blk_aio_pwritev(s->target, op->sector_num * BDRV_SECTOR_SIZE, &op->qiov, + 0, mirror_write_complete, op); } - blk_aio_pwritev(s->target, op->sector_num * BDRV_SECTOR_SIZE, &op->qiov, - 0, mirror_write_complete, op); + aio_context_release(blk_get_aio_context(s->common.blk)); } static inline void mirror_clip_sectors(MirrorBlockJob *s, diff --git a/block/nbd-client.c b/block/nbd-client.c index 06f1532..0dc12c2 100644 --- a/block/nbd-client.c +++ b/block/nbd-client.c @@ -33,8 +33,9 @@ #define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs)) #define INDEX_TO_HANDLE(bs, index) ((index) ^ ((uint64_t)(intptr_t)bs)) -static void nbd_recv_coroutines_enter_all(NBDClientSession *s) +static void nbd_recv_coroutines_enter_all(BlockDriverState *bs) { + NBDClientSession *s = nbd_get_client_session(bs); int i; for (i = 0; i < MAX_NBD_REQUESTS; i++) { @@ -42,6 +43,7 @@ static void nbd_recv_coroutines_enter_all(NBDClientSession *s) qemu_coroutine_enter(s->recv_coroutine[i]); } } + BDRV_POLL_WHILE(bs, s->read_reply_co); } static void nbd_teardown_connection(BlockDriverState *bs) @@ -56,7 +58,7 @@ static void nbd_teardown_connection(BlockDriverState *bs) qio_channel_shutdown(client->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); - nbd_recv_coroutines_enter_all(client); + nbd_recv_coroutines_enter_all(bs); nbd_client_detach_aio_context(bs); object_unref(OBJECT(client->sioc)); @@ -65,54 +67,43 @@ static void nbd_teardown_connection(BlockDriverState *bs) client->ioc = NULL; } -static void nbd_reply_ready(void *opaque) +static coroutine_fn void nbd_read_reply_entry(void *opaque) { - BlockDriverState *bs = opaque; - NBDClientSession *s = nbd_get_client_session(bs); + NBDClientSession *s = opaque; uint64_t i; int ret; - if (!s->ioc) { /* Already closed */ - return; - } - - if (s->reply.handle == 0) { - /* No reply already in flight. Fetch a header. It is possible - * that another thread has done the same thing in parallel, so - * the socket is not readable anymore. - */ + for (;;) { + assert(s->reply.handle == 0); ret = nbd_receive_reply(s->ioc, &s->reply); - if (ret == -EAGAIN) { - return; - } if (ret < 0) { - s->reply.handle = 0; - goto fail; + break; } - } - /* There's no need for a mutex on the receive side, because the - * handler acts as a synchronization point and ensures that only - * one coroutine is called until the reply finishes. */ - i = HANDLE_TO_INDEX(s, s->reply.handle); - if (i >= MAX_NBD_REQUESTS) { - goto fail; - } + /* There's no need for a mutex on the receive side, because the + * handler acts as a synchronization point and ensures that only + * one coroutine is called until the reply finishes. + */ + i = HANDLE_TO_INDEX(s, s->reply.handle); + if (i >= MAX_NBD_REQUESTS || !s->recv_coroutine[i]) { + break; + } - if (s->recv_coroutine[i]) { - qemu_coroutine_enter(s->recv_coroutine[i]); - return; + /* We're woken up by the recv_coroutine itself. Note that there + * is no race between yielding and reentering read_reply_co. This + * is because: + * + * - if recv_coroutine[i] runs on the same AioContext, it is only + * entered after we yield + * + * - if recv_coroutine[i] runs on a different AioContext, reentering + * read_reply_co happens through a bottom half, which can only + * run after we yield. + */ + aio_co_wake(s->recv_coroutine[i]); + qemu_coroutine_yield(); } - -fail: - nbd_teardown_connection(bs); -} - -static void nbd_restart_write(void *opaque) -{ - BlockDriverState *bs = opaque; - - qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine); + s->read_reply_co = NULL; } static int nbd_co_send_request(BlockDriverState *bs, @@ -120,7 +111,6 @@ static int nbd_co_send_request(BlockDriverState *bs, QEMUIOVector *qiov) { NBDClientSession *s = nbd_get_client_session(bs); - AioContext *aio_context; int rc, ret, i; qemu_co_mutex_lock(&s->send_mutex); @@ -141,11 +131,6 @@ static int nbd_co_send_request(BlockDriverState *bs, return -EPIPE; } - s->send_coroutine = qemu_coroutine_self(); - aio_context = bdrv_get_aio_context(bs); - - aio_set_fd_handler(aio_context, s->sioc->fd, false, - nbd_reply_ready, nbd_restart_write, NULL, bs); if (qiov) { qio_channel_set_cork(s->ioc, true); rc = nbd_send_request(s->ioc, request); @@ -160,9 +145,6 @@ static int nbd_co_send_request(BlockDriverState *bs, } else { rc = nbd_send_request(s->ioc, request); } - aio_set_fd_handler(aio_context, s->sioc->fd, false, - nbd_reply_ready, NULL, NULL, bs); - s->send_coroutine = NULL; qemu_co_mutex_unlock(&s->send_mutex); return rc; } @@ -174,8 +156,7 @@ static void nbd_co_receive_reply(NBDClientSession *s, { int ret; - /* Wait until we're woken up by the read handler. TODO: perhaps - * peek at the next reply and avoid yielding if it's ours? */ + /* Wait until we're woken up by nbd_read_reply_entry. */ qemu_coroutine_yield(); *reply = s->reply; if (reply->handle != request->handle || @@ -201,7 +182,7 @@ static void nbd_coroutine_start(NBDClientSession *s, /* Poor man semaphore. The free_sema is locked when no other request * can be accepted, and unlocked after receiving one reply. */ if (s->in_flight == MAX_NBD_REQUESTS) { - qemu_co_queue_wait(&s->free_sema); + qemu_co_queue_wait(&s->free_sema, NULL); assert(s->in_flight < MAX_NBD_REQUESTS); } s->in_flight++; @@ -209,13 +190,19 @@ static void nbd_coroutine_start(NBDClientSession *s, /* s->recv_coroutine[i] is set as soon as we get the send_lock. */ } -static void nbd_coroutine_end(NBDClientSession *s, +static void nbd_coroutine_end(BlockDriverState *bs, NBDRequest *request) { + NBDClientSession *s = nbd_get_client_session(bs); int i = HANDLE_TO_INDEX(s, request->handle); + s->recv_coroutine[i] = NULL; - if (s->in_flight-- == MAX_NBD_REQUESTS) { - qemu_co_queue_next(&s->free_sema); + s->in_flight--; + qemu_co_queue_next(&s->free_sema); + + /* Kick the read_reply_co to get the next reply. */ + if (s->read_reply_co) { + aio_co_wake(s->read_reply_co); } } @@ -241,7 +228,7 @@ int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset, } else { nbd_co_receive_reply(client, &request, &reply, qiov); } - nbd_coroutine_end(client, &request); + nbd_coroutine_end(bs, &request); return -reply.error; } @@ -271,7 +258,7 @@ int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset, } else { nbd_co_receive_reply(client, &request, &reply, NULL); } - nbd_coroutine_end(client, &request); + nbd_coroutine_end(bs, &request); return -reply.error; } @@ -306,7 +293,7 @@ int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset, } else { nbd_co_receive_reply(client, &request, &reply, NULL); } - nbd_coroutine_end(client, &request); + nbd_coroutine_end(bs, &request); return -reply.error; } @@ -331,7 +318,7 @@ int nbd_client_co_flush(BlockDriverState *bs) } else { nbd_co_receive_reply(client, &request, &reply, NULL); } - nbd_coroutine_end(client, &request); + nbd_coroutine_end(bs, &request); return -reply.error; } @@ -357,23 +344,23 @@ int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int count) } else { nbd_co_receive_reply(client, &request, &reply, NULL); } - nbd_coroutine_end(client, &request); + nbd_coroutine_end(bs, &request); return -reply.error; } void nbd_client_detach_aio_context(BlockDriverState *bs) { - aio_set_fd_handler(bdrv_get_aio_context(bs), - nbd_get_client_session(bs)->sioc->fd, - false, NULL, NULL, NULL, NULL); + NBDClientSession *client = nbd_get_client_session(bs); + qio_channel_detach_aio_context(QIO_CHANNEL(client->sioc)); } void nbd_client_attach_aio_context(BlockDriverState *bs, AioContext *new_context) { - aio_set_fd_handler(new_context, nbd_get_client_session(bs)->sioc->fd, - false, nbd_reply_ready, NULL, NULL, bs); + NBDClientSession *client = nbd_get_client_session(bs); + qio_channel_attach_aio_context(QIO_CHANNEL(client->sioc), new_context); + aio_co_schedule(new_context, client->read_reply_co); } void nbd_client_close(BlockDriverState *bs) @@ -434,7 +421,7 @@ int nbd_client_init(BlockDriverState *bs, /* Now that we're connected, set the socket to be non-blocking and * kick the reply mechanism. */ qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL); - + client->read_reply_co = qemu_coroutine_create(nbd_read_reply_entry, client); nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs)); logout("Established connection with NBD server\n"); diff --git a/block/nbd-client.h b/block/nbd-client.h index f8d6006..8cdfc92 100644 --- a/block/nbd-client.h +++ b/block/nbd-client.h @@ -25,7 +25,7 @@ typedef struct NBDClientSession { CoMutex send_mutex; CoQueue free_sema; - Coroutine *send_coroutine; + Coroutine *read_reply_co; int in_flight; Coroutine *recv_coroutine[MAX_NBD_REQUESTS]; diff --git a/block/nfs.c b/block/nfs.c index 689eaa7..08b43dd 100644 --- a/block/nfs.c +++ b/block/nfs.c @@ -208,15 +208,21 @@ static void nfs_set_events(NFSClient *client) static void nfs_process_read(void *arg) { NFSClient *client = arg; + + aio_context_acquire(client->aio_context); nfs_service(client->context, POLLIN); nfs_set_events(client); + aio_context_release(client->aio_context); } static void nfs_process_write(void *arg) { NFSClient *client = arg; + + aio_context_acquire(client->aio_context); nfs_service(client->context, POLLOUT); nfs_set_events(client); + aio_context_release(client->aio_context); } static void nfs_co_init_task(BlockDriverState *bs, NFSRPC *task) @@ -231,8 +237,9 @@ static void nfs_co_init_task(BlockDriverState *bs, NFSRPC *task) static void nfs_co_generic_bh_cb(void *opaque) { NFSRPC *task = opaque; + task->complete = 1; - qemu_coroutine_enter(task->co); + aio_co_wake(task->co); } static void diff --git a/block/qcow2-cluster.c b/block/qcow2-cluster.c index 928c1e2..78c11d4 100644 --- a/block/qcow2-cluster.c +++ b/block/qcow2-cluster.c @@ -932,9 +932,7 @@ static int handle_dependencies(BlockDriverState *bs, uint64_t guest_offset, if (bytes == 0) { /* Wait for the dependency to complete. We need to recheck * the free/allocated clusters when we continue. */ - qemu_co_mutex_unlock(&s->lock); - qemu_co_queue_wait(&old_alloc->dependent_requests); - qemu_co_mutex_lock(&s->lock); + qemu_co_queue_wait(&old_alloc->dependent_requests, &s->lock); return -EAGAIN; } } diff --git a/block/qed-cluster.c b/block/qed-cluster.c index c24e756..8f5da74 100644 --- a/block/qed-cluster.c +++ b/block/qed-cluster.c @@ -83,6 +83,7 @@ static void qed_find_cluster_cb(void *opaque, int ret) unsigned int index; unsigned int n; + qed_acquire(s); if (ret) { goto out; } @@ -109,6 +110,7 @@ static void qed_find_cluster_cb(void *opaque, int ret) out: find_cluster_cb->cb(find_cluster_cb->opaque, ret, offset, len); + qed_release(s); g_free(find_cluster_cb); } diff --git a/block/qed-table.c b/block/qed-table.c index ed443e2..b12c298 100644 --- a/block/qed-table.c +++ b/block/qed-table.c @@ -31,6 +31,7 @@ static void qed_read_table_cb(void *opaque, int ret) { QEDReadTableCB *read_table_cb = opaque; QEDTable *table = read_table_cb->table; + BDRVQEDState *s = read_table_cb->s; int noffsets = read_table_cb->qiov.size / sizeof(uint64_t); int i; @@ -40,13 +41,15 @@ static void qed_read_table_cb(void *opaque, int ret) } /* Byteswap offsets */ + qed_acquire(s); for (i = 0; i < noffsets; i++) { table->offsets[i] = le64_to_cpu(table->offsets[i]); } + qed_release(s); out: /* Completion */ - trace_qed_read_table_cb(read_table_cb->s, read_table_cb->table, ret); + trace_qed_read_table_cb(s, read_table_cb->table, ret); gencb_complete(&read_table_cb->gencb, ret); } @@ -84,8 +87,9 @@ typedef struct { static void qed_write_table_cb(void *opaque, int ret) { QEDWriteTableCB *write_table_cb = opaque; + BDRVQEDState *s = write_table_cb->s; - trace_qed_write_table_cb(write_table_cb->s, + trace_qed_write_table_cb(s, write_table_cb->orig_table, write_table_cb->flush, ret); @@ -97,8 +101,10 @@ static void qed_write_table_cb(void *opaque, int ret) if (write_table_cb->flush) { /* We still need to flush first */ write_table_cb->flush = false; + qed_acquire(s); bdrv_aio_flush(write_table_cb->s->bs, qed_write_table_cb, write_table_cb); + qed_release(s); return; } @@ -213,6 +219,7 @@ static void qed_read_l2_table_cb(void *opaque, int ret) CachedL2Table *l2_table = request->l2_table; uint64_t l2_offset = read_l2_table_cb->l2_offset; + qed_acquire(s); if (ret) { /* can't trust loaded L2 table anymore */ qed_unref_l2_cache_entry(l2_table); @@ -228,6 +235,7 @@ static void qed_read_l2_table_cb(void *opaque, int ret) request->l2_table = qed_find_l2_cache_entry(&s->l2_cache, l2_offset); assert(request->l2_table != NULL); } + qed_release(s); gencb_complete(&read_l2_table_cb->gencb, ret); } diff --git a/block/qed.c b/block/qed.c index 1a7ef0a..0b62c77 100644 --- a/block/qed.c +++ b/block/qed.c @@ -273,7 +273,19 @@ static CachedL2Table *qed_new_l2_table(BDRVQEDState *s) return l2_table; } -static void qed_aio_next_io(void *opaque, int ret); +static void qed_aio_next_io(QEDAIOCB *acb, int ret); + +static void qed_aio_start_io(QEDAIOCB *acb) +{ + qed_aio_next_io(acb, 0); +} + +static void qed_aio_next_io_cb(void *opaque, int ret) +{ + QEDAIOCB *acb = opaque; + + qed_aio_next_io(acb, ret); +} static void qed_plug_allocating_write_reqs(BDRVQEDState *s) { @@ -292,7 +304,7 @@ static void qed_unplug_allocating_write_reqs(BDRVQEDState *s) acb = QSIMPLEQ_FIRST(&s->allocating_write_reqs); if (acb) { - qed_aio_next_io(acb, 0); + qed_aio_start_io(acb); } } @@ -333,10 +345,22 @@ static void qed_need_check_timer_cb(void *opaque) trace_qed_need_check_timer_cb(s); + qed_acquire(s); qed_plug_allocating_write_reqs(s); /* Ensure writes are on disk before clearing flag */ bdrv_aio_flush(s->bs->file->bs, qed_clear_need_check, s); + qed_release(s); +} + +void qed_acquire(BDRVQEDState *s) +{ + aio_context_acquire(bdrv_get_aio_context(s->bs)); +} + +void qed_release(BDRVQEDState *s) +{ + aio_context_release(bdrv_get_aio_context(s->bs)); } static void qed_start_need_check_timer(BDRVQEDState *s) @@ -721,7 +745,7 @@ static void qed_is_allocated_cb(void *opaque, int ret, uint64_t offset, size_t l } if (cb->co) { - qemu_coroutine_enter(cb->co); + aio_co_wake(cb->co); } } @@ -918,6 +942,7 @@ static void qed_update_l2_table(BDRVQEDState *s, QEDTable *table, int index, static void qed_aio_complete_bh(void *opaque) { QEDAIOCB *acb = opaque; + BDRVQEDState *s = acb_to_s(acb); BlockCompletionFunc *cb = acb->common.cb; void *user_opaque = acb->common.opaque; int ret = acb->bh_ret; @@ -925,7 +950,9 @@ static void qed_aio_complete_bh(void *opaque) qemu_aio_unref(acb); /* Invoke callback */ + qed_acquire(s); cb(user_opaque, ret); + qed_release(s); } static void qed_aio_complete(QEDAIOCB *acb, int ret) @@ -959,7 +986,7 @@ static void qed_aio_complete(QEDAIOCB *acb, int ret) QSIMPLEQ_REMOVE_HEAD(&s->allocating_write_reqs, next); acb = QSIMPLEQ_FIRST(&s->allocating_write_reqs); if (acb) { - qed_aio_next_io(acb, 0); + qed_aio_start_io(acb); } else if (s->header.features & QED_F_NEED_CHECK) { qed_start_need_check_timer(s); } @@ -984,7 +1011,7 @@ static void qed_commit_l2_update(void *opaque, int ret) acb->request.l2_table = qed_find_l2_cache_entry(&s->l2_cache, l2_offset); assert(acb->request.l2_table != NULL); - qed_aio_next_io(opaque, ret); + qed_aio_next_io(acb, ret); } /** @@ -1032,11 +1059,11 @@ static void qed_aio_write_l2_update(QEDAIOCB *acb, int ret, uint64_t offset) if (need_alloc) { /* Write out the whole new L2 table */ qed_write_l2_table(s, &acb->request, 0, s->table_nelems, true, - qed_aio_write_l1_update, acb); + qed_aio_write_l1_update, acb); } else { /* Write out only the updated part of the L2 table */ qed_write_l2_table(s, &acb->request, index, acb->cur_nclusters, false, - qed_aio_next_io, acb); + qed_aio_next_io_cb, acb); } return; @@ -1088,7 +1115,7 @@ static void qed_aio_write_main(void *opaque, int ret) } if (acb->find_cluster_ret == QED_CLUSTER_FOUND) { - next_fn = qed_aio_next_io; + next_fn = qed_aio_next_io_cb; } else { if (s->bs->backing) { next_fn = qed_aio_write_flush_before_l2_update; @@ -1201,7 +1228,7 @@ static void qed_aio_write_alloc(QEDAIOCB *acb, size_t len) if (acb->flags & QED_AIOCB_ZERO) { /* Skip ahead if the clusters are already zero */ if (acb->find_cluster_ret == QED_CLUSTER_ZERO) { - qed_aio_next_io(acb, 0); + qed_aio_start_io(acb); return; } @@ -1321,18 +1348,18 @@ static void qed_aio_read_data(void *opaque, int ret, /* Handle zero cluster and backing file reads */ if (ret == QED_CLUSTER_ZERO) { qemu_iovec_memset(&acb->cur_qiov, 0, 0, acb->cur_qiov.size); - qed_aio_next_io(acb, 0); + qed_aio_start_io(acb); return; } else if (ret != QED_CLUSTER_FOUND) { qed_read_backing_file(s, acb->cur_pos, &acb->cur_qiov, - &acb->backing_qiov, qed_aio_next_io, acb); + &acb->backing_qiov, qed_aio_next_io_cb, acb); return; } BLKDBG_EVENT(bs->file, BLKDBG_READ_AIO); bdrv_aio_readv(bs->file, offset / BDRV_SECTOR_SIZE, &acb->cur_qiov, acb->cur_qiov.size / BDRV_SECTOR_SIZE, - qed_aio_next_io, acb); + qed_aio_next_io_cb, acb); return; err: @@ -1342,9 +1369,8 @@ err: /** * Begin next I/O or complete the request */ -static void qed_aio_next_io(void *opaque, int ret) +static void qed_aio_next_io(QEDAIOCB *acb, int ret) { - QEDAIOCB *acb = opaque; BDRVQEDState *s = acb_to_s(acb); QEDFindClusterFunc *io_fn = (acb->flags & QED_AIOCB_WRITE) ? qed_aio_write_data : qed_aio_read_data; @@ -1400,7 +1426,7 @@ static BlockAIOCB *qed_aio_setup(BlockDriverState *bs, qemu_iovec_init(&acb->cur_qiov, qiov->niov); /* Start request */ - qed_aio_next_io(acb, 0); + qed_aio_start_io(acb); return &acb->common; } @@ -1436,7 +1462,7 @@ static void coroutine_fn qed_co_pwrite_zeroes_cb(void *opaque, int ret) cb->done = true; cb->ret = ret; if (cb->co) { - qemu_coroutine_enter(cb->co); + aio_co_wake(cb->co); } } diff --git a/block/qed.h b/block/qed.h index 9676ab9..ce8c314 100644 --- a/block/qed.h +++ b/block/qed.h @@ -198,6 +198,9 @@ enum { */ typedef void QEDFindClusterFunc(void *opaque, int ret, uint64_t offset, size_t len); +void qed_acquire(BDRVQEDState *s); +void qed_release(BDRVQEDState *s); + /** * Generic callback for chaining async callbacks */ diff --git a/block/sheepdog.c b/block/sheepdog.c index f757157..860ba61 100644 --- a/block/sheepdog.c +++ b/block/sheepdog.c @@ -486,7 +486,7 @@ static void wait_for_overlapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *acb) retry: QLIST_FOREACH(cb, &s->inflight_aiocb_head, aiocb_siblings) { if (AIOCBOverlapping(acb, cb)) { - qemu_co_queue_wait(&s->overlapping_queue); + qemu_co_queue_wait(&s->overlapping_queue, NULL); goto retry; } } @@ -575,13 +575,6 @@ static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data, return ret; } -static void restart_co_req(void *opaque) -{ - Coroutine *co = opaque; - - qemu_coroutine_enter(co); -} - typedef struct SheepdogReqCo { int sockfd; BlockDriverState *bs; @@ -592,12 +585,19 @@ typedef struct SheepdogReqCo { unsigned int *rlen; int ret; bool finished; + Coroutine *co; } SheepdogReqCo; +static void restart_co_req(void *opaque) +{ + SheepdogReqCo *srco = opaque; + + aio_co_wake(srco->co); +} + static coroutine_fn void do_co_req(void *opaque) { int ret; - Coroutine *co; SheepdogReqCo *srco = opaque; int sockfd = srco->sockfd; SheepdogReq *hdr = srco->hdr; @@ -605,9 +605,9 @@ static coroutine_fn void do_co_req(void *opaque) unsigned int *wlen = srco->wlen; unsigned int *rlen = srco->rlen; - co = qemu_coroutine_self(); + srco->co = qemu_coroutine_self(); aio_set_fd_handler(srco->aio_context, sockfd, false, - NULL, restart_co_req, NULL, co); + NULL, restart_co_req, NULL, srco); ret = send_co_req(sockfd, hdr, data, wlen); if (ret < 0) { @@ -615,7 +615,7 @@ static coroutine_fn void do_co_req(void *opaque) } aio_set_fd_handler(srco->aio_context, sockfd, false, - restart_co_req, NULL, NULL, co); + restart_co_req, NULL, NULL, srco); ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr)); if (ret != sizeof(*hdr)) { @@ -643,6 +643,7 @@ out: aio_set_fd_handler(srco->aio_context, sockfd, false, NULL, NULL, NULL, NULL); + srco->co = NULL; srco->ret = ret; srco->finished = true; if (srco->bs) { @@ -866,7 +867,7 @@ static void coroutine_fn aio_read_response(void *opaque) * We've finished all requests which belong to the AIOCB, so * we can switch back to sd_co_readv/writev now. */ - qemu_coroutine_enter(acb->coroutine); + aio_co_wake(acb->coroutine); } return; @@ -883,14 +884,14 @@ static void co_read_response(void *opaque) s->co_recv = qemu_coroutine_create(aio_read_response, opaque); } - qemu_coroutine_enter(s->co_recv); + aio_co_wake(s->co_recv); } static void co_write_request(void *opaque) { BDRVSheepdogState *s = opaque; - qemu_coroutine_enter(s->co_send); + aio_co_wake(s->co_send); } /* diff --git a/block/ssh.c b/block/ssh.c index e0edf20..835932e 100644 --- a/block/ssh.c +++ b/block/ssh.c @@ -889,10 +889,14 @@ static void restart_coroutine(void *opaque) DPRINTF("co=%p", co); - qemu_coroutine_enter(co); + aio_co_wake(co); } -static coroutine_fn void set_fd_handler(BDRVSSHState *s, BlockDriverState *bs) +/* A non-blocking call returned EAGAIN, so yield, ensuring the + * handlers are set up so that we'll be rescheduled when there is an + * interesting event on the socket. + */ +static coroutine_fn void co_yield(BDRVSSHState *s, BlockDriverState *bs) { int r; IOHandler *rd_handler = NULL, *wr_handler = NULL; @@ -912,25 +916,10 @@ static coroutine_fn void set_fd_handler(BDRVSSHState *s, BlockDriverState *bs) aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock, false, rd_handler, wr_handler, NULL, co); -} - -static coroutine_fn void clear_fd_handler(BDRVSSHState *s, - BlockDriverState *bs) -{ - DPRINTF("s->sock=%d", s->sock); - aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock, - false, NULL, NULL, NULL, NULL); -} - -/* A non-blocking call returned EAGAIN, so yield, ensuring the - * handlers are set up so that we'll be rescheduled when there is an - * interesting event on the socket. - */ -static coroutine_fn void co_yield(BDRVSSHState *s, BlockDriverState *bs) -{ - set_fd_handler(s, bs); qemu_coroutine_yield(); - clear_fd_handler(s, bs); + DPRINTF("s->sock=%d - back", s->sock); + aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock, false, + NULL, NULL, NULL, NULL); } /* SFTP has a function `libssh2_sftp_seek64' which seeks to a position diff --git a/block/throttle-groups.c b/block/throttle-groups.c index 17b2efb..b73e7a8 100644 --- a/block/throttle-groups.c +++ b/block/throttle-groups.c @@ -326,7 +326,7 @@ void coroutine_fn throttle_group_co_io_limits_intercept(BlockBackend *blk, if (must_wait || blkp->pending_reqs[is_write]) { blkp->pending_reqs[is_write]++; qemu_mutex_unlock(&tg->lock); - qemu_co_queue_wait(&blkp->throttled_reqs[is_write]); + qemu_co_queue_wait(&blkp->throttled_reqs[is_write], NULL); qemu_mutex_lock(&tg->lock); blkp->pending_reqs[is_write]--; } @@ -416,7 +416,9 @@ static void timer_cb(BlockBackend *blk, bool is_write) qemu_mutex_unlock(&tg->lock); /* Run the request that was waiting for this timer */ + aio_context_acquire(blk_get_aio_context(blk)); empty_queue = !qemu_co_enter_next(&blkp->throttled_reqs[is_write]); + aio_context_release(blk_get_aio_context(blk)); /* If the request queue was empty then we have to take care of * scheduling the next one */ diff --git a/block/win32-aio.c b/block/win32-aio.c index 8cdf73b..3be8f45 100644 --- a/block/win32-aio.c +++ b/block/win32-aio.c @@ -41,7 +41,7 @@ struct QEMUWin32AIOState { HANDLE hIOCP; EventNotifier e; int count; - bool is_aio_context_attached; + AioContext *aio_ctx; }; typedef struct QEMUWin32AIOCB { @@ -87,7 +87,6 @@ static void win32_aio_process_completion(QEMUWin32AIOState *s, qemu_vfree(waiocb->buf); } - waiocb->common.cb(waiocb->common.opaque, ret); qemu_aio_unref(waiocb); } @@ -176,13 +175,13 @@ void win32_aio_detach_aio_context(QEMUWin32AIOState *aio, AioContext *old_context) { aio_set_event_notifier(old_context, &aio->e, false, NULL, NULL); - aio->is_aio_context_attached = false; + aio->aio_ctx = NULL; } void win32_aio_attach_aio_context(QEMUWin32AIOState *aio, AioContext *new_context) { - aio->is_aio_context_attached = true; + aio->aio_ctx = new_context; aio_set_event_notifier(new_context, &aio->e, false, win32_aio_completion_cb, NULL); } @@ -212,7 +211,7 @@ out_free_state: void win32_aio_cleanup(QEMUWin32AIOState *aio) { - assert(!aio->is_aio_context_attached); + assert(!aio->aio_ctx); CloseHandle(aio->hIOCP); event_notifier_cleanup(&aio->e); g_free(aio); diff --git a/dma-helpers.c b/dma-helpers.c index 97157cc..2d7e02d 100644 --- a/dma-helpers.c +++ b/dma-helpers.c @@ -166,8 +166,10 @@ static void dma_blk_cb(void *opaque, int ret) QEMU_ALIGN_DOWN(dbs->iov.size, dbs->align)); } + aio_context_acquire(dbs->ctx); dbs->acb = dbs->io_func(dbs->offset, &dbs->iov, dma_blk_cb, dbs, dbs->io_func_opaque); + aio_context_release(dbs->ctx); assert(dbs->acb); } diff --git a/hw/9pfs/9p.c b/hw/9pfs/9p.c index 99e9472..3af1c93 100644 --- a/hw/9pfs/9p.c +++ b/hw/9pfs/9p.c @@ -2374,7 +2374,7 @@ static void coroutine_fn v9fs_flush(void *opaque) /* * Wait for pdu to complete. */ - qemu_co_queue_wait(&cancel_pdu->complete); + qemu_co_queue_wait(&cancel_pdu->complete, NULL); cancel_pdu->cancelled = 0; pdu_free(cancel_pdu); } diff --git a/hw/block/virtio-blk.c b/hw/block/virtio-blk.c index baaa195..843bd2f 100644 --- a/hw/block/virtio-blk.c +++ b/hw/block/virtio-blk.c @@ -89,7 +89,9 @@ static int virtio_blk_handle_rw_error(VirtIOBlockReq *req, int error, static void virtio_blk_rw_complete(void *opaque, int ret) { VirtIOBlockReq *next = opaque; + VirtIOBlock *s = next->dev; + aio_context_acquire(blk_get_aio_context(s->conf.conf.blk)); while (next) { VirtIOBlockReq *req = next; next = req->mr_next; @@ -122,21 +124,27 @@ static void virtio_blk_rw_complete(void *opaque, int ret) block_acct_done(blk_get_stats(req->dev->blk), &req->acct); virtio_blk_free_request(req); } + aio_context_release(blk_get_aio_context(s->conf.conf.blk)); } static void virtio_blk_flush_complete(void *opaque, int ret) { VirtIOBlockReq *req = opaque; + VirtIOBlock *s = req->dev; + aio_context_acquire(blk_get_aio_context(s->conf.conf.blk)); if (ret) { if (virtio_blk_handle_rw_error(req, -ret, 0)) { - return; + goto out; } } virtio_blk_req_complete(req, VIRTIO_BLK_S_OK); block_acct_done(blk_get_stats(req->dev->blk), &req->acct); virtio_blk_free_request(req); + +out: + aio_context_release(blk_get_aio_context(s->conf.conf.blk)); } #ifdef __linux__ @@ -150,7 +158,8 @@ static void virtio_blk_ioctl_complete(void *opaque, int status) { VirtIOBlockIoctlReq *ioctl_req = opaque; VirtIOBlockReq *req = ioctl_req->req; - VirtIODevice *vdev = VIRTIO_DEVICE(req->dev); + VirtIOBlock *s = req->dev; + VirtIODevice *vdev = VIRTIO_DEVICE(s); struct virtio_scsi_inhdr *scsi; struct sg_io_hdr *hdr; @@ -182,8 +191,10 @@ static void virtio_blk_ioctl_complete(void *opaque, int status) virtio_stl_p(vdev, &scsi->data_len, hdr->dxfer_len); out: + aio_context_acquire(blk_get_aio_context(s->conf.conf.blk)); virtio_blk_req_complete(req, status); virtio_blk_free_request(req); + aio_context_release(blk_get_aio_context(s->conf.conf.blk)); g_free(ioctl_req); } @@ -587,6 +598,7 @@ bool virtio_blk_handle_vq(VirtIOBlock *s, VirtQueue *vq) MultiReqBuffer mrb = {}; bool progress = false; + aio_context_acquire(blk_get_aio_context(s->blk)); blk_io_plug(s->blk); do { @@ -609,6 +621,7 @@ bool virtio_blk_handle_vq(VirtIOBlock *s, VirtQueue *vq) } blk_io_unplug(s->blk); + aio_context_release(blk_get_aio_context(s->blk)); return progress; } @@ -644,6 +657,7 @@ static void virtio_blk_dma_restart_bh(void *opaque) s->rq = NULL; + aio_context_acquire(blk_get_aio_context(s->conf.conf.blk)); while (req) { VirtIOBlockReq *next = req->next; if (virtio_blk_handle_request(req, &mrb)) { @@ -664,6 +678,7 @@ static void virtio_blk_dma_restart_bh(void *opaque) if (mrb.num_reqs) { virtio_blk_submit_multireq(s->blk, &mrb); } + aio_context_release(blk_get_aio_context(s->conf.conf.blk)); } static void virtio_blk_dma_restart_cb(void *opaque, int running, diff --git a/hw/scsi/scsi-bus.c b/hw/scsi/scsi-bus.c index 5940cb1..c9f0ac0 100644 --- a/hw/scsi/scsi-bus.c +++ b/hw/scsi/scsi-bus.c @@ -105,6 +105,7 @@ static void scsi_dma_restart_bh(void *opaque) qemu_bh_delete(s->bh); s->bh = NULL; + aio_context_acquire(blk_get_aio_context(s->conf.blk)); QTAILQ_FOREACH_SAFE(req, &s->requests, next, next) { scsi_req_ref(req); if (req->retry) { @@ -122,6 +123,7 @@ static void scsi_dma_restart_bh(void *opaque) } scsi_req_unref(req); } + aio_context_release(blk_get_aio_context(s->conf.blk)); } void scsi_req_retry(SCSIRequest *req) diff --git a/hw/scsi/scsi-disk.c b/hw/scsi/scsi-disk.c index cc06fe5..bbfb5dc 100644 --- a/hw/scsi/scsi-disk.c +++ b/hw/scsi/scsi-disk.c @@ -207,6 +207,7 @@ static void scsi_aio_complete(void *opaque, int ret) assert(r->req.aiocb != NULL); r->req.aiocb = NULL; + aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk)); if (scsi_disk_req_check_error(r, ret, true)) { goto done; } @@ -215,6 +216,7 @@ static void scsi_aio_complete(void *opaque, int ret) scsi_req_complete(&r->req, GOOD); done: + aio_context_release(blk_get_aio_context(s->qdev.conf.blk)); scsi_req_unref(&r->req); } @@ -290,12 +292,14 @@ static void scsi_dma_complete(void *opaque, int ret) assert(r->req.aiocb != NULL); r->req.aiocb = NULL; + aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk)); if (ret < 0) { block_acct_failed(blk_get_stats(s->qdev.conf.blk), &r->acct); } else { block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct); } scsi_dma_complete_noio(r, ret); + aio_context_release(blk_get_aio_context(s->qdev.conf.blk)); } static void scsi_read_complete(void * opaque, int ret) @@ -306,6 +310,7 @@ static void scsi_read_complete(void * opaque, int ret) assert(r->req.aiocb != NULL); r->req.aiocb = NULL; + aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk)); if (scsi_disk_req_check_error(r, ret, true)) { goto done; } @@ -320,6 +325,7 @@ static void scsi_read_complete(void * opaque, int ret) done: scsi_req_unref(&r->req); + aio_context_release(blk_get_aio_context(s->qdev.conf.blk)); } /* Actually issue a read to the block device. */ @@ -364,12 +370,14 @@ static void scsi_do_read_cb(void *opaque, int ret) assert (r->req.aiocb != NULL); r->req.aiocb = NULL; + aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk)); if (ret < 0) { block_acct_failed(blk_get_stats(s->qdev.conf.blk), &r->acct); } else { block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct); } scsi_do_read(opaque, ret); + aio_context_release(blk_get_aio_context(s->qdev.conf.blk)); } /* Read more data from scsi device into buffer. */ @@ -489,12 +497,14 @@ static void scsi_write_complete(void * opaque, int ret) assert (r->req.aiocb != NULL); r->req.aiocb = NULL; + aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk)); if (ret < 0) { block_acct_failed(blk_get_stats(s->qdev.conf.blk), &r->acct); } else { block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct); } scsi_write_complete_noio(r, ret); + aio_context_release(blk_get_aio_context(s->qdev.conf.blk)); } static void scsi_write_data(SCSIRequest *req) @@ -1625,11 +1635,14 @@ static void scsi_unmap_complete(void *opaque, int ret) { UnmapCBData *data = opaque; SCSIDiskReq *r = data->r; + SCSIDiskState *s = DO_UPCAST(SCSIDiskState, qdev, r->req.dev); assert(r->req.aiocb != NULL); r->req.aiocb = NULL; + aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk)); scsi_unmap_complete_noio(data, ret); + aio_context_release(blk_get_aio_context(s->qdev.conf.blk)); } static void scsi_disk_emulate_unmap(SCSIDiskReq *r, uint8_t *inbuf) @@ -1696,6 +1709,7 @@ static void scsi_write_same_complete(void *opaque, int ret) assert(r->req.aiocb != NULL); r->req.aiocb = NULL; + aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk)); if (scsi_disk_req_check_error(r, ret, true)) { goto done; } @@ -1724,6 +1738,7 @@ done: scsi_req_unref(&r->req); qemu_vfree(data->iov.iov_base); g_free(data); + aio_context_release(blk_get_aio_context(s->qdev.conf.blk)); } static void scsi_disk_emulate_write_same(SCSIDiskReq *r, uint8_t *inbuf) diff --git a/hw/scsi/scsi-generic.c b/hw/scsi/scsi-generic.c index 92f091a..2933119 100644 --- a/hw/scsi/scsi-generic.c +++ b/hw/scsi/scsi-generic.c @@ -143,10 +143,14 @@ done: static void scsi_command_complete(void *opaque, int ret) { SCSIGenericReq *r = (SCSIGenericReq *)opaque; + SCSIDevice *s = r->req.dev; assert(r->req.aiocb != NULL); r->req.aiocb = NULL; + + aio_context_acquire(blk_get_aio_context(s->conf.blk)); scsi_command_complete_noio(r, ret); + aio_context_release(blk_get_aio_context(s->conf.blk)); } static int execute_command(BlockBackend *blk, @@ -182,9 +186,11 @@ static void scsi_read_complete(void * opaque, int ret) assert(r->req.aiocb != NULL); r->req.aiocb = NULL; + aio_context_acquire(blk_get_aio_context(s->conf.blk)); + if (ret || r->req.io_canceled) { scsi_command_complete_noio(r, ret); - return; + goto done; } len = r->io_header.dxfer_len - r->io_header.resid; @@ -193,7 +199,7 @@ static void scsi_read_complete(void * opaque, int ret) r->len = -1; if (len == 0) { scsi_command_complete_noio(r, 0); - return; + goto done; } /* Snoop READ CAPACITY output to set the blocksize. */ @@ -237,6 +243,9 @@ static void scsi_read_complete(void * opaque, int ret) } scsi_req_data(&r->req, len); scsi_req_unref(&r->req); + +done: + aio_context_release(blk_get_aio_context(s->conf.blk)); } /* Read more data from scsi device into buffer. */ @@ -272,9 +281,11 @@ static void scsi_write_complete(void * opaque, int ret) assert(r->req.aiocb != NULL); r->req.aiocb = NULL; + aio_context_acquire(blk_get_aio_context(s->conf.blk)); + if (ret || r->req.io_canceled) { scsi_command_complete_noio(r, ret); - return; + goto done; } if (r->req.cmd.buf[0] == MODE_SELECT && r->req.cmd.buf[4] == 12 && @@ -284,6 +295,9 @@ static void scsi_write_complete(void * opaque, int ret) } scsi_command_complete_noio(r, ret); + +done: + aio_context_release(blk_get_aio_context(s->conf.blk)); } /* Write data to a scsi device. Returns nonzero on failure. diff --git a/hw/scsi/virtio-scsi.c b/hw/scsi/virtio-scsi.c index b01030b..9e6f0e8 100644 --- a/hw/scsi/virtio-scsi.c +++ b/hw/scsi/virtio-scsi.c @@ -441,10 +441,12 @@ bool virtio_scsi_handle_ctrl_vq(VirtIOSCSI *s, VirtQueue *vq) VirtIOSCSIReq *req; bool progress = false; + virtio_scsi_acquire(s); while ((req = virtio_scsi_pop_req(s, vq))) { progress = true; virtio_scsi_handle_ctrl_req(s, req); } + virtio_scsi_release(s); return progress; } @@ -602,6 +604,7 @@ bool virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq) QTAILQ_HEAD(, VirtIOSCSIReq) reqs = QTAILQ_HEAD_INITIALIZER(reqs); + virtio_scsi_acquire(s); do { virtio_queue_set_notification(vq, 0); @@ -629,6 +632,7 @@ bool virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq) QTAILQ_FOREACH_SAFE(req, &reqs, next, next) { virtio_scsi_handle_cmd_req_submit(s, req); } + virtio_scsi_release(s); return progress; } @@ -760,10 +764,13 @@ out: bool virtio_scsi_handle_event_vq(VirtIOSCSI *s, VirtQueue *vq) { + virtio_scsi_acquire(s); if (s->events_dropped) { virtio_scsi_push_event(s, NULL, VIRTIO_SCSI_T_NO_EVENT, 0); + virtio_scsi_release(s); return true; } + virtio_scsi_release(s); return false; } diff --git a/include/block/aio.h b/include/block/aio.h index 7df271d..677b6ff 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -47,6 +47,7 @@ typedef void QEMUBHFunc(void *opaque); typedef bool AioPollFn(void *opaque); typedef void IOHandler(void *opaque); +struct Coroutine; struct ThreadPool; struct LinuxAioState; @@ -108,6 +109,9 @@ struct AioContext { bool notified; EventNotifier notifier; + QSLIST_HEAD(, Coroutine) scheduled_coroutines; + QEMUBH *co_schedule_bh; + /* Thread pool for performing work and receiving completion callbacks. * Has its own locking. */ @@ -306,12 +310,8 @@ bool aio_pending(AioContext *ctx); /* Dispatch any pending callbacks from the GSource attached to the AioContext. * * This is used internally in the implementation of the GSource. - * - * @dispatch_fds: true to process fds, false to skip them - * (can be used as an optimization by callers that know there - * are no fds ready) */ -bool aio_dispatch(AioContext *ctx, bool dispatch_fds); +void aio_dispatch(AioContext *ctx); /* Progress in completing AIO work to occur. This can issue new pending * aio as a result of executing I/O completion or bh callbacks. @@ -483,6 +483,34 @@ static inline bool aio_node_check(AioContext *ctx, bool is_external) } /** + * aio_co_schedule: + * @ctx: the aio context + * @co: the coroutine + * + * Start a coroutine on a remote AioContext. + * + * The coroutine must not be entered by anyone else while aio_co_schedule() + * is active. In addition the coroutine must have yielded unless ctx + * is the context in which the coroutine is running (i.e. the value of + * qemu_get_current_aio_context() from the coroutine itself). + */ +void aio_co_schedule(AioContext *ctx, struct Coroutine *co); + +/** + * aio_co_wake: + * @co: the coroutine + * + * Restart a coroutine on the AioContext where it was running last, thus + * preventing coroutines from jumping from one context to another when they + * go to sleep. + * + * aio_co_wake may be executed either in coroutine or non-coroutine + * context. The coroutine must not be entered by anyone else while + * aio_co_wake() is active. + */ +void aio_co_wake(struct Coroutine *co); + +/** * Return the AioContext whose event loop runs in the current thread. * * If called from an IOThread this will be the IOThread's AioContext. If diff --git a/include/block/block_int.h b/include/block/block_int.h index 2d92d7e..1670941 100644 --- a/include/block/block_int.h +++ b/include/block/block_int.h @@ -430,8 +430,9 @@ struct BdrvChild { * copied as well. */ struct BlockDriverState { - int64_t total_sectors; /* if we are reading a disk image, give its - size in sectors */ + /* Protected by big QEMU lock or read-only after opening. No special + * locking needed during I/O... + */ int open_flags; /* flags used to open the file, re-used for re-open */ bool read_only; /* if true, the media is read only */ bool encrypted; /* if true, the media is encrypted */ @@ -439,14 +440,6 @@ struct BlockDriverState { bool sg; /* if true, the device is a /dev/sg* */ bool probed; /* if true, format was probed rather than specified */ - int copy_on_read; /* if nonzero, copy read backing sectors into image. - note this is a reference count */ - - CoQueue flush_queue; /* Serializing flush queue */ - bool active_flush_req; /* Flush request in flight? */ - unsigned int write_gen; /* Current data generation */ - unsigned int flushed_gen; /* Flushed write generation */ - BlockDriver *drv; /* NULL means no media */ void *opaque; @@ -468,18 +461,6 @@ struct BlockDriverState { BdrvChild *backing; BdrvChild *file; - /* Callback before write request is processed */ - NotifierWithReturnList before_write_notifiers; - - /* number of in-flight requests; overall and serialising */ - unsigned int in_flight; - unsigned int serialising_in_flight; - - bool wakeup; - - /* Offset after the highest byte written to */ - uint64_t wr_highest_offset; - /* I/O Limits */ BlockLimits bl; @@ -497,11 +478,8 @@ struct BlockDriverState { QTAILQ_ENTRY(BlockDriverState) bs_list; /* element of the list of monitor-owned BDS */ QTAILQ_ENTRY(BlockDriverState) monitor_list; - QLIST_HEAD(, BdrvDirtyBitmap) dirty_bitmaps; int refcnt; - QLIST_HEAD(, BdrvTrackedRequest) tracked_requests; - /* operation blockers */ QLIST_HEAD(, BdrvOpBlocker) op_blockers[BLOCK_OP_TYPE_MAX]; @@ -522,6 +500,31 @@ struct BlockDriverState { /* The error object in use for blocking operations on backing_hd */ Error *backing_blocker; + /* Protected by AioContext lock */ + + /* If true, copy read backing sectors into image. Can be >1 if more + * than one client has requested copy-on-read. + */ + int copy_on_read; + + /* If we are reading a disk image, give its size in sectors. + * Generally read-only; it is written to by load_vmstate and save_vmstate, + * but the block layer is quiescent during those. + */ + int64_t total_sectors; + + /* Callback before write request is processed */ + NotifierWithReturnList before_write_notifiers; + + /* number of in-flight requests; overall and serialising */ + unsigned int in_flight; + unsigned int serialising_in_flight; + + bool wakeup; + + /* Offset after the highest byte written to */ + uint64_t wr_highest_offset; + /* threshold limit for writes, in bytes. "High water mark". */ uint64_t write_threshold_offset; NotifierWithReturn write_threshold_notifier; @@ -529,6 +532,17 @@ struct BlockDriverState { /* counter for nested bdrv_io_plug */ unsigned io_plugged; + QLIST_HEAD(, BdrvTrackedRequest) tracked_requests; + CoQueue flush_queue; /* Serializing flush queue */ + bool active_flush_req; /* Flush request in flight? */ + unsigned int write_gen; /* Current data generation */ + unsigned int flushed_gen; /* Flushed write generation */ + + QLIST_HEAD(, BdrvDirtyBitmap) dirty_bitmaps; + + /* do we need to tell the quest if we have a volatile write cache? */ + int enable_write_cache; + int quiesce_counter; }; diff --git a/include/io/channel.h b/include/io/channel.h index 32a9470..5d48906 100644 --- a/include/io/channel.h +++ b/include/io/channel.h @@ -23,6 +23,8 @@ #include "qemu-common.h" #include "qom/object.h" +#include "qemu/coroutine.h" +#include "block/aio.h" #define TYPE_QIO_CHANNEL "qio-channel" #define QIO_CHANNEL(obj) \ @@ -80,6 +82,9 @@ struct QIOChannel { Object parent; unsigned int features; /* bitmask of QIOChannelFeatures */ char *name; + AioContext *ctx; + Coroutine *read_coroutine; + Coroutine *write_coroutine; #ifdef _WIN32 HANDLE event; /* For use with GSource on Win32 */ #endif @@ -132,6 +137,11 @@ struct QIOChannelClass { off_t offset, int whence, Error **errp); + void (*io_set_aio_fd_handler)(QIOChannel *ioc, + AioContext *ctx, + IOHandler *io_read, + IOHandler *io_write, + void *opaque); }; /* General I/O handling functions */ @@ -497,13 +507,50 @@ guint qio_channel_add_watch(QIOChannel *ioc, /** + * qio_channel_attach_aio_context: + * @ioc: the channel object + * @ctx: the #AioContext to set the handlers on + * + * Request that qio_channel_yield() sets I/O handlers on + * the given #AioContext. If @ctx is %NULL, qio_channel_yield() + * uses QEMU's main thread event loop. + * + * You can move a #QIOChannel from one #AioContext to another even if + * I/O handlers are set for a coroutine. However, #QIOChannel provides + * no synchronization between the calls to qio_channel_yield() and + * qio_channel_attach_aio_context(). + * + * Therefore you should first call qio_channel_detach_aio_context() + * to ensure that the coroutine is not entered concurrently. Then, + * while the coroutine has yielded, call qio_channel_attach_aio_context(), + * and then aio_co_schedule() to place the coroutine on the new + * #AioContext. The calls to qio_channel_detach_aio_context() + * and qio_channel_attach_aio_context() should be protected with + * aio_context_acquire() and aio_context_release(). + */ +void qio_channel_attach_aio_context(QIOChannel *ioc, + AioContext *ctx); + +/** + * qio_channel_detach_aio_context: + * @ioc: the channel object + * + * Disable any I/O handlers set by qio_channel_yield(). With the + * help of aio_co_schedule(), this allows moving a coroutine that was + * paused by qio_channel_yield() to another context. + */ +void qio_channel_detach_aio_context(QIOChannel *ioc); + +/** * qio_channel_yield: * @ioc: the channel object * @condition: the I/O condition to wait for * - * Yields execution from the current coroutine until - * the condition indicated by @condition becomes - * available. + * Yields execution from the current coroutine until the condition + * indicated by @condition becomes available. @condition must + * be either %G_IO_IN or %G_IO_OUT; it cannot contain both. In + * addition, no two coroutine can be waiting on the same condition + * and channel at the same time. * * This must only be called from coroutine context */ @@ -525,4 +572,23 @@ void qio_channel_yield(QIOChannel *ioc, void qio_channel_wait(QIOChannel *ioc, GIOCondition condition); +/** + * qio_channel_set_aio_fd_handler: + * @ioc: the channel object + * @ctx: the AioContext to set the handlers on + * @io_read: the read handler + * @io_write: the write handler + * @opaque: the opaque value passed to the handler + * + * This is used internally by qio_channel_yield(). It can + * be used by channel implementations to forward the handlers + * to another channel (e.g. from #QIOChannelTLS to the + * underlying socket). + */ +void qio_channel_set_aio_fd_handler(QIOChannel *ioc, + AioContext *ctx, + IOHandler *io_read, + IOHandler *io_write, + void *opaque); + #endif /* QIO_CHANNEL_H */ diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h index 12584ed..e60beaf 100644 --- a/include/qemu/coroutine.h +++ b/include/qemu/coroutine.h @@ -112,11 +112,56 @@ bool qemu_in_coroutine(void); */ bool qemu_coroutine_entered(Coroutine *co); +/** + * Provides a mutex that can be used to synchronise coroutines + */ +struct CoWaitRecord; +typedef struct CoMutex { + /* Count of pending lockers; 0 for a free mutex, 1 for an + * uncontended mutex. + */ + unsigned locked; + + /* Context that is holding the lock. Useful to avoid spinning + * when two coroutines on the same AioContext try to get the lock. :) + */ + AioContext *ctx; + + /* A queue of waiters. Elements are added atomically in front of + * from_push. to_pop is only populated, and popped from, by whoever + * is in charge of the next wakeup. This can be an unlocker or, + * through the handoff protocol, a locker that is about to go to sleep. + */ + QSLIST_HEAD(, CoWaitRecord) from_push, to_pop; + + unsigned handoff, sequence; + + Coroutine *holder; +} CoMutex; + +/** + * Initialises a CoMutex. This must be called before any other operation is used + * on the CoMutex. + */ +void qemu_co_mutex_init(CoMutex *mutex); + +/** + * Locks the mutex. If the lock cannot be taken immediately, control is + * transferred to the caller of the current coroutine. + */ +void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex); + +/** + * Unlocks the mutex and schedules the next coroutine that was waiting for this + * lock to be run. + */ +void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex); + /** * CoQueues are a mechanism to queue coroutines in order to continue executing - * them later. They provide the fundamental primitives on which coroutine locks - * are built. + * them later. They are similar to condition variables, but they need help + * from an external mutex in order to maintain thread-safety. */ typedef struct CoQueue { QSIMPLEQ_HEAD(, Coroutine) entries; @@ -130,9 +175,10 @@ void qemu_co_queue_init(CoQueue *queue); /** * Adds the current coroutine to the CoQueue and transfers control to the - * caller of the coroutine. + * caller of the coroutine. The mutex is unlocked during the wait and + * locked again afterwards. */ -void coroutine_fn qemu_co_queue_wait(CoQueue *queue); +void coroutine_fn qemu_co_queue_wait(CoQueue *queue, CoMutex *mutex); /** * Restarts the next coroutine in the CoQueue and removes it from the queue. @@ -157,36 +203,10 @@ bool qemu_co_enter_next(CoQueue *queue); bool qemu_co_queue_empty(CoQueue *queue); -/** - * Provides a mutex that can be used to synchronise coroutines - */ -typedef struct CoMutex { - bool locked; - Coroutine *holder; - CoQueue queue; -} CoMutex; - -/** - * Initialises a CoMutex. This must be called before any other operation is used - * on the CoMutex. - */ -void qemu_co_mutex_init(CoMutex *mutex); - -/** - * Locks the mutex. If the lock cannot be taken immediately, control is - * transferred to the caller of the current coroutine. - */ -void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex); - -/** - * Unlocks the mutex and schedules the next coroutine that was waiting for this - * lock to be run. - */ -void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex); - typedef struct CoRwlock { - bool writer; + int pending_writer; int reader; + CoMutex mutex; CoQueue queue; } CoRwlock; diff --git a/include/qemu/coroutine_int.h b/include/qemu/coroutine_int.h index 14d4f1d..cb98892 100644 --- a/include/qemu/coroutine_int.h +++ b/include/qemu/coroutine_int.h @@ -40,12 +40,21 @@ struct Coroutine { CoroutineEntry *entry; void *entry_arg; Coroutine *caller; + + /* Only used when the coroutine has terminated. */ QSLIST_ENTRY(Coroutine) pool_next; + size_t locks_held; - /* Coroutines that should be woken up when we yield or terminate */ + /* Coroutines that should be woken up when we yield or terminate. + * Only used when the coroutine is running. + */ QSIMPLEQ_HEAD(, Coroutine) co_queue_wakeup; + + /* Only used when the coroutine has yielded. */ + AioContext *ctx; QSIMPLEQ_ENTRY(Coroutine) co_queue_next; + QSLIST_ENTRY(Coroutine) co_scheduled_next; }; Coroutine *qemu_coroutine_new(void); diff --git a/include/sysemu/block-backend.h b/include/sysemu/block-backend.h index 6444e41..f365a51 100644 --- a/include/sysemu/block-backend.h +++ b/include/sysemu/block-backend.h @@ -64,14 +64,20 @@ typedef struct BlockDevOps { * fields that must be public. This is in particular for QLIST_ENTRY() and * friends so that BlockBackends can be kept in lists outside block-backend.c */ typedef struct BlockBackendPublic { - /* I/O throttling. - * throttle_state tells us if this BlockBackend has I/O limits configured. - * io_limits_disabled tells us if they are currently being enforced */ + /* I/O throttling has its own locking, but also some fields are + * protected by the AioContext lock. + */ + + /* Protected by AioContext lock. */ CoQueue throttled_reqs[2]; + + /* Nonzero if the I/O limits are currently being ignored; generally + * it is zero. */ unsigned int io_limits_disabled; /* The following fields are protected by the ThrottleGroup lock. - * See the ThrottleGroup documentation for details. */ + * See the ThrottleGroup documentation for details. + * throttle_state tells us if I/O limits are configured. */ ThrottleState *throttle_state; ThrottleTimers throttle_timers; unsigned pending_reqs[2]; diff --git a/io/channel-command.c b/io/channel-command.c index ad25313..319c5ed 100644 --- a/io/channel-command.c +++ b/io/channel-command.c @@ -328,6 +328,18 @@ static int qio_channel_command_close(QIOChannel *ioc, } +static void qio_channel_command_set_aio_fd_handler(QIOChannel *ioc, + AioContext *ctx, + IOHandler *io_read, + IOHandler *io_write, + void *opaque) +{ + QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc); + aio_set_fd_handler(ctx, cioc->readfd, false, io_read, NULL, NULL, opaque); + aio_set_fd_handler(ctx, cioc->writefd, false, NULL, io_write, NULL, opaque); +} + + static GSource *qio_channel_command_create_watch(QIOChannel *ioc, GIOCondition condition) { @@ -349,6 +361,7 @@ static void qio_channel_command_class_init(ObjectClass *klass, ioc_klass->io_set_blocking = qio_channel_command_set_blocking; ioc_klass->io_close = qio_channel_command_close; ioc_klass->io_create_watch = qio_channel_command_create_watch; + ioc_klass->io_set_aio_fd_handler = qio_channel_command_set_aio_fd_handler; } static const TypeInfo qio_channel_command_info = { diff --git a/io/channel-file.c b/io/channel-file.c index e1da243..b383273 100644 --- a/io/channel-file.c +++ b/io/channel-file.c @@ -186,6 +186,16 @@ static int qio_channel_file_close(QIOChannel *ioc, } +static void qio_channel_file_set_aio_fd_handler(QIOChannel *ioc, + AioContext *ctx, + IOHandler *io_read, + IOHandler *io_write, + void *opaque) +{ + QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc); + aio_set_fd_handler(ctx, fioc->fd, false, io_read, io_write, NULL, opaque); +} + static GSource *qio_channel_file_create_watch(QIOChannel *ioc, GIOCondition condition) { @@ -206,6 +216,7 @@ static void qio_channel_file_class_init(ObjectClass *klass, ioc_klass->io_seek = qio_channel_file_seek; ioc_klass->io_close = qio_channel_file_close; ioc_klass->io_create_watch = qio_channel_file_create_watch; + ioc_klass->io_set_aio_fd_handler = qio_channel_file_set_aio_fd_handler; } static const TypeInfo qio_channel_file_info = { diff --git a/io/channel-socket.c b/io/channel-socket.c index f385233..f546c68 100644 --- a/io/channel-socket.c +++ b/io/channel-socket.c @@ -649,11 +649,6 @@ qio_channel_socket_set_blocking(QIOChannel *ioc, qemu_set_block(sioc->fd); } else { qemu_set_nonblock(sioc->fd); -#ifdef WIN32 - WSAEventSelect(sioc->fd, ioc->event, - FD_READ | FD_ACCEPT | FD_CLOSE | - FD_CONNECT | FD_WRITE | FD_OOB); -#endif } return 0; } @@ -733,6 +728,16 @@ qio_channel_socket_shutdown(QIOChannel *ioc, return 0; } +static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc, + AioContext *ctx, + IOHandler *io_read, + IOHandler *io_write, + void *opaque) +{ + QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); + aio_set_fd_handler(ctx, sioc->fd, false, io_read, io_write, NULL, opaque); +} + static GSource *qio_channel_socket_create_watch(QIOChannel *ioc, GIOCondition condition) { @@ -755,6 +760,7 @@ static void qio_channel_socket_class_init(ObjectClass *klass, ioc_klass->io_set_cork = qio_channel_socket_set_cork; ioc_klass->io_set_delay = qio_channel_socket_set_delay; ioc_klass->io_create_watch = qio_channel_socket_create_watch; + ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler; } static const TypeInfo qio_channel_socket_info = { diff --git a/io/channel-tls.c b/io/channel-tls.c index f25ab0a..6182702 100644 --- a/io/channel-tls.c +++ b/io/channel-tls.c @@ -345,6 +345,17 @@ static int qio_channel_tls_close(QIOChannel *ioc, return qio_channel_close(tioc->master, errp); } +static void qio_channel_tls_set_aio_fd_handler(QIOChannel *ioc, + AioContext *ctx, + IOHandler *io_read, + IOHandler *io_write, + void *opaque) +{ + QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc); + + qio_channel_set_aio_fd_handler(tioc->master, ctx, io_read, io_write, opaque); +} + static GSource *qio_channel_tls_create_watch(QIOChannel *ioc, GIOCondition condition) { @@ -372,6 +383,7 @@ static void qio_channel_tls_class_init(ObjectClass *klass, ioc_klass->io_close = qio_channel_tls_close; ioc_klass->io_shutdown = qio_channel_tls_shutdown; ioc_klass->io_create_watch = qio_channel_tls_create_watch; + ioc_klass->io_set_aio_fd_handler = qio_channel_tls_set_aio_fd_handler; } static const TypeInfo qio_channel_tls_info = { diff --git a/io/channel-watch.c b/io/channel-watch.c index cf1cdff..8640d1c 100644 --- a/io/channel-watch.c +++ b/io/channel-watch.c @@ -285,6 +285,12 @@ GSource *qio_channel_create_socket_watch(QIOChannel *ioc, GSource *source; QIOChannelSocketSource *ssource; +#ifdef WIN32 + WSAEventSelect(socket, ioc->event, + FD_READ | FD_ACCEPT | FD_CLOSE | + FD_CONNECT | FD_WRITE | FD_OOB); +#endif + source = g_source_new(&qio_channel_socket_source_funcs, sizeof(QIOChannelSocketSource)); ssource = (QIOChannelSocketSource *)source; diff --git a/io/channel.c b/io/channel.c index 80924c1..cdf7454 100644 --- a/io/channel.c +++ b/io/channel.c @@ -21,7 +21,7 @@ #include "qemu/osdep.h" #include "io/channel.h" #include "qapi/error.h" -#include "qemu/coroutine.h" +#include "qemu/main-loop.h" bool qio_channel_has_feature(QIOChannel *ioc, QIOChannelFeature feature) @@ -154,6 +154,17 @@ GSource *qio_channel_create_watch(QIOChannel *ioc, } +void qio_channel_set_aio_fd_handler(QIOChannel *ioc, + AioContext *ctx, + IOHandler *io_read, + IOHandler *io_write, + void *opaque) +{ + QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc); + + klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque); +} + guint qio_channel_add_watch(QIOChannel *ioc, GIOCondition condition, QIOChannelFunc func, @@ -227,36 +238,80 @@ off_t qio_channel_io_seek(QIOChannel *ioc, } -typedef struct QIOChannelYieldData QIOChannelYieldData; -struct QIOChannelYieldData { - QIOChannel *ioc; - Coroutine *co; -}; +static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc); + +static void qio_channel_restart_read(void *opaque) +{ + QIOChannel *ioc = opaque; + Coroutine *co = ioc->read_coroutine; + ioc->read_coroutine = NULL; + qio_channel_set_aio_fd_handlers(ioc); + aio_co_wake(co); +} -static gboolean qio_channel_yield_enter(QIOChannel *ioc, - GIOCondition condition, - gpointer opaque) +static void qio_channel_restart_write(void *opaque) { - QIOChannelYieldData *data = opaque; - qemu_coroutine_enter(data->co); - return FALSE; + QIOChannel *ioc = opaque; + Coroutine *co = ioc->write_coroutine; + + ioc->write_coroutine = NULL; + qio_channel_set_aio_fd_handlers(ioc); + aio_co_wake(co); +} + +static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc) +{ + IOHandler *rd_handler = NULL, *wr_handler = NULL; + AioContext *ctx; + + if (ioc->read_coroutine) { + rd_handler = qio_channel_restart_read; + } + if (ioc->write_coroutine) { + wr_handler = qio_channel_restart_write; + } + + ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context(); + qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc); +} + +void qio_channel_attach_aio_context(QIOChannel *ioc, + AioContext *ctx) +{ + AioContext *old_ctx; + if (ioc->ctx == ctx) { + return; + } + + old_ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context(); + qio_channel_set_aio_fd_handler(ioc, old_ctx, NULL, NULL, NULL); + ioc->ctx = ctx; + qio_channel_set_aio_fd_handlers(ioc); } +void qio_channel_detach_aio_context(QIOChannel *ioc) +{ + ioc->read_coroutine = NULL; + ioc->write_coroutine = NULL; + qio_channel_set_aio_fd_handlers(ioc); + ioc->ctx = NULL; +} void coroutine_fn qio_channel_yield(QIOChannel *ioc, GIOCondition condition) { - QIOChannelYieldData data; - assert(qemu_in_coroutine()); - data.ioc = ioc; - data.co = qemu_coroutine_self(); - qio_channel_add_watch(ioc, - condition, - qio_channel_yield_enter, - &data, - NULL); + if (condition == G_IO_IN) { + assert(!ioc->read_coroutine); + ioc->read_coroutine = qemu_coroutine_self(); + } else if (condition == G_IO_OUT) { + assert(!ioc->write_coroutine); + ioc->write_coroutine = qemu_coroutine_self(); + } else { + abort(); + } + qio_channel_set_aio_fd_handlers(ioc); qemu_coroutine_yield(); } diff --git a/nbd/client.c b/nbd/client.c index ffb0743..5c9dee3 100644 --- a/nbd/client.c +++ b/nbd/client.c @@ -778,7 +778,7 @@ ssize_t nbd_receive_reply(QIOChannel *ioc, NBDReply *reply) ssize_t ret; ret = read_sync(ioc, buf, sizeof(buf)); - if (ret < 0) { + if (ret <= 0) { return ret; } diff --git a/nbd/common.c b/nbd/common.c index a5f39ea..dccbb8e 100644 --- a/nbd/common.c +++ b/nbd/common.c @@ -43,14 +43,7 @@ ssize_t nbd_wr_syncv(QIOChannel *ioc, } if (len == QIO_CHANNEL_ERR_BLOCK) { if (qemu_in_coroutine()) { - /* XXX figure out if we can create a variant on - * qio_channel_yield() that works with AIO contexts - * and consider using that in this branch */ - qemu_coroutine_yield(); - } else if (done) { - /* XXX this is needed by nbd_reply_ready. */ - qio_channel_wait(ioc, - do_read ? G_IO_IN : G_IO_OUT); + qio_channel_yield(ioc, do_read ? G_IO_IN : G_IO_OUT); } else { return -EAGAIN; } diff --git a/nbd/server.c b/nbd/server.c index efe5cb8..ac92fa0 100644 --- a/nbd/server.c +++ b/nbd/server.c @@ -95,8 +95,6 @@ struct NBDClient { CoMutex send_lock; Coroutine *send_coroutine; - bool can_read; - QTAILQ_ENTRY(NBDClient) next; int nb_requests; bool closing; @@ -104,9 +102,7 @@ struct NBDClient { /* That's all folks */ -static void nbd_set_handlers(NBDClient *client); -static void nbd_unset_handlers(NBDClient *client); -static void nbd_update_can_read(NBDClient *client); +static void nbd_client_receive_next_request(NBDClient *client); static gboolean nbd_negotiate_continue(QIOChannel *ioc, GIOCondition condition, @@ -785,7 +781,7 @@ void nbd_client_put(NBDClient *client) */ assert(client->closing); - nbd_unset_handlers(client); + qio_channel_detach_aio_context(client->ioc); object_unref(OBJECT(client->sioc)); object_unref(OBJECT(client->ioc)); if (client->tlscreds) { @@ -826,7 +822,6 @@ static NBDRequestData *nbd_request_get(NBDClient *client) assert(client->nb_requests <= MAX_NBD_REQUESTS - 1); client->nb_requests++; - nbd_update_can_read(client); req = g_new0(NBDRequestData, 1); nbd_client_get(client); @@ -844,7 +839,8 @@ static void nbd_request_put(NBDRequestData *req) g_free(req); client->nb_requests--; - nbd_update_can_read(client); + nbd_client_receive_next_request(client); + nbd_client_put(client); } @@ -858,7 +854,13 @@ static void blk_aio_attached(AioContext *ctx, void *opaque) exp->ctx = ctx; QTAILQ_FOREACH(client, &exp->clients, next) { - nbd_set_handlers(client); + qio_channel_attach_aio_context(client->ioc, ctx); + if (client->recv_coroutine) { + aio_co_schedule(ctx, client->recv_coroutine); + } + if (client->send_coroutine) { + aio_co_schedule(ctx, client->send_coroutine); + } } } @@ -870,7 +872,7 @@ static void blk_aio_detach(void *opaque) TRACE("Export %s: Detaching clients from AIO context %p\n", exp->name, exp->ctx); QTAILQ_FOREACH(client, &exp->clients, next) { - nbd_unset_handlers(client); + qio_channel_detach_aio_context(client->ioc); } exp->ctx = NULL; @@ -1045,7 +1047,6 @@ static ssize_t nbd_co_send_reply(NBDRequestData *req, NBDReply *reply, g_assert(qemu_in_coroutine()); qemu_co_mutex_lock(&client->send_lock); client->send_coroutine = qemu_coroutine_self(); - nbd_set_handlers(client); if (!len) { rc = nbd_send_reply(client->ioc, reply); @@ -1062,7 +1063,6 @@ static ssize_t nbd_co_send_reply(NBDRequestData *req, NBDReply *reply, } client->send_coroutine = NULL; - nbd_set_handlers(client); qemu_co_mutex_unlock(&client->send_lock); return rc; } @@ -1079,9 +1079,7 @@ static ssize_t nbd_co_receive_request(NBDRequestData *req, ssize_t rc; g_assert(qemu_in_coroutine()); - client->recv_coroutine = qemu_coroutine_self(); - nbd_update_can_read(client); - + assert(client->recv_coroutine == qemu_coroutine_self()); rc = nbd_receive_request(client->ioc, request); if (rc < 0) { if (rc != -EAGAIN) { @@ -1163,23 +1161,25 @@ static ssize_t nbd_co_receive_request(NBDRequestData *req, out: client->recv_coroutine = NULL; - nbd_update_can_read(client); + nbd_client_receive_next_request(client); return rc; } -static void nbd_trip(void *opaque) +/* Owns a reference to the NBDClient passed as opaque. */ +static coroutine_fn void nbd_trip(void *opaque) { NBDClient *client = opaque; NBDExport *exp = client->exp; NBDRequestData *req; - NBDRequest request; + NBDRequest request = { 0 }; /* GCC thinks it can be used uninitialized */ NBDReply reply; ssize_t ret; int flags; TRACE("Reading request."); if (client->closing) { + nbd_client_put(client); return; } @@ -1338,60 +1338,21 @@ static void nbd_trip(void *opaque) done: nbd_request_put(req); + nbd_client_put(client); return; out: nbd_request_put(req); client_close(client); + nbd_client_put(client); } -static void nbd_read(void *opaque) -{ - NBDClient *client = opaque; - - if (client->recv_coroutine) { - qemu_coroutine_enter(client->recv_coroutine); - } else { - qemu_coroutine_enter(qemu_coroutine_create(nbd_trip, client)); - } -} - -static void nbd_restart_write(void *opaque) -{ - NBDClient *client = opaque; - - qemu_coroutine_enter(client->send_coroutine); -} - -static void nbd_set_handlers(NBDClient *client) -{ - if (client->exp && client->exp->ctx) { - aio_set_fd_handler(client->exp->ctx, client->sioc->fd, true, - client->can_read ? nbd_read : NULL, - client->send_coroutine ? nbd_restart_write : NULL, - NULL, client); - } -} - -static void nbd_unset_handlers(NBDClient *client) -{ - if (client->exp && client->exp->ctx) { - aio_set_fd_handler(client->exp->ctx, client->sioc->fd, true, NULL, - NULL, NULL, NULL); - } -} - -static void nbd_update_can_read(NBDClient *client) +static void nbd_client_receive_next_request(NBDClient *client) { - bool can_read = client->recv_coroutine || - client->nb_requests < MAX_NBD_REQUESTS; - - if (can_read != client->can_read) { - client->can_read = can_read; - nbd_set_handlers(client); - - /* There is no need to invoke aio_notify(), since aio_set_fd_handler() - * in nbd_set_handlers() will have taken care of that */ + if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) { + nbd_client_get(client); + client->recv_coroutine = qemu_coroutine_create(nbd_trip, client); + aio_co_schedule(client->exp->ctx, client->recv_coroutine); } } @@ -1409,11 +1370,13 @@ static coroutine_fn void nbd_co_client_start(void *opaque) goto out; } qemu_co_mutex_init(&client->send_lock); - nbd_set_handlers(client); if (exp) { QTAILQ_INSERT_TAIL(&exp->clients, client, next); } + + nbd_client_receive_next_request(client); + out: g_free(data); } @@ -1439,7 +1402,6 @@ void nbd_client_new(NBDExport *exp, object_ref(OBJECT(client->sioc)); client->ioc = QIO_CHANNEL(sioc); object_ref(OBJECT(client->ioc)); - client->can_read = true; client->close = close_fn; data->client = client; diff --git a/stubs/Makefile.objs b/stubs/Makefile.objs index a187295..aa6050f 100644 --- a/stubs/Makefile.objs +++ b/stubs/Makefile.objs @@ -16,6 +16,7 @@ stub-obj-y += get-vm-name.o stub-obj-y += iothread.o stub-obj-y += iothread-lock.o stub-obj-y += is-daemonized.o +stub-obj-$(CONFIG_LINUX_AIO) += linux-aio.o stub-obj-y += machine-init-done.o stub-obj-y += migr-blocker.o stub-obj-y += monitor.o diff --git a/stubs/linux-aio.c b/stubs/linux-aio.c new file mode 100644 index 0000000..ed47bd4 --- /dev/null +++ b/stubs/linux-aio.c @@ -0,0 +1,32 @@ +/* + * Linux native AIO support. + * + * Copyright (C) 2009 IBM, Corp. + * Copyright (C) 2009 Red Hat, Inc. + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + */ +#include "qemu/osdep.h" +#include "block/aio.h" +#include "block/raw-aio.h" + +void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context) +{ + abort(); +} + +void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context) +{ + abort(); +} + +LinuxAioState *laio_init(void) +{ + abort(); +} + +void laio_cleanup(LinuxAioState *s) +{ + abort(); +} diff --git a/stubs/set-fd-handler.c b/stubs/set-fd-handler.c index acbe65c..26965de 100644 --- a/stubs/set-fd-handler.c +++ b/stubs/set-fd-handler.c @@ -9,14 +9,3 @@ void qemu_set_fd_handler(int fd, { abort(); } - -void aio_set_fd_handler(AioContext *ctx, - int fd, - bool is_external, - IOHandler *io_read, - IOHandler *io_write, - AioPollFn *io_poll, - void *opaque) -{ - abort(); -} diff --git a/tests/Makefile.include b/tests/Makefile.include index 634394a..e60bb6c 100644 --- a/tests/Makefile.include +++ b/tests/Makefile.include @@ -45,9 +45,13 @@ check-unit-y += tests/test-visitor-serialization$(EXESUF) check-unit-y += tests/test-iov$(EXESUF) gcov-files-test-iov-y = util/iov.c check-unit-y += tests/test-aio$(EXESUF) +gcov-files-test-aio-y = util/async.c util/qemu-timer.o +gcov-files-test-aio-$(CONFIG_WIN32) += util/aio-win32.c +gcov-files-test-aio-$(CONFIG_POSIX) += util/aio-posix.c +check-unit-y += tests/test-aio-multithread$(EXESUF) +gcov-files-test-aio-multithread-y = $(gcov-files-test-aio-y) +gcov-files-test-aio-multithread-y += util/qemu-coroutine.c tests/iothread.c check-unit-y += tests/test-throttle$(EXESUF) -gcov-files-test-aio-$(CONFIG_WIN32) = aio-win32.c -gcov-files-test-aio-$(CONFIG_POSIX) = aio-posix.c check-unit-y += tests/test-thread-pool$(EXESUF) gcov-files-test-thread-pool-y = thread-pool.c gcov-files-test-hbitmap-y = util/hbitmap.c @@ -505,7 +509,7 @@ test-qapi-obj-y = tests/test-qapi-visit.o tests/test-qapi-types.o \ $(test-qom-obj-y) test-crypto-obj-y = $(crypto-obj-y) $(test-qom-obj-y) test-io-obj-y = $(io-obj-y) $(test-crypto-obj-y) -test-block-obj-y = $(block-obj-y) $(test-io-obj-y) +test-block-obj-y = $(block-obj-y) $(test-io-obj-y) tests/iothread.o tests/check-qint$(EXESUF): tests/check-qint.o $(test-util-obj-y) tests/check-qstring$(EXESUF): tests/check-qstring.o $(test-util-obj-y) @@ -517,10 +521,10 @@ tests/check-qjson$(EXESUF): tests/check-qjson.o $(test-util-obj-y) tests/check-qom-interface$(EXESUF): tests/check-qom-interface.o $(test-qom-obj-y) tests/check-qom-proplist$(EXESUF): tests/check-qom-proplist.o $(test-qom-obj-y) -tests/test-char$(EXESUF): tests/test-char.o qemu-timer.o \ - $(test-util-obj-y) $(qtest-obj-y) $(test-block-obj-y) $(chardev-obj-y) +tests/test-char$(EXESUF): tests/test-char.o $(test-util-obj-y) $(qtest-obj-y) $(test-io-obj-y) $(chardev-obj-y) tests/test-coroutine$(EXESUF): tests/test-coroutine.o $(test-block-obj-y) tests/test-aio$(EXESUF): tests/test-aio.o $(test-block-obj-y) +tests/test-aio-multithread$(EXESUF): tests/test-aio-multithread.o $(test-block-obj-y) tests/test-throttle$(EXESUF): tests/test-throttle.o $(test-block-obj-y) tests/test-blockjob$(EXESUF): tests/test-blockjob.o $(test-block-obj-y) $(test-util-obj-y) tests/test-blockjob-txn$(EXESUF): tests/test-blockjob-txn.o $(test-block-obj-y) $(test-util-obj-y) @@ -551,8 +555,7 @@ tests/test-vmstate$(EXESUF): tests/test-vmstate.o \ migration/vmstate.o migration/qemu-file.o \ migration/qemu-file-channel.o migration/qjson.o \ $(test-io-obj-y) -tests/test-timed-average$(EXESUF): tests/test-timed-average.o qemu-timer.o \ - $(test-util-obj-y) +tests/test-timed-average$(EXESUF): tests/test-timed-average.o $(test-util-obj-y) tests/test-base64$(EXESUF): tests/test-base64.o \ libqemuutil.a libqemustub.a tests/ptimer-test$(EXESUF): tests/ptimer-test.o tests/ptimer-test-stubs.o hw/core/ptimer.o libqemustub.a @@ -712,7 +715,7 @@ tests/usb-hcd-ehci-test$(EXESUF): tests/usb-hcd-ehci-test.o $(libqos-usb-obj-y) tests/usb-hcd-xhci-test$(EXESUF): tests/usb-hcd-xhci-test.o $(libqos-usb-obj-y) tests/pc-cpu-test$(EXESUF): tests/pc-cpu-test.o tests/postcopy-test$(EXESUF): tests/postcopy-test.o -tests/vhost-user-test$(EXESUF): tests/vhost-user-test.o qemu-timer.o \ +tests/vhost-user-test$(EXESUF): tests/vhost-user-test.o $(test-util-obj-y) \ $(qtest-obj-y) $(test-io-obj-y) $(libqos-virtio-obj-y) $(libqos-pc-obj-y) \ $(chardev-obj-y) tests/qemu-iotests/socket_scm_helper$(EXESUF): tests/qemu-iotests/socket_scm_helper.o diff --git a/tests/iothread.c b/tests/iothread.c new file mode 100644 index 0000000..777d9ee --- /dev/null +++ b/tests/iothread.c @@ -0,0 +1,91 @@ +/* + * Event loop thread implementation for unit tests + * + * Copyright Red Hat Inc., 2013, 2016 + * + * Authors: + * Stefan Hajnoczi <stefanha@redhat.com> + * Paolo Bonzini <pbonzini@redhat.com> + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + * + */ + +#include "qemu/osdep.h" +#include "qapi/error.h" +#include "block/aio.h" +#include "qemu/main-loop.h" +#include "qemu/rcu.h" +#include "iothread.h" + +struct IOThread { + AioContext *ctx; + + QemuThread thread; + QemuMutex init_done_lock; + QemuCond init_done_cond; /* is thread initialization done? */ + bool stopping; +}; + +static __thread IOThread *my_iothread; + +AioContext *qemu_get_current_aio_context(void) +{ + return my_iothread ? my_iothread->ctx : qemu_get_aio_context(); +} + +static void *iothread_run(void *opaque) +{ + IOThread *iothread = opaque; + + rcu_register_thread(); + + my_iothread = iothread; + qemu_mutex_lock(&iothread->init_done_lock); + iothread->ctx = aio_context_new(&error_abort); + qemu_cond_signal(&iothread->init_done_cond); + qemu_mutex_unlock(&iothread->init_done_lock); + + while (!atomic_read(&iothread->stopping)) { + aio_poll(iothread->ctx, true); + } + + rcu_unregister_thread(); + return NULL; +} + +void iothread_join(IOThread *iothread) +{ + iothread->stopping = true; + aio_notify(iothread->ctx); + qemu_thread_join(&iothread->thread); + qemu_cond_destroy(&iothread->init_done_cond); + qemu_mutex_destroy(&iothread->init_done_lock); + aio_context_unref(iothread->ctx); + g_free(iothread); +} + +IOThread *iothread_new(void) +{ + IOThread *iothread = g_new0(IOThread, 1); + + qemu_mutex_init(&iothread->init_done_lock); + qemu_cond_init(&iothread->init_done_cond); + qemu_thread_create(&iothread->thread, NULL, iothread_run, + iothread, QEMU_THREAD_JOINABLE); + + /* Wait for initialization to complete */ + qemu_mutex_lock(&iothread->init_done_lock); + while (iothread->ctx == NULL) { + qemu_cond_wait(&iothread->init_done_cond, + &iothread->init_done_lock); + } + qemu_mutex_unlock(&iothread->init_done_lock); + return iothread; +} + +AioContext *iothread_get_aio_context(IOThread *iothread) +{ + return iothread->ctx; +} diff --git a/tests/iothread.h b/tests/iothread.h new file mode 100644 index 0000000..4877cea --- /dev/null +++ b/tests/iothread.h @@ -0,0 +1,25 @@ +/* + * Event loop thread implementation for unit tests + * + * Copyright Red Hat Inc., 2013, 2016 + * + * Authors: + * Stefan Hajnoczi <stefanha@redhat.com> + * Paolo Bonzini <pbonzini@redhat.com> + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + */ +#ifndef TEST_IOTHREAD_H +#define TEST_IOTHREAD_H + +#include "block/aio.h" +#include "qemu/thread.h" + +typedef struct IOThread IOThread; + +IOThread *iothread_new(void); +void iothread_join(IOThread *iothread); +AioContext *iothread_get_aio_context(IOThread *iothread); + +#endif diff --git a/tests/test-aio-multithread.c b/tests/test-aio-multithread.c new file mode 100644 index 0000000..f11e990 --- /dev/null +++ b/tests/test-aio-multithread.c @@ -0,0 +1,463 @@ +/* + * AioContext multithreading tests + * + * Copyright Red Hat, Inc. 2016 + * + * Authors: + * Paolo Bonzini <pbonzini@redhat.com> + * + * This work is licensed under the terms of the GNU LGPL, version 2 or later. + * See the COPYING.LIB file in the top-level directory. + */ + +#include "qemu/osdep.h" +#include <glib.h> +#include "block/aio.h" +#include "qapi/error.h" +#include "qemu/coroutine.h" +#include "qemu/thread.h" +#include "qemu/error-report.h" +#include "iothread.h" + +/* AioContext management */ + +#define NUM_CONTEXTS 5 + +static IOThread *threads[NUM_CONTEXTS]; +static AioContext *ctx[NUM_CONTEXTS]; +static __thread int id = -1; + +static QemuEvent done_event; + +/* Run a function synchronously on a remote iothread. */ + +typedef struct CtxRunData { + QEMUBHFunc *cb; + void *arg; +} CtxRunData; + +static void ctx_run_bh_cb(void *opaque) +{ + CtxRunData *data = opaque; + + data->cb(data->arg); + qemu_event_set(&done_event); +} + +static void ctx_run(int i, QEMUBHFunc *cb, void *opaque) +{ + CtxRunData data = { + .cb = cb, + .arg = opaque + }; + + qemu_event_reset(&done_event); + aio_bh_schedule_oneshot(ctx[i], ctx_run_bh_cb, &data); + qemu_event_wait(&done_event); +} + +/* Starting the iothreads. */ + +static void set_id_cb(void *opaque) +{ + int *i = opaque; + + id = *i; +} + +static void create_aio_contexts(void) +{ + int i; + + for (i = 0; i < NUM_CONTEXTS; i++) { + threads[i] = iothread_new(); + ctx[i] = iothread_get_aio_context(threads[i]); + } + + qemu_event_init(&done_event, false); + for (i = 0; i < NUM_CONTEXTS; i++) { + ctx_run(i, set_id_cb, &i); + } +} + +/* Stopping the iothreads. */ + +static void join_aio_contexts(void) +{ + int i; + + for (i = 0; i < NUM_CONTEXTS; i++) { + aio_context_ref(ctx[i]); + } + for (i = 0; i < NUM_CONTEXTS; i++) { + iothread_join(threads[i]); + } + for (i = 0; i < NUM_CONTEXTS; i++) { + aio_context_unref(ctx[i]); + } + qemu_event_destroy(&done_event); +} + +/* Basic test for the stuff above. */ + +static void test_lifecycle(void) +{ + create_aio_contexts(); + join_aio_contexts(); +} + +/* aio_co_schedule test. */ + +static Coroutine *to_schedule[NUM_CONTEXTS]; + +static bool now_stopping; + +static int count_retry; +static int count_here; +static int count_other; + +static bool schedule_next(int n) +{ + Coroutine *co; + + co = atomic_xchg(&to_schedule[n], NULL); + if (!co) { + atomic_inc(&count_retry); + return false; + } + + if (n == id) { + atomic_inc(&count_here); + } else { + atomic_inc(&count_other); + } + + aio_co_schedule(ctx[n], co); + return true; +} + +static void finish_cb(void *opaque) +{ + schedule_next(id); +} + +static coroutine_fn void test_multi_co_schedule_entry(void *opaque) +{ + g_assert(to_schedule[id] == NULL); + atomic_mb_set(&to_schedule[id], qemu_coroutine_self()); + + while (!atomic_mb_read(&now_stopping)) { + int n; + + n = g_test_rand_int_range(0, NUM_CONTEXTS); + schedule_next(n); + qemu_coroutine_yield(); + + g_assert(to_schedule[id] == NULL); + atomic_mb_set(&to_schedule[id], qemu_coroutine_self()); + } +} + + +static void test_multi_co_schedule(int seconds) +{ + int i; + + count_here = count_other = count_retry = 0; + now_stopping = false; + + create_aio_contexts(); + for (i = 0; i < NUM_CONTEXTS; i++) { + Coroutine *co1 = qemu_coroutine_create(test_multi_co_schedule_entry, NULL); + aio_co_schedule(ctx[i], co1); + } + + g_usleep(seconds * 1000000); + + atomic_mb_set(&now_stopping, true); + for (i = 0; i < NUM_CONTEXTS; i++) { + ctx_run(i, finish_cb, NULL); + to_schedule[i] = NULL; + } + + join_aio_contexts(); + g_test_message("scheduled %d, queued %d, retry %d, total %d\n", + count_other, count_here, count_retry, + count_here + count_other + count_retry); +} + +static void test_multi_co_schedule_1(void) +{ + test_multi_co_schedule(1); +} + +static void test_multi_co_schedule_10(void) +{ + test_multi_co_schedule(10); +} + +/* CoMutex thread-safety. */ + +static uint32_t atomic_counter; +static uint32_t running; +static uint32_t counter; +static CoMutex comutex; + +static void coroutine_fn test_multi_co_mutex_entry(void *opaque) +{ + while (!atomic_mb_read(&now_stopping)) { + qemu_co_mutex_lock(&comutex); + counter++; + qemu_co_mutex_unlock(&comutex); + + /* Increase atomic_counter *after* releasing the mutex. Otherwise + * there is a chance (it happens about 1 in 3 runs) that the iothread + * exits before the coroutine is woken up, causing a spurious + * assertion failure. + */ + atomic_inc(&atomic_counter); + } + atomic_dec(&running); +} + +static void test_multi_co_mutex(int threads, int seconds) +{ + int i; + + qemu_co_mutex_init(&comutex); + counter = 0; + atomic_counter = 0; + now_stopping = false; + + create_aio_contexts(); + assert(threads <= NUM_CONTEXTS); + running = threads; + for (i = 0; i < threads; i++) { + Coroutine *co1 = qemu_coroutine_create(test_multi_co_mutex_entry, NULL); + aio_co_schedule(ctx[i], co1); + } + + g_usleep(seconds * 1000000); + + atomic_mb_set(&now_stopping, true); + while (running > 0) { + g_usleep(100000); + } + + join_aio_contexts(); + g_test_message("%d iterations/second\n", counter / seconds); + g_assert_cmpint(counter, ==, atomic_counter); +} + +/* Testing with NUM_CONTEXTS threads focuses on the queue. The mutex however + * is too contended (and the threads spend too much time in aio_poll) + * to actually stress the handoff protocol. + */ +static void test_multi_co_mutex_1(void) +{ + test_multi_co_mutex(NUM_CONTEXTS, 1); +} + +static void test_multi_co_mutex_10(void) +{ + test_multi_co_mutex(NUM_CONTEXTS, 10); +} + +/* Testing with fewer threads stresses the handoff protocol too. Still, the + * case where the locker _can_ pick up a handoff is very rare, happening + * about 10 times in 1 million, so increase the runtime a bit compared to + * other "quick" testcases that only run for 1 second. + */ +static void test_multi_co_mutex_2_3(void) +{ + test_multi_co_mutex(2, 3); +} + +static void test_multi_co_mutex_2_30(void) +{ + test_multi_co_mutex(2, 30); +} + +/* Same test with fair mutexes, for performance comparison. */ + +#ifdef CONFIG_LINUX +#include "qemu/futex.h" + +/* The nodes for the mutex reside in this structure (on which we try to avoid + * false sharing). The head of the mutex is in the "mutex_head" variable. + */ +static struct { + int next, locked; + int padding[14]; +} nodes[NUM_CONTEXTS] __attribute__((__aligned__(64))); + +static int mutex_head = -1; + +static void mcs_mutex_lock(void) +{ + int prev; + + nodes[id].next = -1; + nodes[id].locked = 1; + prev = atomic_xchg(&mutex_head, id); + if (prev != -1) { + atomic_set(&nodes[prev].next, id); + qemu_futex_wait(&nodes[id].locked, 1); + } +} + +static void mcs_mutex_unlock(void) +{ + int next; + if (nodes[id].next == -1) { + if (atomic_read(&mutex_head) == id && + atomic_cmpxchg(&mutex_head, id, -1) == id) { + /* Last item in the list, exit. */ + return; + } + while (atomic_read(&nodes[id].next) == -1) { + /* mcs_mutex_lock did the xchg, but has not updated + * nodes[prev].next yet. + */ + } + } + + /* Wake up the next in line. */ + next = nodes[id].next; + nodes[next].locked = 0; + qemu_futex_wake(&nodes[next].locked, 1); +} + +static void test_multi_fair_mutex_entry(void *opaque) +{ + while (!atomic_mb_read(&now_stopping)) { + mcs_mutex_lock(); + counter++; + mcs_mutex_unlock(); + atomic_inc(&atomic_counter); + } + atomic_dec(&running); +} + +static void test_multi_fair_mutex(int threads, int seconds) +{ + int i; + + assert(mutex_head == -1); + counter = 0; + atomic_counter = 0; + now_stopping = false; + + create_aio_contexts(); + assert(threads <= NUM_CONTEXTS); + running = threads; + for (i = 0; i < threads; i++) { + Coroutine *co1 = qemu_coroutine_create(test_multi_fair_mutex_entry, NULL); + aio_co_schedule(ctx[i], co1); + } + + g_usleep(seconds * 1000000); + + atomic_mb_set(&now_stopping, true); + while (running > 0) { + g_usleep(100000); + } + + join_aio_contexts(); + g_test_message("%d iterations/second\n", counter / seconds); + g_assert_cmpint(counter, ==, atomic_counter); +} + +static void test_multi_fair_mutex_1(void) +{ + test_multi_fair_mutex(NUM_CONTEXTS, 1); +} + +static void test_multi_fair_mutex_10(void) +{ + test_multi_fair_mutex(NUM_CONTEXTS, 10); +} +#endif + +/* Same test with pthread mutexes, for performance comparison and + * portability. */ + +static QemuMutex mutex; + +static void test_multi_mutex_entry(void *opaque) +{ + while (!atomic_mb_read(&now_stopping)) { + qemu_mutex_lock(&mutex); + counter++; + qemu_mutex_unlock(&mutex); + atomic_inc(&atomic_counter); + } + atomic_dec(&running); +} + +static void test_multi_mutex(int threads, int seconds) +{ + int i; + + qemu_mutex_init(&mutex); + counter = 0; + atomic_counter = 0; + now_stopping = false; + + create_aio_contexts(); + assert(threads <= NUM_CONTEXTS); + running = threads; + for (i = 0; i < threads; i++) { + Coroutine *co1 = qemu_coroutine_create(test_multi_mutex_entry, NULL); + aio_co_schedule(ctx[i], co1); + } + + g_usleep(seconds * 1000000); + + atomic_mb_set(&now_stopping, true); + while (running > 0) { + g_usleep(100000); + } + + join_aio_contexts(); + g_test_message("%d iterations/second\n", counter / seconds); + g_assert_cmpint(counter, ==, atomic_counter); +} + +static void test_multi_mutex_1(void) +{ + test_multi_mutex(NUM_CONTEXTS, 1); +} + +static void test_multi_mutex_10(void) +{ + test_multi_mutex(NUM_CONTEXTS, 10); +} + +/* End of tests. */ + +int main(int argc, char **argv) +{ + init_clocks(); + + g_test_init(&argc, &argv, NULL); + g_test_add_func("/aio/multi/lifecycle", test_lifecycle); + if (g_test_quick()) { + g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1); + g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_1); + g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_3); +#ifdef CONFIG_LINUX + g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_1); +#endif + g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_1); + } else { + g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10); + g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_10); + g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_30); +#ifdef CONFIG_LINUX + g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_10); +#endif + g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_10); + } + return g_test_run(); +} diff --git a/tests/test-thread-pool.c b/tests/test-thread-pool.c index 8dbf66a..91b4ec5 100644 --- a/tests/test-thread-pool.c +++ b/tests/test-thread-pool.c @@ -6,6 +6,7 @@ #include "qapi/error.h" #include "qemu/timer.h" #include "qemu/error-report.h" +#include "qemu/main-loop.h" static AioContext *ctx; static ThreadPool *pool; @@ -224,15 +225,9 @@ static void test_cancel_async(void) int main(int argc, char **argv) { int ret; - Error *local_error = NULL; - init_clocks(); - - ctx = aio_context_new(&local_error); - if (!ctx) { - error_reportf_err(local_error, "Failed to create AIO Context: "); - exit(1); - } + qemu_init_main_loop(&error_abort); + ctx = qemu_get_current_aio_context(); pool = aio_get_thread_pool(ctx); g_test_init(&argc, &argv, NULL); @@ -245,6 +240,5 @@ int main(int argc, char **argv) ret = g_test_run(); - aio_context_unref(ctx); return ret; } diff --git a/trace-events b/trace-events index 756a947..7288557 100644 --- a/trace-events +++ b/trace-events @@ -25,17 +25,6 @@ # # The <format-string> should be a sprintf()-compatible format string. -# aio-posix.c -run_poll_handlers_begin(void *ctx, int64_t max_ns) "ctx %p max_ns %"PRId64 -run_poll_handlers_end(void *ctx, bool progress) "ctx %p progress %d" -poll_shrink(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64 -poll_grow(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64 - -# thread-pool.c -thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p" -thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d" -thread_pool_cancel(void *req, void *opaque) "req %p opaque %p" - # ioport.c cpu_in(unsigned int addr, char size, unsigned int val) "addr %#x(%c) value %u" cpu_out(unsigned int addr, char size, unsigned int val) "addr %#x(%c) value %u" diff --git a/util/Makefile.objs b/util/Makefile.objs index 56c8c23..bc629e2 100644 --- a/util/Makefile.objs +++ b/util/Makefile.objs @@ -1,14 +1,18 @@ util-obj-y = osdep.o cutils.o unicode.o qemu-timer-common.o util-obj-y += bufferiszero.o util-obj-y += lockcnt.o +util-obj-y += aiocb.o async.o thread-pool.o qemu-timer.o +util-obj-y += main-loop.o iohandler.o +util-obj-$(CONFIG_POSIX) += aio-posix.o util-obj-$(CONFIG_POSIX) += compatfd.o util-obj-$(CONFIG_POSIX) += event_notifier-posix.o util-obj-$(CONFIG_POSIX) += mmap-alloc.o util-obj-$(CONFIG_POSIX) += oslib-posix.o util-obj-$(CONFIG_POSIX) += qemu-openpty.o util-obj-$(CONFIG_POSIX) += qemu-thread-posix.o -util-obj-$(CONFIG_WIN32) += event_notifier-win32.o util-obj-$(CONFIG_POSIX) += memfd.o +util-obj-$(CONFIG_WIN32) += aio-win32.o +util-obj-$(CONFIG_WIN32) += event_notifier-win32.o util-obj-$(CONFIG_WIN32) += oslib-win32.o util-obj-$(CONFIG_WIN32) += qemu-thread-win32.o util-obj-y += envlist.o path.o module.o diff --git a/aio-posix.c b/util/aio-posix.c index 577527f..2d51239 100644 --- a/aio-posix.c +++ b/util/aio-posix.c @@ -19,7 +19,7 @@ #include "qemu/rcu_queue.h" #include "qemu/sockets.h" #include "qemu/cutils.h" -#include "trace-root.h" +#include "trace.h" #ifdef CONFIG_EPOLL_CREATE1 #include <sys/epoll.h> #endif @@ -386,12 +386,6 @@ static bool aio_dispatch_handlers(AioContext *ctx) AioHandler *node, *tmp; bool progress = false; - /* - * We have to walk very carefully in case aio_set_fd_handler is - * called while we're walking. - */ - qemu_lockcnt_inc(&ctx->list_lock); - QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) { int revents; @@ -426,33 +420,17 @@ static bool aio_dispatch_handlers(AioContext *ctx) } } - qemu_lockcnt_dec(&ctx->list_lock); return progress; } -/* - * Note that dispatch_fds == false has the side-effect of post-poning the - * freeing of deleted handlers. - */ -bool aio_dispatch(AioContext *ctx, bool dispatch_fds) +void aio_dispatch(AioContext *ctx) { - bool progress; - - /* - * If there are callbacks left that have been queued, we need to call them. - * Do not call select in this case, because it is possible that the caller - * does not need a complete flush (as is the case for aio_poll loops). - */ - progress = aio_bh_poll(ctx); - - if (dispatch_fds) { - progress |= aio_dispatch_handlers(ctx); - } - - /* Run our timers */ - progress |= timerlistgroup_run_timers(&ctx->tlg); + qemu_lockcnt_inc(&ctx->list_lock); + aio_bh_poll(ctx); + aio_dispatch_handlers(ctx); + qemu_lockcnt_dec(&ctx->list_lock); - return progress; + timerlistgroup_run_timers(&ctx->tlg); } /* These thread-local variables are used only in a small part of aio_poll @@ -597,9 +575,6 @@ bool aio_poll(AioContext *ctx, bool blocking) int64_t timeout; int64_t start = 0; - aio_context_acquire(ctx); - progress = false; - /* aio_notify can avoid the expensive event_notifier_set if * everything (file descriptors, bottom halves, timers) will * be re-evaluated before the next blocking poll(). This is @@ -617,9 +592,8 @@ bool aio_poll(AioContext *ctx, bool blocking) start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); } - if (try_poll_mode(ctx, blocking)) { - progress = true; - } else { + progress = try_poll_mode(ctx, blocking); + if (!progress) { assert(npfd == 0); /* fill pollfds */ @@ -636,9 +610,6 @@ bool aio_poll(AioContext *ctx, bool blocking) timeout = blocking ? aio_compute_timeout(ctx) : 0; /* wait until next event */ - if (timeout) { - aio_context_release(ctx); - } if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) { AioHandler epoll_handler; @@ -650,9 +621,6 @@ bool aio_poll(AioContext *ctx, bool blocking) } else { ret = qemu_poll_ns(pollfds, npfd, timeout); } - if (timeout) { - aio_context_acquire(ctx); - } } if (blocking) { @@ -710,14 +678,16 @@ bool aio_poll(AioContext *ctx, bool blocking) } npfd = 0; - qemu_lockcnt_dec(&ctx->list_lock); - /* Run dispatch even if there were no readable fds to run timers */ - if (aio_dispatch(ctx, ret > 0)) { - progress = true; + progress |= aio_bh_poll(ctx); + + if (ret > 0) { + progress |= aio_dispatch_handlers(ctx); } - aio_context_release(ctx); + qemu_lockcnt_dec(&ctx->list_lock); + + progress |= timerlistgroup_run_timers(&ctx->tlg); return progress; } diff --git a/aio-win32.c b/util/aio-win32.c index 900524c..bca496a 100644 --- a/aio-win32.c +++ b/util/aio-win32.c @@ -253,8 +253,6 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event) bool progress = false; AioHandler *tmp; - qemu_lockcnt_inc(&ctx->list_lock); - /* * We have to walk very carefully in case aio_set_fd_handler is * called while we're walking. @@ -305,20 +303,16 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event) } } - qemu_lockcnt_dec(&ctx->list_lock); return progress; } -bool aio_dispatch(AioContext *ctx, bool dispatch_fds) +void aio_dispatch(AioContext *ctx) { - bool progress; - - progress = aio_bh_poll(ctx); - if (dispatch_fds) { - progress |= aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE); - } - progress |= timerlistgroup_run_timers(&ctx->tlg); - return progress; + qemu_lockcnt_inc(&ctx->list_lock); + aio_bh_poll(ctx); + aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE); + qemu_lockcnt_dec(&ctx->list_lock); + timerlistgroup_run_timers(&ctx->tlg); } bool aio_poll(AioContext *ctx, bool blocking) @@ -329,7 +323,6 @@ bool aio_poll(AioContext *ctx, bool blocking) int count; int timeout; - aio_context_acquire(ctx); progress = false; /* aio_notify can avoid the expensive event_notifier_set if @@ -355,7 +348,6 @@ bool aio_poll(AioContext *ctx, bool blocking) } } - qemu_lockcnt_dec(&ctx->list_lock); first = true; /* ctx->notifier is always registered. */ @@ -371,17 +363,11 @@ bool aio_poll(AioContext *ctx, bool blocking) timeout = blocking && !have_select_revents ? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0; - if (timeout) { - aio_context_release(ctx); - } ret = WaitForMultipleObjects(count, events, FALSE, timeout); if (blocking) { assert(first); atomic_sub(&ctx->notify_me, 2); } - if (timeout) { - aio_context_acquire(ctx); - } if (first) { aio_notify_accept(ctx); @@ -404,9 +390,9 @@ bool aio_poll(AioContext *ctx, bool blocking) progress |= aio_dispatch_handlers(ctx, event); } while (count > 0); - progress |= timerlistgroup_run_timers(&ctx->tlg); + qemu_lockcnt_dec(&ctx->list_lock); - aio_context_release(ctx); + progress |= timerlistgroup_run_timers(&ctx->tlg); return progress; } diff --git a/util/aiocb.c b/util/aiocb.c new file mode 100644 index 0000000..5aef3a0 --- /dev/null +++ b/util/aiocb.c @@ -0,0 +1,55 @@ +/* + * BlockAIOCB allocation + * + * Copyright (c) 2003-2017 Fabrice Bellard and other QEMU contributors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include "qemu/osdep.h" +#include "block/aio.h" + +void *qemu_aio_get(const AIOCBInfo *aiocb_info, BlockDriverState *bs, + BlockCompletionFunc *cb, void *opaque) +{ + BlockAIOCB *acb; + + acb = g_malloc(aiocb_info->aiocb_size); + acb->aiocb_info = aiocb_info; + acb->bs = bs; + acb->cb = cb; + acb->opaque = opaque; + acb->refcnt = 1; + return acb; +} + +void qemu_aio_ref(void *p) +{ + BlockAIOCB *acb = p; + acb->refcnt++; +} + +void qemu_aio_unref(void *p) +{ + BlockAIOCB *acb = p; + assert(acb->refcnt > 0); + if (--acb->refcnt == 0) { + g_free(acb); + } +} @@ -1,7 +1,8 @@ /* - * QEMU System Emulator + * Data plane event loop * * Copyright (c) 2003-2008 Fabrice Bellard + * Copyright (c) 2009-2017 QEMU contributors * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -30,6 +31,8 @@ #include "qemu/main-loop.h" #include "qemu/atomic.h" #include "block/raw-aio.h" +#include "qemu/coroutine_int.h" +#include "trace.h" /***********************************************************/ /* bottom halves (can be seen as timers which expire ASAP) */ @@ -87,15 +90,16 @@ void aio_bh_call(QEMUBH *bh) bh->cb(bh->opaque); } -/* Multiple occurrences of aio_bh_poll cannot be called concurrently */ +/* Multiple occurrences of aio_bh_poll cannot be called concurrently. + * The count in ctx->list_lock is incremented before the call, and is + * not affected by the call. + */ int aio_bh_poll(AioContext *ctx) { QEMUBH *bh, **bhp, *next; int ret; bool deleted = false; - qemu_lockcnt_inc(&ctx->list_lock); - ret = 0; for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) { next = atomic_rcu_read(&bh->next); @@ -120,11 +124,10 @@ int aio_bh_poll(AioContext *ctx) /* remove deleted bhs */ if (!deleted) { - qemu_lockcnt_dec(&ctx->list_lock); return ret; } - if (qemu_lockcnt_dec_and_lock(&ctx->list_lock)) { + if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) { bhp = &ctx->first_bh; while (*bhp) { bh = *bhp; @@ -135,7 +138,7 @@ int aio_bh_poll(AioContext *ctx) bhp = &bh->next; } } - qemu_lockcnt_unlock(&ctx->list_lock); + qemu_lockcnt_inc_and_unlock(&ctx->list_lock); } return ret; } @@ -255,7 +258,7 @@ aio_ctx_dispatch(GSource *source, AioContext *ctx = (AioContext *) source; assert(callback == NULL); - aio_dispatch(ctx, true); + aio_dispatch(ctx); return true; } @@ -274,6 +277,9 @@ aio_ctx_finalize(GSource *source) } #endif + assert(QSLIST_EMPTY(&ctx->scheduled_coroutines)); + qemu_bh_delete(ctx->co_schedule_bh); + qemu_lockcnt_lock(&ctx->list_lock); assert(!qemu_lockcnt_count(&ctx->list_lock)); while (ctx->first_bh) { @@ -363,6 +369,30 @@ static bool event_notifier_poll(void *opaque) return atomic_read(&ctx->notified); } +static void co_schedule_bh_cb(void *opaque) +{ + AioContext *ctx = opaque; + QSLIST_HEAD(, Coroutine) straight, reversed; + + QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines); + QSLIST_INIT(&straight); + + while (!QSLIST_EMPTY(&reversed)) { + Coroutine *co = QSLIST_FIRST(&reversed); + QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next); + QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next); + } + + while (!QSLIST_EMPTY(&straight)) { + Coroutine *co = QSLIST_FIRST(&straight); + QSLIST_REMOVE_HEAD(&straight, co_scheduled_next); + trace_aio_co_schedule_bh_cb(ctx, co); + aio_context_acquire(ctx); + qemu_coroutine_enter(co); + aio_context_release(ctx); + } +} + AioContext *aio_context_new(Error **errp) { int ret; @@ -378,6 +408,10 @@ AioContext *aio_context_new(Error **errp) } g_source_set_can_recurse(&ctx->source, true); qemu_lockcnt_init(&ctx->list_lock); + + ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx); + QSLIST_INIT(&ctx->scheduled_coroutines); + aio_set_event_notifier(ctx, &ctx->notifier, false, (EventNotifierHandler *) @@ -401,6 +435,40 @@ fail: return NULL; } +void aio_co_schedule(AioContext *ctx, Coroutine *co) +{ + trace_aio_co_schedule(ctx, co); + QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines, + co, co_scheduled_next); + qemu_bh_schedule(ctx->co_schedule_bh); +} + +void aio_co_wake(struct Coroutine *co) +{ + AioContext *ctx; + + /* Read coroutine before co->ctx. Matches smp_wmb in + * qemu_coroutine_enter. + */ + smp_read_barrier_depends(); + ctx = atomic_read(&co->ctx); + + if (ctx != qemu_get_current_aio_context()) { + aio_co_schedule(ctx, co); + return; + } + + if (qemu_in_coroutine()) { + Coroutine *self = qemu_coroutine_self(); + assert(self != co); + QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next); + } else { + aio_context_acquire(ctx); + qemu_coroutine_enter(co); + aio_context_release(ctx); + } +} + void aio_context_ref(AioContext *ctx) { g_source_ref(&ctx->source); diff --git a/iohandler.c b/util/iohandler.c index 623b55b..623b55b 100644 --- a/iohandler.c +++ b/util/iohandler.c diff --git a/main-loop.c b/util/main-loop.c index ad10bca..ad10bca 100644 --- a/main-loop.c +++ b/util/main-loop.c diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c index 14cf9ce..6328eed 100644 --- a/util/qemu-coroutine-lock.c +++ b/util/qemu-coroutine-lock.c @@ -20,13 +20,19 @@ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. + * + * The lock-free mutex implementation is based on OSv + * (core/lfmutex.cc, include/lockfree/mutex.hh). + * Copyright (C) 2013 Cloudius Systems, Ltd. */ #include "qemu/osdep.h" #include "qemu-common.h" #include "qemu/coroutine.h" #include "qemu/coroutine_int.h" +#include "qemu/processor.h" #include "qemu/queue.h" +#include "block/aio.h" #include "trace.h" void qemu_co_queue_init(CoQueue *queue) @@ -34,12 +40,30 @@ void qemu_co_queue_init(CoQueue *queue) QSIMPLEQ_INIT(&queue->entries); } -void coroutine_fn qemu_co_queue_wait(CoQueue *queue) +void coroutine_fn qemu_co_queue_wait(CoQueue *queue, CoMutex *mutex) { Coroutine *self = qemu_coroutine_self(); QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next); + + if (mutex) { + qemu_co_mutex_unlock(mutex); + } + + /* There is no race condition here. Other threads will call + * aio_co_schedule on our AioContext, which can reenter this + * coroutine but only after this yield and after the main loop + * has gone through the next iteration. + */ qemu_coroutine_yield(); assert(qemu_in_coroutine()); + + /* TODO: OSv implements wait morphing here, where the wakeup + * primitive automatically places the woken coroutine on the + * mutex's queue. This avoids the thundering herd effect. + */ + if (mutex) { + qemu_co_mutex_lock(mutex); + } } /** @@ -63,7 +87,6 @@ void qemu_co_queue_run_restart(Coroutine *co) static bool qemu_co_queue_do_restart(CoQueue *queue, bool single) { - Coroutine *self = qemu_coroutine_self(); Coroutine *next; if (QSIMPLEQ_EMPTY(&queue->entries)) { @@ -72,8 +95,7 @@ static bool qemu_co_queue_do_restart(CoQueue *queue, bool single) while ((next = QSIMPLEQ_FIRST(&queue->entries)) != NULL) { QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next); - QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, next, co_queue_next); - trace_qemu_co_queue_next(next); + aio_co_wake(next); if (single) { break; } @@ -112,27 +134,157 @@ bool qemu_co_queue_empty(CoQueue *queue) return QSIMPLEQ_FIRST(&queue->entries) == NULL; } +/* The wait records are handled with a multiple-producer, single-consumer + * lock-free queue. There cannot be two concurrent pop_waiter() calls + * because pop_waiter() can only be called while mutex->handoff is zero. + * This can happen in three cases: + * - in qemu_co_mutex_unlock, before the hand-off protocol has started. + * In this case, qemu_co_mutex_lock will see mutex->handoff == 0 and + * not take part in the handoff. + * - in qemu_co_mutex_lock, if it steals the hand-off responsibility from + * qemu_co_mutex_unlock. In this case, qemu_co_mutex_unlock will fail + * the cmpxchg (it will see either 0 or the next sequence value) and + * exit. The next hand-off cannot begin until qemu_co_mutex_lock has + * woken up someone. + * - in qemu_co_mutex_unlock, if it takes the hand-off token itself. + * In this case another iteration starts with mutex->handoff == 0; + * a concurrent qemu_co_mutex_lock will fail the cmpxchg, and + * qemu_co_mutex_unlock will go back to case (1). + * + * The following functions manage this queue. + */ +typedef struct CoWaitRecord { + Coroutine *co; + QSLIST_ENTRY(CoWaitRecord) next; +} CoWaitRecord; + +static void push_waiter(CoMutex *mutex, CoWaitRecord *w) +{ + w->co = qemu_coroutine_self(); + QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next); +} + +static void move_waiters(CoMutex *mutex) +{ + QSLIST_HEAD(, CoWaitRecord) reversed; + QSLIST_MOVE_ATOMIC(&reversed, &mutex->from_push); + while (!QSLIST_EMPTY(&reversed)) { + CoWaitRecord *w = QSLIST_FIRST(&reversed); + QSLIST_REMOVE_HEAD(&reversed, next); + QSLIST_INSERT_HEAD(&mutex->to_pop, w, next); + } +} + +static CoWaitRecord *pop_waiter(CoMutex *mutex) +{ + CoWaitRecord *w; + + if (QSLIST_EMPTY(&mutex->to_pop)) { + move_waiters(mutex); + if (QSLIST_EMPTY(&mutex->to_pop)) { + return NULL; + } + } + w = QSLIST_FIRST(&mutex->to_pop); + QSLIST_REMOVE_HEAD(&mutex->to_pop, next); + return w; +} + +static bool has_waiters(CoMutex *mutex) +{ + return QSLIST_EMPTY(&mutex->to_pop) || QSLIST_EMPTY(&mutex->from_push); +} + void qemu_co_mutex_init(CoMutex *mutex) { memset(mutex, 0, sizeof(*mutex)); - qemu_co_queue_init(&mutex->queue); } -void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex) +static void coroutine_fn qemu_co_mutex_wake(CoMutex *mutex, Coroutine *co) +{ + /* Read co before co->ctx; pairs with smp_wmb() in + * qemu_coroutine_enter(). + */ + smp_read_barrier_depends(); + mutex->ctx = co->ctx; + aio_co_wake(co); +} + +static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx, + CoMutex *mutex) { Coroutine *self = qemu_coroutine_self(); + CoWaitRecord w; + unsigned old_handoff; trace_qemu_co_mutex_lock_entry(mutex, self); + w.co = self; + push_waiter(mutex, &w); + + /* This is the "Responsibility Hand-Off" protocol; a lock() picks from + * a concurrent unlock() the responsibility of waking somebody up. + */ + old_handoff = atomic_mb_read(&mutex->handoff); + if (old_handoff && + has_waiters(mutex) && + atomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) { + /* There can be no concurrent pops, because there can be only + * one active handoff at a time. + */ + CoWaitRecord *to_wake = pop_waiter(mutex); + Coroutine *co = to_wake->co; + if (co == self) { + /* We got the lock ourselves! */ + assert(to_wake == &w); + mutex->ctx = ctx; + return; + } + + qemu_co_mutex_wake(mutex, co); + } - while (mutex->locked) { - qemu_co_queue_wait(&mutex->queue); + qemu_coroutine_yield(); + trace_qemu_co_mutex_lock_return(mutex, self); +} + +void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex) +{ + AioContext *ctx = qemu_get_current_aio_context(); + Coroutine *self = qemu_coroutine_self(); + int waiters, i; + + /* Running a very small critical section on pthread_mutex_t and CoMutex + * shows that pthread_mutex_t is much faster because it doesn't actually + * go to sleep. What happens is that the critical section is shorter + * than the latency of entering the kernel and thus FUTEX_WAIT always + * fails. With CoMutex there is no such latency but you still want to + * avoid wait and wakeup. So introduce it artificially. + */ + i = 0; +retry_fast_path: + waiters = atomic_cmpxchg(&mutex->locked, 0, 1); + if (waiters != 0) { + while (waiters == 1 && ++i < 1000) { + if (atomic_read(&mutex->ctx) == ctx) { + break; + } + if (atomic_read(&mutex->locked) == 0) { + goto retry_fast_path; + } + cpu_relax(); + } + waiters = atomic_fetch_inc(&mutex->locked); } - mutex->locked = true; + if (waiters == 0) { + /* Uncontended. */ + trace_qemu_co_mutex_lock_uncontended(mutex, self); + mutex->ctx = ctx; + } else { + qemu_co_mutex_lock_slowpath(ctx, mutex); + } mutex->holder = self; self->locks_held++; - - trace_qemu_co_mutex_lock_return(mutex, self); } void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex) @@ -141,14 +293,51 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex) trace_qemu_co_mutex_unlock_entry(mutex, self); - assert(mutex->locked == true); + assert(mutex->locked); assert(mutex->holder == self); assert(qemu_in_coroutine()); - mutex->locked = false; + mutex->ctx = NULL; mutex->holder = NULL; self->locks_held--; - qemu_co_queue_next(&mutex->queue); + if (atomic_fetch_dec(&mutex->locked) == 1) { + /* No waiting qemu_co_mutex_lock(). Pfew, that was easy! */ + return; + } + + for (;;) { + CoWaitRecord *to_wake = pop_waiter(mutex); + unsigned our_handoff; + + if (to_wake) { + qemu_co_mutex_wake(mutex, to_wake->co); + break; + } + + /* Some concurrent lock() is in progress (we know this because + * mutex->locked was >1) but it hasn't yet put itself on the wait + * queue. Pick a sequence number for the handoff protocol (not 0). + */ + if (++mutex->sequence == 0) { + mutex->sequence = 1; + } + + our_handoff = mutex->sequence; + atomic_mb_set(&mutex->handoff, our_handoff); + if (!has_waiters(mutex)) { + /* The concurrent lock has not added itself yet, so it + * will be able to pick our handoff. + */ + break; + } + + /* Try to do the handoff protocol ourselves; if somebody else has + * already taken it, however, we're done and they're responsible. + */ + if (atomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) { + break; + } + } trace_qemu_co_mutex_unlock_return(mutex, self); } @@ -157,16 +346,22 @@ void qemu_co_rwlock_init(CoRwlock *lock) { memset(lock, 0, sizeof(*lock)); qemu_co_queue_init(&lock->queue); + qemu_co_mutex_init(&lock->mutex); } void qemu_co_rwlock_rdlock(CoRwlock *lock) { Coroutine *self = qemu_coroutine_self(); - while (lock->writer) { - qemu_co_queue_wait(&lock->queue); + qemu_co_mutex_lock(&lock->mutex); + /* For fairness, wait if a writer is in line. */ + while (lock->pending_writer) { + qemu_co_queue_wait(&lock->queue, &lock->mutex); } lock->reader++; + qemu_co_mutex_unlock(&lock->mutex); + + /* The rest of the read-side critical section is run without the mutex. */ self->locks_held++; } @@ -175,10 +370,13 @@ void qemu_co_rwlock_unlock(CoRwlock *lock) Coroutine *self = qemu_coroutine_self(); assert(qemu_in_coroutine()); - if (lock->writer) { - lock->writer = false; + if (!lock->reader) { + /* The critical section started in qemu_co_rwlock_wrlock. */ qemu_co_queue_restart_all(&lock->queue); } else { + self->locks_held--; + + qemu_co_mutex_lock(&lock->mutex); lock->reader--; assert(lock->reader >= 0); /* Wakeup only one waiting writer */ @@ -186,16 +384,20 @@ void qemu_co_rwlock_unlock(CoRwlock *lock) qemu_co_queue_next(&lock->queue); } } - self->locks_held--; + qemu_co_mutex_unlock(&lock->mutex); } void qemu_co_rwlock_wrlock(CoRwlock *lock) { - Coroutine *self = qemu_coroutine_self(); - - while (lock->writer || lock->reader) { - qemu_co_queue_wait(&lock->queue); + qemu_co_mutex_lock(&lock->mutex); + lock->pending_writer++; + while (lock->reader) { + qemu_co_queue_wait(&lock->queue, &lock->mutex); } - lock->writer = true; - self->locks_held++; + lock->pending_writer--; + + /* The rest of the write-side critical section is run with + * the mutex taken, so that lock->reader remains zero. + * There is no need to update self->locks_held. + */ } diff --git a/util/qemu-coroutine-sleep.c b/util/qemu-coroutine-sleep.c index 25de3ed..9c56550 100644 --- a/util/qemu-coroutine-sleep.c +++ b/util/qemu-coroutine-sleep.c @@ -25,7 +25,7 @@ static void co_sleep_cb(void *opaque) { CoSleepCB *sleep_cb = opaque; - qemu_coroutine_enter(sleep_cb->co); + aio_co_wake(sleep_cb->co); } void coroutine_fn co_aio_sleep_ns(AioContext *ctx, QEMUClockType type, diff --git a/util/qemu-coroutine.c b/util/qemu-coroutine.c index a5d2f6c..72412e5 100644 --- a/util/qemu-coroutine.c +++ b/util/qemu-coroutine.c @@ -19,6 +19,7 @@ #include "qemu/atomic.h" #include "qemu/coroutine.h" #include "qemu/coroutine_int.h" +#include "block/aio.h" enum { POOL_BATCH_SIZE = 64, @@ -114,6 +115,13 @@ void qemu_coroutine_enter(Coroutine *co) } co->caller = self; + co->ctx = qemu_get_current_aio_context(); + + /* Store co->ctx before anything that stores co. Matches + * barrier in aio_co_wake and qemu_co_mutex_wake. + */ + smp_wmb(); + ret = qemu_coroutine_switch(self, co, COROUTINE_ENTER); qemu_co_queue_run_restart(co); diff --git a/qemu-timer.c b/util/qemu-timer.c index ff620ec..ff620ec 100644 --- a/qemu-timer.c +++ b/util/qemu-timer.c diff --git a/thread-pool.c b/util/thread-pool.c index 3847969..ce6cd30 100644 --- a/thread-pool.c +++ b/util/thread-pool.c @@ -19,7 +19,7 @@ #include "qemu/queue.h" #include "qemu/thread.h" #include "qemu/coroutine.h" -#include "trace-root.h" +#include "trace.h" #include "block/thread-pool.h" #include "qemu/main-loop.h" @@ -165,6 +165,7 @@ static void thread_pool_completion_bh(void *opaque) ThreadPool *pool = opaque; ThreadPoolElement *elem, *next; + aio_context_acquire(pool->ctx); restart: QLIST_FOREACH_SAFE(elem, &pool->head, all, next) { if (elem->state != THREAD_DONE) { @@ -184,13 +185,16 @@ restart: */ qemu_bh_schedule(pool->completion_bh); + aio_context_release(pool->ctx); elem->common.cb(elem->common.opaque, elem->ret); + aio_context_acquire(pool->ctx); qemu_aio_unref(elem); goto restart; } else { qemu_aio_unref(elem); } } + aio_context_release(pool->ctx); } static void thread_pool_cancel(BlockAIOCB *acb) @@ -267,7 +271,7 @@ static void thread_pool_co_cb(void *opaque, int ret) ThreadPoolCo *co = opaque; co->ret = ret; - qemu_coroutine_enter(co->co); + aio_co_wake(co->co); } int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, diff --git a/util/trace-events b/util/trace-events index 2b8aa30..ac27d94 100644 --- a/util/trace-events +++ b/util/trace-events @@ -1,5 +1,20 @@ # See docs/tracing.txt for syntax documentation. +# util/aio-posix.c +run_poll_handlers_begin(void *ctx, int64_t max_ns) "ctx %p max_ns %"PRId64 +run_poll_handlers_end(void *ctx, bool progress) "ctx %p progress %d" +poll_shrink(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64 +poll_grow(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64 + +# util/async.c +aio_co_schedule(void *ctx, void *co) "ctx %p co %p" +aio_co_schedule_bh_cb(void *ctx, void *co) "ctx %p co %p" + +# util/thread-pool.c +thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p" +thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d" +thread_pool_cancel(void *req, void *opaque) "req %p opaque %p" + # util/buffer.c buffer_resize(const char *buf, size_t olen, size_t len) "%s: old %zd, new %zd" buffer_move_empty(const char *buf, size_t len, const char *from) "%s: %zd bytes from %s" @@ -13,7 +28,7 @@ qemu_coroutine_terminate(void *co) "self %p" # util/qemu-coroutine-lock.c qemu_co_queue_run_restart(void *co) "co %p" -qemu_co_queue_next(void *nxt) "next %p" +qemu_co_mutex_lock_uncontended(void *mutex, void *self) "mutex %p self %p" qemu_co_mutex_lock_entry(void *mutex, void *self) "mutex %p self %p" qemu_co_mutex_lock_return(void *mutex, void *self) "mutex %p self %p" qemu_co_mutex_unlock_entry(void *mutex, void *self) "mutex %p self %p" |