diff options
-rw-r--r-- | block.c | 6 | ||||
-rw-r--r-- | block/block-backend.c | 31 | ||||
-rw-r--r-- | block/commit.c | 97 | ||||
-rw-r--r-- | block/io.c | 30 | ||||
-rw-r--r-- | block/linux-aio.c | 2 | ||||
-rw-r--r-- | block/mirror.c | 49 | ||||
-rw-r--r-- | block/stream.c | 28 | ||||
-rw-r--r-- | blockdev.c | 84 | ||||
-rw-r--r-- | blockjob.c | 9 | ||||
-rw-r--r-- | hmp.c | 5 | ||||
-rw-r--r-- | include/block/aio-wait.h | 28 | ||||
-rw-r--r-- | include/block/block.h | 6 | ||||
-rw-r--r-- | include/block/block_int.h | 18 | ||||
-rw-r--r-- | include/block/blockjob.h | 3 | ||||
-rw-r--r-- | include/qemu/coroutine.h | 5 | ||||
-rw-r--r-- | include/qemu/job.h | 23 | ||||
-rw-r--r-- | job.c | 144 | ||||
-rw-r--r-- | qapi/block-core.json | 104 | ||||
-rwxr-xr-x | tests/qemu-iotests/040 | 52 | ||||
-rw-r--r-- | tests/qemu-iotests/040.out | 4 | ||||
-rwxr-xr-x | tests/qemu-iotests/051 | 3 | ||||
-rw-r--r-- | tests/qemu-iotests/051.out | 3 | ||||
-rw-r--r-- | tests/qemu-iotests/051.pc.out | 3 | ||||
-rw-r--r-- | tests/test-bdrv-drain.c | 294 | ||||
-rw-r--r-- | tests/test-blockjob-txn.c | 4 | ||||
-rw-r--r-- | tests/test-blockjob.c | 120 | ||||
-rw-r--r-- | util/aio-wait.c | 11 | ||||
-rw-r--r-- | util/async.c | 2 | ||||
-rw-r--r-- | util/qemu-coroutine.c | 5 |
29 files changed, 856 insertions, 317 deletions
@@ -2792,6 +2792,7 @@ static BlockDriverState *bdrv_open_inherit(const char *filename, bdrv_parent_cb_change_media(bs, true); qobject_unref(options); + options = NULL; /* For snapshot=on, create a temporary qcow2 overlay. bs points to the * temporary snapshot afterwards. */ @@ -4885,11 +4886,6 @@ AioContext *bdrv_get_aio_context(BlockDriverState *bs) return bs ? bs->aio_context : qemu_get_aio_context(); } -AioWait *bdrv_get_aio_wait(BlockDriverState *bs) -{ - return bs ? &bs->wait : NULL; -} - void bdrv_coroutine_enter(BlockDriverState *bs, Coroutine *co) { aio_co_enter(bdrv_get_aio_context(bs), co); diff --git a/block/block-backend.c b/block/block-backend.c index 14a1b7a..7b1ec50 100644 --- a/block/block-backend.c +++ b/block/block-backend.c @@ -88,7 +88,6 @@ struct BlockBackend { * Accessed with atomic ops. */ unsigned int in_flight; - AioWait wait; }; typedef struct BlockBackendAIOCB { @@ -121,6 +120,7 @@ static void blk_root_inherit_options(int *child_flags, QDict *child_options, abort(); } static void blk_root_drained_begin(BdrvChild *child); +static bool blk_root_drained_poll(BdrvChild *child); static void blk_root_drained_end(BdrvChild *child); static void blk_root_change_media(BdrvChild *child, bool load); @@ -294,6 +294,7 @@ static const BdrvChildRole child_root = { .get_parent_desc = blk_root_get_parent_desc, .drained_begin = blk_root_drained_begin, + .drained_poll = blk_root_drained_poll, .drained_end = blk_root_drained_end, .activate = blk_root_activate, @@ -433,6 +434,7 @@ int blk_get_refcnt(BlockBackend *blk) */ void blk_ref(BlockBackend *blk) { + assert(blk->refcnt > 0); blk->refcnt++; } @@ -445,7 +447,13 @@ void blk_unref(BlockBackend *blk) { if (blk) { assert(blk->refcnt > 0); - if (!--blk->refcnt) { + if (blk->refcnt > 1) { + blk->refcnt--; + } else { + blk_drain(blk); + /* blk_drain() cannot resurrect blk, nobody held a reference */ + assert(blk->refcnt == 1); + blk->refcnt = 0; blk_delete(blk); } } @@ -1289,7 +1297,7 @@ static void blk_inc_in_flight(BlockBackend *blk) static void blk_dec_in_flight(BlockBackend *blk) { atomic_dec(&blk->in_flight); - aio_wait_kick(&blk->wait); + aio_wait_kick(); } static void error_callback_bh(void *opaque) @@ -1330,8 +1338,8 @@ static const AIOCBInfo blk_aio_em_aiocb_info = { static void blk_aio_complete(BlkAioEmAIOCB *acb) { if (acb->has_returned) { - blk_dec_in_flight(acb->rwco.blk); acb->common.cb(acb->common.opaque, acb->rwco.ret); + blk_dec_in_flight(acb->rwco.blk); qemu_aio_unref(acb); } } @@ -1590,9 +1598,8 @@ void blk_drain(BlockBackend *blk) } /* We may have -ENOMEDIUM completions in flight */ - AIO_WAIT_WHILE(&blk->wait, - blk_get_aio_context(blk), - atomic_mb_read(&blk->in_flight) > 0); + AIO_WAIT_WHILE(blk_get_aio_context(blk), + atomic_mb_read(&blk->in_flight) > 0); if (bs) { bdrv_drained_end(bs); @@ -1611,8 +1618,7 @@ void blk_drain_all(void) aio_context_acquire(ctx); /* We may have -ENOMEDIUM completions in flight */ - AIO_WAIT_WHILE(&blk->wait, ctx, - atomic_mb_read(&blk->in_flight) > 0); + AIO_WAIT_WHILE(ctx, atomic_mb_read(&blk->in_flight) > 0); aio_context_release(ctx); } @@ -2189,6 +2195,13 @@ static void blk_root_drained_begin(BdrvChild *child) } } +static bool blk_root_drained_poll(BdrvChild *child) +{ + BlockBackend *blk = child->opaque; + assert(blk->quiesce_counter); + return !!blk->in_flight; +} + static void blk_root_drained_end(BdrvChild *child) { BlockBackend *blk = child->opaque; diff --git a/block/commit.c b/block/commit.c index da69165..a2da574 100644 --- a/block/commit.c +++ b/block/commit.c @@ -36,6 +36,7 @@ typedef struct CommitBlockJob { BlockDriverState *commit_top_bs; BlockBackend *top; BlockBackend *base; + BlockDriverState *base_bs; BlockdevOnError on_error; int base_flags; char *backing_file_str; @@ -68,61 +69,67 @@ static int coroutine_fn commit_populate(BlockBackend *bs, BlockBackend *base, return 0; } -static void commit_exit(Job *job) +static int commit_prepare(Job *job) { CommitBlockJob *s = container_of(job, CommitBlockJob, common.job); - BlockJob *bjob = &s->common; - BlockDriverState *top = blk_bs(s->top); - BlockDriverState *base = blk_bs(s->base); - BlockDriverState *commit_top_bs = s->commit_top_bs; - bool remove_commit_top_bs = false; - - /* Make sure commit_top_bs and top stay around until bdrv_replace_node() */ - bdrv_ref(top); - bdrv_ref(commit_top_bs); /* Remove base node parent that still uses BLK_PERM_WRITE/RESIZE before * the normal backing chain can be restored. */ blk_unref(s->base); + s->base = NULL; + + /* FIXME: bdrv_drop_intermediate treats total failures and partial failures + * identically. Further work is needed to disambiguate these cases. */ + return bdrv_drop_intermediate(s->commit_top_bs, s->base_bs, + s->backing_file_str); +} - if (!job_is_cancelled(job) && job->ret == 0) { - /* success */ - job->ret = bdrv_drop_intermediate(s->commit_top_bs, base, - s->backing_file_str); - } else { - /* XXX Can (or should) we somehow keep 'consistent read' blocked even - * after the failed/cancelled commit job is gone? If we already wrote - * something to base, the intermediate images aren't valid any more. */ - remove_commit_top_bs = true; +static void commit_abort(Job *job) +{ + CommitBlockJob *s = container_of(job, CommitBlockJob, common.job); + BlockDriverState *top_bs = blk_bs(s->top); + + /* Make sure commit_top_bs and top stay around until bdrv_replace_node() */ + bdrv_ref(top_bs); + bdrv_ref(s->commit_top_bs); + + if (s->base) { + blk_unref(s->base); } + /* free the blockers on the intermediate nodes so that bdrv_replace_nodes + * can succeed */ + block_job_remove_all_bdrv(&s->common); + + /* If bdrv_drop_intermediate() failed (or was not invoked), remove the + * commit filter driver from the backing chain now. Do this as the final + * step so that the 'consistent read' permission can be granted. + * + * XXX Can (or should) we somehow keep 'consistent read' blocked even + * after the failed/cancelled commit job is gone? If we already wrote + * something to base, the intermediate images aren't valid any more. */ + bdrv_child_try_set_perm(s->commit_top_bs->backing, 0, BLK_PERM_ALL, + &error_abort); + bdrv_replace_node(s->commit_top_bs, backing_bs(s->commit_top_bs), + &error_abort); + + bdrv_unref(s->commit_top_bs); + bdrv_unref(top_bs); +} + +static void commit_clean(Job *job) +{ + CommitBlockJob *s = container_of(job, CommitBlockJob, common.job); + /* restore base open flags here if appropriate (e.g., change the base back * to r/o). These reopens do not need to be atomic, since we won't abort * even on failure here */ - if (s->base_flags != bdrv_get_flags(base)) { - bdrv_reopen(base, s->base_flags, NULL); + if (s->base_flags != bdrv_get_flags(s->base_bs)) { + bdrv_reopen(s->base_bs, s->base_flags, NULL); } + g_free(s->backing_file_str); blk_unref(s->top); - - /* If there is more than one reference to the job (e.g. if called from - * job_finish_sync()), job_completed() won't free it and therefore the - * blockers on the intermediate nodes remain. This would cause - * bdrv_set_backing_hd() to fail. */ - block_job_remove_all_bdrv(bjob); - - /* If bdrv_drop_intermediate() didn't already do that, remove the commit - * filter driver from the backing chain. Do this as the final step so that - * the 'consistent read' permission can be granted. */ - if (remove_commit_top_bs) { - bdrv_child_try_set_perm(commit_top_bs->backing, 0, BLK_PERM_ALL, - &error_abort); - bdrv_replace_node(commit_top_bs, backing_bs(commit_top_bs), - &error_abort); - } - - bdrv_unref(commit_top_bs); - bdrv_unref(top); } static int coroutine_fn commit_run(Job *job, Error **errp) @@ -211,7 +218,9 @@ static const BlockJobDriver commit_job_driver = { .user_resume = block_job_user_resume, .drain = block_job_drain, .run = commit_run, - .exit = commit_exit, + .prepare = commit_prepare, + .abort = commit_abort, + .clean = commit_clean }, }; @@ -249,7 +258,8 @@ static BlockDriver bdrv_commit_top = { }; void commit_start(const char *job_id, BlockDriverState *bs, - BlockDriverState *base, BlockDriverState *top, int64_t speed, + BlockDriverState *base, BlockDriverState *top, + int creation_flags, int64_t speed, BlockdevOnError on_error, const char *backing_file_str, const char *filter_node_name, Error **errp) { @@ -267,7 +277,7 @@ void commit_start(const char *job_id, BlockDriverState *bs, } s = block_job_create(job_id, &commit_job_driver, NULL, bs, 0, BLK_PERM_ALL, - speed, JOB_DEFAULT, NULL, NULL, errp); + speed, creation_flags, NULL, NULL, errp); if (!s) { return; } @@ -344,6 +354,7 @@ void commit_start(const char *job_id, BlockDriverState *bs, if (ret < 0) { goto fail; } + s->base_bs = base; /* Required permissions are already taken with block_job_add_bdrv() */ s->top = blk_new(0, BLK_PERM_ALL); @@ -38,8 +38,6 @@ /* Maximum bounce buffer for copy-on-read and write zeroes, in bytes */ #define MAX_BOUNCE_BUFFER (32768 << BDRV_SECTOR_BITS) -static AioWait drain_all_aio_wait; - static void bdrv_parent_cb_resize(BlockDriverState *bs); static int coroutine_fn bdrv_co_do_pwrite_zeroes(BlockDriverState *bs, int64_t offset, int bytes, BdrvRequestFlags flags); @@ -268,10 +266,6 @@ bool bdrv_drain_poll(BlockDriverState *bs, bool recursive, static bool bdrv_drain_poll_top_level(BlockDriverState *bs, bool recursive, BdrvChild *ignore_parent) { - /* Execute pending BHs first and check everything else only after the BHs - * have executed. */ - while (aio_poll(bs->aio_context, false)); - return bdrv_drain_poll(bs, recursive, ignore_parent, false); } @@ -288,6 +282,18 @@ static void bdrv_co_drain_bh_cb(void *opaque) BlockDriverState *bs = data->bs; if (bs) { + AioContext *ctx = bdrv_get_aio_context(bs); + AioContext *co_ctx = qemu_coroutine_get_aio_context(co); + + /* + * When the coroutine yielded, the lock for its home context was + * released, so we need to re-acquire it here. If it explicitly + * acquired a different context, the lock is still held and we don't + * want to lock it a second time (or AIO_WAIT_WHILE() would hang). + */ + if (ctx == co_ctx) { + aio_context_acquire(ctx); + } bdrv_dec_in_flight(bs); if (data->begin) { bdrv_do_drained_begin(bs, data->recursive, data->parent, @@ -296,6 +302,9 @@ static void bdrv_co_drain_bh_cb(void *opaque) bdrv_do_drained_end(bs, data->recursive, data->parent, data->ignore_bds_parents); } + if (ctx == co_ctx) { + aio_context_release(ctx); + } } else { assert(data->begin); bdrv_drain_all_begin(); @@ -496,10 +505,6 @@ static bool bdrv_drain_all_poll(void) BlockDriverState *bs = NULL; bool result = false; - /* Execute pending BHs first (may modify the graph) and check everything - * else only after the BHs have executed. */ - while (aio_poll(qemu_get_aio_context(), false)); - /* bdrv_drain_poll() can't make changes to the graph and we are holding the * main AioContext lock, so iterating bdrv_next_all_states() is safe. */ while ((bs = bdrv_next_all_states(bs))) { @@ -550,7 +555,7 @@ void bdrv_drain_all_begin(void) } /* Now poll the in-flight requests */ - AIO_WAIT_WHILE(&drain_all_aio_wait, NULL, bdrv_drain_all_poll()); + AIO_WAIT_WHILE(NULL, bdrv_drain_all_poll()); while ((bs = bdrv_next_all_states(bs))) { bdrv_drain_assert_idle(bs); @@ -706,8 +711,7 @@ void bdrv_inc_in_flight(BlockDriverState *bs) void bdrv_wakeup(BlockDriverState *bs) { - aio_wait_kick(bdrv_get_aio_wait(bs)); - aio_wait_kick(&drain_all_aio_wait); + aio_wait_kick(); } void bdrv_dec_in_flight(BlockDriverState *bs) diff --git a/block/linux-aio.c b/block/linux-aio.c index 19eb922..217ce60 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -234,9 +234,9 @@ static void qemu_laio_process_completions(LinuxAioState *s) static void qemu_laio_process_completions_and_submit(LinuxAioState *s) { + aio_context_acquire(s->aio_context); qemu_laio_process_completions(s); - aio_context_acquire(s->aio_context); if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) { ioq_submit(s); } diff --git a/block/mirror.c b/block/mirror.c index b8941db..56d9ef7 100644 --- a/block/mirror.c +++ b/block/mirror.c @@ -79,6 +79,7 @@ typedef struct MirrorBlockJob { int max_iov; bool initial_zeroing_ongoing; int in_active_write_counter; + bool prepared; } MirrorBlockJob; typedef struct MirrorBDSOpaque { @@ -607,7 +608,12 @@ static void mirror_wait_for_all_io(MirrorBlockJob *s) } } -static void mirror_exit(Job *job) +/** + * mirror_exit_common: handle both abort() and prepare() cases. + * for .prepare, returns 0 on success and -errno on failure. + * for .abort cases, denoted by abort = true, MUST return 0. + */ +static int mirror_exit_common(Job *job) { MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job); BlockJob *bjob = &s->common; @@ -617,7 +623,13 @@ static void mirror_exit(Job *job) BlockDriverState *target_bs = blk_bs(s->target); BlockDriverState *mirror_top_bs = s->mirror_top_bs; Error *local_err = NULL; - int ret = job->ret; + bool abort = job->ret < 0; + int ret = 0; + + if (s->prepared) { + return 0; + } + s->prepared = true; bdrv_release_dirty_bitmap(src, s->dirty_bitmap); @@ -642,7 +654,7 @@ static void mirror_exit(Job *job) * required before it could become a backing file of target_bs. */ bdrv_child_try_set_perm(mirror_top_bs->backing, 0, BLK_PERM_ALL, &error_abort); - if (s->backing_mode == MIRROR_SOURCE_BACKING_CHAIN) { + if (!abort && s->backing_mode == MIRROR_SOURCE_BACKING_CHAIN) { BlockDriverState *backing = s->is_none_mode ? src : s->base; if (backing_bs(target_bs) != backing) { bdrv_set_backing_hd(target_bs, backing, &local_err); @@ -658,11 +670,8 @@ static void mirror_exit(Job *job) aio_context_acquire(replace_aio_context); } - if (s->should_complete && ret == 0) { - BlockDriverState *to_replace = src; - if (s->to_replace) { - to_replace = s->to_replace; - } + if (s->should_complete && !abort) { + BlockDriverState *to_replace = s->to_replace ?: src; if (bdrv_get_flags(target_bs) != bdrv_get_flags(to_replace)) { bdrv_reopen(target_bs, bdrv_get_flags(to_replace), NULL); @@ -711,7 +720,18 @@ static void mirror_exit(Job *job) bdrv_unref(mirror_top_bs); bdrv_unref(src); - job->ret = ret; + return ret; +} + +static int mirror_prepare(Job *job) +{ + return mirror_exit_common(job); +} + +static void mirror_abort(Job *job) +{ + int ret = mirror_exit_common(job); + assert(ret == 0); } static void mirror_throttle(MirrorBlockJob *s) @@ -1132,7 +1152,8 @@ static const BlockJobDriver mirror_job_driver = { .user_resume = block_job_user_resume, .drain = block_job_drain, .run = mirror_run, - .exit = mirror_exit, + .prepare = mirror_prepare, + .abort = mirror_abort, .pause = mirror_pause, .complete = mirror_complete, }, @@ -1149,7 +1170,8 @@ static const BlockJobDriver commit_active_job_driver = { .user_resume = block_job_user_resume, .drain = block_job_drain, .run = mirror_run, - .exit = mirror_exit, + .prepare = mirror_prepare, + .abort = mirror_abort, .pause = mirror_pause, .complete = mirror_complete, }, @@ -1639,7 +1661,8 @@ fail: void mirror_start(const char *job_id, BlockDriverState *bs, BlockDriverState *target, const char *replaces, - int64_t speed, uint32_t granularity, int64_t buf_size, + int creation_flags, int64_t speed, + uint32_t granularity, int64_t buf_size, MirrorSyncMode mode, BlockMirrorBackingMode backing_mode, BlockdevOnError on_source_error, BlockdevOnError on_target_error, @@ -1655,7 +1678,7 @@ void mirror_start(const char *job_id, BlockDriverState *bs, } is_none_mode = mode == MIRROR_SYNC_MODE_NONE; base = mode == MIRROR_SYNC_MODE_TOP ? backing_bs(bs) : NULL; - mirror_start_job(job_id, bs, JOB_DEFAULT, target, replaces, + mirror_start_job(job_id, bs, creation_flags, target, replaces, speed, granularity, buf_size, backing_mode, on_source_error, on_target_error, unmap, NULL, NULL, &mirror_job_driver, is_none_mode, base, false, diff --git a/block/stream.c b/block/stream.c index 67e1e72..81a7ec8 100644 --- a/block/stream.c +++ b/block/stream.c @@ -54,16 +54,16 @@ static int coroutine_fn stream_populate(BlockBackend *blk, return blk_co_preadv(blk, offset, qiov.size, &qiov, BDRV_REQ_COPY_ON_READ); } -static void stream_exit(Job *job) +static int stream_prepare(Job *job) { StreamBlockJob *s = container_of(job, StreamBlockJob, common.job); BlockJob *bjob = &s->common; BlockDriverState *bs = blk_bs(bjob->blk); BlockDriverState *base = s->base; Error *local_err = NULL; - int ret = job->ret; + int ret = 0; - if (!job_is_cancelled(job) && bs->backing && ret == 0) { + if (bs->backing) { const char *base_id = NULL, *base_fmt = NULL; if (base) { base_id = s->backing_file_str; @@ -75,12 +75,19 @@ static void stream_exit(Job *job) bdrv_set_backing_hd(bs, base, &local_err); if (local_err) { error_report_err(local_err); - ret = -EPERM; - goto out; + return -EPERM; } } -out: + return ret; +} + +static void stream_clean(Job *job) +{ + StreamBlockJob *s = container_of(job, StreamBlockJob, common.job); + BlockJob *bjob = &s->common; + BlockDriverState *bs = blk_bs(bjob->blk); + /* Reopen the image back in read-only mode if necessary */ if (s->bs_flags != bdrv_get_flags(bs)) { /* Give up write permissions before making it read-only */ @@ -89,7 +96,6 @@ out: } g_free(s->backing_file_str); - job->ret = ret; } static int coroutine_fn stream_run(Job *job, Error **errp) @@ -206,7 +212,8 @@ static const BlockJobDriver stream_job_driver = { .job_type = JOB_TYPE_STREAM, .free = block_job_free, .run = stream_run, - .exit = stream_exit, + .prepare = stream_prepare, + .clean = stream_clean, .user_resume = block_job_user_resume, .drain = block_job_drain, }, @@ -214,7 +221,8 @@ static const BlockJobDriver stream_job_driver = { void stream_start(const char *job_id, BlockDriverState *bs, BlockDriverState *base, const char *backing_file_str, - int64_t speed, BlockdevOnError on_error, Error **errp) + int creation_flags, int64_t speed, + BlockdevOnError on_error, Error **errp) { StreamBlockJob *s; BlockDriverState *iter; @@ -236,7 +244,7 @@ void stream_start(const char *job_id, BlockDriverState *bs, BLK_PERM_GRAPH_MOD, BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE_UNCHANGED | BLK_PERM_WRITE, - speed, JOB_DEFAULT, NULL, NULL, errp); + speed, creation_flags, NULL, NULL, errp); if (!s) { goto fail; } @@ -2182,7 +2182,13 @@ static const BlkActionOps actions[] = { .instance_size = sizeof(BlockDirtyBitmapState), .prepare = block_dirty_bitmap_disable_prepare, .abort = block_dirty_bitmap_disable_abort, - } + }, + /* Where are transactions for MIRROR, COMMIT and STREAM? + * Although these blockjobs use transaction callbacks like the backup job, + * these jobs do not necessarily adhere to transaction semantics. + * These jobs may not fully undo all of their actions on abort, nor do they + * necessarily work in transactions with more than one job in them. + */ }; /** @@ -3116,6 +3122,8 @@ void qmp_block_stream(bool has_job_id, const char *job_id, const char *device, bool has_backing_file, const char *backing_file, bool has_speed, int64_t speed, bool has_on_error, BlockdevOnError on_error, + bool has_auto_finalize, bool auto_finalize, + bool has_auto_dismiss, bool auto_dismiss, Error **errp) { BlockDriverState *bs, *iter; @@ -3123,6 +3131,7 @@ void qmp_block_stream(bool has_job_id, const char *job_id, const char *device, AioContext *aio_context; Error *local_err = NULL; const char *base_name = NULL; + int job_flags = JOB_DEFAULT; if (!has_on_error) { on_error = BLOCKDEV_ON_ERROR_REPORT; @@ -3184,8 +3193,15 @@ void qmp_block_stream(bool has_job_id, const char *job_id, const char *device, /* backing_file string overrides base bs filename */ base_name = has_backing_file ? backing_file : base_name; + if (has_auto_finalize && !auto_finalize) { + job_flags |= JOB_MANUAL_FINALIZE; + } + if (has_auto_dismiss && !auto_dismiss) { + job_flags |= JOB_MANUAL_DISMISS; + } + stream_start(has_job_id ? job_id : NULL, bs, base_bs, base_name, - has_speed ? speed : 0, on_error, &local_err); + job_flags, has_speed ? speed : 0, on_error, &local_err); if (local_err) { error_propagate(errp, local_err); goto out; @@ -3198,11 +3214,15 @@ out: } void qmp_block_commit(bool has_job_id, const char *job_id, const char *device, + bool has_base_node, const char *base_node, bool has_base, const char *base, + bool has_top_node, const char *top_node, bool has_top, const char *top, bool has_backing_file, const char *backing_file, bool has_speed, int64_t speed, bool has_filter_node_name, const char *filter_node_name, + bool has_auto_finalize, bool auto_finalize, + bool has_auto_dismiss, bool auto_dismiss, Error **errp) { BlockDriverState *bs; @@ -3214,6 +3234,7 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device, * BlockdevOnError change for blkmirror makes it in */ BlockdevOnError on_error = BLOCKDEV_ON_ERROR_REPORT; + int job_flags = JOB_DEFAULT; if (!has_speed) { speed = 0; @@ -3221,6 +3242,12 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device, if (!has_filter_node_name) { filter_node_name = NULL; } + if (has_auto_finalize && !auto_finalize) { + job_flags |= JOB_MANUAL_FINALIZE; + } + if (has_auto_dismiss && !auto_dismiss) { + job_flags |= JOB_MANUAL_DISMISS; + } /* Important Note: * libvirt relies on the DeviceNotFound error class in order to probe for @@ -3250,7 +3277,20 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device, /* default top_bs is the active layer */ top_bs = bs; - if (has_top && top) { + if (has_top_node && has_top) { + error_setg(errp, "'top-node' and 'top' are mutually exclusive"); + goto out; + } else if (has_top_node) { + top_bs = bdrv_lookup_bs(NULL, top_node, errp); + if (top_bs == NULL) { + goto out; + } + if (!bdrv_chain_contains(bs, top_bs)) { + error_setg(errp, "'%s' is not in this backing file chain", + top_node); + goto out; + } + } else if (has_top && top) { if (strcmp(bs->filename, top) != 0) { top_bs = bdrv_find_backing_image(bs, top); } @@ -3263,7 +3303,20 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device, assert(bdrv_get_aio_context(top_bs) == aio_context); - if (has_base && base) { + if (has_base_node && has_base) { + error_setg(errp, "'base-node' and 'base' are mutually exclusive"); + goto out; + } else if (has_base_node) { + base_bs = bdrv_lookup_bs(NULL, base_node, errp); + if (base_bs == NULL) { + goto out; + } + if (!bdrv_chain_contains(top_bs, base_bs)) { + error_setg(errp, "'%s' is not in this backing file chain", + base_node); + goto out; + } + } else if (has_base && base) { base_bs = bdrv_find_backing_image(top_bs, base); } else { base_bs = bdrv_find_base(top_bs); @@ -3295,15 +3348,15 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device, goto out; } commit_active_start(has_job_id ? job_id : NULL, bs, base_bs, - JOB_DEFAULT, speed, on_error, + job_flags, speed, on_error, filter_node_name, NULL, NULL, false, &local_err); } else { BlockDriverState *overlay_bs = bdrv_find_overlay(bs, top_bs); if (bdrv_op_is_blocked(overlay_bs, BLOCK_OP_TYPE_COMMIT_TARGET, errp)) { goto out; } - commit_start(has_job_id ? job_id : NULL, bs, base_bs, top_bs, speed, - on_error, has_backing_file ? backing_file : NULL, + commit_start(has_job_id ? job_id : NULL, bs, base_bs, top_bs, job_flags, + speed, on_error, has_backing_file ? backing_file : NULL, filter_node_name, &local_err); } if (local_err != NULL) { @@ -3587,8 +3640,11 @@ static void blockdev_mirror_common(const char *job_id, BlockDriverState *bs, bool has_filter_node_name, const char *filter_node_name, bool has_copy_mode, MirrorCopyMode copy_mode, + bool has_auto_finalize, bool auto_finalize, + bool has_auto_dismiss, bool auto_dismiss, Error **errp) { + int job_flags = JOB_DEFAULT; if (!has_speed) { speed = 0; @@ -3614,6 +3670,12 @@ static void blockdev_mirror_common(const char *job_id, BlockDriverState *bs, if (!has_copy_mode) { copy_mode = MIRROR_COPY_MODE_BACKGROUND; } + if (has_auto_finalize && !auto_finalize) { + job_flags |= JOB_MANUAL_FINALIZE; + } + if (has_auto_dismiss && !auto_dismiss) { + job_flags |= JOB_MANUAL_DISMISS; + } if (granularity != 0 && (granularity < 512 || granularity > 1048576 * 64)) { error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "granularity", @@ -3641,7 +3703,7 @@ static void blockdev_mirror_common(const char *job_id, BlockDriverState *bs, * and will allow to check whether the node still exist at mirror completion */ mirror_start(job_id, bs, target, - has_replaces ? replaces : NULL, + has_replaces ? replaces : NULL, job_flags, speed, granularity, buf_size, sync, backing_mode, on_source_error, on_target_error, unmap, filter_node_name, copy_mode, errp); @@ -3791,6 +3853,8 @@ void qmp_drive_mirror(DriveMirror *arg, Error **errp) arg->has_unmap, arg->unmap, false, NULL, arg->has_copy_mode, arg->copy_mode, + arg->has_auto_finalize, arg->auto_finalize, + arg->has_auto_dismiss, arg->auto_dismiss, &local_err); bdrv_unref(target_bs); error_propagate(errp, local_err); @@ -3812,6 +3876,8 @@ void qmp_blockdev_mirror(bool has_job_id, const char *job_id, bool has_filter_node_name, const char *filter_node_name, bool has_copy_mode, MirrorCopyMode copy_mode, + bool has_auto_finalize, bool auto_finalize, + bool has_auto_dismiss, bool auto_dismiss, Error **errp) { BlockDriverState *bs; @@ -3845,6 +3911,8 @@ void qmp_blockdev_mirror(bool has_job_id, const char *job_id, true, true, has_filter_node_name, filter_node_name, has_copy_mode, copy_mode, + has_auto_finalize, auto_finalize, + has_auto_dismiss, auto_dismiss, &local_err); error_propagate(errp, local_err); @@ -164,7 +164,7 @@ static bool child_job_drained_poll(BdrvChild *c) /* An inactive or completed job doesn't have any pending requests. Jobs * with !job->busy are either already paused or have a pause point after * being reentered, so no job driver code will run before they pause. */ - if (!job->busy || job_is_completed(job) || job->deferred_to_main_loop) { + if (!job->busy || job_is_completed(job)) { return false; } @@ -221,6 +221,11 @@ int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs, return 0; } +static void block_job_on_idle(Notifier *n, void *opaque) +{ + aio_wait_kick(); +} + bool block_job_is_internal(BlockJob *job) { return (job->job.id == NULL); @@ -416,6 +421,7 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver, job->finalize_completed_notifier.notify = block_job_event_completed; job->pending_notifier.notify = block_job_event_pending; job->ready_notifier.notify = block_job_event_ready; + job->idle_notifier.notify = block_job_on_idle; notifier_list_add(&job->job.on_finalize_cancelled, &job->finalize_cancelled_notifier); @@ -423,6 +429,7 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver, &job->finalize_completed_notifier); notifier_list_add(&job->job.on_pending, &job->pending_notifier); notifier_list_add(&job->job.on_ready, &job->ready_notifier); + notifier_list_add(&job->job.on_idle, &job->idle_notifier); error_setg(&job->blocker, "block device is in use by block job: %s", job_type_str(&job->job)); @@ -1907,8 +1907,9 @@ void hmp_block_stream(Monitor *mon, const QDict *qdict) int64_t speed = qdict_get_try_int(qdict, "speed", 0); qmp_block_stream(true, device, device, base != NULL, base, false, NULL, - false, NULL, qdict_haskey(qdict, "speed"), speed, - true, BLOCKDEV_ON_ERROR_REPORT, &error); + false, NULL, qdict_haskey(qdict, "speed"), speed, true, + BLOCKDEV_ON_ERROR_REPORT, false, false, false, false, + &error); hmp_handle_error(mon, &error); } diff --git a/include/block/aio-wait.h b/include/block/aio-wait.h index c85a62f..afd0ff7 100644 --- a/include/block/aio-wait.h +++ b/include/block/aio-wait.h @@ -30,14 +30,15 @@ /** * AioWait: * - * An object that facilitates synchronous waiting on a condition. The main - * loop can wait on an operation running in an IOThread as follows: + * An object that facilitates synchronous waiting on a condition. A single + * global AioWait object (global_aio_wait) is used internally. + * + * The main loop can wait on an operation running in an IOThread as follows: * - * AioWait *wait = ...; * AioContext *ctx = ...; * MyWork work = { .done = false }; * schedule_my_work_in_iothread(ctx, &work); - * AIO_WAIT_WHILE(wait, ctx, !work.done); + * AIO_WAIT_WHILE(ctx, !work.done); * * The IOThread must call aio_wait_kick() to notify the main loop when * work.done changes: @@ -46,7 +47,7 @@ * { * ... * work.done = true; - * aio_wait_kick(wait); + * aio_wait_kick(); * } */ typedef struct { @@ -54,9 +55,10 @@ typedef struct { unsigned num_waiters; } AioWait; +extern AioWait global_aio_wait; + /** * AIO_WAIT_WHILE: - * @wait: the aio wait object * @ctx: the aio context, or NULL if multiple aio contexts (for which the * caller does not hold a lock) are involved in the polling condition. * @cond: wait while this conditional expression is true @@ -72,10 +74,12 @@ typedef struct { * wait on conditions between two IOThreads since that could lead to deadlock, * go via the main loop instead. */ -#define AIO_WAIT_WHILE(wait, ctx, cond) ({ \ +#define AIO_WAIT_WHILE(ctx, cond) ({ \ bool waited_ = false; \ - AioWait *wait_ = (wait); \ + AioWait *wait_ = &global_aio_wait; \ AioContext *ctx_ = (ctx); \ + /* Increment wait_->num_waiters before evaluating cond. */ \ + atomic_inc(&wait_->num_waiters); \ if (ctx_ && in_aio_context_home_thread(ctx_)) { \ while ((cond)) { \ aio_poll(ctx_, true); \ @@ -84,8 +88,6 @@ typedef struct { } else { \ assert(qemu_get_current_aio_context() == \ qemu_get_aio_context()); \ - /* Increment wait_->num_waiters before evaluating cond. */ \ - atomic_inc(&wait_->num_waiters); \ while ((cond)) { \ if (ctx_) { \ aio_context_release(ctx_); \ @@ -96,20 +98,18 @@ typedef struct { } \ waited_ = true; \ } \ - atomic_dec(&wait_->num_waiters); \ } \ + atomic_dec(&wait_->num_waiters); \ waited_; }) /** * aio_wait_kick: - * @wait: the aio wait object that should re-evaluate its condition - * * Wake up the main thread if it is waiting on AIO_WAIT_WHILE(). During * synchronous operations performed in an IOThread, the main thread lets the * IOThread's event loop run, waiting for the operation to complete. A * aio_wait_kick() call will wake up the main thread. */ -void aio_wait_kick(AioWait *wait); +void aio_wait_kick(void); /** * aio_wait_bh_oneshot: diff --git a/include/block/block.h b/include/block/block.h index 4e0871a..4edc1e8 100644 --- a/include/block/block.h +++ b/include/block/block.h @@ -410,13 +410,9 @@ void bdrv_drain_all_begin(void); void bdrv_drain_all_end(void); void bdrv_drain_all(void); -/* Returns NULL when bs == NULL */ -AioWait *bdrv_get_aio_wait(BlockDriverState *bs); - #define BDRV_POLL_WHILE(bs, cond) ({ \ BlockDriverState *bs_ = (bs); \ - AIO_WAIT_WHILE(bdrv_get_aio_wait(bs_), \ - bdrv_get_aio_context(bs_), \ + AIO_WAIT_WHILE(bdrv_get_aio_context(bs_), \ cond); }) int bdrv_pdiscard(BdrvChild *child, int64_t offset, int bytes); diff --git a/include/block/block_int.h b/include/block/block_int.h index 903b9c1..92ecbd8 100644 --- a/include/block/block_int.h +++ b/include/block/block_int.h @@ -794,9 +794,6 @@ struct BlockDriverState { unsigned int in_flight; unsigned int serialising_in_flight; - /* Kicked to signal main loop when a request completes. */ - AioWait wait; - /* counter for nested bdrv_io_plug. * Accessed with atomic ops. */ @@ -958,6 +955,8 @@ int is_windows_drive(const char *filename); * flatten the whole backing file chain onto @bs. * @backing_file_str: The file name that will be written to @bs as the * the new backing file if the job completes. Ignored if @base is %NULL. + * @creation_flags: Flags that control the behavior of the Job lifetime. + * See @BlockJobCreateFlags * @speed: The maximum speed, in bytes per second, or 0 for unlimited. * @on_error: The action to take upon error. * @errp: Error object. @@ -971,7 +970,8 @@ int is_windows_drive(const char *filename); */ void stream_start(const char *job_id, BlockDriverState *bs, BlockDriverState *base, const char *backing_file_str, - int64_t speed, BlockdevOnError on_error, Error **errp); + int creation_flags, int64_t speed, + BlockdevOnError on_error, Error **errp); /** * commit_start: @@ -980,6 +980,8 @@ void stream_start(const char *job_id, BlockDriverState *bs, * @bs: Active block device. * @top: Top block device to be committed. * @base: Block device that will be written into, and become the new top. + * @creation_flags: Flags that control the behavior of the Job lifetime. + * See @BlockJobCreateFlags * @speed: The maximum speed, in bytes per second, or 0 for unlimited. * @on_error: The action to take upon error. * @backing_file_str: String to use as the backing file in @top's overlay @@ -990,7 +992,8 @@ void stream_start(const char *job_id, BlockDriverState *bs, * */ void commit_start(const char *job_id, BlockDriverState *bs, - BlockDriverState *base, BlockDriverState *top, int64_t speed, + BlockDriverState *base, BlockDriverState *top, + int creation_flags, int64_t speed, BlockdevOnError on_error, const char *backing_file_str, const char *filter_node_name, Error **errp); /** @@ -1026,6 +1029,8 @@ void commit_active_start(const char *job_id, BlockDriverState *bs, * @target: Block device to write to. * @replaces: Block graph node name to replace once the mirror is done. Can * only be used when full mirroring is selected. + * @creation_flags: Flags that control the behavior of the Job lifetime. + * See @BlockJobCreateFlags * @speed: The maximum speed, in bytes per second, or 0 for unlimited. * @granularity: The chosen granularity for the dirty bitmap. * @buf_size: The amount of data that can be in flight at one time. @@ -1047,7 +1052,8 @@ void commit_active_start(const char *job_id, BlockDriverState *bs, */ void mirror_start(const char *job_id, BlockDriverState *bs, BlockDriverState *target, const char *replaces, - int64_t speed, uint32_t granularity, int64_t buf_size, + int creation_flags, int64_t speed, + uint32_t granularity, int64_t buf_size, MirrorSyncMode mode, BlockMirrorBackingMode backing_mode, BlockdevOnError on_source_error, BlockdevOnError on_target_error, diff --git a/include/block/blockjob.h b/include/block/blockjob.h index 32c00b7..ede0bd8 100644 --- a/include/block/blockjob.h +++ b/include/block/blockjob.h @@ -70,6 +70,9 @@ typedef struct BlockJob { /** Called when the job transitions to READY */ Notifier ready_notifier; + /** Called when the job coroutine yields or terminates */ + Notifier idle_notifier; + /** BlockDriverStates that are involved in this block job */ GSList *nodes; } BlockJob; diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h index 6f8a487..9801e7f 100644 --- a/include/qemu/coroutine.h +++ b/include/qemu/coroutine.h @@ -90,6 +90,11 @@ void qemu_aio_coroutine_enter(AioContext *ctx, Coroutine *co); void coroutine_fn qemu_coroutine_yield(void); /** + * Get the AioContext of the given coroutine + */ +AioContext *coroutine_fn qemu_coroutine_get_aio_context(Coroutine *co); + +/** * Get the currently executing coroutine */ Coroutine *coroutine_fn qemu_coroutine_self(void); diff --git a/include/qemu/job.h b/include/qemu/job.h index e0cff70..9e7cd1e 100644 --- a/include/qemu/job.h +++ b/include/qemu/job.h @@ -76,6 +76,9 @@ typedef struct Job { * Set to false by the job while the coroutine has yielded and may be * re-entered by job_enter(). There may still be I/O or event loop activity * pending. Accessed under block_job_mutex (in blockjob.c). + * + * When the job is deferred to the main loop, busy is true as long as the + * bottom half is still pending. */ bool busy; @@ -156,6 +159,9 @@ typedef struct Job { /** Notifiers called when the job transitions to READY */ NotifierList on_ready; + /** Notifiers called when the job coroutine yields or terminates */ + NotifierList on_idle; + /** Element of the list of jobs */ QLIST_ENTRY(Job) job_list; @@ -222,17 +228,6 @@ struct JobDriver { void (*drain)(Job *job); /** - * If the callback is not NULL, exit will be invoked from the main thread - * when the job's coroutine has finished, but before transactional - * convergence; before @prepare or @abort. - * - * FIXME TODO: This callback is only temporary to transition remaining jobs - * to prepare/commit/abort/clean callbacks and will be removed before 3.1. - * is released. - */ - void (*exit)(Job *job); - - /** * If the callback is not NULL, prepare will be invoked when all the jobs * belonging to the same transaction complete; or upon this job's completion * if it is not in a transaction. @@ -532,6 +527,8 @@ void job_user_cancel(Job *job, bool force, Error **errp); * * Returns the return value from the job if the job actually completed * during the call, or -ECANCELED if it was canceled. + * + * Callers must hold the AioContext lock of job->aio_context. */ int job_cancel_sync(Job *job); @@ -549,6 +546,8 @@ void job_cancel_sync_all(void); * function). * * Returns the return value from the job. + * + * Callers must hold the AioContext lock of job->aio_context. */ int job_complete_sync(Job *job, Error **errp); @@ -574,6 +573,8 @@ void job_dismiss(Job **job, Error **errp); * * Returns 0 if the job is successfully completed, -ECANCELED if the job was * cancelled before completing, and -errno in other error cases. + * + * Callers must hold the AioContext lock of job->aio_context. */ int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp); @@ -29,6 +29,7 @@ #include "qemu/job.h" #include "qemu/id.h" #include "qemu/main-loop.h" +#include "block/aio-wait.h" #include "trace-root.h" #include "qapi/qapi-events-job.h" @@ -136,21 +137,13 @@ static void job_txn_del_job(Job *job) } } -static int job_txn_apply(JobTxn *txn, int fn(Job *), bool lock) +static int job_txn_apply(JobTxn *txn, int fn(Job *)) { - AioContext *ctx; Job *job, *next; int rc = 0; QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) { - if (lock) { - ctx = job->aio_context; - aio_context_acquire(ctx); - } rc = fn(job); - if (lock) { - aio_context_release(ctx); - } if (rc) { break; } @@ -410,6 +403,11 @@ static void job_event_ready(Job *job) notifier_list_notify(&job->on_ready, job); } +static void job_event_idle(Job *job) +{ + notifier_list_notify(&job->on_idle, job); +} + void job_enter_cond(Job *job, bool(*fn)(Job *job)) { if (!job_started(job)) { @@ -455,6 +453,7 @@ static void coroutine_fn job_do_yield(Job *job, uint64_t ns) timer_mod(&job->sleep_timer, ns); } job->busy = false; + job_event_idle(job); job_unlock(); qemu_coroutine_yield(); @@ -535,49 +534,6 @@ void job_drain(Job *job) } } -static void job_completed(Job *job); - -static void job_exit(void *opaque) -{ - Job *job = (Job *)opaque; - AioContext *aio_context = job->aio_context; - - if (job->driver->exit) { - aio_context_acquire(aio_context); - job->driver->exit(job); - aio_context_release(aio_context); - } - job_completed(job); -} - -/** - * All jobs must allow a pause point before entering their job proper. This - * ensures that jobs can be paused prior to being started, then resumed later. - */ -static void coroutine_fn job_co_entry(void *opaque) -{ - Job *job = opaque; - - assert(job && job->driver && job->driver->run); - job_pause_point(job); - job->ret = job->driver->run(job, &job->err); - job->deferred_to_main_loop = true; - aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job); -} - - -void job_start(Job *job) -{ - assert(job && !job_started(job) && job->paused && - job->driver && job->driver->run); - job->co = qemu_coroutine_create(job_co_entry, job); - job->pause_count--; - job->busy = true; - job->paused = false; - job_state_transition(job, JOB_STATUS_RUNNING); - aio_co_enter(job->aio_context, job->co); -} - /* Assumes the block_job_mutex is held */ static bool job_timer_not_pending(Job *job) { @@ -762,6 +718,7 @@ static void job_cancel_async(Job *job, bool force) static void job_completed_txn_abort(Job *job) { + AioContext *outer_ctx = job->aio_context; AioContext *ctx; JobTxn *txn = job->txn; Job *other_job; @@ -775,23 +732,26 @@ static void job_completed_txn_abort(Job *job) txn->aborting = true; job_txn_ref(txn); - /* We are the first failed job. Cancel other jobs. */ - QLIST_FOREACH(other_job, &txn->jobs, txn_list) { - ctx = other_job->aio_context; - aio_context_acquire(ctx); - } + /* We can only hold the single job's AioContext lock while calling + * job_finalize_single() because the finalization callbacks can involve + * calls of AIO_WAIT_WHILE(), which could deadlock otherwise. */ + aio_context_release(outer_ctx); /* Other jobs are effectively cancelled by us, set the status for * them; this job, however, may or may not be cancelled, depending * on the caller, so leave it. */ QLIST_FOREACH(other_job, &txn->jobs, txn_list) { if (other_job != job) { + ctx = other_job->aio_context; + aio_context_acquire(ctx); job_cancel_async(other_job, false); + aio_context_release(ctx); } } while (!QLIST_EMPTY(&txn->jobs)) { other_job = QLIST_FIRST(&txn->jobs); ctx = other_job->aio_context; + aio_context_acquire(ctx); if (!job_is_completed(other_job)) { assert(job_is_cancelled(other_job)); job_finish_sync(other_job, NULL, NULL); @@ -800,6 +760,8 @@ static void job_completed_txn_abort(Job *job) aio_context_release(ctx); } + aio_context_acquire(outer_ctx); + job_txn_unref(txn); } @@ -823,11 +785,11 @@ static void job_do_finalize(Job *job) assert(job && job->txn); /* prepare the transaction to complete */ - rc = job_txn_apply(job->txn, job_prepare, true); + rc = job_txn_apply(job->txn, job_prepare); if (rc) { job_completed_txn_abort(job); } else { - job_txn_apply(job->txn, job_finalize_single, true); + job_txn_apply(job->txn, job_finalize_single); } } @@ -873,10 +835,10 @@ static void job_completed_txn_success(Job *job) assert(other_job->ret == 0); } - job_txn_apply(txn, job_transition_to_pending, false); + job_txn_apply(txn, job_transition_to_pending); /* If no jobs need manual finalization, automatically do so */ - if (job_txn_apply(txn, job_needs_finalize, false) == 0) { + if (job_txn_apply(txn, job_needs_finalize) == 0) { job_do_finalize(job); } } @@ -894,6 +856,54 @@ static void job_completed(Job *job) } } +/** Useful only as a type shim for aio_bh_schedule_oneshot. */ +static void job_exit(void *opaque) +{ + Job *job = (Job *)opaque; + AioContext *ctx = job->aio_context; + + aio_context_acquire(ctx); + + /* This is a lie, we're not quiescent, but still doing the completion + * callbacks. However, completion callbacks tend to involve operations that + * drain block nodes, and if .drained_poll still returned true, we would + * deadlock. */ + job->busy = false; + job_event_idle(job); + + job_completed(job); + + aio_context_release(ctx); +} + +/** + * All jobs must allow a pause point before entering their job proper. This + * ensures that jobs can be paused prior to being started, then resumed later. + */ +static void coroutine_fn job_co_entry(void *opaque) +{ + Job *job = opaque; + + assert(job && job->driver && job->driver->run); + job_pause_point(job); + job->ret = job->driver->run(job, &job->err); + job->deferred_to_main_loop = true; + job->busy = true; + aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job); +} + +void job_start(Job *job) +{ + assert(job && !job_started(job) && job->paused && + job->driver && job->driver->run); + job->co = qemu_coroutine_create(job_co_entry, job); + job->pause_count--; + job->busy = true; + job->paused = false; + job_state_transition(job, JOB_STATUS_RUNNING); + aio_co_enter(job->aio_context, job->co); +} + void job_cancel(Job *job, bool force) { if (job->status == JOB_STATUS_CONCLUDED) { @@ -980,14 +990,10 @@ int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp) job_unref(job); return -EBUSY; } - /* job_drain calls job_enter, and it should be enough to induce progress - * until the job completes or moves to the main thread. */ - while (!job->deferred_to_main_loop && !job_is_completed(job)) { - job_drain(job); - } - while (!job_is_completed(job)) { - aio_poll(qemu_get_aio_context(), true); - } + + AIO_WAIT_WHILE(job->aio_context, + (job_drain(job), !job_is_completed(job))); + ret = (job_is_cancelled(job) && job->ret == 0) ? -ECANCELED : job->ret; job_unref(job); return ret; diff --git a/qapi/block-core.json b/qapi/block-core.json index 4c7a37a..ac3b48e 100644 --- a/qapi/block-core.json +++ b/qapi/block-core.json @@ -1272,13 +1272,14 @@ # a different block device than @device). # # @auto-finalize: When false, this job will wait in a PENDING state after it has -# finished its work, waiting for @block-job-finalize. -# When true, this job will automatically perform its abort or -# commit actions. +# finished its work, waiting for @block-job-finalize before +# making any block graph changes. +# When true, this job will automatically +# perform its abort or commit actions. # Defaults to true. (Since 2.12) # # @auto-dismiss: When false, this job will wait in a CONCLUDED state after it -# has completed ceased all work, and wait for @block-job-dismiss. +# has completely ceased all work, and awaits @block-job-dismiss. # When true, this job will automatically disappear from the query # list without user intervention. # Defaults to true. (Since 2.12) @@ -1327,13 +1328,14 @@ # a different block device than @device). # # @auto-finalize: When false, this job will wait in a PENDING state after it has -# finished its work, waiting for @block-job-finalize. -# When true, this job will automatically perform its abort or -# commit actions. +# finished its work, waiting for @block-job-finalize before +# making any block graph changes. +# When true, this job will automatically +# perform its abort or commit actions. # Defaults to true. (Since 2.12) # # @auto-dismiss: When false, this job will wait in a CONCLUDED state after it -# has completed ceased all work, and wait for @block-job-dismiss. +# has completely ceased all work, and awaits @block-job-dismiss. # When true, this job will automatically disappear from the query # list without user intervention. # Defaults to true. (Since 2.12) @@ -1455,12 +1457,23 @@ # # @device: the device name or node-name of a root node # -# @base: The file name of the backing image to write data into. -# If not specified, this is the deepest backing image. +# @base-node: The node name of the backing image to write data into. +# If not specified, this is the deepest backing image. +# (since: 3.1) # -# @top: The file name of the backing image within the image chain, -# which contains the topmost data to be committed down. If -# not specified, this is the active layer. +# @base: Same as @base-node, except that it is a file name rather than a node +# name. This must be the exact filename string that was used to open the +# node; other strings, even if addressing the same file, are not +# accepted (deprecated, use @base-node instead) +# +# @top-node: The node name of the backing image within the image chain +# which contains the topmost data to be committed down. If +# not specified, this is the active layer. (since: 3.1) +# +# @top: Same as @top-node, except that it is a file name rather than a node +# name. This must be the exact filename string that was used to open the +# node; other strings, even if addressing the same file, are not +# accepted (deprecated, use @base-node instead) # # @backing-file: The backing file string to write into the overlay # image of 'top'. If 'top' is the active layer, @@ -1498,6 +1511,19 @@ # above @top. If this option is not given, a node name is # autogenerated. (Since: 2.9) # +# @auto-finalize: When false, this job will wait in a PENDING state after it has +# finished its work, waiting for @block-job-finalize before +# making any block graph changes. +# When true, this job will automatically +# perform its abort or commit actions. +# Defaults to true. (Since 3.1) +# +# @auto-dismiss: When false, this job will wait in a CONCLUDED state after it +# has completely ceased all work, and awaits @block-job-dismiss. +# When true, this job will automatically disappear from the query +# list without user intervention. +# Defaults to true. (Since 3.1) +# # Returns: Nothing on success # If @device does not exist, DeviceNotFound # Any other error returns a GenericError. @@ -1513,9 +1539,11 @@ # ## { 'command': 'block-commit', - 'data': { '*job-id': 'str', 'device': 'str', '*base': 'str', '*top': 'str', + 'data': { '*job-id': 'str', 'device': 'str', '*base-node': 'str', + '*base': 'str', '*top-node': 'str', '*top': 'str', '*backing-file': 'str', '*speed': 'int', - '*filter-node-name': 'str' } } + '*filter-node-name': 'str', + '*auto-finalize': 'bool', '*auto-dismiss': 'bool' } } ## # @drive-backup: @@ -1715,6 +1743,18 @@ # @copy-mode: when to copy data to the destination; defaults to 'background' # (Since: 3.0) # +# @auto-finalize: When false, this job will wait in a PENDING state after it has +# finished its work, waiting for @block-job-finalize before +# making any block graph changes. +# When true, this job will automatically +# perform its abort or commit actions. +# Defaults to true. (Since 3.1) +# +# @auto-dismiss: When false, this job will wait in a CONCLUDED state after it +# has completely ceased all work, and awaits @block-job-dismiss. +# When true, this job will automatically disappear from the query +# list without user intervention. +# Defaults to true. (Since 3.1) # Since: 1.3 ## { 'struct': 'DriveMirror', @@ -1724,7 +1764,8 @@ '*speed': 'int', '*granularity': 'uint32', '*buf-size': 'int', '*on-source-error': 'BlockdevOnError', '*on-target-error': 'BlockdevOnError', - '*unmap': 'bool', '*copy-mode': 'MirrorCopyMode' } } + '*unmap': 'bool', '*copy-mode': 'MirrorCopyMode', + '*auto-finalize': 'bool', '*auto-dismiss': 'bool' } } ## # @BlockDirtyBitmap: @@ -1990,6 +2031,18 @@ # @copy-mode: when to copy data to the destination; defaults to 'background' # (Since: 3.0) # +# @auto-finalize: When false, this job will wait in a PENDING state after it has +# finished its work, waiting for @block-job-finalize before +# making any block graph changes. +# When true, this job will automatically +# perform its abort or commit actions. +# Defaults to true. (Since 3.1) +# +# @auto-dismiss: When false, this job will wait in a CONCLUDED state after it +# has completely ceased all work, and awaits @block-job-dismiss. +# When true, this job will automatically disappear from the query +# list without user intervention. +# Defaults to true. (Since 3.1) # Returns: nothing on success. # # Since: 2.6 @@ -2011,7 +2064,8 @@ '*buf-size': 'int', '*on-source-error': 'BlockdevOnError', '*on-target-error': 'BlockdevOnError', '*filter-node-name': 'str', - '*copy-mode': 'MirrorCopyMode' } } + '*copy-mode': 'MirrorCopyMode', + '*auto-finalize': 'bool', '*auto-dismiss': 'bool' } } ## # @block_set_io_throttle: @@ -2277,6 +2331,19 @@ # 'stop' and 'enospc' can only be used if the block device # supports io-status (see BlockInfo). Since 1.3. # +# @auto-finalize: When false, this job will wait in a PENDING state after it has +# finished its work, waiting for @block-job-finalize before +# making any block graph changes. +# When true, this job will automatically +# perform its abort or commit actions. +# Defaults to true. (Since 3.1) +# +# @auto-dismiss: When false, this job will wait in a CONCLUDED state after it +# has completely ceased all work, and awaits @block-job-dismiss. +# When true, this job will automatically disappear from the query +# list without user intervention. +# Defaults to true. (Since 3.1) +# # Returns: Nothing on success. If @device does not exist, DeviceNotFound. # # Since: 1.1 @@ -2292,7 +2359,8 @@ { 'command': 'block-stream', 'data': { '*job-id': 'str', 'device': 'str', '*base': 'str', '*base-node': 'str', '*backing-file': 'str', '*speed': 'int', - '*on-error': 'BlockdevOnError' } } + '*on-error': 'BlockdevOnError', + '*auto-finalize': 'bool', '*auto-dismiss': 'bool' } } ## # @block-job-set-speed: diff --git a/tests/qemu-iotests/040 b/tests/qemu-iotests/040 index 1beb5e6..1cb1cee 100755 --- a/tests/qemu-iotests/040 +++ b/tests/qemu-iotests/040 @@ -57,9 +57,12 @@ class ImageCommitTestCase(iotests.QMPTestCase): self.assert_no_active_block_jobs() self.vm.shutdown() - def run_commit_test(self, top, base, need_ready=False): + def run_commit_test(self, top, base, need_ready=False, node_names=False): self.assert_no_active_block_jobs() - result = self.vm.qmp('block-commit', device='drive0', top=top, base=base) + if node_names: + result = self.vm.qmp('block-commit', device='drive0', top_node=top, base_node=base) + else: + result = self.vm.qmp('block-commit', device='drive0', top=top, base=base) self.assert_qmp(result, 'return', {}) self.wait_for_complete(need_ready) @@ -101,6 +104,11 @@ class TestSingleDrive(ImageCommitTestCase): self.assertEqual(-1, qemu_io('-f', 'raw', '-c', 'read -P 0xab 0 524288', backing_img).find("verification failed")) self.assertEqual(-1, qemu_io('-f', 'raw', '-c', 'read -P 0xef 524288 524288', backing_img).find("verification failed")) + def test_commit_node(self): + self.run_commit_test("mid", "base", node_names=True) + self.assertEqual(-1, qemu_io('-f', 'raw', '-c', 'read -P 0xab 0 524288', backing_img).find("verification failed")) + self.assertEqual(-1, qemu_io('-f', 'raw', '-c', 'read -P 0xef 524288 524288', backing_img).find("verification failed")) + def test_device_not_found(self): result = self.vm.qmp('block-commit', device='nonexistent', top='%s' % mid_img) self.assert_qmp(result, 'error/class', 'DeviceNotFound') @@ -123,6 +131,30 @@ class TestSingleDrive(ImageCommitTestCase): self.assert_qmp(result, 'error/class', 'GenericError') self.assert_qmp(result, 'error/desc', 'Base \'badfile\' not found') + def test_top_node_invalid(self): + self.assert_no_active_block_jobs() + result = self.vm.qmp('block-commit', device='drive0', top_node='badfile', base_node='base') + self.assert_qmp(result, 'error/class', 'GenericError') + self.assert_qmp(result, 'error/desc', "Cannot find device= nor node_name=badfile") + + def test_base_node_invalid(self): + self.assert_no_active_block_jobs() + result = self.vm.qmp('block-commit', device='drive0', top_node='mid', base_node='badfile') + self.assert_qmp(result, 'error/class', 'GenericError') + self.assert_qmp(result, 'error/desc', "Cannot find device= nor node_name=badfile") + + def test_top_path_and_node(self): + self.assert_no_active_block_jobs() + result = self.vm.qmp('block-commit', device='drive0', top_node='mid', base_node='base', top='%s' % mid_img) + self.assert_qmp(result, 'error/class', 'GenericError') + self.assert_qmp(result, 'error/desc', "'top-node' and 'top' are mutually exclusive") + + def test_base_path_and_node(self): + self.assert_no_active_block_jobs() + result = self.vm.qmp('block-commit', device='drive0', top_node='mid', base_node='base', base='%s' % backing_img) + self.assert_qmp(result, 'error/class', 'GenericError') + self.assert_qmp(result, 'error/desc', "'base-node' and 'base' are mutually exclusive") + def test_top_is_active(self): self.run_commit_test(test_img, backing_img, need_ready=True) self.assertEqual(-1, qemu_io('-f', 'raw', '-c', 'read -P 0xab 0 524288', backing_img).find("verification failed")) @@ -139,6 +171,22 @@ class TestSingleDrive(ImageCommitTestCase): self.assert_qmp(result, 'error/class', 'GenericError') self.assert_qmp(result, 'error/desc', 'Base \'%s\' not found' % mid_img) + def test_top_and_base_node_reversed(self): + self.assert_no_active_block_jobs() + result = self.vm.qmp('block-commit', device='drive0', top_node='base', base_node='top') + self.assert_qmp(result, 'error/class', 'GenericError') + self.assert_qmp(result, 'error/desc', "'top' is not in this backing file chain") + + def test_top_node_in_wrong_chain(self): + self.assert_no_active_block_jobs() + + result = self.vm.qmp('blockdev-add', driver='null-co', node_name='null') + self.assert_qmp(result, 'return', {}) + + result = self.vm.qmp('block-commit', device='drive0', top_node='null', base_node='base') + self.assert_qmp(result, 'error/class', 'GenericError') + self.assert_qmp(result, 'error/desc', "'null' is not in this backing file chain") + # When the job is running on a BB that is automatically deleted on hot # unplug, the job is cancelled when the device disappears def test_hot_unplug(self): diff --git a/tests/qemu-iotests/040.out b/tests/qemu-iotests/040.out index e20a75c..802ffaa 100644 --- a/tests/qemu-iotests/040.out +++ b/tests/qemu-iotests/040.out @@ -1,5 +1,5 @@ -............................. +........................................... ---------------------------------------------------------------------- -Ran 29 tests +Ran 43 tests OK diff --git a/tests/qemu-iotests/051 b/tests/qemu-iotests/051 index ee9c820..25d3b2d 100755 --- a/tests/qemu-iotests/051 +++ b/tests/qemu-iotests/051 @@ -354,6 +354,9 @@ printf %b "qemu-io $device_id \"write -P 0x33 0 4k\"\ncommit $device_id\n" | $QEMU_IO -c "read -P 0x33 0 4k" "$TEST_IMG" | _filter_qemu_io +# Using snapshot=on with a non-existent TMPDIR +TMPDIR=/nonexistent run_qemu -drive driver=null-co,snapshot=on + # success, all done echo "*** done" rm -f $seq.full diff --git a/tests/qemu-iotests/051.out b/tests/qemu-iotests/051.out index b727350..793af2a 100644 --- a/tests/qemu-iotests/051.out +++ b/tests/qemu-iotests/051.out @@ -455,4 +455,7 @@ wrote 4096/4096 bytes at offset 0 read 4096/4096 bytes at offset 0 4 KiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec) +Testing: -drive driver=null-co,snapshot=on +QEMU_PROG: -drive driver=null-co,snapshot=on: Could not get temporary filename: No such file or directory + *** done diff --git a/tests/qemu-iotests/051.pc.out b/tests/qemu-iotests/051.pc.out index e9257fe..ca64eda 100644 --- a/tests/qemu-iotests/051.pc.out +++ b/tests/qemu-iotests/051.pc.out @@ -527,4 +527,7 @@ wrote 4096/4096 bytes at offset 0 read 4096/4096 bytes at offset 0 4 KiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec) +Testing: -drive driver=null-co,snapshot=on +QEMU_PROG: -drive driver=null-co,snapshot=on: Could not get temporary filename: No such file or directory + *** done diff --git a/tests/test-bdrv-drain.c b/tests/test-bdrv-drain.c index 89ac15e..c9f29c8 100644 --- a/tests/test-bdrv-drain.c +++ b/tests/test-bdrv-drain.c @@ -174,6 +174,28 @@ static void do_drain_end(enum drain_type drain_type, BlockDriverState *bs) } } +static void do_drain_begin_unlocked(enum drain_type drain_type, BlockDriverState *bs) +{ + if (drain_type != BDRV_DRAIN_ALL) { + aio_context_acquire(bdrv_get_aio_context(bs)); + } + do_drain_begin(drain_type, bs); + if (drain_type != BDRV_DRAIN_ALL) { + aio_context_release(bdrv_get_aio_context(bs)); + } +} + +static void do_drain_end_unlocked(enum drain_type drain_type, BlockDriverState *bs) +{ + if (drain_type != BDRV_DRAIN_ALL) { + aio_context_acquire(bdrv_get_aio_context(bs)); + } + do_drain_end(drain_type, bs); + if (drain_type != BDRV_DRAIN_ALL) { + aio_context_release(bdrv_get_aio_context(bs)); + } +} + static void test_drv_cb_common(enum drain_type drain_type, bool recursive) { BlockBackend *blk; @@ -614,6 +636,17 @@ static void test_iothread_aio_cb(void *opaque, int ret) qemu_event_set(&done_event); } +static void test_iothread_main_thread_bh(void *opaque) +{ + struct test_iothread_data *data = opaque; + + /* Test that the AioContext is not yet locked in a random BH that is + * executed during drain, otherwise this would deadlock. */ + aio_context_acquire(bdrv_get_aio_context(data->bs)); + bdrv_flush(data->bs); + aio_context_release(bdrv_get_aio_context(data->bs)); +} + /* * Starts an AIO request on a BDS that runs in the AioContext of iothread 1. * The request involves a BH on iothread 2 before it can complete. @@ -683,6 +716,8 @@ static void test_iothread_common(enum drain_type drain_type, int drain_thread) aio_context_acquire(ctx_a); } + aio_bh_schedule_oneshot(ctx_a, test_iothread_main_thread_bh, &data); + /* The request is running on the IOThread a. Draining its block device * will make sure that it has completed as far as the BDS is concerned, * but the drain in this thread can continue immediately after @@ -749,23 +784,56 @@ static void test_iothread_drain_subtree(void) typedef struct TestBlockJob { BlockJob common; + int run_ret; + int prepare_ret; + bool running; bool should_complete; } TestBlockJob; +static int test_job_prepare(Job *job) +{ + TestBlockJob *s = container_of(job, TestBlockJob, common.job); + + /* Provoke an AIO_WAIT_WHILE() call to verify there is no deadlock */ + blk_flush(s->common.blk); + return s->prepare_ret; +} + +static void test_job_commit(Job *job) +{ + TestBlockJob *s = container_of(job, TestBlockJob, common.job); + + /* Provoke an AIO_WAIT_WHILE() call to verify there is no deadlock */ + blk_flush(s->common.blk); +} + +static void test_job_abort(Job *job) +{ + TestBlockJob *s = container_of(job, TestBlockJob, common.job); + + /* Provoke an AIO_WAIT_WHILE() call to verify there is no deadlock */ + blk_flush(s->common.blk); +} + static int coroutine_fn test_job_run(Job *job, Error **errp) { TestBlockJob *s = container_of(job, TestBlockJob, common.job); + /* We are running the actual job code past the pause point in + * job_co_entry(). */ + s->running = true; + job_transition_to_ready(&s->common.job); while (!s->should_complete) { - /* Avoid block_job_sleep_ns() because it marks the job as !busy. We - * want to emulate some actual activity (probably some I/O) here so - * that drain has to wait for this acitivity to stop. */ - qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, 100000); + /* Avoid job_sleep_ns() because it marks the job as !busy. We want to + * emulate some actual activity (probably some I/O) here so that drain + * has to wait for this activity to stop. */ + qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, 1000000); + job_pause_point(&s->common.job); } - return 0; + return s->run_ret; } static void test_job_complete(Job *job, Error **errp) @@ -782,36 +850,115 @@ BlockJobDriver test_job_driver = { .drain = block_job_drain, .run = test_job_run, .complete = test_job_complete, + .prepare = test_job_prepare, + .commit = test_job_commit, + .abort = test_job_abort, }, }; -static void test_blockjob_common(enum drain_type drain_type) +enum test_job_result { + TEST_JOB_SUCCESS, + TEST_JOB_FAIL_RUN, + TEST_JOB_FAIL_PREPARE, +}; + +enum test_job_drain_node { + TEST_JOB_DRAIN_SRC, + TEST_JOB_DRAIN_SRC_CHILD, + TEST_JOB_DRAIN_SRC_PARENT, +}; + +static void test_blockjob_common_drain_node(enum drain_type drain_type, + bool use_iothread, + enum test_job_result result, + enum test_job_drain_node drain_node) { BlockBackend *blk_src, *blk_target; - BlockDriverState *src, *target; + BlockDriverState *src, *src_backing, *src_overlay, *target, *drain_bs; BlockJob *job; + TestBlockJob *tjob; + IOThread *iothread = NULL; + AioContext *ctx; int ret; src = bdrv_new_open_driver(&bdrv_test, "source", BDRV_O_RDWR, &error_abort); + src_backing = bdrv_new_open_driver(&bdrv_test, "source-backing", + BDRV_O_RDWR, &error_abort); + src_overlay = bdrv_new_open_driver(&bdrv_test, "source-overlay", + BDRV_O_RDWR, &error_abort); + + bdrv_set_backing_hd(src_overlay, src, &error_abort); + bdrv_unref(src); + bdrv_set_backing_hd(src, src_backing, &error_abort); + bdrv_unref(src_backing); + blk_src = blk_new(BLK_PERM_ALL, BLK_PERM_ALL); - blk_insert_bs(blk_src, src, &error_abort); + blk_insert_bs(blk_src, src_overlay, &error_abort); + + switch (drain_node) { + case TEST_JOB_DRAIN_SRC: + drain_bs = src; + break; + case TEST_JOB_DRAIN_SRC_CHILD: + drain_bs = src_backing; + break; + case TEST_JOB_DRAIN_SRC_PARENT: + drain_bs = src_overlay; + break; + default: + g_assert_not_reached(); + } + + if (use_iothread) { + iothread = iothread_new(); + ctx = iothread_get_aio_context(iothread); + blk_set_aio_context(blk_src, ctx); + } else { + ctx = qemu_get_aio_context(); + } target = bdrv_new_open_driver(&bdrv_test, "target", BDRV_O_RDWR, &error_abort); blk_target = blk_new(BLK_PERM_ALL, BLK_PERM_ALL); blk_insert_bs(blk_target, target, &error_abort); - job = block_job_create("job0", &test_job_driver, NULL, src, 0, BLK_PERM_ALL, - 0, 0, NULL, NULL, &error_abort); + aio_context_acquire(ctx); + tjob = block_job_create("job0", &test_job_driver, NULL, src, + 0, BLK_PERM_ALL, + 0, 0, NULL, NULL, &error_abort); + job = &tjob->common; block_job_add_bdrv(job, "target", target, 0, BLK_PERM_ALL, &error_abort); + + switch (result) { + case TEST_JOB_SUCCESS: + break; + case TEST_JOB_FAIL_RUN: + tjob->run_ret = -EIO; + break; + case TEST_JOB_FAIL_PREPARE: + tjob->prepare_ret = -EIO; + break; + } + job_start(&job->job); + aio_context_release(ctx); + + if (use_iothread) { + /* job_co_entry() is run in the I/O thread, wait for the actual job + * code to start (we don't want to catch the job in the pause point in + * job_co_entry(). */ + while (!tjob->running) { + aio_poll(qemu_get_aio_context(), false); + } + } g_assert_cmpint(job->job.pause_count, ==, 0); g_assert_false(job->job.paused); - g_assert_true(job->job.busy); /* We're in job_sleep_ns() */ + g_assert_true(tjob->running); + g_assert_true(job->job.busy); /* We're in qemu_co_sleep_ns() */ - do_drain_begin(drain_type, src); + do_drain_begin_unlocked(drain_type, drain_bs); if (drain_type == BDRV_DRAIN_ALL) { /* bdrv_drain_all() drains both src and target */ @@ -822,7 +969,14 @@ static void test_blockjob_common(enum drain_type drain_type) g_assert_true(job->job.paused); g_assert_false(job->job.busy); /* The job is paused */ - do_drain_end(drain_type, src); + do_drain_end_unlocked(drain_type, drain_bs); + + if (use_iothread) { + /* paused is reset in the I/O thread, wait for it */ + while (job->job.paused) { + aio_poll(qemu_get_aio_context(), false); + } + } g_assert_cmpint(job->job.pause_count, ==, 0); g_assert_false(job->job.paused); @@ -841,32 +995,113 @@ static void test_blockjob_common(enum drain_type drain_type) do_drain_end(drain_type, target); + if (use_iothread) { + /* paused is reset in the I/O thread, wait for it */ + while (job->job.paused) { + aio_poll(qemu_get_aio_context(), false); + } + } + g_assert_cmpint(job->job.pause_count, ==, 0); g_assert_false(job->job.paused); - g_assert_true(job->job.busy); /* We're in job_sleep_ns() */ + g_assert_true(job->job.busy); /* We're in qemu_co_sleep_ns() */ + aio_context_acquire(ctx); ret = job_complete_sync(&job->job, &error_abort); - g_assert_cmpint(ret, ==, 0); + g_assert_cmpint(ret, ==, (result == TEST_JOB_SUCCESS ? 0 : -EIO)); + + if (use_iothread) { + blk_set_aio_context(blk_src, qemu_get_aio_context()); + } + aio_context_release(ctx); blk_unref(blk_src); blk_unref(blk_target); - bdrv_unref(src); + bdrv_unref(src_overlay); bdrv_unref(target); + + if (iothread) { + iothread_join(iothread); + } +} + +static void test_blockjob_common(enum drain_type drain_type, bool use_iothread, + enum test_job_result result) +{ + test_blockjob_common_drain_node(drain_type, use_iothread, result, + TEST_JOB_DRAIN_SRC); + test_blockjob_common_drain_node(drain_type, use_iothread, result, + TEST_JOB_DRAIN_SRC_CHILD); + if (drain_type == BDRV_SUBTREE_DRAIN) { + test_blockjob_common_drain_node(drain_type, use_iothread, result, + TEST_JOB_DRAIN_SRC_PARENT); + } } static void test_blockjob_drain_all(void) { - test_blockjob_common(BDRV_DRAIN_ALL); + test_blockjob_common(BDRV_DRAIN_ALL, false, TEST_JOB_SUCCESS); } static void test_blockjob_drain(void) { - test_blockjob_common(BDRV_DRAIN); + test_blockjob_common(BDRV_DRAIN, false, TEST_JOB_SUCCESS); } static void test_blockjob_drain_subtree(void) { - test_blockjob_common(BDRV_SUBTREE_DRAIN); + test_blockjob_common(BDRV_SUBTREE_DRAIN, false, TEST_JOB_SUCCESS); +} + +static void test_blockjob_error_drain_all(void) +{ + test_blockjob_common(BDRV_DRAIN_ALL, false, TEST_JOB_FAIL_RUN); + test_blockjob_common(BDRV_DRAIN_ALL, false, TEST_JOB_FAIL_PREPARE); +} + +static void test_blockjob_error_drain(void) +{ + test_blockjob_common(BDRV_DRAIN, false, TEST_JOB_FAIL_RUN); + test_blockjob_common(BDRV_DRAIN, false, TEST_JOB_FAIL_PREPARE); +} + +static void test_blockjob_error_drain_subtree(void) +{ + test_blockjob_common(BDRV_SUBTREE_DRAIN, false, TEST_JOB_FAIL_RUN); + test_blockjob_common(BDRV_SUBTREE_DRAIN, false, TEST_JOB_FAIL_PREPARE); +} + +static void test_blockjob_iothread_drain_all(void) +{ + test_blockjob_common(BDRV_DRAIN_ALL, true, TEST_JOB_SUCCESS); +} + +static void test_blockjob_iothread_drain(void) +{ + test_blockjob_common(BDRV_DRAIN, true, TEST_JOB_SUCCESS); +} + +static void test_blockjob_iothread_drain_subtree(void) +{ + test_blockjob_common(BDRV_SUBTREE_DRAIN, true, TEST_JOB_SUCCESS); +} + +static void test_blockjob_iothread_error_drain_all(void) +{ + test_blockjob_common(BDRV_DRAIN_ALL, true, TEST_JOB_FAIL_RUN); + test_blockjob_common(BDRV_DRAIN_ALL, true, TEST_JOB_FAIL_PREPARE); +} + +static void test_blockjob_iothread_error_drain(void) +{ + test_blockjob_common(BDRV_DRAIN, true, TEST_JOB_FAIL_RUN); + test_blockjob_common(BDRV_DRAIN, true, TEST_JOB_FAIL_PREPARE); +} + +static void test_blockjob_iothread_error_drain_subtree(void) +{ + test_blockjob_common(BDRV_SUBTREE_DRAIN, true, TEST_JOB_FAIL_RUN); + test_blockjob_common(BDRV_SUBTREE_DRAIN, true, TEST_JOB_FAIL_PREPARE); } @@ -1338,6 +1573,27 @@ int main(int argc, char **argv) g_test_add_func("/bdrv-drain/blockjob/drain_subtree", test_blockjob_drain_subtree); + g_test_add_func("/bdrv-drain/blockjob/error/drain_all", + test_blockjob_error_drain_all); + g_test_add_func("/bdrv-drain/blockjob/error/drain", + test_blockjob_error_drain); + g_test_add_func("/bdrv-drain/blockjob/error/drain_subtree", + test_blockjob_error_drain_subtree); + + g_test_add_func("/bdrv-drain/blockjob/iothread/drain_all", + test_blockjob_iothread_drain_all); + g_test_add_func("/bdrv-drain/blockjob/iothread/drain", + test_blockjob_iothread_drain); + g_test_add_func("/bdrv-drain/blockjob/iothread/drain_subtree", + test_blockjob_iothread_drain_subtree); + + g_test_add_func("/bdrv-drain/blockjob/iothread/error/drain_all", + test_blockjob_iothread_error_drain_all); + g_test_add_func("/bdrv-drain/blockjob/iothread/error/drain", + test_blockjob_iothread_error_drain); + g_test_add_func("/bdrv-drain/blockjob/iothread/error/drain_subtree", + test_blockjob_iothread_error_drain_subtree); + g_test_add_func("/bdrv-drain/deletion/drain", test_delete_by_drain); g_test_add_func("/bdrv-drain/detach/drain_all", test_detach_by_drain_all); g_test_add_func("/bdrv-drain/detach/drain", test_detach_by_drain); diff --git a/tests/test-blockjob-txn.c b/tests/test-blockjob-txn.c index ef29f35..86606f9 100644 --- a/tests/test-blockjob-txn.c +++ b/tests/test-blockjob-txn.c @@ -24,7 +24,7 @@ typedef struct { int *result; } TestBlockJob; -static void test_block_job_exit(Job *job) +static void test_block_job_clean(Job *job) { BlockJob *bjob = container_of(job, BlockJob, job); BlockDriverState *bs = blk_bs(bjob->blk); @@ -73,7 +73,7 @@ static const BlockJobDriver test_block_job_driver = { .user_resume = block_job_user_resume, .drain = block_job_drain, .run = test_block_job_run, - .exit = test_block_job_exit, + .clean = test_block_job_clean, }, }; diff --git a/tests/test-blockjob.c b/tests/test-blockjob.c index ad4a65b..652d1e8 100644 --- a/tests/test-blockjob.c +++ b/tests/test-blockjob.c @@ -160,15 +160,8 @@ typedef struct CancelJob { BlockBackend *blk; bool should_converge; bool should_complete; - bool completed; } CancelJob; -static void cancel_job_exit(Job *job) -{ - CancelJob *s = container_of(job, CancelJob, common.job); - s->completed = true; -} - static void cancel_job_complete(Job *job, Error **errp) { CancelJob *s = container_of(job, CancelJob, common.job); @@ -201,23 +194,24 @@ static const BlockJobDriver test_cancel_driver = { .user_resume = block_job_user_resume, .drain = block_job_drain, .run = cancel_job_run, - .exit = cancel_job_exit, .complete = cancel_job_complete, }, }; -static CancelJob *create_common(BlockJob **pjob) +static CancelJob *create_common(Job **pjob) { BlockBackend *blk; - BlockJob *job; + Job *job; + BlockJob *bjob; CancelJob *s; blk = create_blk(NULL); - job = mk_job(blk, "Steve", &test_cancel_driver, true, - JOB_MANUAL_FINALIZE | JOB_MANUAL_DISMISS); - job_ref(&job->job); - assert(job->job.status == JOB_STATUS_CREATED); - s = container_of(job, CancelJob, common); + bjob = mk_job(blk, "Steve", &test_cancel_driver, true, + JOB_MANUAL_FINALIZE | JOB_MANUAL_DISMISS); + job = &bjob->job; + job_ref(job); + assert(job->status == JOB_STATUS_CREATED); + s = container_of(bjob, CancelJob, common); s->blk = blk; *pjob = job; @@ -229,6 +223,10 @@ static void cancel_common(CancelJob *s) BlockJob *job = &s->common; BlockBackend *blk = s->blk; JobStatus sts = job->job.status; + AioContext *ctx; + + ctx = job->job.aio_context; + aio_context_acquire(ctx); job_cancel_sync(&job->job); if (sts != JOB_STATUS_CREATED && sts != JOB_STATUS_CONCLUDED) { @@ -238,11 +236,13 @@ static void cancel_common(CancelJob *s) assert(job->job.status == JOB_STATUS_NULL); job_unref(&job->job); destroy_blk(blk); + + aio_context_release(ctx); } static void test_cancel_created(void) { - BlockJob *job; + Job *job; CancelJob *s; s = create_common(&job); @@ -251,119 +251,123 @@ static void test_cancel_created(void) static void test_cancel_running(void) { - BlockJob *job; + Job *job; CancelJob *s; s = create_common(&job); - job_start(&job->job); - assert(job->job.status == JOB_STATUS_RUNNING); + job_start(job); + assert(job->status == JOB_STATUS_RUNNING); cancel_common(s); } static void test_cancel_paused(void) { - BlockJob *job; + Job *job; CancelJob *s; s = create_common(&job); - job_start(&job->job); - assert(job->job.status == JOB_STATUS_RUNNING); + job_start(job); + assert(job->status == JOB_STATUS_RUNNING); - job_user_pause(&job->job, &error_abort); - job_enter(&job->job); - assert(job->job.status == JOB_STATUS_PAUSED); + job_user_pause(job, &error_abort); + job_enter(job); + assert(job->status == JOB_STATUS_PAUSED); cancel_common(s); } static void test_cancel_ready(void) { - BlockJob *job; + Job *job; CancelJob *s; s = create_common(&job); - job_start(&job->job); - assert(job->job.status == JOB_STATUS_RUNNING); + job_start(job); + assert(job->status == JOB_STATUS_RUNNING); s->should_converge = true; - job_enter(&job->job); - assert(job->job.status == JOB_STATUS_READY); + job_enter(job); + assert(job->status == JOB_STATUS_READY); cancel_common(s); } static void test_cancel_standby(void) { - BlockJob *job; + Job *job; CancelJob *s; s = create_common(&job); - job_start(&job->job); - assert(job->job.status == JOB_STATUS_RUNNING); + job_start(job); + assert(job->status == JOB_STATUS_RUNNING); s->should_converge = true; - job_enter(&job->job); - assert(job->job.status == JOB_STATUS_READY); + job_enter(job); + assert(job->status == JOB_STATUS_READY); - job_user_pause(&job->job, &error_abort); - job_enter(&job->job); - assert(job->job.status == JOB_STATUS_STANDBY); + job_user_pause(job, &error_abort); + job_enter(job); + assert(job->status == JOB_STATUS_STANDBY); cancel_common(s); } static void test_cancel_pending(void) { - BlockJob *job; + Job *job; CancelJob *s; s = create_common(&job); - job_start(&job->job); - assert(job->job.status == JOB_STATUS_RUNNING); + job_start(job); + assert(job->status == JOB_STATUS_RUNNING); s->should_converge = true; - job_enter(&job->job); - assert(job->job.status == JOB_STATUS_READY); + job_enter(job); + assert(job->status == JOB_STATUS_READY); - job_complete(&job->job, &error_abort); - job_enter(&job->job); - while (!s->completed) { + job_complete(job, &error_abort); + job_enter(job); + while (!job->deferred_to_main_loop) { aio_poll(qemu_get_aio_context(), true); } - assert(job->job.status == JOB_STATUS_PENDING); + assert(job->status == JOB_STATUS_READY); + aio_poll(qemu_get_aio_context(), true); + assert(job->status == JOB_STATUS_PENDING); cancel_common(s); } static void test_cancel_concluded(void) { - BlockJob *job; + Job *job; CancelJob *s; s = create_common(&job); - job_start(&job->job); - assert(job->job.status == JOB_STATUS_RUNNING); + job_start(job); + assert(job->status == JOB_STATUS_RUNNING); s->should_converge = true; - job_enter(&job->job); - assert(job->job.status == JOB_STATUS_READY); + job_enter(job); + assert(job->status == JOB_STATUS_READY); - job_complete(&job->job, &error_abort); - job_enter(&job->job); - while (!s->completed) { + job_complete(job, &error_abort); + job_enter(job); + while (!job->deferred_to_main_loop) { aio_poll(qemu_get_aio_context(), true); } - assert(job->job.status == JOB_STATUS_PENDING); + assert(job->status == JOB_STATUS_READY); + aio_poll(qemu_get_aio_context(), true); + assert(job->status == JOB_STATUS_PENDING); - job_finalize(&job->job, &error_abort); - assert(job->job.status == JOB_STATUS_CONCLUDED); + job_finalize(job, &error_abort); + assert(job->status == JOB_STATUS_CONCLUDED); cancel_common(s); } diff --git a/util/aio-wait.c b/util/aio-wait.c index b8a8f86..b487749 100644 --- a/util/aio-wait.c +++ b/util/aio-wait.c @@ -26,21 +26,22 @@ #include "qemu/main-loop.h" #include "block/aio-wait.h" +AioWait global_aio_wait; + static void dummy_bh_cb(void *opaque) { /* The point is to make AIO_WAIT_WHILE()'s aio_poll() return */ } -void aio_wait_kick(AioWait *wait) +void aio_wait_kick(void) { /* The barrier (or an atomic op) is in the caller. */ - if (atomic_read(&wait->num_waiters)) { + if (atomic_read(&global_aio_wait.num_waiters)) { aio_bh_schedule_oneshot(qemu_get_aio_context(), dummy_bh_cb, NULL); } } typedef struct { - AioWait wait; bool done; QEMUBHFunc *cb; void *opaque; @@ -54,7 +55,7 @@ static void aio_wait_bh(void *opaque) data->cb(data->opaque); data->done = true; - aio_wait_kick(&data->wait); + aio_wait_kick(); } void aio_wait_bh_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque) @@ -67,5 +68,5 @@ void aio_wait_bh_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque) assert(qemu_get_current_aio_context() == qemu_get_aio_context()); aio_bh_schedule_oneshot(ctx, aio_wait_bh, &data); - AIO_WAIT_WHILE(&data.wait, ctx, !data.done); + AIO_WAIT_WHILE(ctx, !data.done); } diff --git a/util/async.c b/util/async.c index 05979f8..c10642a 100644 --- a/util/async.c +++ b/util/async.c @@ -400,7 +400,7 @@ static void co_schedule_bh_cb(void *opaque) /* Protected by write barrier in qemu_aio_coroutine_enter */ atomic_set(&co->scheduled, NULL); - qemu_coroutine_enter(co); + qemu_aio_coroutine_enter(ctx, co); aio_context_release(ctx); } } diff --git a/util/qemu-coroutine.c b/util/qemu-coroutine.c index 1ba4191..2295928 100644 --- a/util/qemu-coroutine.c +++ b/util/qemu-coroutine.c @@ -198,3 +198,8 @@ bool qemu_coroutine_entered(Coroutine *co) { return co->caller; } + +AioContext *coroutine_fn qemu_coroutine_get_aio_context(Coroutine *co) +{ + return co->ctx; +} |