aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Hajnoczi <stefanha@redhat.com>2022-10-12 15:57:56 -0400
committerStefan Hajnoczi <stefanha@redhat.com>2022-10-12 15:57:56 -0400
commit7fa24b8d61be040ba58bed6fc1c16fd5fb7b6af0 (patch)
tree001179974c00482016dc5a9f93f43cc9a14d0a75
parentab44ea1059242ff2dbbde44e94468f6c6e5f87be (diff)
parenta7ca2eb488ff149c898f43abe103f8bd8e3ca3c4 (diff)
downloadqemu-7fa24b8d61be040ba58bed6fc1c16fd5fb7b6af0.zip
qemu-7fa24b8d61be040ba58bed6fc1c16fd5fb7b6af0.tar.gz
qemu-7fa24b8d61be040ba58bed6fc1c16fd5fb7b6af0.tar.bz2
Merge tag 'for-upstream' of git://repo.or.cz/qemu/kevin into staging
Block layer patches - job: replace AioContext lock with job_mutex - Fixes to make coroutine_fn annotations more accurate - QAPI schema: Fix incorrect example - Code cleanup # -----BEGIN PGP SIGNATURE----- # # iQJFBAABCAAvFiEE3D3rFZqa+V09dFb+fwmycsiPL9YFAmNAAz8RHGt3b2xmQHJl # ZGhhdC5jb20ACgkQfwmycsiPL9a6zg//QYLx+FYMStb50lS+6VBio8AKOVbwn5zp # ZANoXinMknnxI5wTldjkkM1cBRg27BVjpOHz4XemBtQgT5nBqWq8+Ov31lwASVID # na/L9o4Pa0xmywM777K+edceWk0fpJTLmnFf1Qxan9qB/VSjNFtk+fjwFopoatKg # XbHd6maQtrY8bIOyBsBoZozNaS39E/uPqkP67V6GF09re17f0PBctGHKFkTKZr8w # 2HfyMt8/UIhFet++NFgxppTcvIKfZ20pk4AQ+yYsL+FxWr/cs4leKWl5BSc7thtP # Sm/y0WiEB4nPNo4CSf9sA1Vo8EIGYzBhUVteqYQUF2vSXSzFmZb191fLJRYwp5bQ # QxEmHzPVGqcUHr+jkfXI0yLolWduiKV1ATZ0zW3N41VfzGLYZdSgI2ZhbHJ0/yKO # ZhyC63gye9V6TXxviYIz2V6iOD8QuwJ8X1P0E3yRsGploF1UY/N1lwbmek1XhFn/ # +xn/mrTeV0lu4wKuWRpUfY2C/7SR0Za6MB2GqduRWnbcAonLH3/syAxXSfu2611N # Z1Cf9Wu8Mm0IQz0LbbVvEJZ4yoEPkg/tGH8q6dpau2uTfCb6sSylRxLcXEa5R0UQ # W+wX5GSoTDe4DQKOSaJE7jWV/QwY5diTLHBIvSF8uKAfeCenkDDLowrMvbWafL0X # XTFzpZ/1aA8= # =jMFT # -----END PGP SIGNATURE----- # gpg: Signature made Fri 07 Oct 2022 06:45:19 EDT # gpg: using RSA key DC3DEB159A9AF95D3D7456FE7F09B272C88F2FD6 # gpg: issuer "kwolf@redhat.com" # gpg: Good signature from "Kevin Wolf <kwolf@redhat.com>" [full] # Primary key fingerprint: DC3D EB15 9A9A F95D 3D74 56FE 7F09 B272 C88F 2FD6 * tag 'for-upstream' of git://repo.or.cz/qemu/kevin: (50 commits) file-posix: Remove unused s->discard_zeroes job: remove unused functions blockjob: remove unused functions block_job_query: remove atomic read job.c: enable job lock/unlock and remove Aiocontext locks job.h: categorize JobDriver callbacks that need the AioContext lock blockjob: protect iostatus field in BlockJob struct blockjob: rename notifier callbacks as _locked blockjob.h: categorize fields in struct BlockJob jobs: protect job.aio_context with BQL and job_mutex job: detect change of aiocontext within job coroutine jobs: group together API calls under the same job lock block/mirror.c: use of job helpers in drivers jobs: use job locks also in the unit tests jobs: add job lock in find_* functions blockjob: introduce block_job _locked() APIs job: move and update comments from blockjob.c job.c: add job_lock/unlock while keeping job.h intact aio-wait.h: introduce AIO_WAIT_WHILE_UNLOCKED job.c: API functions not used outside should be static ... Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
-rw-r--r--block.c24
-rw-r--r--block/blkverify.c2
-rw-r--r--block/block-backend.c10
-rw-r--r--block/copy-before-write.c9
-rw-r--r--block/curl.c2
-rw-r--r--block/file-posix.c11
-rw-r--r--block/io.c22
-rw-r--r--block/iscsi.c3
-rw-r--r--block/mirror.c19
-rw-r--r--block/nbd.c11
-rw-r--r--block/nfs.c2
-rw-r--r--block/nvme.c54
-rw-r--r--block/parallels.c5
-rw-r--r--block/qcow2-cluster.c21
-rw-r--r--block/qcow2-refcount.c6
-rw-r--r--block/qcow2.c5
-rw-r--r--block/qcow2.h19
-rw-r--r--block/qed.c4
-rw-r--r--block/quorum.c38
-rw-r--r--block/raw-format.c3
-rw-r--r--block/replication.c3
-rw-r--r--block/throttle.c2
-rw-r--r--block/vmdk.c22
-rw-r--r--blockdev.c129
-rw-r--r--blockjob.c132
-rw-r--r--hw/9pfs/9p.h9
-rw-r--r--include/block/aio-wait.h17
-rw-r--r--include/block/blockjob.h59
-rw-r--r--include/block/nbd.h2
-rw-r--r--include/qemu/coroutine.h4
-rw-r--r--include/qemu/job.h306
-rw-r--r--job-qmp.c92
-rw-r--r--job.c674
-rw-r--r--migration/migration.c3
-rw-r--r--monitor/qmp-cmds.c7
-rw-r--r--qapi/block-core.json10
-rw-r--r--qemu-img.c17
-rw-r--r--tests/unit/test-bdrv-drain.c80
-rw-r--r--tests/unit/test-block-iothread.c8
-rw-r--r--tests/unit/test-blockjob-txn.c24
-rw-r--r--tests/unit/test-blockjob.c136
-rw-r--r--tests/unit/test-coroutine.c2
-rw-r--r--util/qemu-coroutine-lock.c14
-rw-r--r--util/qemu-coroutine.c2
44 files changed, 1237 insertions, 787 deletions
diff --git a/block.c b/block.c
index bc85f46..1fbf6b9 100644
--- a/block.c
+++ b/block.c
@@ -631,9 +631,10 @@ static int64_t create_file_fallback_truncate(BlockBackend *blk,
* Helper function for bdrv_create_file_fallback(): Zero the first
* sector to remove any potentially pre-existing image header.
*/
-static int create_file_fallback_zero_first_sector(BlockBackend *blk,
- int64_t current_size,
- Error **errp)
+static int coroutine_fn
+create_file_fallback_zero_first_sector(BlockBackend *blk,
+ int64_t current_size,
+ Error **errp)
{
int64_t bytes_to_clear;
int ret;
@@ -4980,8 +4981,8 @@ static void bdrv_close(BlockDriverState *bs)
void bdrv_close_all(void)
{
- assert(job_next(NULL) == NULL);
GLOBAL_STATE_CODE();
+ assert(job_next(NULL) == NULL);
/* Drop references from requests still in flight, such as canceled block
* jobs whose AIO context has not been polled yet */
@@ -6167,13 +6168,16 @@ XDbgBlockGraph *bdrv_get_xdbg_block_graph(Error **errp)
}
}
- for (job = block_job_next(NULL); job; job = block_job_next(job)) {
- GSList *el;
+ WITH_JOB_LOCK_GUARD() {
+ for (job = block_job_next_locked(NULL); job;
+ job = block_job_next_locked(job)) {
+ GSList *el;
- xdbg_graph_add_node(gr, job, X_DBG_BLOCK_GRAPH_NODE_TYPE_BLOCK_JOB,
- job->job.id);
- for (el = job->nodes; el; el = el->next) {
- xdbg_graph_add_edge(gr, job, (BdrvChild *)el->data);
+ xdbg_graph_add_node(gr, job, X_DBG_BLOCK_GRAPH_NODE_TYPE_BLOCK_JOB,
+ job->job.id);
+ for (el = job->nodes; el; el = el->next) {
+ xdbg_graph_add_edge(gr, job, (BdrvChild *)el->data);
+ }
}
}
diff --git a/block/blkverify.c b/block/blkverify.c
index e4a37af..020b1ae 100644
--- a/block/blkverify.c
+++ b/block/blkverify.c
@@ -258,7 +258,7 @@ blkverify_co_pwritev(BlockDriverState *bs, int64_t offset, int64_t bytes,
return blkverify_co_prwv(bs, &r, offset, bytes, qiov, qiov, flags, true);
}
-static int blkverify_co_flush(BlockDriverState *bs)
+static int coroutine_fn blkverify_co_flush(BlockDriverState *bs)
{
BDRVBlkverifyState *s = bs->opaque;
diff --git a/block/block-backend.c b/block/block-backend.c
index d4a5df2..aa4adf0 100644
--- a/block/block-backend.c
+++ b/block/block-backend.c
@@ -1546,7 +1546,7 @@ static BlockAIOCB *blk_aio_prwv(BlockBackend *blk, int64_t offset,
return &acb->common;
}
-static void blk_aio_read_entry(void *opaque)
+static void coroutine_fn blk_aio_read_entry(void *opaque)
{
BlkAioEmAIOCB *acb = opaque;
BlkRwCo *rwco = &acb->rwco;
@@ -1558,7 +1558,7 @@ static void blk_aio_read_entry(void *opaque)
blk_aio_complete(acb);
}
-static void blk_aio_write_entry(void *opaque)
+static void coroutine_fn blk_aio_write_entry(void *opaque)
{
BlkAioEmAIOCB *acb = opaque;
BlkRwCo *rwco = &acb->rwco;
@@ -1669,7 +1669,7 @@ int coroutine_fn blk_co_ioctl(BlockBackend *blk, unsigned long int req,
return ret;
}
-static void blk_aio_ioctl_entry(void *opaque)
+static void coroutine_fn blk_aio_ioctl_entry(void *opaque)
{
BlkAioEmAIOCB *acb = opaque;
BlkRwCo *rwco = &acb->rwco;
@@ -1703,7 +1703,7 @@ blk_co_do_pdiscard(BlockBackend *blk, int64_t offset, int64_t bytes)
return bdrv_co_pdiscard(blk->root, offset, bytes);
}
-static void blk_aio_pdiscard_entry(void *opaque)
+static void coroutine_fn blk_aio_pdiscard_entry(void *opaque)
{
BlkAioEmAIOCB *acb = opaque;
BlkRwCo *rwco = &acb->rwco;
@@ -1747,7 +1747,7 @@ static int coroutine_fn blk_co_do_flush(BlockBackend *blk)
return bdrv_co_flush(blk_bs(blk));
}
-static void blk_aio_flush_entry(void *opaque)
+static void coroutine_fn blk_aio_flush_entry(void *opaque)
{
BlkAioEmAIOCB *acb = opaque;
BlkRwCo *rwco = &acb->rwco;
diff --git a/block/copy-before-write.c b/block/copy-before-write.c
index c24b8dd..afbdd04 100644
--- a/block/copy-before-write.c
+++ b/block/copy-before-write.c
@@ -203,9 +203,9 @@ static int coroutine_fn cbw_co_flush(BlockDriverState *bs)
* It's guaranteed that guest writes will not interact in the region until
* cbw_snapshot_read_unlock() called.
*/
-static BlockReq *cbw_snapshot_read_lock(BlockDriverState *bs,
- int64_t offset, int64_t bytes,
- int64_t *pnum, BdrvChild **file)
+static coroutine_fn BlockReq *
+cbw_snapshot_read_lock(BlockDriverState *bs, int64_t offset, int64_t bytes,
+ int64_t *pnum, BdrvChild **file)
{
BDRVCopyBeforeWriteState *s = bs->opaque;
BlockReq *req = g_new(BlockReq, 1);
@@ -240,7 +240,8 @@ static BlockReq *cbw_snapshot_read_lock(BlockDriverState *bs,
return req;
}
-static void cbw_snapshot_read_unlock(BlockDriverState *bs, BlockReq *req)
+static coroutine_fn void
+cbw_snapshot_read_unlock(BlockDriverState *bs, BlockReq *req)
{
BDRVCopyBeforeWriteState *s = bs->opaque;
diff --git a/block/curl.c b/block/curl.c
index 1e0f609..cba4c4c 100644
--- a/block/curl.c
+++ b/block/curl.c
@@ -855,7 +855,7 @@ out_noclean:
return -EINVAL;
}
-static void curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb)
+static void coroutine_fn curl_setup_preadv(BlockDriverState *bs, CURLAIOCB *acb)
{
CURLState *state;
int running;
diff --git a/block/file-posix.c b/block/file-posix.c
index 66fdb07..23acffb 100644
--- a/block/file-posix.c
+++ b/block/file-posix.c
@@ -154,7 +154,6 @@ typedef struct BDRVRawState {
bool has_discard:1;
bool has_write_zeroes:1;
- bool discard_zeroes:1;
bool use_linux_aio:1;
bool use_linux_io_uring:1;
int page_cache_inconsistent; /* errno from fdatasync failure */
@@ -755,7 +754,6 @@ static int raw_open_common(BlockDriverState *bs, QDict *options,
ret = -EINVAL;
goto fail;
} else {
- s->discard_zeroes = true;
s->has_fallocate = true;
}
} else {
@@ -769,19 +767,12 @@ static int raw_open_common(BlockDriverState *bs, QDict *options,
}
if (S_ISBLK(st.st_mode)) {
-#ifdef BLKDISCARDZEROES
- unsigned int arg;
- if (ioctl(s->fd, BLKDISCARDZEROES, &arg) == 0 && arg) {
- s->discard_zeroes = true;
- }
-#endif
#ifdef __linux__
/* On Linux 3.10, BLKDISCARD leaves stale data in the page cache. Do
* not rely on the contents of discarded blocks unless using O_DIRECT.
* Same for BLKZEROOUT.
*/
if (!(bs->open_flags & BDRV_O_NOCACHE)) {
- s->discard_zeroes = false;
s->has_write_zeroes = false;
}
#endif
@@ -2180,7 +2171,7 @@ static void raw_aio_unplug(BlockDriverState *bs)
#endif
}
-static int raw_co_flush_to_disk(BlockDriverState *bs)
+static int coroutine_fn raw_co_flush_to_disk(BlockDriverState *bs)
{
BDRVRawState *s = bs->opaque;
RawPosixAIOData acb;
diff --git a/block/io.c b/block/io.c
index c3200bc..d300730 100644
--- a/block/io.c
+++ b/block/io.c
@@ -751,11 +751,11 @@ static void coroutine_fn tracked_request_end(BdrvTrackedRequest *req)
/**
* Add an active request to the tracked requests list
*/
-static void tracked_request_begin(BdrvTrackedRequest *req,
- BlockDriverState *bs,
- int64_t offset,
- int64_t bytes,
- enum BdrvTrackedRequestType type)
+static void coroutine_fn tracked_request_begin(BdrvTrackedRequest *req,
+ BlockDriverState *bs,
+ int64_t offset,
+ int64_t bytes,
+ enum BdrvTrackedRequestType type)
{
bdrv_check_request(offset, bytes, &error_abort);
@@ -794,7 +794,7 @@ static bool tracked_request_overlaps(BdrvTrackedRequest *req,
}
/* Called with self->bs->reqs_lock held */
-static BdrvTrackedRequest *
+static coroutine_fn BdrvTrackedRequest *
bdrv_find_conflicting_request(BdrvTrackedRequest *self)
{
BdrvTrackedRequest *req;
@@ -1635,10 +1635,10 @@ static bool bdrv_init_padding(BlockDriverState *bs,
return true;
}
-static int bdrv_padding_rmw_read(BdrvChild *child,
- BdrvTrackedRequest *req,
- BdrvRequestPadding *pad,
- bool zero_middle)
+static coroutine_fn int bdrv_padding_rmw_read(BdrvChild *child,
+ BdrvTrackedRequest *req,
+ BdrvRequestPadding *pad,
+ bool zero_middle)
{
QEMUIOVector local_qiov;
BlockDriverState *bs = child->bs;
@@ -3159,7 +3159,7 @@ out:
return ret;
}
-int bdrv_co_ioctl(BlockDriverState *bs, int req, void *buf)
+int coroutine_fn bdrv_co_ioctl(BlockDriverState *bs, int req, void *buf)
{
BlockDriver *drv = bs->drv;
CoroutineIOCompletion co = {
diff --git a/block/iscsi.c b/block/iscsi.c
index 612de12..a316d46 100644
--- a/block/iscsi.c
+++ b/block/iscsi.c
@@ -290,7 +290,8 @@ iscsi_co_generic_cb(struct iscsi_context *iscsi, int status,
}
}
-static void iscsi_co_init_iscsitask(IscsiLun *iscsilun, struct IscsiTask *iTask)
+static void coroutine_fn
+iscsi_co_init_iscsitask(IscsiLun *iscsilun, struct IscsiTask *iTask)
{
*iTask = (struct IscsiTask) {
.co = qemu_coroutine_self(),
diff --git a/block/mirror.c b/block/mirror.c
index 3c4ab11..80c0109 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -894,6 +894,7 @@ static int coroutine_fn mirror_run(Job *job, Error **errp)
BlockDriverState *bs = s->mirror_top_bs->backing->bs;
BlockDriverState *target_bs = blk_bs(s->target);
bool need_drain = true;
+ BlockDeviceIoStatus iostatus;
int64_t length;
int64_t target_length;
BlockDriverInfo bdi;
@@ -1016,8 +1017,11 @@ static int coroutine_fn mirror_run(Job *job, Error **errp)
* We do so every BLKOCK_JOB_SLICE_TIME nanoseconds, or when there is
* an error, or when the source is clean, whichever comes first. */
delta = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - s->last_pause_ns;
+ WITH_JOB_LOCK_GUARD() {
+ iostatus = s->common.iostatus;
+ }
if (delta < BLOCK_JOB_SLICE_TIME &&
- s->common.iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
+ iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
if (s->in_flight >= MAX_IN_FLIGHT || s->buf_free_count == 0 ||
(cnt == 0 && s->in_flight > 0)) {
trace_mirror_yield(s, cnt, s->buf_free_count, s->in_flight);
@@ -1152,8 +1156,10 @@ static void mirror_complete(Job *job, Error **errp)
s->should_complete = true;
/* If the job is paused, it will be re-entered when it is resumed */
- if (!job->paused) {
- job_enter(job);
+ WITH_JOB_LOCK_GUARD() {
+ if (!job->paused) {
+ job_enter_cond_locked(job, NULL);
+ }
}
}
@@ -1173,8 +1179,11 @@ static bool mirror_drained_poll(BlockJob *job)
* from one of our own drain sections, to avoid a deadlock waiting for
* ourselves.
*/
- if (!s->common.job.paused && !job_is_cancelled(&job->job) && !s->in_drain) {
- return true;
+ WITH_JOB_LOCK_GUARD() {
+ if (!s->common.job.paused && !job_is_cancelled_locked(&job->job)
+ && !s->in_drain) {
+ return true;
+ }
}
return !!s->in_flight;
diff --git a/block/nbd.c b/block/nbd.c
index 97683cc..494b9d6 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -983,11 +983,12 @@ static void nbd_iter_request_error(NBDReplyChunkIter *iter, int ret)
* nbd_reply_chunk_iter_receive
* The pointer stored in @payload requires g_free() to free it.
*/
-static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
- NBDReplyChunkIter *iter,
- uint64_t handle,
- QEMUIOVector *qiov, NBDReply *reply,
- void **payload)
+static bool coroutine_fn nbd_reply_chunk_iter_receive(BDRVNBDState *s,
+ NBDReplyChunkIter *iter,
+ uint64_t handle,
+ QEMUIOVector *qiov,
+ NBDReply *reply,
+ void **payload)
{
int ret, request_ret;
NBDReply local_reply;
diff --git a/block/nfs.c b/block/nfs.c
index 444c40b..596ebe9 100644
--- a/block/nfs.c
+++ b/block/nfs.c
@@ -223,7 +223,7 @@ static void nfs_process_write(void *arg)
qemu_mutex_unlock(&client->mutex);
}
-static void nfs_co_init_task(BlockDriverState *bs, NFSRPC *task)
+static void coroutine_fn nfs_co_init_task(BlockDriverState *bs, NFSRPC *task)
{
*task = (NFSRPC) {
.co = qemu_coroutine_self(),
diff --git a/block/nvme.c b/block/nvme.c
index 01fb28a..2b24f95 100644
--- a/block/nvme.c
+++ b/block/nvme.c
@@ -293,34 +293,42 @@ static void nvme_kick(NVMeQueuePair *q)
q->need_kick = 0;
}
-/* Find a free request element if any, otherwise:
- * a) if in coroutine context, try to wait for one to become available;
- * b) if not in coroutine, return NULL;
- */
-static NVMeRequest *nvme_get_free_req(NVMeQueuePair *q)
+static NVMeRequest *nvme_get_free_req_nofail_locked(NVMeQueuePair *q)
{
NVMeRequest *req;
- qemu_mutex_lock(&q->lock);
-
- while (q->free_req_head == -1) {
- if (qemu_in_coroutine()) {
- trace_nvme_free_req_queue_wait(q->s, q->index);
- qemu_co_queue_wait(&q->free_req_queue, &q->lock);
- } else {
- qemu_mutex_unlock(&q->lock);
- return NULL;
- }
- }
-
req = &q->reqs[q->free_req_head];
q->free_req_head = req->free_req_next;
req->free_req_next = -1;
-
- qemu_mutex_unlock(&q->lock);
return req;
}
+/* Return a free request element if any, otherwise return NULL. */
+static NVMeRequest *nvme_get_free_req_nowait(NVMeQueuePair *q)
+{
+ QEMU_LOCK_GUARD(&q->lock);
+ if (q->free_req_head == -1) {
+ return NULL;
+ }
+ return nvme_get_free_req_nofail_locked(q);
+}
+
+/*
+ * Wait for a free request to become available if necessary, then
+ * return it.
+ */
+static coroutine_fn NVMeRequest *nvme_get_free_req(NVMeQueuePair *q)
+{
+ QEMU_LOCK_GUARD(&q->lock);
+
+ while (q->free_req_head == -1) {
+ trace_nvme_free_req_queue_wait(q->s, q->index);
+ qemu_co_queue_wait(&q->free_req_queue, &q->lock);
+ }
+
+ return nvme_get_free_req_nofail_locked(q);
+}
+
/* With q->lock */
static void nvme_put_free_req_locked(NVMeQueuePair *q, NVMeRequest *req)
{
@@ -506,7 +514,7 @@ static int nvme_admin_cmd_sync(BlockDriverState *bs, NvmeCmd *cmd)
AioContext *aio_context = bdrv_get_aio_context(bs);
NVMeRequest *req;
int ret = -EINPROGRESS;
- req = nvme_get_free_req(q);
+ req = nvme_get_free_req_nowait(q);
if (!req) {
return -EBUSY;
}
@@ -1234,8 +1242,10 @@ static inline bool nvme_qiov_aligned(BlockDriverState *bs,
return true;
}
-static int nvme_co_prw(BlockDriverState *bs, uint64_t offset, uint64_t bytes,
- QEMUIOVector *qiov, bool is_write, int flags)
+static coroutine_fn int nvme_co_prw(BlockDriverState *bs,
+ uint64_t offset, uint64_t bytes,
+ QEMUIOVector *qiov, bool is_write,
+ int flags)
{
BDRVNVMeState *s = bs->opaque;
int r;
diff --git a/block/parallels.c b/block/parallels.c
index a229c06..c1523e7 100644
--- a/block/parallels.c
+++ b/block/parallels.c
@@ -165,8 +165,9 @@ static int64_t block_status(BDRVParallelsState *s, int64_t sector_num,
return start_off;
}
-static int64_t allocate_clusters(BlockDriverState *bs, int64_t sector_num,
- int nb_sectors, int *pnum)
+static coroutine_fn int64_t allocate_clusters(BlockDriverState *bs,
+ int64_t sector_num,
+ int nb_sectors, int *pnum)
{
int ret = 0;
BDRVParallelsState *s = bs->opaque;
diff --git a/block/qcow2-cluster.c b/block/qcow2-cluster.c
index fd32316..0f29395 100644
--- a/block/qcow2-cluster.c
+++ b/block/qcow2-cluster.c
@@ -884,7 +884,7 @@ int qcow2_alloc_compressed_cluster_offset(BlockDriverState *bs,
return 0;
}
-static int perform_cow(BlockDriverState *bs, QCowL2Meta *m)
+static int coroutine_fn perform_cow(BlockDriverState *bs, QCowL2Meta *m)
{
BDRVQcow2State *s = bs->opaque;
Qcow2COWRegion *start = &m->cow_start;
@@ -1024,7 +1024,8 @@ fail:
return ret;
}
-int qcow2_alloc_cluster_link_l2(BlockDriverState *bs, QCowL2Meta *m)
+int coroutine_fn qcow2_alloc_cluster_link_l2(BlockDriverState *bs,
+ QCowL2Meta *m)
{
BDRVQcow2State *s = bs->opaque;
int i, j = 0, l2_index, ret;
@@ -1397,8 +1398,9 @@ static int count_single_write_clusters(BlockDriverState *bs, int nb_clusters,
* information on cluster allocation may be invalid now. The caller
* must start over anyway, so consider *cur_bytes undefined.
*/
-static int handle_dependencies(BlockDriverState *bs, uint64_t guest_offset,
- uint64_t *cur_bytes, QCowL2Meta **m)
+static int coroutine_fn handle_dependencies(BlockDriverState *bs,
+ uint64_t guest_offset,
+ uint64_t *cur_bytes, QCowL2Meta **m)
{
BDRVQcow2State *s = bs->opaque;
QCowL2Meta *old_alloc;
@@ -1772,9 +1774,10 @@ out:
*
* Return 0 on success and -errno in error cases
*/
-int qcow2_alloc_host_offset(BlockDriverState *bs, uint64_t offset,
- unsigned int *bytes, uint64_t *host_offset,
- QCowL2Meta **m)
+int coroutine_fn qcow2_alloc_host_offset(BlockDriverState *bs, uint64_t offset,
+ unsigned int *bytes,
+ uint64_t *host_offset,
+ QCowL2Meta **m)
{
BDRVQcow2State *s = bs->opaque;
uint64_t start, remaining;
@@ -2105,8 +2108,8 @@ out:
return ret;
}
-int qcow2_subcluster_zeroize(BlockDriverState *bs, uint64_t offset,
- uint64_t bytes, int flags)
+int coroutine_fn qcow2_subcluster_zeroize(BlockDriverState *bs, uint64_t offset,
+ uint64_t bytes, int flags)
{
BDRVQcow2State *s = bs->opaque;
uint64_t end_offset = offset + bytes;
diff --git a/block/qcow2-refcount.c b/block/qcow2-refcount.c
index c4d9981..1fbb07c 100644
--- a/block/qcow2-refcount.c
+++ b/block/qcow2-refcount.c
@@ -1206,7 +1206,7 @@ void qcow2_free_any_cluster(BlockDriverState *bs, uint64_t l2_entry,
}
}
-int coroutine_fn qcow2_write_caches(BlockDriverState *bs)
+int qcow2_write_caches(BlockDriverState *bs)
{
BDRVQcow2State *s = bs->opaque;
int ret;
@@ -1226,7 +1226,7 @@ int coroutine_fn qcow2_write_caches(BlockDriverState *bs)
return 0;
}
-int coroutine_fn qcow2_flush_caches(BlockDriverState *bs)
+int qcow2_flush_caches(BlockDriverState *bs)
{
int ret = qcow2_write_caches(bs);
if (ret < 0) {
@@ -3706,7 +3706,7 @@ int64_t qcow2_get_last_cluster(BlockDriverState *bs, int64_t size)
return -EIO;
}
-int qcow2_detect_metadata_preallocation(BlockDriverState *bs)
+int coroutine_fn qcow2_detect_metadata_preallocation(BlockDriverState *bs)
{
BDRVQcow2State *s = bs->opaque;
int64_t i, end_cluster, cluster_count = 0, threshold;
diff --git a/block/qcow2.c b/block/qcow2.c
index 6c8c8b2..b57f7cc 100644
--- a/block/qcow2.c
+++ b/block/qcow2.c
@@ -2448,7 +2448,7 @@ static bool merge_cow(uint64_t offset, unsigned bytes,
* Return 1 if the COW regions read as zeroes, 0 if not, < 0 on error.
* Note that returning 0 does not guarantee non-zero data.
*/
-static int is_zero_cow(BlockDriverState *bs, QCowL2Meta *m)
+static int coroutine_fn is_zero_cow(BlockDriverState *bs, QCowL2Meta *m)
{
/*
* This check is designed for optimization shortcut so it must be
@@ -2466,7 +2466,8 @@ static int is_zero_cow(BlockDriverState *bs, QCowL2Meta *m)
m->cow_end.nb_bytes);
}
-static int handle_alloc_space(BlockDriverState *bs, QCowL2Meta *l2meta)
+static int coroutine_fn handle_alloc_space(BlockDriverState *bs,
+ QCowL2Meta *l2meta)
{
BDRVQcow2State *s = bs->opaque;
QCowL2Meta *m;
diff --git a/block/qcow2.h b/block/qcow2.h
index ba436a8..3e7c5e8 100644
--- a/block/qcow2.h
+++ b/block/qcow2.h
@@ -874,8 +874,8 @@ void qcow2_free_any_cluster(BlockDriverState *bs, uint64_t l2_entry,
int qcow2_update_snapshot_refcount(BlockDriverState *bs,
int64_t l1_table_offset, int l1_size, int addend);
-int coroutine_fn qcow2_flush_caches(BlockDriverState *bs);
-int coroutine_fn qcow2_write_caches(BlockDriverState *bs);
+int qcow2_flush_caches(BlockDriverState *bs);
+int qcow2_write_caches(BlockDriverState *bs);
int qcow2_check_refcounts(BlockDriverState *bs, BdrvCheckResult *res,
BdrvCheckMode fix);
@@ -895,7 +895,7 @@ int qcow2_change_refcount_order(BlockDriverState *bs, int refcount_order,
void *cb_opaque, Error **errp);
int qcow2_shrink_reftable(BlockDriverState *bs);
int64_t qcow2_get_last_cluster(BlockDriverState *bs, int64_t size);
-int qcow2_detect_metadata_preallocation(BlockDriverState *bs);
+int coroutine_fn qcow2_detect_metadata_preallocation(BlockDriverState *bs);
/* qcow2-cluster.c functions */
int qcow2_grow_l1_table(BlockDriverState *bs, uint64_t min_size,
@@ -908,9 +908,9 @@ int qcow2_encrypt_sectors(BDRVQcow2State *s, int64_t sector_num,
int qcow2_get_host_offset(BlockDriverState *bs, uint64_t offset,
unsigned int *bytes, uint64_t *host_offset,
QCow2SubclusterType *subcluster_type);
-int qcow2_alloc_host_offset(BlockDriverState *bs, uint64_t offset,
- unsigned int *bytes, uint64_t *host_offset,
- QCowL2Meta **m);
+int coroutine_fn qcow2_alloc_host_offset(BlockDriverState *bs, uint64_t offset,
+ unsigned int *bytes,
+ uint64_t *host_offset, QCowL2Meta **m);
int qcow2_alloc_compressed_cluster_offset(BlockDriverState *bs,
uint64_t offset,
int compressed_size,
@@ -918,13 +918,14 @@ int qcow2_alloc_compressed_cluster_offset(BlockDriverState *bs,
void qcow2_parse_compressed_l2_entry(BlockDriverState *bs, uint64_t l2_entry,
uint64_t *coffset, int *csize);
-int qcow2_alloc_cluster_link_l2(BlockDriverState *bs, QCowL2Meta *m);
+int coroutine_fn qcow2_alloc_cluster_link_l2(BlockDriverState *bs,
+ QCowL2Meta *m);
void qcow2_alloc_cluster_abort(BlockDriverState *bs, QCowL2Meta *m);
int qcow2_cluster_discard(BlockDriverState *bs, uint64_t offset,
uint64_t bytes, enum qcow2_discard_type type,
bool full_discard);
-int qcow2_subcluster_zeroize(BlockDriverState *bs, uint64_t offset,
- uint64_t bytes, int flags);
+int coroutine_fn qcow2_subcluster_zeroize(BlockDriverState *bs, uint64_t offset,
+ uint64_t bytes, int flags);
int qcow2_expand_zero_clusters(BlockDriverState *bs,
BlockDriverAmendStatusCB *status_cb,
diff --git a/block/qed.c b/block/qed.c
index 324ca0e..bda00e6 100644
--- a/block/qed.c
+++ b/block/qed.c
@@ -254,7 +254,7 @@ static CachedL2Table *qed_new_l2_table(BDRVQEDState *s)
return l2_table;
}
-static bool qed_plug_allocating_write_reqs(BDRVQEDState *s)
+static bool coroutine_fn qed_plug_allocating_write_reqs(BDRVQEDState *s)
{
qemu_co_mutex_lock(&s->table_lock);
@@ -273,7 +273,7 @@ static bool qed_plug_allocating_write_reqs(BDRVQEDState *s)
return true;
}
-static void qed_unplug_allocating_write_reqs(BDRVQEDState *s)
+static void coroutine_fn qed_unplug_allocating_write_reqs(BDRVQEDState *s)
{
qemu_co_mutex_lock(&s->table_lock);
assert(s->allocating_write_reqs_plugged);
diff --git a/block/quorum.c b/block/quorum.c
index f33f30d..f9e6539 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -161,11 +161,10 @@ static bool quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b)
return a->l == b->l;
}
-static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs,
- QEMUIOVector *qiov,
- uint64_t offset,
- uint64_t bytes,
- int flags)
+static QuorumAIOCB *coroutine_fn quorum_aio_get(BlockDriverState *bs,
+ QEMUIOVector *qiov,
+ uint64_t offset, uint64_t bytes,
+ int flags)
{
BDRVQuorumState *s = bs->opaque;
QuorumAIOCB *acb = g_new(QuorumAIOCB, 1);
@@ -233,8 +232,6 @@ static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb)
return false;
}
-static int read_fifo_child(QuorumAIOCB *acb);
-
static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
{
int i;
@@ -273,7 +270,7 @@ static void quorum_report_bad_versions(BDRVQuorumState *s,
}
}
-static void quorum_rewrite_entry(void *opaque)
+static void coroutine_fn quorum_rewrite_entry(void *opaque)
{
QuorumCo *co = opaque;
QuorumAIOCB *acb = co->acb;
@@ -574,7 +571,7 @@ free_exit:
quorum_free_vote_list(&acb->votes);
}
-static void read_quorum_children_entry(void *opaque)
+static void coroutine_fn read_quorum_children_entry(void *opaque)
{
QuorumCo *co = opaque;
QuorumAIOCB *acb = co->acb;
@@ -602,7 +599,7 @@ static void read_quorum_children_entry(void *opaque)
}
}
-static int read_quorum_children(QuorumAIOCB *acb)
+static int coroutine_fn read_quorum_children(QuorumAIOCB *acb)
{
BDRVQuorumState *s = acb->bs->opaque;
int i;
@@ -643,7 +640,7 @@ static int read_quorum_children(QuorumAIOCB *acb)
return acb->vote_ret;
}
-static int read_fifo_child(QuorumAIOCB *acb)
+static int coroutine_fn read_fifo_child(QuorumAIOCB *acb)
{
BDRVQuorumState *s = acb->bs->opaque;
int n, ret;
@@ -664,8 +661,10 @@ static int read_fifo_child(QuorumAIOCB *acb)
return ret;
}
-static int quorum_co_preadv(BlockDriverState *bs, int64_t offset, int64_t bytes,
- QEMUIOVector *qiov, BdrvRequestFlags flags)
+static int coroutine_fn quorum_co_preadv(BlockDriverState *bs,
+ int64_t offset, int64_t bytes,
+ QEMUIOVector *qiov,
+ BdrvRequestFlags flags)
{
BDRVQuorumState *s = bs->opaque;
QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags);
@@ -684,7 +683,7 @@ static int quorum_co_preadv(BlockDriverState *bs, int64_t offset, int64_t bytes,
return ret;
}
-static void write_quorum_entry(void *opaque)
+static void coroutine_fn write_quorum_entry(void *opaque)
{
QuorumCo *co = opaque;
QuorumAIOCB *acb = co->acb;
@@ -715,9 +714,9 @@ static void write_quorum_entry(void *opaque)
}
}
-static int quorum_co_pwritev(BlockDriverState *bs, int64_t offset,
- int64_t bytes, QEMUIOVector *qiov,
- BdrvRequestFlags flags)
+static int coroutine_fn quorum_co_pwritev(BlockDriverState *bs, int64_t offset,
+ int64_t bytes, QEMUIOVector *qiov,
+ BdrvRequestFlags flags)
{
BDRVQuorumState *s = bs->opaque;
QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags);
@@ -746,8 +745,9 @@ static int quorum_co_pwritev(BlockDriverState *bs, int64_t offset,
return ret;
}
-static int quorum_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
- int64_t bytes, BdrvRequestFlags flags)
+static int coroutine_fn quorum_co_pwrite_zeroes(BlockDriverState *bs,
+ int64_t offset, int64_t bytes,
+ BdrvRequestFlags flags)
{
return quorum_co_pwritev(bs, offset, bytes, NULL,
diff --git a/block/raw-format.c b/block/raw-format.c
index c7278e3..f337ac7 100644
--- a/block/raw-format.c
+++ b/block/raw-format.c
@@ -411,7 +411,8 @@ static void raw_lock_medium(BlockDriverState *bs, bool locked)
bdrv_lock_medium(bs->file->bs, locked);
}
-static int raw_co_ioctl(BlockDriverState *bs, unsigned long int req, void *buf)
+static int coroutine_fn raw_co_ioctl(BlockDriverState *bs,
+ unsigned long int req, void *buf)
{
BDRVRawState *s = bs->opaque;
if (s->offset || s->has_size) {
diff --git a/block/replication.c b/block/replication.c
index 55c8f89..c67f931 100644
--- a/block/replication.c
+++ b/block/replication.c
@@ -142,6 +142,7 @@ static void replication_close(BlockDriverState *bs)
{
BDRVReplicationState *s = bs->opaque;
Job *commit_job;
+ GLOBAL_STATE_CODE();
if (s->stage == BLOCK_REPLICATION_RUNNING) {
replication_stop(s->rs, false, NULL);
@@ -726,7 +727,9 @@ static void replication_stop(ReplicationState *rs, bool failover, Error **errp)
* disk, secondary disk in backup_job_completed().
*/
if (s->backup_job) {
+ aio_context_release(aio_context);
job_cancel_sync(&s->backup_job->job, true);
+ aio_context_acquire(aio_context);
}
if (!failover) {
diff --git a/block/throttle.c b/block/throttle.c
index 6e8d52f..ddd4505 100644
--- a/block/throttle.c
+++ b/block/throttle.c
@@ -162,7 +162,7 @@ static int coroutine_fn throttle_co_pwritev_compressed(BlockDriverState *bs,
BDRV_REQ_WRITE_COMPRESSED);
}
-static int throttle_co_flush(BlockDriverState *bs)
+static int coroutine_fn throttle_co_flush(BlockDriverState *bs)
{
return bdrv_co_flush(bs->file->bs);
}
diff --git a/block/vmdk.c b/block/vmdk.c
index fe07a54..f7d8856 100644
--- a/block/vmdk.c
+++ b/block/vmdk.c
@@ -1787,10 +1787,11 @@ static int coroutine_fn vmdk_co_block_status(BlockDriverState *bs,
return ret;
}
-static int vmdk_write_extent(VmdkExtent *extent, int64_t cluster_offset,
- int64_t offset_in_cluster, QEMUIOVector *qiov,
- uint64_t qiov_offset, uint64_t n_bytes,
- uint64_t offset)
+static int coroutine_fn
+vmdk_write_extent(VmdkExtent *extent, int64_t cluster_offset,
+ int64_t offset_in_cluster, QEMUIOVector *qiov,
+ uint64_t qiov_offset, uint64_t n_bytes,
+ uint64_t offset)
{
int ret;
VmdkGrainMarker *data = NULL;
@@ -1868,9 +1869,10 @@ static int vmdk_write_extent(VmdkExtent *extent, int64_t cluster_offset,
return ret;
}
-static int vmdk_read_extent(VmdkExtent *extent, int64_t cluster_offset,
- int64_t offset_in_cluster, QEMUIOVector *qiov,
- int bytes)
+static int coroutine_fn
+vmdk_read_extent(VmdkExtent *extent, int64_t cluster_offset,
+ int64_t offset_in_cluster, QEMUIOVector *qiov,
+ int bytes)
{
int ret;
int cluster_bytes, buf_bytes;
@@ -2015,9 +2017,9 @@ fail:
*
* Returns: error code with 0 for success.
*/
-static int vmdk_pwritev(BlockDriverState *bs, uint64_t offset,
- uint64_t bytes, QEMUIOVector *qiov,
- bool zeroed, bool zero_dry_run)
+static int coroutine_fn vmdk_pwritev(BlockDriverState *bs, uint64_t offset,
+ uint64_t bytes, QEMUIOVector *qiov,
+ bool zeroed, bool zero_dry_run)
{
BDRVVmdkState *s = bs->opaque;
VmdkExtent *extent = NULL;
diff --git a/blockdev.c b/blockdev.c
index 392d947..a32bafc 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -150,14 +150,12 @@ void blockdev_mark_auto_del(BlockBackend *blk)
return;
}
- for (job = block_job_next(NULL); job; job = block_job_next(job)) {
- if (block_job_has_bdrv(job, blk_bs(blk))) {
- AioContext *aio_context = job->job.aio_context;
- aio_context_acquire(aio_context);
-
- job_cancel(&job->job, false);
+ JOB_LOCK_GUARD();
- aio_context_release(aio_context);
+ for (job = block_job_next_locked(NULL); job;
+ job = block_job_next_locked(job)) {
+ if (block_job_has_bdrv(job, blk_bs(blk))) {
+ job_cancel_locked(&job->job, false);
}
}
@@ -1844,14 +1842,7 @@ static void drive_backup_abort(BlkActionState *common)
DriveBackupState *state = DO_UPCAST(DriveBackupState, common, common);
if (state->job) {
- AioContext *aio_context;
-
- aio_context = bdrv_get_aio_context(state->bs);
- aio_context_acquire(aio_context);
-
job_cancel_sync(&state->job->job, true);
-
- aio_context_release(aio_context);
}
}
@@ -1945,14 +1936,7 @@ static void blockdev_backup_abort(BlkActionState *common)
BlockdevBackupState *state = DO_UPCAST(BlockdevBackupState, common, common);
if (state->job) {
- AioContext *aio_context;
-
- aio_context = bdrv_get_aio_context(state->bs);
- aio_context_acquire(aio_context);
-
job_cancel_sync(&state->job->job, true);
-
- aio_context_release(aio_context);
}
}
@@ -3313,17 +3297,16 @@ out:
aio_context_release(aio_context);
}
-/* Get a block job using its ID and acquire its AioContext */
-static BlockJob *find_block_job(const char *id, AioContext **aio_context,
- Error **errp)
+/*
+ * Get a block job using its ID. Called with job_mutex held.
+ */
+static BlockJob *find_block_job_locked(const char *id, Error **errp)
{
BlockJob *job;
assert(id != NULL);
- *aio_context = NULL;
-
- job = block_job_get(id);
+ job = block_job_get_locked(id);
if (!job) {
error_set(errp, ERROR_CLASS_DEVICE_NOT_ACTIVE,
@@ -3331,30 +3314,30 @@ static BlockJob *find_block_job(const char *id, AioContext **aio_context,
return NULL;
}
- *aio_context = block_job_get_aio_context(job);
- aio_context_acquire(*aio_context);
-
return job;
}
void qmp_block_job_set_speed(const char *device, int64_t speed, Error **errp)
{
- AioContext *aio_context;
- BlockJob *job = find_block_job(device, &aio_context, errp);
+ BlockJob *job;
+
+ JOB_LOCK_GUARD();
+ job = find_block_job_locked(device, errp);
if (!job) {
return;
}
- block_job_set_speed(job, speed, errp);
- aio_context_release(aio_context);
+ block_job_set_speed_locked(job, speed, errp);
}
void qmp_block_job_cancel(const char *device,
bool has_force, bool force, Error **errp)
{
- AioContext *aio_context;
- BlockJob *job = find_block_job(device, &aio_context, errp);
+ BlockJob *job;
+
+ JOB_LOCK_GUARD();
+ job = find_block_job_locked(device, errp);
if (!job) {
return;
@@ -3364,97 +3347,94 @@ void qmp_block_job_cancel(const char *device,
force = false;
}
- if (job_user_paused(&job->job) && !force) {
+ if (job_user_paused_locked(&job->job) && !force) {
error_setg(errp, "The block job for device '%s' is currently paused",
device);
- goto out;
+ return;
}
trace_qmp_block_job_cancel(job);
- job_user_cancel(&job->job, force, errp);
-out:
- aio_context_release(aio_context);
+ job_user_cancel_locked(&job->job, force, errp);
}
void qmp_block_job_pause(const char *device, Error **errp)
{
- AioContext *aio_context;
- BlockJob *job = find_block_job(device, &aio_context, errp);
+ BlockJob *job;
+
+ JOB_LOCK_GUARD();
+ job = find_block_job_locked(device, errp);
if (!job) {
return;
}
trace_qmp_block_job_pause(job);
- job_user_pause(&job->job, errp);
- aio_context_release(aio_context);
+ job_user_pause_locked(&job->job, errp);
}
void qmp_block_job_resume(const char *device, Error **errp)
{
- AioContext *aio_context;
- BlockJob *job = find_block_job(device, &aio_context, errp);
+ BlockJob *job;
+
+ JOB_LOCK_GUARD();
+ job = find_block_job_locked(device, errp);
if (!job) {
return;
}
trace_qmp_block_job_resume(job);
- job_user_resume(&job->job, errp);
- aio_context_release(aio_context);
+ job_user_resume_locked(&job->job, errp);
}
void qmp_block_job_complete(const char *device, Error **errp)
{
- AioContext *aio_context;
- BlockJob *job = find_block_job(device, &aio_context, errp);
+ BlockJob *job;
+
+ JOB_LOCK_GUARD();
+ job = find_block_job_locked(device, errp);
if (!job) {
return;
}
trace_qmp_block_job_complete(job);
- job_complete(&job->job, errp);
- aio_context_release(aio_context);
+ job_complete_locked(&job->job, errp);
}
void qmp_block_job_finalize(const char *id, Error **errp)
{
- AioContext *aio_context;
- BlockJob *job = find_block_job(id, &aio_context, errp);
+ BlockJob *job;
+
+ JOB_LOCK_GUARD();
+ job = find_block_job_locked(id, errp);
if (!job) {
return;
}
trace_qmp_block_job_finalize(job);
- job_ref(&job->job);
- job_finalize(&job->job, errp);
+ job_ref_locked(&job->job);
+ job_finalize_locked(&job->job, errp);
- /*
- * Job's context might have changed via job_finalize (and job_txn_apply
- * automatically acquires the new one), so make sure we release the correct
- * one.
- */
- aio_context = block_job_get_aio_context(job);
- job_unref(&job->job);
- aio_context_release(aio_context);
+ job_unref_locked(&job->job);
}
void qmp_block_job_dismiss(const char *id, Error **errp)
{
- AioContext *aio_context;
- BlockJob *bjob = find_block_job(id, &aio_context, errp);
+ BlockJob *bjob;
Job *job;
+ JOB_LOCK_GUARD();
+ bjob = find_block_job_locked(id, errp);
+
if (!bjob) {
return;
}
trace_qmp_block_job_dismiss(bjob);
job = &bjob->job;
- job_dismiss(&job, errp);
- aio_context_release(aio_context);
+ job_dismiss_locked(&job, errp);
}
void qmp_change_backing_file(const char *device,
@@ -3731,17 +3711,16 @@ BlockJobInfoList *qmp_query_block_jobs(Error **errp)
BlockJobInfoList *head = NULL, **tail = &head;
BlockJob *job;
- for (job = block_job_next(NULL); job; job = block_job_next(job)) {
+ JOB_LOCK_GUARD();
+
+ for (job = block_job_next_locked(NULL); job;
+ job = block_job_next_locked(job)) {
BlockJobInfo *value;
- AioContext *aio_context;
if (block_job_is_internal(job)) {
continue;
}
- aio_context = block_job_get_aio_context(job);
- aio_context_acquire(aio_context);
- value = block_job_query(job, errp);
- aio_context_release(aio_context);
+ value = block_job_query_locked(job, errp);
if (!value) {
qapi_free_BlockJobInfoList(head);
return NULL;
diff --git a/blockjob.c b/blockjob.c
index 4868453..bdf20a0 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -36,21 +36,6 @@
#include "qemu/main-loop.h"
#include "qemu/timer.h"
-/*
- * The block job API is composed of two categories of functions.
- *
- * The first includes functions used by the monitor. The monitor is
- * peculiar in that it accesses the block job list with block_job_get, and
- * therefore needs consistency across block_job_get and the actual operation
- * (e.g. block_job_set_speed). The consistency is achieved with
- * aio_context_acquire/release. These functions are declared in blockjob.h.
- *
- * The second includes functions used by the block job drivers and sometimes
- * by the core block layer. These do not care about locking, because the
- * whole coroutine runs under the AioContext lock, and are declared in
- * blockjob_int.h.
- */
-
static bool is_block_job(Job *job)
{
return job_type(job) == JOB_TYPE_BACKUP ||
@@ -59,21 +44,21 @@ static bool is_block_job(Job *job)
job_type(job) == JOB_TYPE_STREAM;
}
-BlockJob *block_job_next(BlockJob *bjob)
+BlockJob *block_job_next_locked(BlockJob *bjob)
{
Job *job = bjob ? &bjob->job : NULL;
GLOBAL_STATE_CODE();
do {
- job = job_next(job);
+ job = job_next_locked(job);
} while (job && !is_block_job(job));
return job ? container_of(job, BlockJob, job) : NULL;
}
-BlockJob *block_job_get(const char *id)
+BlockJob *block_job_get_locked(const char *id)
{
- Job *job = job_get(id);
+ Job *job = job_get_locked(id);
GLOBAL_STATE_CODE();
if (job && is_block_job(job)) {
@@ -83,6 +68,12 @@ BlockJob *block_job_get(const char *id)
}
}
+BlockJob *block_job_get(const char *id)
+{
+ JOB_LOCK_GUARD();
+ return block_job_get_locked(id);
+}
+
void block_job_free(Job *job)
{
BlockJob *bjob = container_of(job, BlockJob, job);
@@ -114,8 +105,10 @@ static bool child_job_drained_poll(BdrvChild *c)
/* An inactive or completed job doesn't have any pending requests. Jobs
* with !job->busy are either already paused or have a pause point after
* being reentered, so no job driver code will run before they pause. */
- if (!job->busy || job_is_completed(job)) {
- return false;
+ WITH_JOB_LOCK_GUARD() {
+ if (!job->busy || job_is_completed_locked(job)) {
+ return false;
+ }
}
/* Otherwise, assume that it isn't fully stopped yet, but allow the job to
@@ -163,12 +156,13 @@ static void child_job_set_aio_ctx(BdrvChild *c, AioContext *ctx,
bdrv_set_aio_context_ignore(sibling->bs, ctx, ignore);
}
- job->job.aio_context = ctx;
+ job_set_aio_context(&job->job, ctx);
}
static AioContext *child_job_get_parent_aio_context(BdrvChild *c)
{
BlockJob *job = c->opaque;
+ GLOBAL_STATE_CODE();
return job->job.aio_context;
}
@@ -250,7 +244,8 @@ int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
return 0;
}
-static void block_job_on_idle(Notifier *n, void *opaque)
+/* Called with job_mutex lock held. */
+static void block_job_on_idle_locked(Notifier *n, void *opaque)
{
aio_wait_kick();
}
@@ -271,14 +266,14 @@ static bool job_timer_pending(Job *job)
return timer_pending(&job->sleep_timer);
}
-bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
+bool block_job_set_speed_locked(BlockJob *job, int64_t speed, Error **errp)
{
const BlockJobDriver *drv = block_job_driver(job);
int64_t old_speed = job->speed;
GLOBAL_STATE_CODE();
- if (job_apply_verb(&job->job, JOB_VERB_SET_SPEED, errp) < 0) {
+ if (job_apply_verb_locked(&job->job, JOB_VERB_SET_SPEED, errp) < 0) {
return false;
}
if (speed < 0) {
@@ -292,7 +287,9 @@ bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
job->speed = speed;
if (drv->set_speed) {
+ job_unlock();
drv->set_speed(job, speed);
+ job_lock();
}
if (speed && speed <= old_speed) {
@@ -300,18 +297,24 @@ bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
}
/* kick only if a timer is pending */
- job_enter_cond(&job->job, job_timer_pending);
+ job_enter_cond_locked(&job->job, job_timer_pending);
return true;
}
+static bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
+{
+ JOB_LOCK_GUARD();
+ return block_job_set_speed_locked(job, speed, errp);
+}
+
int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n)
{
IO_CODE();
return ratelimit_calculate_delay(&job->limit, n);
}
-BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
+BlockJobInfo *block_job_query_locked(BlockJob *job, Error **errp)
{
BlockJobInfo *info;
uint64_t progress_current, progress_total;
@@ -329,13 +332,13 @@ BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
info = g_new0(BlockJobInfo, 1);
info->type = g_strdup(job_type_str(&job->job));
info->device = g_strdup(job->job.id);
- info->busy = qatomic_read(&job->job.busy);
+ info->busy = job->job.busy;
info->paused = job->job.pause_count > 0;
info->offset = progress_current;
info->len = progress_total;
info->speed = job->speed;
info->io_status = job->iostatus;
- info->ready = job_is_ready(&job->job),
+ info->ready = job_is_ready_locked(&job->job),
info->status = job->job.status;
info->auto_finalize = job->job.auto_finalize;
info->auto_dismiss = job->job.auto_dismiss;
@@ -348,7 +351,8 @@ BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
return info;
}
-static void block_job_iostatus_set_err(BlockJob *job, int error)
+/* Called with job lock held */
+static void block_job_iostatus_set_err_locked(BlockJob *job, int error)
{
if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
job->iostatus = error == ENOSPC ? BLOCK_DEVICE_IO_STATUS_NOSPACE :
@@ -356,7 +360,8 @@ static void block_job_iostatus_set_err(BlockJob *job, int error)
}
}
-static void block_job_event_cancelled(Notifier *n, void *opaque)
+/* Called with job_mutex lock held. */
+static void block_job_event_cancelled_locked(Notifier *n, void *opaque)
{
BlockJob *job = opaque;
uint64_t progress_current, progress_total;
@@ -375,7 +380,8 @@ static void block_job_event_cancelled(Notifier *n, void *opaque)
job->speed);
}
-static void block_job_event_completed(Notifier *n, void *opaque)
+/* Called with job_mutex lock held. */
+static void block_job_event_completed_locked(Notifier *n, void *opaque)
{
BlockJob *job = opaque;
const char *msg = NULL;
@@ -401,7 +407,8 @@ static void block_job_event_completed(Notifier *n, void *opaque)
msg);
}
-static void block_job_event_pending(Notifier *n, void *opaque)
+/* Called with job_mutex lock held. */
+static void block_job_event_pending_locked(Notifier *n, void *opaque)
{
BlockJob *job = opaque;
@@ -413,7 +420,8 @@ static void block_job_event_pending(Notifier *n, void *opaque)
job->job.id);
}
-static void block_job_event_ready(Notifier *n, void *opaque)
+/* Called with job_mutex lock held. */
+static void block_job_event_ready_locked(Notifier *n, void *opaque)
{
BlockJob *job = opaque;
uint64_t progress_current, progress_total;
@@ -433,11 +441,6 @@ static void block_job_event_ready(Notifier *n, void *opaque)
}
-/*
- * API for block job drivers and the block layer. These functions are
- * declared in blockjob_int.h.
- */
-
void *block_job_create(const char *job_id, const BlockJobDriver *driver,
JobTxn *txn, BlockDriverState *bs, uint64_t perm,
uint64_t shared_perm, int64_t speed, int flags,
@@ -463,19 +466,21 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
ratelimit_init(&job->limit);
- job->finalize_cancelled_notifier.notify = block_job_event_cancelled;
- job->finalize_completed_notifier.notify = block_job_event_completed;
- job->pending_notifier.notify = block_job_event_pending;
- job->ready_notifier.notify = block_job_event_ready;
- job->idle_notifier.notify = block_job_on_idle;
-
- notifier_list_add(&job->job.on_finalize_cancelled,
- &job->finalize_cancelled_notifier);
- notifier_list_add(&job->job.on_finalize_completed,
- &job->finalize_completed_notifier);
- notifier_list_add(&job->job.on_pending, &job->pending_notifier);
- notifier_list_add(&job->job.on_ready, &job->ready_notifier);
- notifier_list_add(&job->job.on_idle, &job->idle_notifier);
+ job->finalize_cancelled_notifier.notify = block_job_event_cancelled_locked;
+ job->finalize_completed_notifier.notify = block_job_event_completed_locked;
+ job->pending_notifier.notify = block_job_event_pending_locked;
+ job->ready_notifier.notify = block_job_event_ready_locked;
+ job->idle_notifier.notify = block_job_on_idle_locked;
+
+ WITH_JOB_LOCK_GUARD() {
+ notifier_list_add(&job->job.on_finalize_cancelled,
+ &job->finalize_cancelled_notifier);
+ notifier_list_add(&job->job.on_finalize_completed,
+ &job->finalize_completed_notifier);
+ notifier_list_add(&job->job.on_pending, &job->pending_notifier);
+ notifier_list_add(&job->job.on_ready, &job->ready_notifier);
+ notifier_list_add(&job->job.on_idle, &job->idle_notifier);
+ }
error_setg(&job->blocker, "block device is in use by block job: %s",
job_type_str(&job->job));
@@ -498,7 +503,7 @@ fail:
return NULL;
}
-void block_job_iostatus_reset(BlockJob *job)
+void block_job_iostatus_reset_locked(BlockJob *job)
{
GLOBAL_STATE_CODE();
if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
@@ -508,6 +513,12 @@ void block_job_iostatus_reset(BlockJob *job)
job->iostatus = BLOCK_DEVICE_IO_STATUS_OK;
}
+static void block_job_iostatus_reset(BlockJob *job)
+{
+ JOB_LOCK_GUARD();
+ block_job_iostatus_reset_locked(job);
+}
+
void block_job_user_resume(Job *job)
{
BlockJob *bjob = container_of(job, BlockJob, job);
@@ -546,12 +557,17 @@ BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
action);
}
if (action == BLOCK_ERROR_ACTION_STOP) {
- if (!job->job.user_paused) {
- job_pause(&job->job);
- /* make the pause user visible, which will be resumed from QMP. */
- job->job.user_paused = true;
+ WITH_JOB_LOCK_GUARD() {
+ if (!job->job.user_paused) {
+ job_pause_locked(&job->job);
+ /*
+ * make the pause user visible, which will be
+ * resumed from QMP.
+ */
+ job->job.user_paused = true;
+ }
+ block_job_iostatus_set_err_locked(job, error);
}
- block_job_iostatus_set_err(job, error);
}
return action;
}
diff --git a/hw/9pfs/9p.h b/hw/9pfs/9p.h
index 994f952..a523ac3 100644
--- a/hw/9pfs/9p.h
+++ b/hw/9pfs/9p.h
@@ -424,21 +424,24 @@ typedef struct V9fsGetlock
extern int open_fd_hw;
extern int total_open_fd;
-static inline void v9fs_path_write_lock(V9fsState *s)
+static inline void coroutine_fn
+v9fs_path_write_lock(V9fsState *s)
{
if (s->ctx.export_flags & V9FS_PATHNAME_FSCONTEXT) {
qemu_co_rwlock_wrlock(&s->rename_lock);
}
}
-static inline void v9fs_path_read_lock(V9fsState *s)
+static inline void coroutine_fn
+v9fs_path_read_lock(V9fsState *s)
{
if (s->ctx.export_flags & V9FS_PATHNAME_FSCONTEXT) {
qemu_co_rwlock_rdlock(&s->rename_lock);
}
}
-static inline void v9fs_path_unlock(V9fsState *s)
+static inline void coroutine_fn
+v9fs_path_unlock(V9fsState *s)
{
if (s->ctx.export_flags & V9FS_PATHNAME_FSCONTEXT) {
qemu_co_rwlock_unlock(&s->rename_lock);
diff --git a/include/block/aio-wait.h b/include/block/aio-wait.h
index 54840f8..dd9a7f6 100644
--- a/include/block/aio-wait.h
+++ b/include/block/aio-wait.h
@@ -59,10 +59,13 @@ typedef struct {
extern AioWait global_aio_wait;
/**
- * AIO_WAIT_WHILE:
+ * AIO_WAIT_WHILE_INTERNAL:
* @ctx: the aio context, or NULL if multiple aio contexts (for which the
* caller does not hold a lock) are involved in the polling condition.
* @cond: wait while this conditional expression is true
+ * @unlock: whether to unlock and then lock again @ctx. This apples
+ * only when waiting for another AioContext from the main loop.
+ * Otherwise it's ignored.
*
* Wait while a condition is true. Use this to implement synchronous
* operations that require event loop activity.
@@ -75,7 +78,7 @@ extern AioWait global_aio_wait;
* wait on conditions between two IOThreads since that could lead to deadlock,
* go via the main loop instead.
*/
-#define AIO_WAIT_WHILE(ctx, cond) ({ \
+#define AIO_WAIT_WHILE_INTERNAL(ctx, cond, unlock) ({ \
bool waited_ = false; \
AioWait *wait_ = &global_aio_wait; \
AioContext *ctx_ = (ctx); \
@@ -92,11 +95,11 @@ extern AioWait global_aio_wait;
assert(qemu_get_current_aio_context() == \
qemu_get_aio_context()); \
while ((cond)) { \
- if (ctx_) { \
+ if (unlock && ctx_) { \
aio_context_release(ctx_); \
} \
aio_poll(qemu_get_aio_context(), true); \
- if (ctx_) { \
+ if (unlock && ctx_) { \
aio_context_acquire(ctx_); \
} \
waited_ = true; \
@@ -105,6 +108,12 @@ extern AioWait global_aio_wait;
qatomic_dec(&wait_->num_waiters); \
waited_; })
+#define AIO_WAIT_WHILE(ctx, cond) \
+ AIO_WAIT_WHILE_INTERNAL(ctx, cond, true)
+
+#define AIO_WAIT_WHILE_UNLOCKED(ctx, cond) \
+ AIO_WAIT_WHILE_INTERNAL(ctx, cond, false)
+
/**
* aio_wait_kick:
* Wake up the main thread if it is waiting on AIO_WAIT_WHILE(). During
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
index 6525e16..03032b2 100644
--- a/include/block/blockjob.h
+++ b/include/block/blockjob.h
@@ -40,21 +40,38 @@ typedef struct BlockJobDriver BlockJobDriver;
* Long-running operation on a BlockDriverState.
*/
typedef struct BlockJob {
- /** Data belonging to the generic Job infrastructure */
+ /**
+ * Data belonging to the generic Job infrastructure.
+ * Protected by job mutex.
+ */
Job job;
- /** Status that is published by the query-block-jobs QMP API */
+ /**
+ * Status that is published by the query-block-jobs QMP API.
+ * Protected by job mutex.
+ */
BlockDeviceIoStatus iostatus;
- /** Speed that was set with @block_job_set_speed. */
+ /**
+ * Speed that was set with @block_job_set_speed.
+ * Always modified and read under QEMU global mutex (GLOBAL_STATE_CODE).
+ */
int64_t speed;
- /** Rate limiting data structure for implementing @speed. */
+ /**
+ * Rate limiting data structure for implementing @speed.
+ * RateLimit API is thread-safe.
+ */
RateLimit limit;
- /** Block other operations when block job is running */
+ /**
+ * Block other operations when block job is running.
+ * Always modified and read under QEMU global mutex (GLOBAL_STATE_CODE).
+ */
Error *blocker;
+ /** All notifiers are set once in block_job_create() and never modified. */
+
/** Called when a cancelled job is finalised. */
Notifier finalize_cancelled_notifier;
@@ -70,7 +87,10 @@ typedef struct BlockJob {
/** Called when the job coroutine yields or terminates */
Notifier idle_notifier;
- /** BlockDriverStates that are involved in this block job */
+ /**
+ * BlockDriverStates that are involved in this block job.
+ * Always modified and read under QEMU global mutex (GLOBAL_STATE_CODE).
+ */
GSList *nodes;
} BlockJob;
@@ -82,15 +102,16 @@ typedef struct BlockJob {
*/
/**
- * block_job_next:
+ * block_job_next_locked:
* @job: A block job, or %NULL.
*
* Get the next element from the list of block jobs after @job, or the
* first one if @job is %NULL.
*
* Returns the requested job, or %NULL if there are no more jobs left.
+ * Called with job lock held.
*/
-BlockJob *block_job_next(BlockJob *job);
+BlockJob *block_job_next_locked(BlockJob *job);
/**
* block_job_get:
@@ -99,9 +120,13 @@ BlockJob *block_job_next(BlockJob *job);
* Get the block job identified by @id (which must not be %NULL).
*
* Returns the requested job, or %NULL if it doesn't exist.
+ * Called with job lock *not* held.
*/
BlockJob *block_job_get(const char *id);
+/* Same as block_job_get(), but called with job lock held. */
+BlockJob *block_job_get_locked(const char *id);
+
/**
* block_job_add_bdrv:
* @job: A block job
@@ -135,32 +160,38 @@ void block_job_remove_all_bdrv(BlockJob *job);
bool block_job_has_bdrv(BlockJob *job, BlockDriverState *bs);
/**
- * block_job_set_speed:
+ * block_job_set_speed_locked:
* @job: The job to set the speed for.
* @speed: The new value
* @errp: Error object.
*
* Set a rate-limiting parameter for the job; the actual meaning may
* vary depending on the job type.
+ *
+ * Called with job lock held, but might release it temporarily.
*/
-bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp);
+bool block_job_set_speed_locked(BlockJob *job, int64_t speed, Error **errp);
/**
- * block_job_query:
+ * block_job_query_locked:
* @job: The job to get information about.
*
* Return information about a job.
+ *
+ * Called with job lock held.
*/
-BlockJobInfo *block_job_query(BlockJob *job, Error **errp);
+BlockJobInfo *block_job_query_locked(BlockJob *job, Error **errp);
/**
- * block_job_iostatus_reset:
+ * block_job_iostatus_reset_locked:
* @job: The job whose I/O status should be reset.
*
* Reset I/O status on @job and on BlockDriverState objects it uses,
* other than job->blk.
+ *
+ * Called with job lock held.
*/
-void block_job_iostatus_reset(BlockJob *job);
+void block_job_iostatus_reset_locked(BlockJob *job);
/*
* block_job_get_aio_context:
diff --git a/include/block/nbd.h b/include/block/nbd.h
index c74b7a9..4ede3b2 100644
--- a/include/block/nbd.h
+++ b/include/block/nbd.h
@@ -424,6 +424,6 @@ QIOChannel *coroutine_fn
nbd_co_establish_connection(NBDClientConnection *conn, NBDExportInfo *info,
bool blocking, Error **errp);
-void coroutine_fn nbd_co_establish_connection_cancel(NBDClientConnection *conn);
+void nbd_co_establish_connection_cancel(NBDClientConnection *conn);
#endif
diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index 08c5bb3..aae33cc 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -92,12 +92,12 @@ void coroutine_fn qemu_coroutine_yield(void);
/**
* Get the AioContext of the given coroutine
*/
-AioContext *coroutine_fn qemu_coroutine_get_aio_context(Coroutine *co);
+AioContext *qemu_coroutine_get_aio_context(Coroutine *co);
/**
* Get the currently executing coroutine
*/
-Coroutine *coroutine_fn qemu_coroutine_self(void);
+Coroutine *qemu_coroutine_self(void);
/**
* Return whether or not currently inside a coroutine
diff --git a/include/qemu/job.h b/include/qemu/job.h
index c105b31..e502787 100644
--- a/include/qemu/job.h
+++ b/include/qemu/job.h
@@ -40,27 +40,62 @@ typedef struct JobTxn JobTxn;
* Long-running operation.
*/
typedef struct Job {
+
+ /* Fields set at initialization (job_create), and never modified */
+
/** The ID of the job. May be NULL for internal jobs. */
char *id;
- /** The type of this job. */
+ /**
+ * The type of this job.
+ * All callbacks are called with job_mutex *not* held.
+ */
const JobDriver *driver;
- /** Reference count of the block job */
- int refcnt;
-
- /** Current state; See @JobStatus for details. */
- JobStatus status;
-
- /** AioContext to run the job coroutine in */
- AioContext *aio_context;
-
/**
* The coroutine that executes the job. If not NULL, it is reentered when
* busy is false and the job is cancelled.
+ * Initialized in job_start()
*/
Coroutine *co;
+ /** True if this job should automatically finalize itself */
+ bool auto_finalize;
+
+ /** True if this job should automatically dismiss itself */
+ bool auto_dismiss;
+
+ /**
+ * The completion function that will be called when the job completes.
+ * Called with AioContext lock held, since many callback implementations
+ * use bdrv_* functions that require to hold the lock.
+ */
+ BlockCompletionFunc *cb;
+
+ /** The opaque value that is passed to the completion function. */
+ void *opaque;
+
+ /* ProgressMeter API is thread-safe */
+ ProgressMeter progress;
+
+ /**
+ * AioContext to run the job coroutine in.
+ * The job Aiocontext can be read when holding *either*
+ * the BQL (so we are in the main loop) or the job_mutex.
+ * It can only be written when we hold *both* BQL
+ * and the job_mutex.
+ */
+ AioContext *aio_context;
+
+
+ /** Protected by job_mutex */
+
+ /** Reference count of the block job */
+ int refcnt;
+
+ /** Current state; See @JobStatus for details. */
+ JobStatus status;
+
/**
* Timer that is used by @job_sleep_ns. Accessed under job_mutex (in
* job.c).
@@ -76,7 +111,7 @@ typedef struct Job {
/**
* Set to false by the job while the coroutine has yielded and may be
* re-entered by job_enter(). There may still be I/O or event loop activity
- * pending. Accessed under block_job_mutex (in blockjob.c).
+ * pending. Accessed under job_mutex.
*
* When the job is deferred to the main loop, busy is true as long as the
* bottom half is still pending.
@@ -112,14 +147,6 @@ typedef struct Job {
/** Set to true when the job has deferred work to the main loop. */
bool deferred_to_main_loop;
- /** True if this job should automatically finalize itself */
- bool auto_finalize;
-
- /** True if this job should automatically dismiss itself */
- bool auto_dismiss;
-
- ProgressMeter progress;
-
/**
* Return code from @run and/or @prepare callback(s).
* Not final until the job has reached the CONCLUDED status.
@@ -134,12 +161,6 @@ typedef struct Job {
*/
Error *err;
- /** The completion function that will be called when the job completes. */
- BlockCompletionFunc *cb;
-
- /** The opaque value that is passed to the completion function. */
- void *opaque;
-
/** Notifiers called when a cancelled job is finalised */
NotifierList on_finalize_cancelled;
@@ -167,6 +188,7 @@ typedef struct Job {
/**
* Callbacks and other information about a Job driver.
+ * All callbacks are invoked with job_mutex *not* held.
*/
struct JobDriver {
@@ -242,6 +264,9 @@ struct JobDriver {
*
* This callback will not be invoked if the job has already failed.
* If it fails, abort and then clean will be called.
+ *
+ * Called with AioContext lock held, since many callbacs implementations
+ * use bdrv_* functions that require to hold the lock.
*/
int (*prepare)(Job *job);
@@ -252,6 +277,9 @@ struct JobDriver {
*
* All jobs will complete with a call to either .commit() or .abort() but
* never both.
+ *
+ * Called with AioContext lock held, since many callback implementations
+ * use bdrv_* functions that require to hold the lock.
*/
void (*commit)(Job *job);
@@ -262,6 +290,9 @@ struct JobDriver {
*
* All jobs will complete with a call to either .commit() or .abort() but
* never both.
+ *
+ * Called with AioContext lock held, since many callback implementations
+ * use bdrv_* functions that require to hold the lock.
*/
void (*abort)(Job *job);
@@ -270,6 +301,9 @@ struct JobDriver {
* .commit() or .abort(). Regardless of which callback is invoked after
* completion, .clean() will always be called, even if the job does not
* belong to a transaction group.
+ *
+ * Called with AioContext lock held, since many callbacs implementations
+ * use bdrv_* functions that require to hold the lock.
*/
void (*clean)(Job *job);
@@ -284,11 +318,18 @@ struct JobDriver {
* READY).
* (If the callback is NULL, the job is assumed to terminate
* without I/O.)
+ *
+ * Called with AioContext lock held, since many callback implementations
+ * use bdrv_* functions that require to hold the lock.
*/
bool (*cancel)(Job *job, bool force);
- /** Called when the job is freed */
+ /**
+ * Called when the job is freed.
+ * Called with AioContext lock held, since many callback implementations
+ * use bdrv_* functions that require to hold the lock.
+ */
void (*free)(Job *job);
};
@@ -303,6 +344,30 @@ typedef enum JobCreateFlags {
JOB_MANUAL_DISMISS = 0x04,
} JobCreateFlags;
+extern QemuMutex job_mutex;
+
+#define JOB_LOCK_GUARD() QEMU_LOCK_GUARD(&job_mutex)
+
+#define WITH_JOB_LOCK_GUARD() WITH_QEMU_LOCK_GUARD(&job_mutex)
+
+/**
+ * job_lock:
+ *
+ * Take the mutex protecting the list of jobs and their status.
+ * Most functions called by the monitor need to call job_lock
+ * and job_unlock manually. On the other hand, function called
+ * by the block jobs themselves and by the block layer will take the
+ * lock for you.
+ */
+void job_lock(void);
+
+/**
+ * job_unlock:
+ *
+ * Release the mutex protecting the list of jobs and their status.
+ */
+void job_unlock(void);
+
/**
* Allocate and return a new job transaction. Jobs can be added to the
* transaction using job_txn_add_job().
@@ -319,23 +384,20 @@ JobTxn *job_txn_new(void);
/**
* Release a reference that was previously acquired with job_txn_add_job or
* job_txn_new. If it's the last reference to the object, it will be freed.
+ *
+ * Called with job lock *not* held.
*/
void job_txn_unref(JobTxn *txn);
-/**
- * @txn: The transaction (may be NULL)
- * @job: Job to add to the transaction
- *
- * Add @job to the transaction. The @job must not already be in a transaction.
- * The caller must call either job_txn_unref() or job_completed() to release
- * the reference that is automatically grabbed here.
- *
- * If @txn is NULL, the function does nothing.
+/*
+ * Same as job_txn_unref(), but called with job lock held.
+ * Might release the lock temporarily.
*/
-void job_txn_add_job(JobTxn *txn, Job *job);
+void job_txn_unref_locked(JobTxn *txn);
/**
* Create a new long-running job and return it.
+ * Called with job_mutex *not* held.
*
* @job_id: The id of the newly-created job, or %NULL for internal jobs
* @driver: The class object for the newly-created job.
@@ -353,20 +415,27 @@ void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
/**
* Add a reference to Job refcnt, it will be decreased with job_unref, and then
* be freed if it comes to be the last reference.
+ *
+ * Called with job lock held.
*/
-void job_ref(Job *job);
+void job_ref_locked(Job *job);
/**
- * Release a reference that was previously acquired with job_ref() or
+ * Release a reference that was previously acquired with job_ref_locked() or
* job_create(). If it's the last reference to the object, it will be freed.
+ *
+ * Takes AioContext lock internally to invoke a job->driver callback.
+ * Called with job lock held.
*/
-void job_unref(Job *job);
+void job_unref_locked(Job *job);
/**
* @job: The job that has made progress
* @done: How much progress the job made since the last call
*
* Updates the progress counter of the job.
+ *
+ * May be called with mutex held or not held.
*/
void job_progress_update(Job *job, uint64_t done);
@@ -377,6 +446,8 @@ void job_progress_update(Job *job, uint64_t done);
*
* Sets the expected end value of the progress counter of a job so that a
* completion percentage can be calculated when the progress is updated.
+ *
+ * May be called with mutex held or not held.
*/
void job_progress_set_remaining(Job *job, uint64_t remaining);
@@ -392,27 +463,27 @@ void job_progress_set_remaining(Job *job, uint64_t remaining);
* length before, and job_progress_update() afterwards.
* (So the operation acts as a parenthesis in regards to the main job
* operation running in background.)
+ *
+ * May be called with mutex held or not held.
*/
void job_progress_increase_remaining(Job *job, uint64_t delta);
-/** To be called when a cancelled job is finalised. */
-void job_event_cancelled(Job *job);
-
-/** To be called when a successfully completed job is finalised. */
-void job_event_completed(Job *job);
-
/**
* Conditionally enter the job coroutine if the job is ready to run, not
* already busy and fn() returns true. fn() is called while under the job_lock
* critical section.
+ *
+ * Called with job lock held, but might release it temporarily.
*/
-void job_enter_cond(Job *job, bool(*fn)(Job *job));
+void job_enter_cond_locked(Job *job, bool(*fn)(Job *job));
/**
* @job: A job that has not yet been started.
*
* Begins execution of a job.
* Takes ownership of one reference to the job object.
+ *
+ * Called with job_mutex *not* held.
*/
void job_start(Job *job);
@@ -420,6 +491,7 @@ void job_start(Job *job);
* @job: The job to enter.
*
* Continue the specified job by entering the coroutine.
+ * Called with job_mutex *not* held.
*/
void job_enter(Job *job);
@@ -428,6 +500,8 @@ void job_enter(Job *job);
*
* Pause now if job_pause() has been called. Jobs that perform lots of I/O
* must call this between requests so that the job can be paused.
+ *
+ * Called with job_mutex *not* held.
*/
void coroutine_fn job_pause_point(Job *job);
@@ -435,8 +509,9 @@ void coroutine_fn job_pause_point(Job *job);
* @job: The job that calls the function.
*
* Yield the job coroutine.
+ * Called with job_mutex *not* held.
*/
-void job_yield(Job *job);
+void coroutine_fn job_yield(Job *job);
/**
* @job: The job that calls the function.
@@ -445,10 +520,11 @@ void job_yield(Job *job);
* Put the job to sleep (assuming that it wasn't canceled) for @ns
* %QEMU_CLOCK_REALTIME nanoseconds. Canceling the job will immediately
* interrupt the wait.
+ *
+ * Called with job_mutex *not* held.
*/
void coroutine_fn job_sleep_ns(Job *job, int64_t ns);
-
/** Returns the JobType of a given Job. */
JobType job_type(const Job *job);
@@ -458,88 +534,138 @@ const char *job_type_str(const Job *job);
/** Returns true if the job should not be visible to the management layer. */
bool job_is_internal(Job *job);
-/** Returns whether the job is being cancelled. */
+/**
+ * Returns whether the job is being cancelled.
+ * Called with job_mutex *not* held.
+ */
bool job_is_cancelled(Job *job);
+/* Same as job_is_cancelled(), but called with job lock held. */
+bool job_is_cancelled_locked(Job *job);
+
/**
* Returns whether the job is scheduled for cancellation (at an
* indefinite point).
+ * Called with job_mutex *not* held.
*/
bool job_cancel_requested(Job *job);
-/** Returns whether the job is in a completed state. */
-bool job_is_completed(Job *job);
+/**
+ * Returns whether the job is in a completed state.
+ * Called with job lock held.
+ */
+bool job_is_completed_locked(Job *job);
-/** Returns whether the job is ready to be completed. */
+/**
+ * Returns whether the job is ready to be completed.
+ * Called with job_mutex *not* held.
+ */
bool job_is_ready(Job *job);
+/* Same as job_is_ready(), but called with job lock held. */
+bool job_is_ready_locked(Job *job);
+
/**
* Request @job to pause at the next pause point. Must be paired with
* job_resume(). If the job is supposed to be resumed by user action, call
- * job_user_pause() instead.
+ * job_user_pause_locked() instead.
+ *
+ * Called with job lock *not* held.
*/
void job_pause(Job *job);
-/** Resumes a @job paused with job_pause. */
+/* Same as job_pause(), but called with job lock held. */
+void job_pause_locked(Job *job);
+
+/** Resumes a @job paused with job_pause. Called with job lock *not* held. */
void job_resume(Job *job);
+/*
+ * Same as job_resume(), but called with job lock held.
+ * Might release the lock temporarily.
+ */
+void job_resume_locked(Job *job);
+
/**
* Asynchronously pause the specified @job.
* Do not allow a resume until a matching call to job_user_resume.
+ * Called with job lock held.
*/
-void job_user_pause(Job *job, Error **errp);
+void job_user_pause_locked(Job *job, Error **errp);
-/** Returns true if the job is user-paused. */
-bool job_user_paused(Job *job);
+/**
+ * Returns true if the job is user-paused.
+ * Called with job lock held.
+ */
+bool job_user_paused_locked(Job *job);
/**
* Resume the specified @job.
- * Must be paired with a preceding job_user_pause.
+ * Must be paired with a preceding job_user_pause_locked.
+ * Called with job lock held, but might release it temporarily.
*/
-void job_user_resume(Job *job, Error **errp);
+void job_user_resume_locked(Job *job, Error **errp);
/**
* Get the next element from the list of block jobs after @job, or the
* first one if @job is %NULL.
*
* Returns the requested job, or %NULL if there are no more jobs left.
+ * Called with job lock *not* held.
*/
Job *job_next(Job *job);
+/* Same as job_next(), but called with job lock held. */
+Job *job_next_locked(Job *job);
+
/**
* Get the job identified by @id (which must not be %NULL).
*
* Returns the requested job, or %NULL if it doesn't exist.
+ * Called with job lock held.
*/
-Job *job_get(const char *id);
+Job *job_get_locked(const char *id);
/**
* Check whether the verb @verb can be applied to @job in its current state.
* Returns 0 if the verb can be applied; otherwise errp is set and -EPERM
* returned.
+ *
+ * Called with job lock held.
*/
-int job_apply_verb(Job *job, JobVerb verb, Error **errp);
+int job_apply_verb_locked(Job *job, JobVerb verb, Error **errp);
-/** The @job could not be started, free it. */
+/**
+ * The @job could not be started, free it.
+ * Called with job_mutex *not* held.
+ */
void job_early_fail(Job *job);
-/** Moves the @job from RUNNING to READY */
+/**
+ * Moves the @job from RUNNING to READY.
+ * Called with job_mutex *not* held.
+ */
void job_transition_to_ready(Job *job);
-/** Asynchronously complete the specified @job. */
-void job_complete(Job *job, Error **errp);
+/**
+ * Asynchronously complete the specified @job.
+ * Called with job lock held, but might release it temporarily.
+ */
+void job_complete_locked(Job *job, Error **errp);
/**
* Asynchronously cancel the specified @job. If @force is true, the job should
* be cancelled immediately without waiting for a consistent state.
+ * Called with job lock held.
*/
-void job_cancel(Job *job, bool force);
+void job_cancel_locked(Job *job, bool force);
/**
- * Cancels the specified job like job_cancel(), but may refuse to do so if the
- * operation isn't meaningful in the current state of the job.
+ * Cancels the specified job like job_cancel_locked(), but may refuse
+ * to do so if the operation isn't meaningful in the current state of the job.
+ * Called with job lock held.
*/
-void job_user_cancel(Job *job, bool force, Error **errp);
+void job_user_cancel_locked(Job *job, bool force, Error **errp);
/**
* Synchronously cancel the @job. The completion callback is called
@@ -550,16 +676,23 @@ void job_user_cancel(Job *job, bool force, Error **errp);
* Returns the return value from the job if the job actually completed
* during the call, or -ECANCELED if it was canceled.
*
- * Callers must hold the AioContext lock of job->aio_context.
+ * Called with job_lock *not* held.
*/
int job_cancel_sync(Job *job, bool force);
-/** Synchronously force-cancels all jobs using job_cancel_sync(). */
+/* Same as job_cancel_sync, but called with job lock held. */
+int job_cancel_sync_locked(Job *job, bool force);
+
+/**
+ * Synchronously force-cancels all jobs using job_cancel_sync_locked().
+ *
+ * Called with job_lock *not* held.
+ */
void job_cancel_sync_all(void);
/**
* @job: The job to be completed.
- * @errp: Error object which may be set by job_complete(); this is not
+ * @errp: Error object which may be set by job_complete_locked(); this is not
* necessarily set on every error, the job return value has to be
* checked as well.
*
@@ -568,10 +701,9 @@ void job_cancel_sync_all(void);
* function).
*
* Returns the return value from the job.
- *
- * Callers must hold the AioContext lock of job->aio_context.
+ * Called with job_lock held.
*/
-int job_complete_sync(Job *job, Error **errp);
+int job_complete_sync_locked(Job *job, Error **errp);
/**
* For a @job that has finished its work and is pending awaiting explicit
@@ -580,14 +712,18 @@ int job_complete_sync(Job *job, Error **errp);
* FIXME: Make the below statement universally true:
* For jobs that support the manual workflow mode, all graph changes that occur
* as a result will occur after this command and before a successful reply.
+ *
+ * Called with job lock held.
*/
-void job_finalize(Job *job, Error **errp);
+void job_finalize_locked(Job *job, Error **errp);
/**
* Remove the concluded @job from the query list and resets the passed pointer
* to %NULL. Returns an error if the job is not actually concluded.
+ *
+ * Called with job lock held.
*/
-void job_dismiss(Job **job, Error **errp);
+void job_dismiss_locked(Job **job, Error **errp);
/**
* Synchronously finishes the given @job. If @finish is given, it is called to
@@ -596,8 +732,20 @@ void job_dismiss(Job **job, Error **errp);
* Returns 0 if the job is successfully completed, -ECANCELED if the job was
* cancelled before completing, and -errno in other error cases.
*
- * Callers must hold the AioContext lock of job->aio_context.
+ * Called with job_lock held, but might release it temporarily.
+ */
+int job_finish_sync_locked(Job *job, void (*finish)(Job *, Error **errp),
+ Error **errp);
+
+/**
+ * Sets the @job->aio_context.
+ * Called with job_mutex *not* held.
+ *
+ * This function must run in the main thread to protect against
+ * concurrent read in job_finish_sync_locked(), takes the job_mutex
+ * lock to protect against the read in job_do_yield_locked(), and must
+ * be called when the job is quiescent.
*/
-int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp);
+void job_set_aio_context(Job *job, AioContext *ctx);
#endif
diff --git a/job-qmp.c b/job-qmp.c
index 829a28a..d498fc8 100644
--- a/job-qmp.c
+++ b/job-qmp.c
@@ -29,119 +29,117 @@
#include "qapi/error.h"
#include "trace/trace-root.h"
-/* Get a job using its ID and acquire its AioContext */
-static Job *find_job(const char *id, AioContext **aio_context, Error **errp)
+/*
+ * Get a job using its ID. Called with job_mutex held.
+ */
+static Job *find_job_locked(const char *id, Error **errp)
{
Job *job;
- *aio_context = NULL;
-
- job = job_get(id);
+ job = job_get_locked(id);
if (!job) {
error_setg(errp, "Job not found");
return NULL;
}
- *aio_context = job->aio_context;
- aio_context_acquire(*aio_context);
-
return job;
}
void qmp_job_cancel(const char *id, Error **errp)
{
- AioContext *aio_context;
- Job *job = find_job(id, &aio_context, errp);
+ Job *job;
+
+ JOB_LOCK_GUARD();
+ job = find_job_locked(id, errp);
if (!job) {
return;
}
trace_qmp_job_cancel(job);
- job_user_cancel(job, true, errp);
- aio_context_release(aio_context);
+ job_user_cancel_locked(job, true, errp);
}
void qmp_job_pause(const char *id, Error **errp)
{
- AioContext *aio_context;
- Job *job = find_job(id, &aio_context, errp);
+ Job *job;
+
+ JOB_LOCK_GUARD();
+ job = find_job_locked(id, errp);
if (!job) {
return;
}
trace_qmp_job_pause(job);
- job_user_pause(job, errp);
- aio_context_release(aio_context);
+ job_user_pause_locked(job, errp);
}
void qmp_job_resume(const char *id, Error **errp)
{
- AioContext *aio_context;
- Job *job = find_job(id, &aio_context, errp);
+ Job *job;
+
+ JOB_LOCK_GUARD();
+ job = find_job_locked(id, errp);
if (!job) {
return;
}
trace_qmp_job_resume(job);
- job_user_resume(job, errp);
- aio_context_release(aio_context);
+ job_user_resume_locked(job, errp);
}
void qmp_job_complete(const char *id, Error **errp)
{
- AioContext *aio_context;
- Job *job = find_job(id, &aio_context, errp);
+ Job *job;
+
+ JOB_LOCK_GUARD();
+ job = find_job_locked(id, errp);
if (!job) {
return;
}
trace_qmp_job_complete(job);
- job_complete(job, errp);
- aio_context_release(aio_context);
+ job_complete_locked(job, errp);
}
void qmp_job_finalize(const char *id, Error **errp)
{
- AioContext *aio_context;
- Job *job = find_job(id, &aio_context, errp);
+ Job *job;
+
+ JOB_LOCK_GUARD();
+ job = find_job_locked(id, errp);
if (!job) {
return;
}
trace_qmp_job_finalize(job);
- job_ref(job);
- job_finalize(job, errp);
-
- /*
- * Job's context might have changed via job_finalize (and job_txn_apply
- * automatically acquires the new one), so make sure we release the correct
- * one.
- */
- aio_context = job->aio_context;
- job_unref(job);
- aio_context_release(aio_context);
+ job_ref_locked(job);
+ job_finalize_locked(job, errp);
+
+ job_unref_locked(job);
}
void qmp_job_dismiss(const char *id, Error **errp)
{
- AioContext *aio_context;
- Job *job = find_job(id, &aio_context, errp);
+ Job *job;
+
+ JOB_LOCK_GUARD();
+ job = find_job_locked(id, errp);
if (!job) {
return;
}
trace_qmp_job_dismiss(job);
- job_dismiss(&job, errp);
- aio_context_release(aio_context);
+ job_dismiss_locked(&job, errp);
}
-static JobInfo *job_query_single(Job *job, Error **errp)
+/* Called with job_mutex held. */
+static JobInfo *job_query_single_locked(Job *job, Error **errp)
{
JobInfo *info;
uint64_t progress_current;
@@ -171,17 +169,15 @@ JobInfoList *qmp_query_jobs(Error **errp)
JobInfoList *head = NULL, **tail = &head;
Job *job;
- for (job = job_next(NULL); job; job = job_next(job)) {
+ JOB_LOCK_GUARD();
+
+ for (job = job_next_locked(NULL); job; job = job_next_locked(job)) {
JobInfo *value;
- AioContext *aio_context;
if (job_is_internal(job)) {
continue;
}
- aio_context = job->aio_context;
- aio_context_acquire(aio_context);
- value = job_query_single(job, errp);
- aio_context_release(aio_context);
+ value = job_query_single_locked(job, errp);
if (!value) {
qapi_free_JobInfoList(head);
return NULL;
diff --git a/job.c b/job.c
index 075c6f3..78feae0 100644
--- a/job.c
+++ b/job.c
@@ -32,6 +32,27 @@
#include "trace/trace-root.h"
#include "qapi/qapi-events-job.h"
+/*
+ * The job API is composed of two categories of functions.
+ *
+ * The first includes functions used by the monitor. The monitor is
+ * peculiar in that it accesses the job list with job_get, and
+ * therefore needs consistency across job_get and the actual operation
+ * (e.g. job_user_cancel). To achieve this consistency, the caller
+ * calls job_lock/job_unlock itself around the whole operation.
+ *
+ *
+ * The second includes functions used by the job drivers and sometimes
+ * by the core block layer. These delegate the locking to the callee instead.
+ */
+
+/*
+ * job_mutex protects the jobs list, but also makes the
+ * struct job fields thread-safe.
+ */
+QemuMutex job_mutex;
+
+/* Protected by job_mutex */
static QLIST_HEAD(, Job) jobs = QLIST_HEAD_INITIALIZER(jobs);
/* Job State Transition Table */
@@ -74,17 +95,12 @@ struct JobTxn {
int refcnt;
};
-/* Right now, this mutex is only needed to synchronize accesses to job->busy
- * and job->sleep_timer, such as concurrent calls to job_do_yield and
- * job_enter. */
-static QemuMutex job_mutex;
-
-static void job_lock(void)
+void job_lock(void)
{
qemu_mutex_lock(&job_mutex);
}
-static void job_unlock(void)
+void job_unlock(void)
{
qemu_mutex_unlock(&job_mutex);
}
@@ -102,19 +118,38 @@ JobTxn *job_txn_new(void)
return txn;
}
-static void job_txn_ref(JobTxn *txn)
+/* Called with job_mutex held. */
+static void job_txn_ref_locked(JobTxn *txn)
{
txn->refcnt++;
}
-void job_txn_unref(JobTxn *txn)
+void job_txn_unref_locked(JobTxn *txn)
{
if (txn && --txn->refcnt == 0) {
g_free(txn);
}
}
-void job_txn_add_job(JobTxn *txn, Job *job)
+void job_txn_unref(JobTxn *txn)
+{
+ JOB_LOCK_GUARD();
+ job_txn_unref_locked(txn);
+}
+
+/**
+ * @txn: The transaction (may be NULL)
+ * @job: Job to add to the transaction
+ *
+ * Add @job to the transaction. The @job must not already be in a transaction.
+ * The caller must call either job_txn_unref() or job_completed() to release
+ * the reference that is automatically grabbed here.
+ *
+ * If @txn is NULL, the function does nothing.
+ *
+ * Called with job_mutex held.
+ */
+static void job_txn_add_job_locked(JobTxn *txn, Job *job)
{
if (!txn) {
return;
@@ -124,21 +159,22 @@ void job_txn_add_job(JobTxn *txn, Job *job)
job->txn = txn;
QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
- job_txn_ref(txn);
+ job_txn_ref_locked(txn);
}
-static void job_txn_del_job(Job *job)
+/* Called with job_mutex held. */
+static void job_txn_del_job_locked(Job *job)
{
if (job->txn) {
QLIST_REMOVE(job, txn_list);
- job_txn_unref(job->txn);
+ job_txn_unref_locked(job->txn);
job->txn = NULL;
}
}
-static int job_txn_apply(Job *job, int fn(Job *))
+/* Called with job_mutex held, but releases it temporarily. */
+static int job_txn_apply_locked(Job *job, int fn(Job *))
{
- AioContext *inner_ctx;
Job *other_job, *next;
JobTxn *txn = job->txn;
int rc = 0;
@@ -149,25 +185,16 @@ static int job_txn_apply(Job *job, int fn(Job *))
* we need to release it here to avoid holding the lock twice - which would
* break AIO_WAIT_WHILE from within fn.
*/
- job_ref(job);
- aio_context_release(job->aio_context);
+ job_ref_locked(job);
QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
- inner_ctx = other_job->aio_context;
- aio_context_acquire(inner_ctx);
rc = fn(other_job);
- aio_context_release(inner_ctx);
if (rc) {
break;
}
}
- /*
- * Note that job->aio_context might have been changed by calling fn, so we
- * can't use a local variable to cache it.
- */
- aio_context_acquire(job->aio_context);
- job_unref(job);
+ job_unref_locked(job);
return rc;
}
@@ -176,7 +203,8 @@ bool job_is_internal(Job *job)
return (job->id == NULL);
}
-static void job_state_transition(Job *job, JobStatus s1)
+/* Called with job_mutex held. */
+static void job_state_transition_locked(Job *job, JobStatus s1)
{
JobStatus s0 = job->status;
assert(s1 >= 0 && s1 < JOB_STATUS__MAX);
@@ -191,7 +219,7 @@ static void job_state_transition(Job *job, JobStatus s1)
}
}
-int job_apply_verb(Job *job, JobVerb verb, Error **errp)
+int job_apply_verb_locked(Job *job, JobVerb verb, Error **errp)
{
JobStatus s0 = job->status;
assert(verb >= 0 && verb < JOB_VERB__MAX);
@@ -215,19 +243,32 @@ const char *job_type_str(const Job *job)
return JobType_str(job_type(job));
}
-bool job_is_cancelled(Job *job)
+bool job_is_cancelled_locked(Job *job)
{
/* force_cancel may be true only if cancelled is true, too */
assert(job->cancelled || !job->force_cancel);
return job->force_cancel;
}
-bool job_cancel_requested(Job *job)
+bool job_is_cancelled(Job *job)
+{
+ JOB_LOCK_GUARD();
+ return job_is_cancelled_locked(job);
+}
+
+/* Called with job_mutex held. */
+static bool job_cancel_requested_locked(Job *job)
{
return job->cancelled;
}
-bool job_is_ready(Job *job)
+bool job_cancel_requested(Job *job)
+{
+ JOB_LOCK_GUARD();
+ return job_cancel_requested_locked(job);
+}
+
+bool job_is_ready_locked(Job *job)
{
switch (job->status) {
case JOB_STATUS_UNDEFINED:
@@ -249,7 +290,13 @@ bool job_is_ready(Job *job)
return false;
}
-bool job_is_completed(Job *job)
+bool job_is_ready(Job *job)
+{
+ JOB_LOCK_GUARD();
+ return job_is_ready_locked(job);
+}
+
+bool job_is_completed_locked(Job *job)
{
switch (job->status) {
case JOB_STATUS_UNDEFINED:
@@ -271,17 +318,24 @@ bool job_is_completed(Job *job)
return false;
}
-static bool job_started(Job *job)
+static bool job_is_completed(Job *job)
+{
+ JOB_LOCK_GUARD();
+ return job_is_completed_locked(job);
+}
+
+static bool job_started_locked(Job *job)
{
return job->co;
}
-static bool job_should_pause(Job *job)
+/* Called with job_mutex held. */
+static bool job_should_pause_locked(Job *job)
{
return job->pause_count > 0;
}
-Job *job_next(Job *job)
+Job *job_next_locked(Job *job)
{
if (!job) {
return QLIST_FIRST(&jobs);
@@ -289,7 +343,13 @@ Job *job_next(Job *job)
return QLIST_NEXT(job, job_list);
}
-Job *job_get(const char *id)
+Job *job_next(Job *job)
+{
+ JOB_LOCK_GUARD();
+ return job_next_locked(job);
+}
+
+Job *job_get_locked(const char *id)
{
Job *job;
@@ -302,6 +362,18 @@ Job *job_get(const char *id)
return NULL;
}
+void job_set_aio_context(Job *job, AioContext *ctx)
+{
+ /* protect against read in job_finish_sync_locked and job_start */
+ GLOBAL_STATE_CODE();
+ /* protect against read in job_do_yield_locked */
+ JOB_LOCK_GUARD();
+ /* ensure the job is quiescent while the AioContext is changed */
+ assert(job->paused || job_is_completed_locked(job));
+ job->aio_context = ctx;
+}
+
+/* Called with job_mutex *not* held. */
static void job_sleep_timer_cb(void *opaque)
{
Job *job = opaque;
@@ -315,6 +387,8 @@ void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
{
Job *job;
+ JOB_LOCK_GUARD();
+
if (job_id) {
if (flags & JOB_INTERNAL) {
error_setg(errp, "Cannot specify job ID for internal job");
@@ -324,7 +398,7 @@ void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
error_setg(errp, "Invalid job ID '%s'", job_id);
return NULL;
}
- if (job_get(job_id)) {
+ if (job_get_locked(job_id)) {
error_setg(errp, "Job ID '%s' already in use", job_id);
return NULL;
}
@@ -354,7 +428,7 @@ void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
notifier_list_init(&job->on_ready);
notifier_list_init(&job->on_idle);
- job_state_transition(job, JOB_STATUS_CREATED);
+ job_state_transition_locked(job, JOB_STATUS_CREATED);
aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
QEMU_CLOCK_REALTIME, SCALE_NS,
job_sleep_timer_cb, job);
@@ -365,21 +439,21 @@ void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
* consolidating the job management logic */
if (!txn) {
txn = job_txn_new();
- job_txn_add_job(txn, job);
- job_txn_unref(txn);
+ job_txn_add_job_locked(txn, job);
+ job_txn_unref_locked(txn);
} else {
- job_txn_add_job(txn, job);
+ job_txn_add_job_locked(txn, job);
}
return job;
}
-void job_ref(Job *job)
+void job_ref_locked(Job *job)
{
++job->refcnt;
}
-void job_unref(Job *job)
+void job_unref_locked(Job *job)
{
GLOBAL_STATE_CODE();
@@ -389,7 +463,13 @@ void job_unref(Job *job)
assert(!job->txn);
if (job->driver->free) {
+ AioContext *aio_context = job->aio_context;
+ job_unlock();
+ /* FIXME: aiocontext lock is required because cb calls blk_unref */
+ aio_context_acquire(aio_context);
job->driver->free(job);
+ aio_context_release(aio_context);
+ job_lock();
}
QLIST_REMOVE(job, job_list);
@@ -416,48 +496,56 @@ void job_progress_increase_remaining(Job *job, uint64_t delta)
progress_increase_remaining(&job->progress, delta);
}
-void job_event_cancelled(Job *job)
+/**
+ * To be called when a cancelled job is finalised.
+ * Called with job_mutex held.
+ */
+static void job_event_cancelled_locked(Job *job)
{
notifier_list_notify(&job->on_finalize_cancelled, job);
}
-void job_event_completed(Job *job)
+/**
+ * To be called when a successfully completed job is finalised.
+ * Called with job_mutex held.
+ */
+static void job_event_completed_locked(Job *job)
{
notifier_list_notify(&job->on_finalize_completed, job);
}
-static void job_event_pending(Job *job)
+/* Called with job_mutex held. */
+static void job_event_pending_locked(Job *job)
{
notifier_list_notify(&job->on_pending, job);
}
-static void job_event_ready(Job *job)
+/* Called with job_mutex held. */
+static void job_event_ready_locked(Job *job)
{
notifier_list_notify(&job->on_ready, job);
}
-static void job_event_idle(Job *job)
+/* Called with job_mutex held. */
+static void job_event_idle_locked(Job *job)
{
notifier_list_notify(&job->on_idle, job);
}
-void job_enter_cond(Job *job, bool(*fn)(Job *job))
+void job_enter_cond_locked(Job *job, bool(*fn)(Job *job))
{
- if (!job_started(job)) {
+ if (!job_started_locked(job)) {
return;
}
if (job->deferred_to_main_loop) {
return;
}
- job_lock();
if (job->busy) {
- job_unlock();
return;
}
if (fn && !fn(job)) {
- job_unlock();
return;
}
@@ -465,12 +553,14 @@ void job_enter_cond(Job *job, bool(*fn)(Job *job))
timer_del(&job->sleep_timer);
job->busy = true;
job_unlock();
- aio_co_enter(job->aio_context, job->co);
+ aio_co_wake(job->co);
+ job_lock();
}
void job_enter(Job *job)
{
- job_enter_cond(job, NULL);
+ JOB_LOCK_GUARD();
+ job_enter_cond_locked(job, NULL);
}
/* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
@@ -478,100 +568,137 @@ void job_enter(Job *job)
* is allowed and cancels the timer.
*
* If @ns is (uint64_t) -1, no timer is scheduled and job_enter() must be
- * called explicitly. */
-static void coroutine_fn job_do_yield(Job *job, uint64_t ns)
+ * called explicitly.
+ *
+ * Called with job_mutex held, but releases it temporarily.
+ */
+static void coroutine_fn job_do_yield_locked(Job *job, uint64_t ns)
{
- job_lock();
+ AioContext *next_aio_context;
+
if (ns != -1) {
timer_mod(&job->sleep_timer, ns);
}
job->busy = false;
- job_event_idle(job);
+ job_event_idle_locked(job);
job_unlock();
qemu_coroutine_yield();
+ job_lock();
+
+ next_aio_context = job->aio_context;
+ /*
+ * Coroutine has resumed, but in the meanwhile the job AioContext
+ * might have changed via bdrv_try_set_aio_context(), so we need to move
+ * the coroutine too in the new aiocontext.
+ */
+ while (qemu_get_current_aio_context() != next_aio_context) {
+ job_unlock();
+ aio_co_reschedule_self(next_aio_context);
+ job_lock();
+ next_aio_context = job->aio_context;
+ }
- /* Set by job_enter_cond() before re-entering the coroutine. */
+ /* Set by job_enter_cond_locked() before re-entering the coroutine. */
assert(job->busy);
}
-void coroutine_fn job_pause_point(Job *job)
+/* Called with job_mutex held, but releases it temporarily. */
+static void coroutine_fn job_pause_point_locked(Job *job)
{
- assert(job && job_started(job));
+ assert(job && job_started_locked(job));
- if (!job_should_pause(job)) {
+ if (!job_should_pause_locked(job)) {
return;
}
- if (job_is_cancelled(job)) {
+ if (job_is_cancelled_locked(job)) {
return;
}
if (job->driver->pause) {
+ job_unlock();
job->driver->pause(job);
+ job_lock();
}
- if (job_should_pause(job) && !job_is_cancelled(job)) {
+ if (job_should_pause_locked(job) && !job_is_cancelled_locked(job)) {
JobStatus status = job->status;
- job_state_transition(job, status == JOB_STATUS_READY
- ? JOB_STATUS_STANDBY
- : JOB_STATUS_PAUSED);
+ job_state_transition_locked(job, status == JOB_STATUS_READY
+ ? JOB_STATUS_STANDBY
+ : JOB_STATUS_PAUSED);
job->paused = true;
- job_do_yield(job, -1);
+ job_do_yield_locked(job, -1);
job->paused = false;
- job_state_transition(job, status);
+ job_state_transition_locked(job, status);
}
if (job->driver->resume) {
+ job_unlock();
job->driver->resume(job);
+ job_lock();
}
}
-void job_yield(Job *job)
+void coroutine_fn job_pause_point(Job *job)
{
+ JOB_LOCK_GUARD();
+ job_pause_point_locked(job);
+}
+
+void coroutine_fn job_yield(Job *job)
+{
+ JOB_LOCK_GUARD();
assert(job->busy);
/* Check cancellation *before* setting busy = false, too! */
- if (job_is_cancelled(job)) {
+ if (job_is_cancelled_locked(job)) {
return;
}
- if (!job_should_pause(job)) {
- job_do_yield(job, -1);
+ if (!job_should_pause_locked(job)) {
+ job_do_yield_locked(job, -1);
}
- job_pause_point(job);
+ job_pause_point_locked(job);
}
void coroutine_fn job_sleep_ns(Job *job, int64_t ns)
{
+ JOB_LOCK_GUARD();
assert(job->busy);
/* Check cancellation *before* setting busy = false, too! */
- if (job_is_cancelled(job)) {
+ if (job_is_cancelled_locked(job)) {
return;
}
- if (!job_should_pause(job)) {
- job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
+ if (!job_should_pause_locked(job)) {
+ job_do_yield_locked(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
}
- job_pause_point(job);
+ job_pause_point_locked(job);
}
-/* Assumes the block_job_mutex is held */
-static bool job_timer_not_pending(Job *job)
+/* Assumes the job_mutex is held */
+static bool job_timer_not_pending_locked(Job *job)
{
return !timer_pending(&job->sleep_timer);
}
-void job_pause(Job *job)
+void job_pause_locked(Job *job)
{
job->pause_count++;
if (!job->paused) {
- job_enter(job);
+ job_enter_cond_locked(job, NULL);
}
}
-void job_resume(Job *job)
+void job_pause(Job *job)
+{
+ JOB_LOCK_GUARD();
+ job_pause_locked(job);
+}
+
+void job_resume_locked(Job *job)
{
assert(job->pause_count > 0);
job->pause_count--;
@@ -580,12 +707,18 @@ void job_resume(Job *job)
}
/* kick only if no timer is pending */
- job_enter_cond(job, job_timer_not_pending);
+ job_enter_cond_locked(job, job_timer_not_pending_locked);
}
-void job_user_pause(Job *job, Error **errp)
+void job_resume(Job *job)
{
- if (job_apply_verb(job, JOB_VERB_PAUSE, errp)) {
+ JOB_LOCK_GUARD();
+ job_resume_locked(job);
+}
+
+void job_user_pause_locked(Job *job, Error **errp)
+{
+ if (job_apply_verb_locked(job, JOB_VERB_PAUSE, errp)) {
return;
}
if (job->user_paused) {
@@ -593,15 +726,15 @@ void job_user_pause(Job *job, Error **errp)
return;
}
job->user_paused = true;
- job_pause(job);
+ job_pause_locked(job);
}
-bool job_user_paused(Job *job)
+bool job_user_paused_locked(Job *job)
{
return job->user_paused;
}
-void job_user_resume(Job *job, Error **errp)
+void job_user_resume_locked(Job *job, Error **errp)
{
assert(job);
GLOBAL_STATE_CODE();
@@ -609,66 +742,72 @@ void job_user_resume(Job *job, Error **errp)
error_setg(errp, "Can't resume a job that was not paused");
return;
}
- if (job_apply_verb(job, JOB_VERB_RESUME, errp)) {
+ if (job_apply_verb_locked(job, JOB_VERB_RESUME, errp)) {
return;
}
if (job->driver->user_resume) {
+ job_unlock();
job->driver->user_resume(job);
+ job_lock();
}
job->user_paused = false;
- job_resume(job);
+ job_resume_locked(job);
}
-static void job_do_dismiss(Job *job)
+/* Called with job_mutex held, but releases it temporarily. */
+static void job_do_dismiss_locked(Job *job)
{
assert(job);
job->busy = false;
job->paused = false;
job->deferred_to_main_loop = true;
- job_txn_del_job(job);
+ job_txn_del_job_locked(job);
- job_state_transition(job, JOB_STATUS_NULL);
- job_unref(job);
+ job_state_transition_locked(job, JOB_STATUS_NULL);
+ job_unref_locked(job);
}
-void job_dismiss(Job **jobptr, Error **errp)
+void job_dismiss_locked(Job **jobptr, Error **errp)
{
Job *job = *jobptr;
/* similarly to _complete, this is QMP-interface only. */
assert(job->id);
- if (job_apply_verb(job, JOB_VERB_DISMISS, errp)) {
+ if (job_apply_verb_locked(job, JOB_VERB_DISMISS, errp)) {
return;
}
- job_do_dismiss(job);
+ job_do_dismiss_locked(job);
*jobptr = NULL;
}
void job_early_fail(Job *job)
{
+ JOB_LOCK_GUARD();
assert(job->status == JOB_STATUS_CREATED);
- job_do_dismiss(job);
+ job_do_dismiss_locked(job);
}
-static void job_conclude(Job *job)
+/* Called with job_mutex held. */
+static void job_conclude_locked(Job *job)
{
- job_state_transition(job, JOB_STATUS_CONCLUDED);
- if (job->auto_dismiss || !job_started(job)) {
- job_do_dismiss(job);
+ job_state_transition_locked(job, JOB_STATUS_CONCLUDED);
+ if (job->auto_dismiss || !job_started_locked(job)) {
+ job_do_dismiss_locked(job);
}
}
-static void job_update_rc(Job *job)
+/* Called with job_mutex held. */
+static void job_update_rc_locked(Job *job)
{
- if (!job->ret && job_is_cancelled(job)) {
+ if (!job->ret && job_is_cancelled_locked(job)) {
job->ret = -ECANCELED;
}
if (job->ret) {
if (!job->err) {
error_setg(&job->err, "%s", strerror(-job->ret));
}
- job_state_transition(job, JOB_STATUS_ABORTING);
+ job_state_transition_locked(job, JOB_STATUS_ABORTING);
}
}
@@ -698,14 +837,25 @@ static void job_clean(Job *job)
}
}
-static int job_finalize_single(Job *job)
+/*
+ * Called with job_mutex held, but releases it temporarily.
+ * Takes AioContext lock internally to invoke a job->driver callback.
+ */
+static int job_finalize_single_locked(Job *job)
{
- assert(job_is_completed(job));
+ int job_ret;
+ AioContext *ctx = job->aio_context;
+
+ assert(job_is_completed_locked(job));
/* Ensure abort is called for late-transactional failures */
- job_update_rc(job);
+ job_update_rc_locked(job);
- if (!job->ret) {
+ job_ret = job->ret;
+ job_unlock();
+ aio_context_acquire(ctx);
+
+ if (!job_ret) {
job_commit(job);
} else {
job_abort(job);
@@ -713,28 +863,40 @@ static int job_finalize_single(Job *job)
job_clean(job);
if (job->cb) {
- job->cb(job->opaque, job->ret);
+ job->cb(job->opaque, job_ret);
}
+ aio_context_release(ctx);
+ job_lock();
+
/* Emit events only if we actually started */
- if (job_started(job)) {
- if (job_is_cancelled(job)) {
- job_event_cancelled(job);
+ if (job_started_locked(job)) {
+ if (job_is_cancelled_locked(job)) {
+ job_event_cancelled_locked(job);
} else {
- job_event_completed(job);
+ job_event_completed_locked(job);
}
}
- job_txn_del_job(job);
- job_conclude(job);
+ job_txn_del_job_locked(job);
+ job_conclude_locked(job);
return 0;
}
-static void job_cancel_async(Job *job, bool force)
+/*
+ * Called with job_mutex held, but releases it temporarily.
+ * Takes AioContext lock internally to invoke a job->driver callback.
+ */
+static void job_cancel_async_locked(Job *job, bool force)
{
+ AioContext *ctx = job->aio_context;
GLOBAL_STATE_CODE();
if (job->driver->cancel) {
+ job_unlock();
+ aio_context_acquire(ctx);
force = job->driver->cancel(job, force);
+ aio_context_release(ctx);
+ job_lock();
} else {
/* No .cancel() means the job will behave as if force-cancelled */
force = true;
@@ -743,7 +905,9 @@ static void job_cancel_async(Job *job, bool force)
if (job->user_paused) {
/* Do not call job_enter here, the caller will handle it. */
if (job->driver->user_resume) {
+ job_unlock();
job->driver->user_resume(job);
+ job_lock();
}
job->user_paused = false;
assert(job->pause_count > 0);
@@ -764,9 +928,12 @@ static void job_cancel_async(Job *job, bool force)
}
}
-static void job_completed_txn_abort(Job *job)
+/*
+ * Called with job_mutex held, but releases it temporarily.
+ * Takes AioContext lock internally to invoke a job->driver callback.
+ */
+static void job_completed_txn_abort_locked(Job *job)
{
- AioContext *ctx;
JobTxn *txn = job->txn;
Job *other_job;
@@ -777,178 +944,164 @@ static void job_completed_txn_abort(Job *job)
return;
}
txn->aborting = true;
- job_txn_ref(txn);
+ job_txn_ref_locked(txn);
- /*
- * We can only hold the single job's AioContext lock while calling
- * job_finalize_single() because the finalization callbacks can involve
- * calls of AIO_WAIT_WHILE(), which could deadlock otherwise.
- * Note that the job's AioContext may change when it is finalized.
- */
- job_ref(job);
- aio_context_release(job->aio_context);
+ job_ref_locked(job);
/* Other jobs are effectively cancelled by us, set the status for
* them; this job, however, may or may not be cancelled, depending
* on the caller, so leave it. */
QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
if (other_job != job) {
- ctx = other_job->aio_context;
- aio_context_acquire(ctx);
/*
* This is a transaction: If one job failed, no result will matter.
* Therefore, pass force=true to terminate all other jobs as quickly
* as possible.
*/
- job_cancel_async(other_job, true);
- aio_context_release(ctx);
+ job_cancel_async_locked(other_job, true);
}
}
while (!QLIST_EMPTY(&txn->jobs)) {
other_job = QLIST_FIRST(&txn->jobs);
- /*
- * The job's AioContext may change, so store it in @ctx so we
- * release the same context that we have acquired before.
- */
- ctx = other_job->aio_context;
- aio_context_acquire(ctx);
- if (!job_is_completed(other_job)) {
- assert(job_cancel_requested(other_job));
- job_finish_sync(other_job, NULL, NULL);
+ if (!job_is_completed_locked(other_job)) {
+ assert(job_cancel_requested_locked(other_job));
+ job_finish_sync_locked(other_job, NULL, NULL);
}
- job_finalize_single(other_job);
- aio_context_release(ctx);
+ job_finalize_single_locked(other_job);
}
- /*
- * Use job_ref()/job_unref() so we can read the AioContext here
- * even if the job went away during job_finalize_single().
- */
- aio_context_acquire(job->aio_context);
- job_unref(job);
-
- job_txn_unref(txn);
+ job_unref_locked(job);
+ job_txn_unref_locked(txn);
}
-static int job_prepare(Job *job)
+/* Called with job_mutex held, but releases it temporarily */
+static int job_prepare_locked(Job *job)
{
+ int ret;
+ AioContext *ctx = job->aio_context;
+
GLOBAL_STATE_CODE();
+
if (job->ret == 0 && job->driver->prepare) {
- job->ret = job->driver->prepare(job);
- job_update_rc(job);
+ job_unlock();
+ aio_context_acquire(ctx);
+ ret = job->driver->prepare(job);
+ aio_context_release(ctx);
+ job_lock();
+ job->ret = ret;
+ job_update_rc_locked(job);
}
+
return job->ret;
}
-static int job_needs_finalize(Job *job)
+/* Called with job_mutex held */
+static int job_needs_finalize_locked(Job *job)
{
return !job->auto_finalize;
}
-static void job_do_finalize(Job *job)
+/* Called with job_mutex held */
+static void job_do_finalize_locked(Job *job)
{
int rc;
assert(job && job->txn);
/* prepare the transaction to complete */
- rc = job_txn_apply(job, job_prepare);
+ rc = job_txn_apply_locked(job, job_prepare_locked);
if (rc) {
- job_completed_txn_abort(job);
+ job_completed_txn_abort_locked(job);
} else {
- job_txn_apply(job, job_finalize_single);
+ job_txn_apply_locked(job, job_finalize_single_locked);
}
}
-void job_finalize(Job *job, Error **errp)
+void job_finalize_locked(Job *job, Error **errp)
{
assert(job && job->id);
- if (job_apply_verb(job, JOB_VERB_FINALIZE, errp)) {
+ if (job_apply_verb_locked(job, JOB_VERB_FINALIZE, errp)) {
return;
}
- job_do_finalize(job);
+ job_do_finalize_locked(job);
}
-static int job_transition_to_pending(Job *job)
+/* Called with job_mutex held. */
+static int job_transition_to_pending_locked(Job *job)
{
- job_state_transition(job, JOB_STATUS_PENDING);
+ job_state_transition_locked(job, JOB_STATUS_PENDING);
if (!job->auto_finalize) {
- job_event_pending(job);
+ job_event_pending_locked(job);
}
return 0;
}
void job_transition_to_ready(Job *job)
{
- job_state_transition(job, JOB_STATUS_READY);
- job_event_ready(job);
+ JOB_LOCK_GUARD();
+ job_state_transition_locked(job, JOB_STATUS_READY);
+ job_event_ready_locked(job);
}
-static void job_completed_txn_success(Job *job)
+/* Called with job_mutex held. */
+static void job_completed_txn_success_locked(Job *job)
{
JobTxn *txn = job->txn;
Job *other_job;
- job_state_transition(job, JOB_STATUS_WAITING);
+ job_state_transition_locked(job, JOB_STATUS_WAITING);
/*
* Successful completion, see if there are other running jobs in this
* txn.
*/
QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
- if (!job_is_completed(other_job)) {
+ if (!job_is_completed_locked(other_job)) {
return;
}
assert(other_job->ret == 0);
}
- job_txn_apply(job, job_transition_to_pending);
+ job_txn_apply_locked(job, job_transition_to_pending_locked);
/* If no jobs need manual finalization, automatically do so */
- if (job_txn_apply(job, job_needs_finalize) == 0) {
- job_do_finalize(job);
+ if (job_txn_apply_locked(job, job_needs_finalize_locked) == 0) {
+ job_do_finalize_locked(job);
}
}
-static void job_completed(Job *job)
+/* Called with job_mutex held. */
+static void job_completed_locked(Job *job)
{
- assert(job && job->txn && !job_is_completed(job));
+ assert(job && job->txn && !job_is_completed_locked(job));
- job_update_rc(job);
+ job_update_rc_locked(job);
trace_job_completed(job, job->ret);
if (job->ret) {
- job_completed_txn_abort(job);
+ job_completed_txn_abort_locked(job);
} else {
- job_completed_txn_success(job);
+ job_completed_txn_success_locked(job);
}
}
-/** Useful only as a type shim for aio_bh_schedule_oneshot. */
+/**
+ * Useful only as a type shim for aio_bh_schedule_oneshot.
+ * Called with job_mutex *not* held.
+ */
static void job_exit(void *opaque)
{
Job *job = (Job *)opaque;
- AioContext *ctx;
-
- job_ref(job);
- aio_context_acquire(job->aio_context);
+ JOB_LOCK_GUARD();
+ job_ref_locked(job);
/* This is a lie, we're not quiescent, but still doing the completion
* callbacks. However, completion callbacks tend to involve operations that
* drain block nodes, and if .drained_poll still returned true, we would
* deadlock. */
job->busy = false;
- job_event_idle(job);
-
- job_completed(job);
+ job_event_idle_locked(job);
- /*
- * Note that calling job_completed can move the job to a different
- * aio_context, so we cannot cache from above. job_txn_apply takes care of
- * acquiring the new lock, and we ref/unref to avoid job_completed freeing
- * the job underneath us.
- */
- ctx = job->aio_context;
- job_unref(job);
- aio_context_release(ctx);
+ job_completed_locked(job);
+ job_unref_locked(job);
}
/**
@@ -958,37 +1111,47 @@ static void job_exit(void *opaque)
static void coroutine_fn job_co_entry(void *opaque)
{
Job *job = opaque;
+ int ret;
assert(job && job->driver && job->driver->run);
- assert(job->aio_context == qemu_get_current_aio_context());
- job_pause_point(job);
- job->ret = job->driver->run(job, &job->err);
- job->deferred_to_main_loop = true;
- job->busy = true;
+ WITH_JOB_LOCK_GUARD() {
+ assert(job->aio_context == qemu_get_current_aio_context());
+ job_pause_point_locked(job);
+ }
+ ret = job->driver->run(job, &job->err);
+ WITH_JOB_LOCK_GUARD() {
+ job->ret = ret;
+ job->deferred_to_main_loop = true;
+ job->busy = true;
+ }
aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
}
void job_start(Job *job)
{
- assert(job && !job_started(job) && job->paused &&
- job->driver && job->driver->run);
- job->co = qemu_coroutine_create(job_co_entry, job);
- job->pause_count--;
- job->busy = true;
- job->paused = false;
- job_state_transition(job, JOB_STATUS_RUNNING);
+ assert(qemu_in_main_thread());
+
+ WITH_JOB_LOCK_GUARD() {
+ assert(job && !job_started_locked(job) && job->paused &&
+ job->driver && job->driver->run);
+ job->co = qemu_coroutine_create(job_co_entry, job);
+ job->pause_count--;
+ job->busy = true;
+ job->paused = false;
+ job_state_transition_locked(job, JOB_STATUS_RUNNING);
+ }
aio_co_enter(job->aio_context, job->co);
}
-void job_cancel(Job *job, bool force)
+void job_cancel_locked(Job *job, bool force)
{
if (job->status == JOB_STATUS_CONCLUDED) {
- job_do_dismiss(job);
+ job_do_dismiss_locked(job);
return;
}
- job_cancel_async(job, force);
- if (!job_started(job)) {
- job_completed(job);
+ job_cancel_async_locked(job, force);
+ if (!job_started_locked(job)) {
+ job_completed_locked(job);
} else if (job->deferred_to_main_loop) {
/*
* job_cancel_async() ignores soft-cancel requests for jobs
@@ -1000,102 +1163,117 @@ void job_cancel(Job *job, bool force)
* choose to call job_is_cancelled() to show that we invoke
* job_completed_txn_abort() only for force-cancelled jobs.)
*/
- if (job_is_cancelled(job)) {
- job_completed_txn_abort(job);
+ if (job_is_cancelled_locked(job)) {
+ job_completed_txn_abort_locked(job);
}
} else {
- job_enter(job);
+ job_enter_cond_locked(job, NULL);
}
}
-void job_user_cancel(Job *job, bool force, Error **errp)
+void job_user_cancel_locked(Job *job, bool force, Error **errp)
{
- if (job_apply_verb(job, JOB_VERB_CANCEL, errp)) {
+ if (job_apply_verb_locked(job, JOB_VERB_CANCEL, errp)) {
return;
}
- job_cancel(job, force);
+ job_cancel_locked(job, force);
}
-/* A wrapper around job_cancel() taking an Error ** parameter so it may be
- * used with job_finish_sync() without the need for (rather nasty) function
- * pointer casts there. */
-static void job_cancel_err(Job *job, Error **errp)
+/* A wrapper around job_cancel_locked() taking an Error ** parameter so it may
+ * be used with job_finish_sync_locked() without the need for (rather nasty)
+ * function pointer casts there.
+ *
+ * Called with job_mutex held.
+ */
+static void job_cancel_err_locked(Job *job, Error **errp)
{
- job_cancel(job, false);
+ job_cancel_locked(job, false);
}
/**
* Same as job_cancel_err(), but force-cancel.
+ * Called with job_mutex held.
*/
-static void job_force_cancel_err(Job *job, Error **errp)
+static void job_force_cancel_err_locked(Job *job, Error **errp)
{
- job_cancel(job, true);
+ job_cancel_locked(job, true);
}
-int job_cancel_sync(Job *job, bool force)
+int job_cancel_sync_locked(Job *job, bool force)
{
if (force) {
- return job_finish_sync(job, &job_force_cancel_err, NULL);
+ return job_finish_sync_locked(job, &job_force_cancel_err_locked, NULL);
} else {
- return job_finish_sync(job, &job_cancel_err, NULL);
+ return job_finish_sync_locked(job, &job_cancel_err_locked, NULL);
}
}
+int job_cancel_sync(Job *job, bool force)
+{
+ JOB_LOCK_GUARD();
+ return job_cancel_sync_locked(job, force);
+}
+
void job_cancel_sync_all(void)
{
Job *job;
- AioContext *aio_context;
+ JOB_LOCK_GUARD();
- while ((job = job_next(NULL))) {
- aio_context = job->aio_context;
- aio_context_acquire(aio_context);
- job_cancel_sync(job, true);
- aio_context_release(aio_context);
+ while ((job = job_next_locked(NULL))) {
+ job_cancel_sync_locked(job, true);
}
}
-int job_complete_sync(Job *job, Error **errp)
+int job_complete_sync_locked(Job *job, Error **errp)
{
- return job_finish_sync(job, job_complete, errp);
+ return job_finish_sync_locked(job, job_complete_locked, errp);
}
-void job_complete(Job *job, Error **errp)
+void job_complete_locked(Job *job, Error **errp)
{
/* Should not be reachable via external interface for internal jobs */
assert(job->id);
GLOBAL_STATE_CODE();
- if (job_apply_verb(job, JOB_VERB_COMPLETE, errp)) {
+ if (job_apply_verb_locked(job, JOB_VERB_COMPLETE, errp)) {
return;
}
- if (job_cancel_requested(job) || !job->driver->complete) {
+ if (job_cancel_requested_locked(job) || !job->driver->complete) {
error_setg(errp, "The active block job '%s' cannot be completed",
job->id);
return;
}
+ job_unlock();
job->driver->complete(job, errp);
+ job_lock();
}
-int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp)
+int job_finish_sync_locked(Job *job,
+ void (*finish)(Job *, Error **errp),
+ Error **errp)
{
Error *local_err = NULL;
int ret;
+ GLOBAL_STATE_CODE();
- job_ref(job);
+ job_ref_locked(job);
if (finish) {
finish(job, &local_err);
}
if (local_err) {
error_propagate(errp, local_err);
- job_unref(job);
+ job_unref_locked(job);
return -EBUSY;
}
- AIO_WAIT_WHILE(job->aio_context,
- (job_enter(job), !job_is_completed(job)));
+ job_unlock();
+ AIO_WAIT_WHILE_UNLOCKED(job->aio_context,
+ (job_enter(job), !job_is_completed(job)));
+ job_lock();
- ret = (job_is_cancelled(job) && job->ret == 0) ? -ECANCELED : job->ret;
- job_unref(job);
+ ret = (job_is_cancelled_locked(job) && job->ret == 0)
+ ? -ECANCELED : job->ret;
+ job_unref_locked(job);
return ret;
}
diff --git a/migration/migration.c b/migration/migration.c
index bb8bbdd..739bb68 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -574,7 +574,8 @@ static void process_incoming_migration_bh(void *opaque)
migration_incoming_state_destroy();
}
-static void process_incoming_migration_co(void *opaque)
+static void coroutine_fn
+process_incoming_migration_co(void *opaque)
{
MigrationIncomingState *mis = migration_incoming_get_current();
PostcopyState ps;
diff --git a/monitor/qmp-cmds.c b/monitor/qmp-cmds.c
index 7314cd8..81c8fda 100644
--- a/monitor/qmp-cmds.c
+++ b/monitor/qmp-cmds.c
@@ -135,8 +135,11 @@ void qmp_cont(Error **errp)
blk_iostatus_reset(blk);
}
- for (job = block_job_next(NULL); job; job = block_job_next(job)) {
- block_job_iostatus_reset(job);
+ WITH_JOB_LOCK_GUARD() {
+ for (job = block_job_next_locked(NULL); job;
+ job = block_job_next_locked(job)) {
+ block_job_iostatus_reset_locked(job);
+ }
}
/* Continuing after completed migration. Images have been inactivated to
diff --git a/qapi/block-core.json b/qapi/block-core.json
index f21fa23..882b266 100644
--- a/qapi/block-core.json
+++ b/qapi/block-core.json
@@ -1541,8 +1541,8 @@
# -> { "execute": "blockdev-add",
# "arguments": { "driver": "qcow2",
# "node-name": "node1534",
-# "data-file": { "driver": "file",
-# "filename": "hd1.qcow2" },
+# "file": { "driver": "file",
+# "filename": "hd1.qcow2" },
# "backing": null } }
#
# <- { "return": {} }
@@ -4378,7 +4378,7 @@
# "arguments": {
# "driver": "qcow2",
# "node-name": "test1",
-# "data-file": {
+# "file": {
# "driver": "file",
# "filename": "test.qcow2"
# }
@@ -4395,7 +4395,7 @@
# "cache": {
# "direct": true
# },
-# "data-file": {
+# "file": {
# "driver": "file",
# "filename": "/tmp/test.qcow2"
# },
@@ -4477,7 +4477,7 @@
# "arguments": {
# "driver": "qcow2",
# "node-name": "node0",
-# "data-file": {
+# "file": {
# "driver": "file",
# "filename": "test.qcow2"
# }
diff --git a/qemu-img.c b/qemu-img.c
index cab9776..ace3adf 100644
--- a/qemu-img.c
+++ b/qemu-img.c
@@ -911,10 +911,11 @@ static void run_block_job(BlockJob *job, Error **errp)
AioContext *aio_context = block_job_get_aio_context(job);
int ret = 0;
- aio_context_acquire(aio_context);
- job_ref(&job->job);
+ job_lock();
+ job_ref_locked(&job->job);
do {
float progress = 0.0f;
+ job_unlock();
aio_poll(aio_context, true);
progress_get_snapshot(&job->job.progress, &progress_current,
@@ -923,15 +924,17 @@ static void run_block_job(BlockJob *job, Error **errp)
progress = (float)progress_current / progress_total * 100.f;
}
qemu_progress_print(progress, 0);
- } while (!job_is_ready(&job->job) && !job_is_completed(&job->job));
+ job_lock();
+ } while (!job_is_ready_locked(&job->job) &&
+ !job_is_completed_locked(&job->job));
- if (!job_is_completed(&job->job)) {
- ret = job_complete_sync(&job->job, errp);
+ if (!job_is_completed_locked(&job->job)) {
+ ret = job_complete_sync_locked(&job->job, errp);
} else {
ret = job->job.ret;
}
- job_unref(&job->job);
- aio_context_release(aio_context);
+ job_unref_locked(&job->job);
+ job_unlock();
/* publish completion progress only when success */
if (!ret) {
diff --git a/tests/unit/test-bdrv-drain.c b/tests/unit/test-bdrv-drain.c
index 36be84a..4924ceb 100644
--- a/tests/unit/test-bdrv-drain.c
+++ b/tests/unit/test-bdrv-drain.c
@@ -930,9 +930,9 @@ static void test_blockjob_common_drain_node(enum drain_type drain_type,
tjob->prepare_ret = -EIO;
break;
}
+ aio_context_release(ctx);
job_start(&job->job);
- aio_context_release(ctx);
if (use_iothread) {
/* job_co_entry() is run in the I/O thread, wait for the actual job
@@ -943,63 +943,85 @@ static void test_blockjob_common_drain_node(enum drain_type drain_type,
}
}
- g_assert_cmpint(job->job.pause_count, ==, 0);
- g_assert_false(job->job.paused);
- g_assert_true(tjob->running);
- g_assert_true(job->job.busy); /* We're in qemu_co_sleep_ns() */
+ WITH_JOB_LOCK_GUARD() {
+ g_assert_cmpint(job->job.pause_count, ==, 0);
+ g_assert_false(job->job.paused);
+ g_assert_true(tjob->running);
+ g_assert_true(job->job.busy); /* We're in qemu_co_sleep_ns() */
+ }
do_drain_begin_unlocked(drain_type, drain_bs);
- if (drain_type == BDRV_DRAIN_ALL) {
- /* bdrv_drain_all() drains both src and target */
- g_assert_cmpint(job->job.pause_count, ==, 2);
- } else {
- g_assert_cmpint(job->job.pause_count, ==, 1);
+ WITH_JOB_LOCK_GUARD() {
+ if (drain_type == BDRV_DRAIN_ALL) {
+ /* bdrv_drain_all() drains both src and target */
+ g_assert_cmpint(job->job.pause_count, ==, 2);
+ } else {
+ g_assert_cmpint(job->job.pause_count, ==, 1);
+ }
+ g_assert_true(job->job.paused);
+ g_assert_false(job->job.busy); /* The job is paused */
}
- g_assert_true(job->job.paused);
- g_assert_false(job->job.busy); /* The job is paused */
do_drain_end_unlocked(drain_type, drain_bs);
if (use_iothread) {
- /* paused is reset in the I/O thread, wait for it */
+ /*
+ * Here we are waiting for the paused status to change,
+ * so don't bother protecting the read every time.
+ *
+ * paused is reset in the I/O thread, wait for it
+ */
while (job->job.paused) {
aio_poll(qemu_get_aio_context(), false);
}
}
- g_assert_cmpint(job->job.pause_count, ==, 0);
- g_assert_false(job->job.paused);
- g_assert_true(job->job.busy); /* We're in qemu_co_sleep_ns() */
+ WITH_JOB_LOCK_GUARD() {
+ g_assert_cmpint(job->job.pause_count, ==, 0);
+ g_assert_false(job->job.paused);
+ g_assert_true(job->job.busy); /* We're in qemu_co_sleep_ns() */
+ }
do_drain_begin_unlocked(drain_type, target);
- if (drain_type == BDRV_DRAIN_ALL) {
- /* bdrv_drain_all() drains both src and target */
- g_assert_cmpint(job->job.pause_count, ==, 2);
- } else {
- g_assert_cmpint(job->job.pause_count, ==, 1);
+ WITH_JOB_LOCK_GUARD() {
+ if (drain_type == BDRV_DRAIN_ALL) {
+ /* bdrv_drain_all() drains both src and target */
+ g_assert_cmpint(job->job.pause_count, ==, 2);
+ } else {
+ g_assert_cmpint(job->job.pause_count, ==, 1);
+ }
+ g_assert_true(job->job.paused);
+ g_assert_false(job->job.busy); /* The job is paused */
}
- g_assert_true(job->job.paused);
- g_assert_false(job->job.busy); /* The job is paused */
do_drain_end_unlocked(drain_type, target);
if (use_iothread) {
- /* paused is reset in the I/O thread, wait for it */
+ /*
+ * Here we are waiting for the paused status to change,
+ * so don't bother protecting the read every time.
+ *
+ * paused is reset in the I/O thread, wait for it
+ */
while (job->job.paused) {
aio_poll(qemu_get_aio_context(), false);
}
}
- g_assert_cmpint(job->job.pause_count, ==, 0);
- g_assert_false(job->job.paused);
- g_assert_true(job->job.busy); /* We're in qemu_co_sleep_ns() */
+ WITH_JOB_LOCK_GUARD() {
+ g_assert_cmpint(job->job.pause_count, ==, 0);
+ g_assert_false(job->job.paused);
+ g_assert_true(job->job.busy); /* We're in qemu_co_sleep_ns() */
+ }
- aio_context_acquire(ctx);
- ret = job_complete_sync(&job->job, &error_abort);
+ WITH_JOB_LOCK_GUARD() {
+ ret = job_complete_sync_locked(&job->job, &error_abort);
+ }
g_assert_cmpint(ret, ==, (result == TEST_JOB_SUCCESS ? 0 : -EIO));
+ aio_context_acquire(ctx);
if (use_iothread) {
blk_set_aio_context(blk_src, qemu_get_aio_context(), &error_abort);
assert(blk_get_aio_context(blk_target) == qemu_get_aio_context());
diff --git a/tests/unit/test-block-iothread.c b/tests/unit/test-block-iothread.c
index 8b55ecc..def0709 100644
--- a/tests/unit/test-block-iothread.c
+++ b/tests/unit/test-block-iothread.c
@@ -582,8 +582,10 @@ static void test_attach_blockjob(void)
aio_poll(qemu_get_aio_context(), false);
}
+ WITH_JOB_LOCK_GUARD() {
+ job_complete_sync_locked(&tjob->common.job, &error_abort);
+ }
aio_context_acquire(ctx);
- job_complete_sync(&tjob->common.job, &error_abort);
blk_set_aio_context(blk, qemu_get_aio_context(), &error_abort);
aio_context_release(ctx);
@@ -757,7 +759,9 @@ static void test_propagate_mirror(void)
BLOCKDEV_ON_ERROR_REPORT, BLOCKDEV_ON_ERROR_REPORT,
false, "filter_node", MIRROR_COPY_MODE_BACKGROUND,
&error_abort);
- job = job_get("job0");
+ WITH_JOB_LOCK_GUARD() {
+ job = job_get_locked("job0");
+ }
filter = bdrv_find_node("filter_node");
/* Change the AioContext of src */
diff --git a/tests/unit/test-blockjob-txn.c b/tests/unit/test-blockjob-txn.c
index c69028b..d3b0bb2 100644
--- a/tests/unit/test-blockjob-txn.c
+++ b/tests/unit/test-blockjob-txn.c
@@ -116,8 +116,10 @@ static void test_single_job(int expected)
job = test_block_job_start(1, true, expected, &result, txn);
job_start(&job->job);
- if (expected == -ECANCELED) {
- job_cancel(&job->job, false);
+ WITH_JOB_LOCK_GUARD() {
+ if (expected == -ECANCELED) {
+ job_cancel_locked(&job->job, false);
+ }
}
while (result == -EINPROGRESS) {
@@ -160,13 +162,15 @@ static void test_pair_jobs(int expected1, int expected2)
/* Release our reference now to trigger as many nice
* use-after-free bugs as possible.
*/
- job_txn_unref(txn);
+ WITH_JOB_LOCK_GUARD() {
+ job_txn_unref_locked(txn);
- if (expected1 == -ECANCELED) {
- job_cancel(&job1->job, false);
- }
- if (expected2 == -ECANCELED) {
- job_cancel(&job2->job, false);
+ if (expected1 == -ECANCELED) {
+ job_cancel_locked(&job1->job, false);
+ }
+ if (expected2 == -ECANCELED) {
+ job_cancel_locked(&job2->job, false);
+ }
}
while (result1 == -EINPROGRESS || result2 == -EINPROGRESS) {
@@ -219,7 +223,9 @@ static void test_pair_jobs_fail_cancel_race(void)
job_start(&job1->job);
job_start(&job2->job);
- job_cancel(&job1->job, false);
+ WITH_JOB_LOCK_GUARD() {
+ job_cancel_locked(&job1->job, false);
+ }
/* Now make job2 finish before the main loop kicks jobs. This simulates
* the race between a pending kick and another job completing.
diff --git a/tests/unit/test-blockjob.c b/tests/unit/test-blockjob.c
index 4c9e1bf..c0426bd 100644
--- a/tests/unit/test-blockjob.c
+++ b/tests/unit/test-blockjob.c
@@ -211,8 +211,11 @@ static CancelJob *create_common(Job **pjob)
bjob = mk_job(blk, "Steve", &test_cancel_driver, true,
JOB_MANUAL_FINALIZE | JOB_MANUAL_DISMISS);
job = &bjob->job;
- job_ref(job);
- assert(job->status == JOB_STATUS_CREATED);
+ WITH_JOB_LOCK_GUARD() {
+ job_ref_locked(job);
+ assert(job->status == JOB_STATUS_CREATED);
+ }
+
s = container_of(bjob, CancelJob, common);
s->blk = blk;
@@ -225,21 +228,22 @@ static void cancel_common(CancelJob *s)
BlockJob *job = &s->common;
BlockBackend *blk = s->blk;
JobStatus sts = job->job.status;
- AioContext *ctx;
-
- ctx = job->job.aio_context;
- aio_context_acquire(ctx);
+ AioContext *ctx = job->job.aio_context;
job_cancel_sync(&job->job, true);
- if (sts != JOB_STATUS_CREATED && sts != JOB_STATUS_CONCLUDED) {
- Job *dummy = &job->job;
- job_dismiss(&dummy, &error_abort);
+ WITH_JOB_LOCK_GUARD() {
+ if (sts != JOB_STATUS_CREATED && sts != JOB_STATUS_CONCLUDED) {
+ Job *dummy = &job->job;
+ job_dismiss_locked(&dummy, &error_abort);
+ }
+ assert(job->job.status == JOB_STATUS_NULL);
+ job_unref_locked(&job->job);
}
- assert(job->job.status == JOB_STATUS_NULL);
- job_unref(&job->job);
- destroy_blk(blk);
+ aio_context_acquire(ctx);
+ destroy_blk(blk);
aio_context_release(ctx);
+
}
static void test_cancel_created(void)
@@ -251,6 +255,13 @@ static void test_cancel_created(void)
cancel_common(s);
}
+static void assert_job_status_is(Job *job, int status)
+{
+ WITH_JOB_LOCK_GUARD() {
+ assert(job->status == status);
+ }
+}
+
static void test_cancel_running(void)
{
Job *job;
@@ -259,7 +270,7 @@ static void test_cancel_running(void)
s = create_common(&job);
job_start(job);
- assert(job->status == JOB_STATUS_RUNNING);
+ assert_job_status_is(job, JOB_STATUS_RUNNING);
cancel_common(s);
}
@@ -272,11 +283,12 @@ static void test_cancel_paused(void)
s = create_common(&job);
job_start(job);
- assert(job->status == JOB_STATUS_RUNNING);
-
- job_user_pause(job, &error_abort);
+ WITH_JOB_LOCK_GUARD() {
+ assert(job->status == JOB_STATUS_RUNNING);
+ job_user_pause_locked(job, &error_abort);
+ }
job_enter(job);
- assert(job->status == JOB_STATUS_PAUSED);
+ assert_job_status_is(job, JOB_STATUS_PAUSED);
cancel_common(s);
}
@@ -289,11 +301,11 @@ static void test_cancel_ready(void)
s = create_common(&job);
job_start(job);
- assert(job->status == JOB_STATUS_RUNNING);
+ assert_job_status_is(job, JOB_STATUS_RUNNING);
s->should_converge = true;
job_enter(job);
- assert(job->status == JOB_STATUS_READY);
+ assert_job_status_is(job, JOB_STATUS_READY);
cancel_common(s);
}
@@ -306,15 +318,16 @@ static void test_cancel_standby(void)
s = create_common(&job);
job_start(job);
- assert(job->status == JOB_STATUS_RUNNING);
+ assert_job_status_is(job, JOB_STATUS_RUNNING);
s->should_converge = true;
job_enter(job);
- assert(job->status == JOB_STATUS_READY);
-
- job_user_pause(job, &error_abort);
+ WITH_JOB_LOCK_GUARD() {
+ assert(job->status == JOB_STATUS_READY);
+ job_user_pause_locked(job, &error_abort);
+ }
job_enter(job);
- assert(job->status == JOB_STATUS_STANDBY);
+ assert_job_status_is(job, JOB_STATUS_STANDBY);
cancel_common(s);
}
@@ -327,20 +340,21 @@ static void test_cancel_pending(void)
s = create_common(&job);
job_start(job);
- assert(job->status == JOB_STATUS_RUNNING);
+ assert_job_status_is(job, JOB_STATUS_RUNNING);
s->should_converge = true;
job_enter(job);
- assert(job->status == JOB_STATUS_READY);
-
- job_complete(job, &error_abort);
+ WITH_JOB_LOCK_GUARD() {
+ assert(job->status == JOB_STATUS_READY);
+ job_complete_locked(job, &error_abort);
+ }
job_enter(job);
while (!job->deferred_to_main_loop) {
aio_poll(qemu_get_aio_context(), true);
}
- assert(job->status == JOB_STATUS_READY);
+ assert_job_status_is(job, JOB_STATUS_READY);
aio_poll(qemu_get_aio_context(), true);
- assert(job->status == JOB_STATUS_PENDING);
+ assert_job_status_is(job, JOB_STATUS_PENDING);
cancel_common(s);
}
@@ -353,25 +367,26 @@ static void test_cancel_concluded(void)
s = create_common(&job);
job_start(job);
- assert(job->status == JOB_STATUS_RUNNING);
+ assert_job_status_is(job, JOB_STATUS_RUNNING);
s->should_converge = true;
job_enter(job);
- assert(job->status == JOB_STATUS_READY);
-
- job_complete(job, &error_abort);
+ WITH_JOB_LOCK_GUARD() {
+ assert(job->status == JOB_STATUS_READY);
+ job_complete_locked(job, &error_abort);
+ }
job_enter(job);
while (!job->deferred_to_main_loop) {
aio_poll(qemu_get_aio_context(), true);
}
- assert(job->status == JOB_STATUS_READY);
+ assert_job_status_is(job, JOB_STATUS_READY);
aio_poll(qemu_get_aio_context(), true);
- assert(job->status == JOB_STATUS_PENDING);
+ assert_job_status_is(job, JOB_STATUS_PENDING);
- aio_context_acquire(job->aio_context);
- job_finalize(job, &error_abort);
- aio_context_release(job->aio_context);
- assert(job->status == JOB_STATUS_CONCLUDED);
+ WITH_JOB_LOCK_GUARD() {
+ job_finalize_locked(job, &error_abort);
+ assert(job->status == JOB_STATUS_CONCLUDED);
+ }
cancel_common(s);
}
@@ -417,7 +432,7 @@ static const BlockJobDriver test_yielding_driver = {
};
/*
- * Test that job_complete() works even on jobs that are in a paused
+ * Test that job_complete_locked() works even on jobs that are in a paused
* state (i.e., STANDBY).
*
* To do this, run YieldingJob in an IO thread, get it into the READY
@@ -425,7 +440,7 @@ static const BlockJobDriver test_yielding_driver = {
* acquire the context so the job will not be entered and will thus
* remain on STANDBY.
*
- * job_complete() should still work without error.
+ * job_complete_locked() should still work without error.
*
* Note that on the QMP interface, it is impossible to lock an IO
* thread before a drained section ends. In practice, the
@@ -459,37 +474,44 @@ static void test_complete_in_standby(void)
bjob = mk_job(blk, "job", &test_yielding_driver, true,
JOB_MANUAL_FINALIZE | JOB_MANUAL_DISMISS);
job = &bjob->job;
- assert(job->status == JOB_STATUS_CREATED);
+ assert_job_status_is(job, JOB_STATUS_CREATED);
/* Wait for the job to become READY */
job_start(job);
- aio_context_acquire(ctx);
- AIO_WAIT_WHILE(ctx, job->status != JOB_STATUS_READY);
- aio_context_release(ctx);
+ /*
+ * Here we are waiting for the status to change, so don't bother
+ * protecting the read every time.
+ */
+ AIO_WAIT_WHILE_UNLOCKED(ctx, job->status != JOB_STATUS_READY);
/* Begin the drained section, pausing the job */
bdrv_drain_all_begin();
- assert(job->status == JOB_STATUS_STANDBY);
+ assert_job_status_is(job, JOB_STATUS_STANDBY);
+
/* Lock the IO thread to prevent the job from being run */
aio_context_acquire(ctx);
/* This will schedule the job to resume it */
bdrv_drain_all_end();
+ aio_context_release(ctx);
- /* But the job cannot run, so it will remain on standby */
- assert(job->status == JOB_STATUS_STANDBY);
+ WITH_JOB_LOCK_GUARD() {
+ /* But the job cannot run, so it will remain on standby */
+ assert(job->status == JOB_STATUS_STANDBY);
- /* Even though the job is on standby, this should work */
- job_complete(job, &error_abort);
+ /* Even though the job is on standby, this should work */
+ job_complete_locked(job, &error_abort);
- /* The test is done now, clean up. */
- job_finish_sync(job, NULL, &error_abort);
- assert(job->status == JOB_STATUS_PENDING);
+ /* The test is done now, clean up. */
+ job_finish_sync_locked(job, NULL, &error_abort);
+ assert(job->status == JOB_STATUS_PENDING);
- job_finalize(job, &error_abort);
- assert(job->status == JOB_STATUS_CONCLUDED);
+ job_finalize_locked(job, &error_abort);
+ assert(job->status == JOB_STATUS_CONCLUDED);
- job_dismiss(&job, &error_abort);
+ job_dismiss_locked(&job, &error_abort);
+ }
+ aio_context_acquire(ctx);
destroy_blk(blk);
aio_context_release(ctx);
iothread_join(iothread);
diff --git a/tests/unit/test-coroutine.c b/tests/unit/test-coroutine.c
index aa77a3b..e16b80c 100644
--- a/tests/unit/test-coroutine.c
+++ b/tests/unit/test-coroutine.c
@@ -610,7 +610,7 @@ static void perf_baseline(void)
g_test_message("Function call %u iterations: %f s", maxcycles, duration);
}
-static __attribute__((noinline)) void perf_cost_func(void *opaque)
+static __attribute__((noinline)) void coroutine_fn perf_cost_func(void *opaque)
{
qemu_coroutine_yield();
}
diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
index 9ad24ab..15c82d9 100644
--- a/util/qemu-coroutine-lock.c
+++ b/util/qemu-coroutine-lock.c
@@ -135,7 +135,7 @@ typedef struct CoWaitRecord {
QSLIST_ENTRY(CoWaitRecord) next;
} CoWaitRecord;
-static void push_waiter(CoMutex *mutex, CoWaitRecord *w)
+static void coroutine_fn push_waiter(CoMutex *mutex, CoWaitRecord *w)
{
w->co = qemu_coroutine_self();
QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next);
@@ -332,7 +332,7 @@ void qemu_co_rwlock_init(CoRwlock *lock)
}
/* Releases the internal CoMutex. */
-static void qemu_co_rwlock_maybe_wake_one(CoRwlock *lock)
+static void coroutine_fn qemu_co_rwlock_maybe_wake_one(CoRwlock *lock)
{
CoRwTicket *tkt = QSIMPLEQ_FIRST(&lock->tickets);
Coroutine *co = NULL;
@@ -365,7 +365,7 @@ static void qemu_co_rwlock_maybe_wake_one(CoRwlock *lock)
}
}
-void qemu_co_rwlock_rdlock(CoRwlock *lock)
+void coroutine_fn qemu_co_rwlock_rdlock(CoRwlock *lock)
{
Coroutine *self = qemu_coroutine_self();
@@ -390,7 +390,7 @@ void qemu_co_rwlock_rdlock(CoRwlock *lock)
self->locks_held++;
}
-void qemu_co_rwlock_unlock(CoRwlock *lock)
+void coroutine_fn qemu_co_rwlock_unlock(CoRwlock *lock)
{
Coroutine *self = qemu_coroutine_self();
@@ -408,7 +408,7 @@ void qemu_co_rwlock_unlock(CoRwlock *lock)
qemu_co_rwlock_maybe_wake_one(lock);
}
-void qemu_co_rwlock_downgrade(CoRwlock *lock)
+void coroutine_fn qemu_co_rwlock_downgrade(CoRwlock *lock)
{
qemu_co_mutex_lock(&lock->mutex);
assert(lock->owners == -1);
@@ -418,7 +418,7 @@ void qemu_co_rwlock_downgrade(CoRwlock *lock)
qemu_co_rwlock_maybe_wake_one(lock);
}
-void qemu_co_rwlock_wrlock(CoRwlock *lock)
+void coroutine_fn qemu_co_rwlock_wrlock(CoRwlock *lock)
{
Coroutine *self = qemu_coroutine_self();
@@ -438,7 +438,7 @@ void qemu_co_rwlock_wrlock(CoRwlock *lock)
self->locks_held++;
}
-void qemu_co_rwlock_upgrade(CoRwlock *lock)
+void coroutine_fn qemu_co_rwlock_upgrade(CoRwlock *lock)
{
qemu_co_mutex_lock(&lock->mutex);
assert(lock->owners > 0);
diff --git a/util/qemu-coroutine.c b/util/qemu-coroutine.c
index 4a8bd63..356b746 100644
--- a/util/qemu-coroutine.c
+++ b/util/qemu-coroutine.c
@@ -213,7 +213,7 @@ bool qemu_coroutine_entered(Coroutine *co)
return co->caller;
}
-AioContext *coroutine_fn qemu_coroutine_get_aio_context(Coroutine *co)
+AioContext *qemu_coroutine_get_aio_context(Coroutine *co)
{
return co->ctx;
}