diff options
-rw-r--r-- | block/backup.c | 2 | ||||
-rw-r--r-- | block/commit.c | 4 | ||||
-rw-r--r-- | block/mirror.c | 22 | ||||
-rw-r--r-- | block/replication.c | 2 | ||||
-rw-r--r-- | block/stream.c | 4 | ||||
-rw-r--r-- | blockdev.c | 8 | ||||
-rw-r--r-- | blockjob.c | 219 | ||||
-rw-r--r-- | include/block/blockjob.h | 40 | ||||
-rw-r--r-- | include/block/blockjob_int.h | 26 | ||||
-rw-r--r-- | include/qemu/job.h | 76 | ||||
-rw-r--r-- | job.c | 137 | ||||
-rw-r--r-- | tests/test-bdrv-drain.c | 38 | ||||
-rw-r--r-- | tests/test-blockjob-txn.c | 12 | ||||
-rw-r--r-- | tests/test-blockjob.c | 14 |
14 files changed, 305 insertions, 299 deletions
diff --git a/block/backup.c b/block/backup.c index 22dd368..7d9aad9 100644 --- a/block/backup.c +++ b/block/backup.c @@ -528,8 +528,8 @@ static const BlockJobDriver backup_job_driver = { .instance_size = sizeof(BackupBlockJob), .job_type = JOB_TYPE_BACKUP, .free = block_job_free, + .start = backup_run, }, - .start = backup_run, .commit = backup_commit, .abort = backup_abort, .clean = backup_clean, diff --git a/block/commit.c b/block/commit.c index d326766..2fbc310 100644 --- a/block/commit.c +++ b/block/commit.c @@ -220,8 +220,8 @@ static const BlockJobDriver commit_job_driver = { .instance_size = sizeof(CommitBlockJob), .job_type = JOB_TYPE_COMMIT, .free = block_job_free, + .start = commit_run, }, - .start = commit_run, }; static int coroutine_fn bdrv_commit_top_preadv(BlockDriverState *bs, @@ -371,7 +371,7 @@ void commit_start(const char *job_id, BlockDriverState *bs, s->on_error = on_error; trace_commit_start(bs, base, top, s); - block_job_start(&s->common); + job_start(&s->common.job); return; fail: diff --git a/block/mirror.c b/block/mirror.c index 90d4ac9..95fc807 100644 --- a/block/mirror.c +++ b/block/mirror.c @@ -126,7 +126,7 @@ static void mirror_iteration_done(MirrorOp *op, int ret) g_free(op); if (s->waiting_for_io) { - qemu_coroutine_enter(s->common.co); + qemu_coroutine_enter(s->common.job.co); } } @@ -345,7 +345,7 @@ static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s) mirror_wait_for_io(s); } - block_job_pause_point(&s->common); + job_pause_point(&s->common.job); /* Find the number of consective dirty chunks following the first dirty * one, and wait for in flight requests in them. */ @@ -597,7 +597,7 @@ static void mirror_throttle(MirrorBlockJob *s) s->last_pause_ns = now; block_job_sleep_ns(&s->common, 0); } else { - block_job_pause_point(&s->common); + job_pause_point(&s->common.job); } } @@ -786,7 +786,7 @@ static void coroutine_fn mirror_run(void *opaque) goto immediate_exit; } - block_job_pause_point(&s->common); + job_pause_point(&s->common.job); cnt = bdrv_get_dirty_count(s->dirty_bitmap); /* cnt is the number of dirty bytes remaining and s->bytes_in_flight is @@ -957,9 +957,9 @@ static void mirror_complete(BlockJob *job, Error **errp) block_job_enter(&s->common); } -static void mirror_pause(BlockJob *job) +static void mirror_pause(Job *job) { - MirrorBlockJob *s = container_of(job, MirrorBlockJob, common); + MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job); mirror_wait_for_all_io(s); } @@ -991,10 +991,10 @@ static const BlockJobDriver mirror_job_driver = { .instance_size = sizeof(MirrorBlockJob), .job_type = JOB_TYPE_MIRROR, .free = block_job_free, + .start = mirror_run, + .pause = mirror_pause, }, - .start = mirror_run, .complete = mirror_complete, - .pause = mirror_pause, .attached_aio_context = mirror_attached_aio_context, .drain = mirror_drain, }; @@ -1004,10 +1004,10 @@ static const BlockJobDriver commit_active_job_driver = { .instance_size = sizeof(MirrorBlockJob), .job_type = JOB_TYPE_COMMIT, .free = block_job_free, + .start = mirror_run, + .pause = mirror_pause, }, - .start = mirror_run, .complete = mirror_complete, - .pause = mirror_pause, .attached_aio_context = mirror_attached_aio_context, .drain = mirror_drain, }; @@ -1244,7 +1244,7 @@ static void mirror_start_job(const char *job_id, BlockDriverState *bs, } trace_mirror_start(bs, s, opaque); - block_job_start(&s->common); + job_start(&s->common.job); return; fail: diff --git a/block/replication.c b/block/replication.c index 48148b8..9ed6e0f 100644 --- a/block/replication.c +++ b/block/replication.c @@ -576,7 +576,7 @@ static void replication_start(ReplicationState *rs, ReplicationMode mode, aio_context_release(aio_context); return; } - block_job_start(job); + job_start(&job->job); break; default: aio_context_release(aio_context); diff --git a/block/stream.c b/block/stream.c index 0bba816..6d8b7b6 100644 --- a/block/stream.c +++ b/block/stream.c @@ -213,8 +213,8 @@ static const BlockJobDriver stream_job_driver = { .instance_size = sizeof(StreamBlockJob), .job_type = JOB_TYPE_STREAM, .free = block_job_free, + .start = stream_run, }, - .start = stream_run, }; void stream_start(const char *job_id, BlockDriverState *bs, @@ -262,7 +262,7 @@ void stream_start(const char *job_id, BlockDriverState *bs, s->on_error = on_error; trace_stream_start(bs, base, s); - block_job_start(&s->common); + job_start(&s->common.job); return; fail: @@ -1910,7 +1910,7 @@ static void drive_backup_commit(BlkActionState *common) aio_context_acquire(aio_context); assert(state->job); - block_job_start(state->job); + job_start(&state->job->job); aio_context_release(aio_context); } @@ -2008,7 +2008,7 @@ static void blockdev_backup_commit(BlkActionState *common) aio_context_acquire(aio_context); assert(state->job); - block_job_start(state->job); + job_start(&state->job->job); aio_context_release(aio_context); } @@ -3425,7 +3425,7 @@ void qmp_drive_backup(DriveBackup *arg, Error **errp) BlockJob *job; job = do_drive_backup(arg, NULL, errp); if (job) { - block_job_start(job); + job_start(&job->job); } } @@ -3513,7 +3513,7 @@ void qmp_blockdev_backup(BlockdevBackup *arg, Error **errp) BlockJob *job; job = do_blockdev_backup(arg, NULL, errp); if (job) { - block_job_start(job); + job_start(&job->job); } } @@ -36,30 +36,9 @@ #include "qemu/coroutine.h" #include "qemu/timer.h" -/* Right now, this mutex is only needed to synchronize accesses to job->busy - * and job->sleep_timer, such as concurrent calls to block_job_do_yield and - * block_job_enter. */ -static QemuMutex block_job_mutex; - -static void block_job_lock(void) -{ - qemu_mutex_lock(&block_job_mutex); -} - -static void block_job_unlock(void) -{ - qemu_mutex_unlock(&block_job_mutex); -} - -static void __attribute__((__constructor__)) block_job_init(void) -{ - qemu_mutex_init(&block_job_mutex); -} - static void block_job_event_cancelled(BlockJob *job); static void block_job_event_completed(BlockJob *job, const char *msg); static int block_job_event_pending(BlockJob *job); -static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job)); /* Transactional group of block jobs */ struct BlockJobTxn { @@ -161,33 +140,27 @@ static void block_job_txn_del_job(BlockJob *job) } } -/* Assumes the block_job_mutex is held */ -static bool block_job_timer_pending(BlockJob *job) -{ - return timer_pending(&job->sleep_timer); -} - -/* Assumes the block_job_mutex is held */ -static bool block_job_timer_not_pending(BlockJob *job) +/* Assumes the job_mutex is held */ +static bool job_timer_not_pending(Job *job) { - return !block_job_timer_pending(job); + return !timer_pending(&job->sleep_timer); } static void block_job_pause(BlockJob *job) { - job->pause_count++; + job->job.pause_count++; } static void block_job_resume(BlockJob *job) { - assert(job->pause_count > 0); - job->pause_count--; - if (job->pause_count) { + assert(job->job.pause_count > 0); + job->job.pause_count--; + if (job->job.pause_count) { return; } /* kick only if no timer is pending */ - block_job_enter_cond(job, block_job_timer_not_pending); + job_enter_cond(&job->job, job_timer_not_pending); } static void block_job_attached_aio_context(AioContext *new_context, @@ -208,7 +181,7 @@ void block_job_free(Job *job) block_job_detach_aio_context, bjob); blk_unref(bjob->blk); error_free(bjob->blocker); - assert(!timer_pending(&bjob->sleep_timer)); + assert(!timer_pending(&bjob->job.sleep_timer)); } static void block_job_attached_aio_context(AioContext *new_context, @@ -226,7 +199,7 @@ static void block_job_attached_aio_context(AioContext *new_context, static void block_job_drain(BlockJob *job) { - /* If job is !job->busy this kicks it into the next pause point. */ + /* If job is !job->job.busy this kicks it into the next pause point. */ block_job_enter(job); blk_drain(job->blk); @@ -244,7 +217,7 @@ static void block_job_detach_aio_context(void *opaque) block_job_pause(job); - while (!job->paused && !job->completed) { + while (!job->job.paused && !job->completed) { block_job_drain(job); } @@ -312,29 +285,11 @@ bool block_job_is_internal(BlockJob *job) return (job->job.id == NULL); } -static bool block_job_started(BlockJob *job) -{ - return job->co; -} - const BlockJobDriver *block_job_driver(BlockJob *job) { return job->driver; } -/** - * All jobs must allow a pause point before entering their job proper. This - * ensures that jobs can be paused prior to being started, then resumed later. - */ -static void coroutine_fn block_job_co_entry(void *opaque) -{ - BlockJob *job = opaque; - - assert(job && job->driver && job->driver->start); - block_job_pause_point(job); - job->driver->start(job); -} - static void block_job_sleep_timer_cb(void *opaque) { BlockJob *job = opaque; @@ -342,24 +297,12 @@ static void block_job_sleep_timer_cb(void *opaque) block_job_enter(job); } -void block_job_start(BlockJob *job) -{ - assert(job && !block_job_started(job) && job->paused && - job->driver && job->driver->start); - job->co = qemu_coroutine_create(block_job_co_entry, job); - job->pause_count--; - job->busy = true; - job->paused = false; - job_state_transition(&job->job, JOB_STATUS_RUNNING); - bdrv_coroutine_enter(blk_bs(job->blk), job->co); -} - static void block_job_decommission(BlockJob *job) { assert(job); job->completed = true; - job->busy = false; - job->paused = false; + job->job.busy = false; + job->job.paused = false; job->job.deferred_to_main_loop = true; block_job_txn_del_job(job); job_state_transition(&job->job, JOB_STATUS_NULL); @@ -374,7 +317,7 @@ static void block_job_do_dismiss(BlockJob *job) static void block_job_conclude(BlockJob *job) { job_state_transition(&job->job, JOB_STATUS_CONCLUDED); - if (job->auto_dismiss || !block_job_started(job)) { + if (job->auto_dismiss || !job_started(&job->job)) { block_job_do_dismiss(job); } } @@ -439,7 +382,7 @@ static int block_job_finalize_single(BlockJob *job) } /* Emit events only if we actually started */ - if (block_job_started(job)) { + if (job_started(&job->job)) { if (job_is_cancelled(&job->job)) { block_job_event_cancelled(job); } else { @@ -464,7 +407,7 @@ static void block_job_cancel_async(BlockJob *job, bool force) if (job->user_paused) { /* Do not call block_job_enter here, the caller will handle it. */ job->user_paused = false; - job->pause_count--; + job->job.pause_count--; } job->job.cancelled = true; /* To prevent 'force == false' overriding a previous 'force == true' */ @@ -615,6 +558,12 @@ static void block_job_completed_txn_success(BlockJob *job) } } +/* Assumes the job_mutex is held */ +static bool job_timer_pending(Job *job) +{ + return timer_pending(&job->sleep_timer); +} + void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp) { int64_t old_speed = job->speed; @@ -635,7 +584,7 @@ void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp) } /* kick only if a timer is pending */ - block_job_enter_cond(job, block_job_timer_pending); + job_enter_cond(&job->job, job_timer_pending); } int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n) @@ -654,7 +603,7 @@ void block_job_complete(BlockJob *job, Error **errp) if (job_apply_verb(&job->job, JOB_VERB_COMPLETE, errp)) { return; } - if (job->pause_count || job_is_cancelled(&job->job) || + if (job->job.pause_count || job_is_cancelled(&job->job) || !job->driver->complete) { error_setg(errp, "The active block job '%s' cannot be completed", @@ -708,7 +657,7 @@ bool block_job_user_paused(BlockJob *job) void block_job_user_resume(BlockJob *job, Error **errp) { assert(job); - if (!job->user_paused || job->pause_count <= 0) { + if (!job->user_paused || job->job.pause_count <= 0) { error_setg(errp, "Can't resume a job that was not paused"); return; } @@ -727,7 +676,7 @@ void block_job_cancel(BlockJob *job, bool force) return; } block_job_cancel_async(job, force); - if (!block_job_started(job)) { + if (!job_started(&job->job)) { block_job_completed(job, -ECANCELED); } else if (job->job.deferred_to_main_loop) { block_job_completed_txn_abort(job); @@ -797,8 +746,8 @@ BlockJobInfo *block_job_query(BlockJob *job, Error **errp) info->type = g_strdup(job_type_str(&job->job)); info->device = g_strdup(job->job.id); info->len = job->len; - info->busy = atomic_read(&job->busy); - info->paused = job->pause_count > 0; + info->busy = atomic_read(&job->job.busy); + info->paused = job->job.pause_count > 0; info->offset = job->offset; info->speed = job->speed; info->io_status = job->iostatus; @@ -915,12 +864,9 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver, job->blk = blk; job->cb = cb; job->opaque = opaque; - job->busy = false; - job->paused = true; - job->pause_count = 1; job->auto_finalize = !(flags & BLOCK_JOB_MANUAL_FINALIZE); job->auto_dismiss = !(flags & BLOCK_JOB_MANUAL_DISMISS); - aio_timer_init(qemu_get_aio_context(), &job->sleep_timer, + aio_timer_init(qemu_get_aio_context(), &job->job.sleep_timer, QEMU_CLOCK_REALTIME, SCALE_NS, block_job_sleep_timer_cb, job); @@ -980,128 +926,41 @@ void block_job_completed(BlockJob *job, int ret) } } -static bool block_job_should_pause(BlockJob *job) -{ - return job->pause_count > 0; -} - -/* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds. - * Reentering the job coroutine with block_job_enter() before the timer has - * expired is allowed and cancels the timer. - * - * If @ns is (uint64_t) -1, no timer is scheduled and block_job_enter() must be - * called explicitly. */ -static void block_job_do_yield(BlockJob *job, uint64_t ns) -{ - block_job_lock(); - if (ns != -1) { - timer_mod(&job->sleep_timer, ns); - } - job->busy = false; - block_job_unlock(); - qemu_coroutine_yield(); - - /* Set by block_job_enter before re-entering the coroutine. */ - assert(job->busy); -} - -void coroutine_fn block_job_pause_point(BlockJob *job) -{ - assert(job && block_job_started(job)); - - if (!block_job_should_pause(job)) { - return; - } - if (job_is_cancelled(&job->job)) { - return; - } - - if (job->driver->pause) { - job->driver->pause(job); - } - - if (block_job_should_pause(job) && !job_is_cancelled(&job->job)) { - JobStatus status = job->job.status; - job_state_transition(&job->job, status == JOB_STATUS_READY - ? JOB_STATUS_STANDBY - : JOB_STATUS_PAUSED); - job->paused = true; - block_job_do_yield(job, -1); - job->paused = false; - job_state_transition(&job->job, status); - } - - if (job->driver->resume) { - job->driver->resume(job); - } -} - -/* - * Conditionally enter a block_job pending a call to fn() while - * under the block_job_lock critical section. - */ -static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job)) -{ - if (!block_job_started(job)) { - return; - } - if (job->job.deferred_to_main_loop) { - return; - } - - block_job_lock(); - if (job->busy) { - block_job_unlock(); - return; - } - - if (fn && !fn(job)) { - block_job_unlock(); - return; - } - - assert(!job->job.deferred_to_main_loop); - timer_del(&job->sleep_timer); - job->busy = true; - block_job_unlock(); - aio_co_wake(job->co); -} - void block_job_enter(BlockJob *job) { - block_job_enter_cond(job, NULL); + job_enter_cond(&job->job, NULL); } void block_job_sleep_ns(BlockJob *job, int64_t ns) { - assert(job->busy); + assert(job->job.busy); /* Check cancellation *before* setting busy = false, too! */ if (job_is_cancelled(&job->job)) { return; } - if (!block_job_should_pause(job)) { - block_job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns); + if (!job_should_pause(&job->job)) { + job_do_yield(&job->job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns); } - block_job_pause_point(job); + job_pause_point(&job->job); } void block_job_yield(BlockJob *job) { - assert(job->busy); + assert(job->job.busy); /* Check cancellation *before* setting busy = false, too! */ if (job_is_cancelled(&job->job)) { return; } - if (!block_job_should_pause(job)) { - block_job_do_yield(job, -1); + if (!job_should_pause(&job->job)) { + job_do_yield(&job->job, -1); } - block_job_pause_point(job); + job_pause_point(&job->job); } void block_job_iostatus_reset(BlockJob *job) @@ -1109,7 +968,7 @@ void block_job_iostatus_reset(BlockJob *job) if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) { return; } - assert(job->user_paused && job->pause_count > 0); + assert(job->user_paused && job->job.pause_count > 0); job->iostatus = BLOCK_DEVICE_IO_STATUS_OK; } diff --git a/include/block/blockjob.h b/include/block/blockjob.h index 2a9e865..b60d919 100644 --- a/include/block/blockjob.h +++ b/include/block/blockjob.h @@ -51,43 +51,18 @@ typedef struct BlockJob { BlockBackend *blk; /** - * The coroutine that executes the job. If not NULL, it is - * reentered when busy is false and the job is cancelled. - */ - Coroutine *co; - - /** * Set to true if the job should abort immediately without waiting * for data to be in sync. */ bool force; /** - * Counter for pause request. If non-zero, the block job is either paused, - * or if busy == true will pause itself as soon as possible. - */ - int pause_count; - - /** * Set to true if the job is paused by user. Can be unpaused with the * block-job-resume QMP command. */ bool user_paused; /** - * Set to false by the job while the coroutine has yielded and may be - * re-entered by block_job_enter(). There may still be I/O or event loop - * activity pending. Accessed under block_job_mutex (in blockjob.c). - */ - bool busy; - - /** - * Set to true by the job while it is in a quiescent state, where - * no I/O or event loop activity is pending. - */ - bool paused; - - /** * Set to true when the job is ready to be completed. */ bool ready; @@ -125,12 +100,6 @@ typedef struct BlockJob { /** ret code passed to block_job_completed. */ int ret; - /** - * Timer that is used by @block_job_sleep_ns. Accessed under - * block_job_mutex (in blockjob.c). - */ - QEMUTimer sleep_timer; - /** True if this job should automatically finalize itself */ bool auto_finalize; @@ -208,15 +177,6 @@ void block_job_remove_all_bdrv(BlockJob *job); void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp); /** - * block_job_start: - * @job: A job that has not yet been started. - * - * Begins execution of a block job. - * Takes ownership of one reference to the job object. - */ -void block_job_start(BlockJob *job); - -/** * block_job_cancel: * @job: The job to be canceled. * @force: Quit a job without waiting for data to be in sync. diff --git a/include/block/blockjob_int.h b/include/block/blockjob_int.h index 0c2f8de..0a614a8 100644 --- a/include/block/blockjob_int.h +++ b/include/block/blockjob_int.h @@ -38,9 +38,6 @@ struct BlockJobDriver { /** Generic JobDriver callbacks and settings */ JobDriver job_driver; - /** Mandatory: Entrypoint for the Coroutine. */ - CoroutineEntry *start; - /** * Optional callback for job types whose completion must be triggered * manually. @@ -85,20 +82,6 @@ struct BlockJobDriver { */ void (*clean)(BlockJob *job); - /** - * If the callback is not NULL, it will be invoked when the job transitions - * into the paused state. Paused jobs must not perform any asynchronous - * I/O or event loop activity. This callback is used to quiesce jobs. - */ - void coroutine_fn (*pause)(BlockJob *job); - - /** - * If the callback is not NULL, it will be invoked when the job transitions - * out of the paused state. Any asynchronous I/O or event loop activity - * should be restarted from this callback. - */ - void coroutine_fn (*resume)(BlockJob *job); - /* * If the callback is not NULL, it will be invoked before the job is * resumed in a new AioContext. This is the place to move any resources @@ -196,15 +179,6 @@ void block_job_early_fail(BlockJob *job); void block_job_completed(BlockJob *job, int ret); /** - * block_job_pause_point: - * @job: The job that is ready to pause. - * - * Pause now if block_job_pause() has been called. Block jobs that perform - * lots of I/O must call this between requests so that the job can be paused. - */ -void coroutine_fn block_job_pause_point(BlockJob *job); - -/** * block_job_enter: * @job: The job to enter. * diff --git a/include/qemu/job.h b/include/qemu/job.h index 933e0ab..9dcff12 100644 --- a/include/qemu/job.h +++ b/include/qemu/job.h @@ -28,6 +28,7 @@ #include "qapi/qapi-types-block-core.h" #include "qemu/queue.h" +#include "qemu/coroutine.h" typedef struct JobDriver JobDriver; @@ -51,6 +52,37 @@ typedef struct Job { AioContext *aio_context; /** + * The coroutine that executes the job. If not NULL, it is reentered when + * busy is false and the job is cancelled. + */ + Coroutine *co; + + /** + * Timer that is used by @block_job_sleep_ns. Accessed under job_mutex (in + * job.c). + */ + QEMUTimer sleep_timer; + + /** + * Counter for pause request. If non-zero, the block job is either paused, + * or if busy == true will pause itself as soon as possible. + */ + int pause_count; + + /** + * Set to false by the job while the coroutine has yielded and may be + * re-entered by block_job_enter(). There may still be I/O or event loop + * activity pending. Accessed under block_job_mutex (in blockjob.c). + */ + bool busy; + + /** + * Set to true by the job while it is in a quiescent state, where + * no I/O or event loop activity is pending. + */ + bool paused; + + /** * Set to true if the job should cancel itself. The flag must * always be tested just before toggling the busy flag from false * to true. After a job has been cancelled, it should only yield @@ -75,6 +107,23 @@ struct JobDriver { /** Enum describing the operation */ JobType job_type; + /** Mandatory: Entrypoint for the Coroutine. */ + CoroutineEntry *start; + + /** + * If the callback is not NULL, it will be invoked when the job transitions + * into the paused state. Paused jobs must not perform any asynchronous + * I/O or event loop activity. This callback is used to quiesce jobs. + */ + void coroutine_fn (*pause)(Job *job); + + /** + * If the callback is not NULL, it will be invoked when the job transitions + * out of the paused state. Any asynchronous I/O or event loop activity + * should be restarted from this callback. + */ + void coroutine_fn (*resume)(Job *job); + /** Called when the job is freed */ void (*free)(Job *job); }; @@ -103,6 +152,30 @@ void job_ref(Job *job); */ void job_unref(Job *job); +/** + * Conditionally enter the job coroutine if the job is ready to run, not + * already busy and fn() returns true. fn() is called while under the job_lock + * critical section. + */ +void job_enter_cond(Job *job, bool(*fn)(Job *job)); + +/** + * @job: A job that has not yet been started. + * + * Begins execution of a job. + * Takes ownership of one reference to the job object. + */ +void job_start(Job *job); + +/** + * @job: The job that is ready to pause. + * + * Pause now if job_pause() has been called. Jobs that perform lots of I/O + * must call this between requests so that the job can be paused. + */ +void coroutine_fn job_pause_point(Job *job); + + /** Returns the JobType of a given Job. */ JobType job_type(const Job *job); @@ -153,5 +226,8 @@ void job_defer_to_main_loop(Job *job, JobDeferToMainLoopFn *fn, void *opaque); /* TODO To be removed from the public interface */ void job_state_transition(Job *job, JobStatus s1); +void coroutine_fn job_do_yield(Job *job, uint64_t ns); +bool job_should_pause(Job *job); +bool job_started(Job *job); #endif @@ -60,6 +60,26 @@ bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = { [JOB_VERB_DISMISS] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0}, }; +/* Right now, this mutex is only needed to synchronize accesses to job->busy + * and job->sleep_timer, such as concurrent calls to job_do_yield and + * job_enter. */ +static QemuMutex job_mutex; + +static void job_lock(void) +{ + qemu_mutex_lock(&job_mutex); +} + +static void job_unlock(void) +{ + qemu_mutex_unlock(&job_mutex); +} + +static void __attribute__((__constructor__)) job_init(void) +{ + qemu_mutex_init(&job_mutex); +} + /* TODO Make static once the whole state machine is in job.c */ void job_state_transition(Job *job, JobStatus s1) { @@ -101,6 +121,16 @@ bool job_is_cancelled(Job *job) return job->cancelled; } +bool job_started(Job *job) +{ + return job->co; +} + +bool job_should_pause(Job *job) +{ + return job->pause_count > 0; +} + Job *job_next(Job *job) { if (!job) { @@ -143,6 +173,9 @@ void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx, job->id = g_strdup(job_id); job->refcnt = 1; job->aio_context = ctx; + job->busy = false; + job->paused = true; + job->pause_count = 1; job_state_transition(job, JOB_STATUS_CREATED); @@ -172,6 +205,110 @@ void job_unref(Job *job) } } +void job_enter_cond(Job *job, bool(*fn)(Job *job)) +{ + if (!job_started(job)) { + return; + } + if (job->deferred_to_main_loop) { + return; + } + + job_lock(); + if (job->busy) { + job_unlock(); + return; + } + + if (fn && !fn(job)) { + job_unlock(); + return; + } + + assert(!job->deferred_to_main_loop); + timer_del(&job->sleep_timer); + job->busy = true; + job_unlock(); + aio_co_wake(job->co); +} + +/* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds. + * Reentering the job coroutine with block_job_enter() before the timer has + * expired is allowed and cancels the timer. + * + * If @ns is (uint64_t) -1, no timer is scheduled and block_job_enter() must be + * called explicitly. */ +void coroutine_fn job_do_yield(Job *job, uint64_t ns) +{ + job_lock(); + if (ns != -1) { + timer_mod(&job->sleep_timer, ns); + } + job->busy = false; + job_unlock(); + qemu_coroutine_yield(); + + /* Set by job_enter_cond() before re-entering the coroutine. */ + assert(job->busy); +} + +void coroutine_fn job_pause_point(Job *job) +{ + assert(job && job_started(job)); + + if (!job_should_pause(job)) { + return; + } + if (job_is_cancelled(job)) { + return; + } + + if (job->driver->pause) { + job->driver->pause(job); + } + + if (job_should_pause(job) && !job_is_cancelled(job)) { + JobStatus status = job->status; + job_state_transition(job, status == JOB_STATUS_READY + ? JOB_STATUS_STANDBY + : JOB_STATUS_PAUSED); + job->paused = true; + job_do_yield(job, -1); + job->paused = false; + job_state_transition(job, status); + } + + if (job->driver->resume) { + job->driver->resume(job); + } +} + +/** + * All jobs must allow a pause point before entering their job proper. This + * ensures that jobs can be paused prior to being started, then resumed later. + */ +static void coroutine_fn job_co_entry(void *opaque) +{ + Job *job = opaque; + + assert(job && job->driver && job->driver->start); + job_pause_point(job); + job->driver->start(job); +} + + +void job_start(Job *job) +{ + assert(job && !job_started(job) && job->paused && + job->driver && job->driver->start); + job->co = qemu_coroutine_create(job_co_entry, job); + job->pause_count--; + job->busy = true; + job->paused = false; + job_state_transition(job, JOB_STATUS_RUNNING); + aio_co_enter(job->aio_context, job->co); +} + typedef struct { Job *job; JobDeferToMainLoopFn *fn; diff --git a/tests/test-bdrv-drain.c b/tests/test-bdrv-drain.c index 4f8cba8..c9f2f9b 100644 --- a/tests/test-bdrv-drain.c +++ b/tests/test-bdrv-drain.c @@ -524,8 +524,8 @@ BlockJobDriver test_job_driver = { .job_driver = { .instance_size = sizeof(TestBlockJob), .free = block_job_free, + .start = test_job_start, }, - .start = test_job_start, .complete = test_job_complete, }; @@ -549,47 +549,47 @@ static void test_blockjob_common(enum drain_type drain_type) job = block_job_create("job0", &test_job_driver, NULL, src, 0, BLK_PERM_ALL, 0, 0, NULL, NULL, &error_abort); block_job_add_bdrv(job, "target", target, 0, BLK_PERM_ALL, &error_abort); - block_job_start(job); + job_start(&job->job); - g_assert_cmpint(job->pause_count, ==, 0); - g_assert_false(job->paused); - g_assert_false(job->busy); /* We're in block_job_sleep_ns() */ + g_assert_cmpint(job->job.pause_count, ==, 0); + g_assert_false(job->job.paused); + g_assert_false(job->job.busy); /* We're in block_job_sleep_ns() */ do_drain_begin(drain_type, src); if (drain_type == BDRV_DRAIN_ALL) { /* bdrv_drain_all() drains both src and target */ - g_assert_cmpint(job->pause_count, ==, 2); + g_assert_cmpint(job->job.pause_count, ==, 2); } else { - g_assert_cmpint(job->pause_count, ==, 1); + g_assert_cmpint(job->job.pause_count, ==, 1); } /* XXX We don't wait until the job is actually paused. Is this okay? */ - /* g_assert_true(job->paused); */ - g_assert_false(job->busy); /* The job is paused */ + /* g_assert_true(job->job.paused); */ + g_assert_false(job->job.busy); /* The job is paused */ do_drain_end(drain_type, src); - g_assert_cmpint(job->pause_count, ==, 0); - g_assert_false(job->paused); - g_assert_false(job->busy); /* We're in block_job_sleep_ns() */ + g_assert_cmpint(job->job.pause_count, ==, 0); + g_assert_false(job->job.paused); + g_assert_false(job->job.busy); /* We're in block_job_sleep_ns() */ do_drain_begin(drain_type, target); if (drain_type == BDRV_DRAIN_ALL) { /* bdrv_drain_all() drains both src and target */ - g_assert_cmpint(job->pause_count, ==, 2); + g_assert_cmpint(job->job.pause_count, ==, 2); } else { - g_assert_cmpint(job->pause_count, ==, 1); + g_assert_cmpint(job->job.pause_count, ==, 1); } /* XXX We don't wait until the job is actually paused. Is this okay? */ - /* g_assert_true(job->paused); */ - g_assert_false(job->busy); /* The job is paused */ + /* g_assert_true(job->job.paused); */ + g_assert_false(job->job.busy); /* The job is paused */ do_drain_end(drain_type, target); - g_assert_cmpint(job->pause_count, ==, 0); - g_assert_false(job->paused); - g_assert_false(job->busy); /* We're in block_job_sleep_ns() */ + g_assert_cmpint(job->job.pause_count, ==, 0); + g_assert_false(job->job.paused); + g_assert_false(job->job.busy); /* We're in block_job_sleep_ns() */ ret = block_job_complete_sync(job, &error_abort); g_assert_cmpint(ret, ==, 0); diff --git a/tests/test-blockjob-txn.c b/tests/test-blockjob-txn.c index c03f966..323e154 100644 --- a/tests/test-blockjob-txn.c +++ b/tests/test-blockjob-txn.c @@ -78,8 +78,8 @@ static const BlockJobDriver test_block_job_driver = { .job_driver = { .instance_size = sizeof(TestBlockJob), .free = block_job_free, + .start = test_block_job_run, }, - .start = test_block_job_run, }; /* Create a block job that completes with a given return code after a given @@ -125,7 +125,7 @@ static void test_single_job(int expected) txn = block_job_txn_new(); job = test_block_job_start(1, true, expected, &result, txn); - block_job_start(job); + job_start(&job->job); if (expected == -ECANCELED) { block_job_cancel(job, false); @@ -165,8 +165,8 @@ static void test_pair_jobs(int expected1, int expected2) txn = block_job_txn_new(); job1 = test_block_job_start(1, true, expected1, &result1, txn); job2 = test_block_job_start(2, true, expected2, &result2, txn); - block_job_start(job1); - block_job_start(job2); + job_start(&job1->job); + job_start(&job2->job); /* Release our reference now to trigger as many nice * use-after-free bugs as possible. @@ -227,8 +227,8 @@ static void test_pair_jobs_fail_cancel_race(void) txn = block_job_txn_new(); job1 = test_block_job_start(1, true, -ECANCELED, &result1, txn); job2 = test_block_job_start(2, false, 0, &result2, txn); - block_job_start(job1); - block_job_start(job2); + job_start(&job1->job); + job_start(&job2->job); block_job_cancel(job1, false); diff --git a/tests/test-blockjob.c b/tests/test-blockjob.c index 5f43bd7..1d18325 100644 --- a/tests/test-blockjob.c +++ b/tests/test-blockjob.c @@ -199,8 +199,8 @@ static const BlockJobDriver test_cancel_driver = { .job_driver = { .instance_size = sizeof(CancelJob), .free = block_job_free, + .start = cancel_job_start, }, - .start = cancel_job_start, .complete = cancel_job_complete, }; @@ -254,7 +254,7 @@ static void test_cancel_running(void) s = create_common(&job); - block_job_start(job); + job_start(&job->job); assert(job->job.status == JOB_STATUS_RUNNING); cancel_common(s); @@ -267,7 +267,7 @@ static void test_cancel_paused(void) s = create_common(&job); - block_job_start(job); + job_start(&job->job); assert(job->job.status == JOB_STATUS_RUNNING); block_job_user_pause(job, &error_abort); @@ -284,7 +284,7 @@ static void test_cancel_ready(void) s = create_common(&job); - block_job_start(job); + job_start(&job->job); assert(job->job.status == JOB_STATUS_RUNNING); s->should_converge = true; @@ -301,7 +301,7 @@ static void test_cancel_standby(void) s = create_common(&job); - block_job_start(job); + job_start(&job->job); assert(job->job.status == JOB_STATUS_RUNNING); s->should_converge = true; @@ -322,7 +322,7 @@ static void test_cancel_pending(void) s = create_common(&job); - block_job_start(job); + job_start(&job->job); assert(job->job.status == JOB_STATUS_RUNNING); s->should_converge = true; @@ -346,7 +346,7 @@ static void test_cancel_concluded(void) s = create_common(&job); - block_job_start(job); + job_start(&job->job); assert(job->job.status == JOB_STATUS_RUNNING); s->should_converge = true; |