diff options
author | Anthony Liguori <aliguori@us.ibm.com> | 2013-03-15 10:47:21 -0500 |
---|---|---|
committer | Anthony Liguori <aliguori@us.ibm.com> | 2013-03-15 10:47:21 -0500 |
commit | dc0b0616f726956001be09e9a65a6e0b0bd939db (patch) | |
tree | 9b5e0b67bafa20803a8a70b1e04dbbf1cf762a84 | |
parent | d4d7682484f339d70355b165a15f8f5e83638e40 (diff) | |
parent | 3618a094022e984d4e045c6db21aed961b7c6fc9 (diff) | |
download | qemu-dc0b0616f726956001be09e9a65a6e0b0bd939db.zip qemu-dc0b0616f726956001be09e9a65a6e0b0bd939db.tar.gz qemu-dc0b0616f726956001be09e9a65a6e0b0bd939db.tar.bz2 |
Merge remote-tracking branch 'stefanha/block' into staging
# By Stefan Hajnoczi (14) and others
# Via Stefan Hajnoczi
* stefanha/block: (28 commits)
blockdev: Fix up copyright and permission notice
qemu-iotests: use -nographic in test case 007
qemu-iotests: add tests for rebasing zero clusters
dataplane: fix hang introduced by AioContext transition
coroutine: use AioContext for CoQueue BH
threadpool: drop global thread pool
block: add bdrv_get_aio_context()
aio: add a ThreadPool instance to AioContext
threadpool: add thread_pool_new() and thread_pool_free()
threadpool: move globals into struct ThreadPool
main-loop: add qemu_get_aio_context()
sheepdog: set io_flush handler in do_co_req
sheepdog: use non-blocking fd in coroutine context
qcow2: make is_allocated return true for zero clusters
qcow2: drop unnecessary flush in qcow2_update_snapshot_refcount()
qcow2: drop flush in update_cluster_refcount()
qcow2: flush in qcow2_update_snapshot_refcount()
qcow2: set L2 cache dependency in qcow2_alloc_bytes()
qcow2: flush refcount cache correctly in qcow2_write_snapshots()
qcow2: flush refcount cache correctly in alloc_refcount_block()
...
49 files changed, 702 insertions, 233 deletions
@@ -24,6 +24,7 @@ #include "qemu-common.h" #include "block/aio.h" +#include "block/thread-pool.h" #include "qemu/main-loop.h" /***********************************************************/ @@ -172,6 +173,7 @@ aio_ctx_finalize(GSource *source) { AioContext *ctx = (AioContext *) source; + thread_pool_free(ctx->thread_pool); aio_set_event_notifier(ctx, &ctx->notifier, NULL, NULL); event_notifier_cleanup(&ctx->notifier); g_array_free(ctx->pollfds, TRUE); @@ -190,6 +192,14 @@ GSource *aio_get_g_source(AioContext *ctx) return &ctx->source; } +ThreadPool *aio_get_thread_pool(AioContext *ctx) +{ + if (!ctx->thread_pool) { + ctx->thread_pool = thread_pool_new(ctx); + } + return ctx->thread_pool; +} + void aio_notify(AioContext *ctx) { event_notifier_set(&ctx->notifier); @@ -200,6 +210,7 @@ AioContext *aio_context_new(void) AioContext *ctx; ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext)); ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD)); + ctx->thread_pool = NULL; event_notifier_init(&ctx->notifier, false); aio_set_event_notifier(ctx, &ctx->notifier, (EventNotifierHandler *) @@ -665,15 +665,18 @@ static int bdrv_open_flags(BlockDriverState *bs, int flags) /* * Common part for opening disk images and files + * + * Removes all processed options from *options. */ static int bdrv_open_common(BlockDriverState *bs, BlockDriverState *file, - const char *filename, + const char *filename, QDict *options, int flags, BlockDriver *drv) { int ret, open_flags; assert(drv != NULL); assert(bs->file == NULL); + assert(options == NULL || bs->options != options); trace_bdrv_open_common(bs, filename, flags, drv->format_name); @@ -710,7 +713,7 @@ static int bdrv_open_common(BlockDriverState *bs, BlockDriverState *file, } else { assert(file != NULL); bs->file = file; - ret = drv->bdrv_open(bs, open_flags); + ret = drv->bdrv_open(bs, options, open_flags); } if (ret < 0) { @@ -752,7 +755,7 @@ int bdrv_file_open(BlockDriverState **pbs, const char *filename, int flags) } bs = bdrv_new(""); - ret = bdrv_open_common(bs, NULL, filename, flags, drv); + ret = bdrv_open_common(bs, NULL, filename, NULL, flags, drv); if (ret < 0) { bdrv_delete(bs); return ret; @@ -788,7 +791,8 @@ int bdrv_open_backing_file(BlockDriverState *bs) /* backing files always opened read-only */ back_flags = bs->open_flags & ~(BDRV_O_RDWR | BDRV_O_SNAPSHOT); - ret = bdrv_open(bs->backing_hd, backing_filename, back_flags, back_drv); + ret = bdrv_open(bs->backing_hd, backing_filename, NULL, + back_flags, back_drv); if (ret < 0) { bdrv_delete(bs->backing_hd); bs->backing_hd = NULL; @@ -800,15 +804,29 @@ int bdrv_open_backing_file(BlockDriverState *bs) /* * Opens a disk image (raw, qcow2, vmdk, ...) + * + * options is a QDict of options to pass to the block drivers, or NULL for an + * empty set of options. The reference to the QDict belongs to the block layer + * after the call (even on failure), so if the caller intends to reuse the + * dictionary, it needs to use QINCREF() before calling bdrv_open. */ -int bdrv_open(BlockDriverState *bs, const char *filename, int flags, - BlockDriver *drv) +int bdrv_open(BlockDriverState *bs, const char *filename, QDict *options, + int flags, BlockDriver *drv) { int ret; /* TODO: extra byte is a hack to ensure MAX_PATH space on Windows. */ char tmp_filename[PATH_MAX + 1]; BlockDriverState *file = NULL; + /* NULL means an empty set of options */ + if (options == NULL) { + options = qdict_new(); + } + + bs->options = options; + options = qdict_clone_shallow(options); + + /* For snapshot=on, create a temporary qcow2 overlay */ if (flags & BDRV_O_SNAPSHOT) { BlockDriverState *bs1; int64_t total_size; @@ -822,10 +840,10 @@ int bdrv_open(BlockDriverState *bs, const char *filename, int flags, /* if there is a backing file, use it */ bs1 = bdrv_new(""); - ret = bdrv_open(bs1, filename, 0, drv); + ret = bdrv_open(bs1, filename, NULL, 0, drv); if (ret < 0) { bdrv_delete(bs1); - return ret; + goto fail; } total_size = bdrv_getlength(bs1) & BDRV_SECTOR_MASK; @@ -836,15 +854,17 @@ int bdrv_open(BlockDriverState *bs, const char *filename, int flags, ret = get_tmp_filename(tmp_filename, sizeof(tmp_filename)); if (ret < 0) { - return ret; + goto fail; } /* Real path is meaningless for protocols */ - if (is_protocol) + if (is_protocol) { snprintf(backing_filename, sizeof(backing_filename), "%s", filename); - else if (!realpath(filename, backing_filename)) - return -errno; + } else if (!realpath(filename, backing_filename)) { + ret = -errno; + goto fail; + } bdrv_qcow2 = bdrv_find_format("qcow2"); options = parse_option_parameters("", bdrv_qcow2->create_options, NULL); @@ -859,7 +879,7 @@ int bdrv_open(BlockDriverState *bs, const char *filename, int flags, ret = bdrv_create(bdrv_qcow2, tmp_filename, options); free_option_parameters(options); if (ret < 0) { - return ret; + goto fail; } filename = tmp_filename; @@ -874,7 +894,7 @@ int bdrv_open(BlockDriverState *bs, const char *filename, int flags, ret = bdrv_file_open(&file, filename, bdrv_open_flags(bs, flags)); if (ret < 0) { - return ret; + goto fail; } /* Find the right image format driver */ @@ -887,7 +907,7 @@ int bdrv_open(BlockDriverState *bs, const char *filename, int flags, } /* Open the image */ - ret = bdrv_open_common(bs, file, filename, flags, drv); + ret = bdrv_open_common(bs, file, filename, options, flags, drv); if (ret < 0) { goto unlink_and_fail; } @@ -901,11 +921,22 @@ int bdrv_open(BlockDriverState *bs, const char *filename, int flags, if ((flags & BDRV_O_NO_BACKING) == 0) { ret = bdrv_open_backing_file(bs); if (ret < 0) { - bdrv_close(bs); - return ret; + goto close_and_fail; } } + /* Check if any unknown options were used */ + if (qdict_size(options) != 0) { + const QDictEntry *entry = qdict_first(options); + qerror_report(ERROR_CLASS_GENERIC_ERROR, "Block format '%s' used by " + "device '%s' doesn't support the option '%s'", + drv->format_name, bs->device_name, entry->key); + + ret = -EINVAL; + goto close_and_fail; + } + QDECREF(options); + if (!bdrv_key_required(bs)) { bdrv_dev_change_media_cb(bs, true); } @@ -924,6 +955,15 @@ unlink_and_fail: if (bs->is_temporary) { unlink(filename); } +fail: + QDECREF(bs->options); + QDECREF(options); + bs->options = NULL; + return ret; + +close_and_fail: + bdrv_close(bs); + QDECREF(options); return ret; } @@ -1193,6 +1233,8 @@ void bdrv_close(BlockDriverState *bs) bs->valid_key = 0; bs->sg = 0; bs->growable = 0; + QDECREF(bs->options); + bs->options = NULL; if (bs->file != NULL) { bdrv_delete(bs->file); @@ -3190,7 +3232,7 @@ int bdrv_snapshot_goto(BlockDriverState *bs, if (bs->file) { drv->bdrv_close(bs); ret = bdrv_snapshot_goto(bs->file, snapshot_id); - open_ret = drv->bdrv_open(bs, bs->open_flags); + open_ret = drv->bdrv_open(bs, NULL, bs->open_flags); if (open_ret < 0) { bdrv_delete(bs->file); bs->drv = NULL; @@ -4594,7 +4636,8 @@ void bdrv_img_create(const char *filename, const char *fmt, bs = bdrv_new(""); - ret = bdrv_open(bs, backing_file->value.s, back_flags, backing_drv); + ret = bdrv_open(bs, backing_file->value.s, NULL, back_flags, + backing_drv); if (ret < 0) { error_setg_errno(errp, -ret, "Could not open '%s'", backing_file->value.s); @@ -4638,3 +4681,9 @@ out: bdrv_delete(bs); } } + +AioContext *bdrv_get_aio_context(BlockDriverState *bs) +{ + /* Currently BlockDriverState always uses the main loop AioContext */ + return qemu_get_aio_context(); +} diff --git a/block/blkverify.c b/block/blkverify.c index a7dd459..2086d97 100644 --- a/block/blkverify.c +++ b/block/blkverify.c @@ -98,7 +98,7 @@ static int blkverify_open(BlockDriverState *bs, const char *filename, int flags) /* Open the test file */ s->test_file = bdrv_new(""); - ret = bdrv_open(s->test_file, filename, flags, NULL); + ret = bdrv_open(s->test_file, filename, NULL, flags, NULL); if (ret < 0) { bdrv_delete(s->test_file); s->test_file = NULL; diff --git a/block/bochs.c b/block/bochs.c index a6eb33d..d7078c0 100644 --- a/block/bochs.c +++ b/block/bochs.c @@ -108,7 +108,7 @@ static int bochs_probe(const uint8_t *buf, int buf_size, const char *filename) return 0; } -static int bochs_open(BlockDriverState *bs, int flags) +static int bochs_open(BlockDriverState *bs, QDict *options, int flags) { BDRVBochsState *s = bs->opaque; int i; diff --git a/block/cloop.c b/block/cloop.c index 8fe13e9..6ea7cf4 100644 --- a/block/cloop.c +++ b/block/cloop.c @@ -53,7 +53,7 @@ static int cloop_probe(const uint8_t *buf, int buf_size, const char *filename) return 0; } -static int cloop_open(BlockDriverState *bs, int flags) +static int cloop_open(BlockDriverState *bs, QDict *options, int flags) { BDRVCloopState *s = bs->opaque; uint32_t offsets_size, max_compressed_block_size = 1, i; diff --git a/block/cow.c b/block/cow.c index 4baf904..d73e08c 100644 --- a/block/cow.c +++ b/block/cow.c @@ -58,7 +58,7 @@ static int cow_probe(const uint8_t *buf, int buf_size, const char *filename) return 0; } -static int cow_open(BlockDriverState *bs, int flags) +static int cow_open(BlockDriverState *bs, QDict *options, int flags) { BDRVCowState *s = bs->opaque; struct cow_header_v2 cow_header; diff --git a/block/dmg.c b/block/dmg.c index 6d85801..c1066df 100644 --- a/block/dmg.c +++ b/block/dmg.c @@ -85,7 +85,7 @@ static int read_uint32(BlockDriverState *bs, int64_t offset, uint32_t *result) return 0; } -static int dmg_open(BlockDriverState *bs, int flags) +static int dmg_open(BlockDriverState *bs, QDict *options, int flags) { BDRVDMGState *s = bs->opaque; uint64_t info_begin,info_end,last_in_offset,last_out_offset; diff --git a/block/parallels.c b/block/parallels.c index 8688f6c..18b3ac0 100644 --- a/block/parallels.c +++ b/block/parallels.c @@ -68,7 +68,7 @@ static int parallels_probe(const uint8_t *buf, int buf_size, const char *filenam return 0; } -static int parallels_open(BlockDriverState *bs, int flags) +static int parallels_open(BlockDriverState *bs, QDict *options, int flags) { BDRVParallelsState *s = bs->opaque; int i; diff --git a/block/qcow.c b/block/qcow.c index a7135ee..f6750a5 100644 --- a/block/qcow.c +++ b/block/qcow.c @@ -92,7 +92,7 @@ static int qcow_probe(const uint8_t *buf, int buf_size, const char *filename) return 0; } -static int qcow_open(BlockDriverState *bs, int flags) +static int qcow_open(BlockDriverState *bs, QDict *options, int flags) { BDRVQcowState *s = bs->opaque; int len, i, shift, ret; diff --git a/block/qcow2-cluster.c b/block/qcow2-cluster.c index 56fccf9..d72d063 100644 --- a/block/qcow2-cluster.c +++ b/block/qcow2-cluster.c @@ -454,6 +454,9 @@ int qcow2_get_cluster_offset(BlockDriverState *bs, uint64_t offset, *cluster_offset &= L2E_COMPRESSED_OFFSET_SIZE_MASK; break; case QCOW2_CLUSTER_ZERO: + if (s->qcow_version < 3) { + return -EIO; + } c = count_contiguous_clusters(nb_clusters, s->cluster_size, &l2_table[l2_index], 0, QCOW_OFLAG_COMPRESSED | QCOW_OFLAG_ZERO); @@ -668,7 +671,7 @@ int qcow2_alloc_cluster_link_l2(BlockDriverState *bs, QCowL2Meta *m) } /* Update L2 table. */ - if (s->compatible_features & QCOW2_COMPAT_LAZY_REFCOUNTS) { + if (s->use_lazy_refcounts) { qcow2_mark_dirty(bs); } if (qcow2_need_accurate_refcounts(s)) { diff --git a/block/qcow2-refcount.c b/block/qcow2-refcount.c index 55543ed..9bfb390 100644 --- a/block/qcow2-refcount.c +++ b/block/qcow2-refcount.c @@ -201,7 +201,10 @@ static int alloc_refcount_block(BlockDriverState *bs, *refcount_block = NULL; /* We write to the refcount table, so we might depend on L2 tables */ - qcow2_cache_flush(bs, s->l2_table_cache); + ret = qcow2_cache_flush(bs, s->l2_table_cache); + if (ret < 0) { + return ret; + } /* Allocate the refcount block itself and mark it as used */ int64_t new_block = alloc_clusters_noref(bs, s->cluster_size); @@ -237,7 +240,10 @@ static int alloc_refcount_block(BlockDriverState *bs, goto fail_block; } - bdrv_flush(bs->file); + ret = qcow2_cache_flush(bs, s->refcount_block_cache); + if (ret < 0) { + goto fail_block; + } /* Initialize the new refcount block only after updating its refcount, * update_refcount uses the refcount cache itself */ @@ -526,8 +532,6 @@ static int update_cluster_refcount(BlockDriverState *bs, return ret; } - bdrv_flush(bs->file); - return get_refcount(bs, cluster_index); } @@ -663,7 +667,11 @@ int64_t qcow2_alloc_bytes(BlockDriverState *bs, int size) } } - bdrv_flush(bs->file); + /* The cluster refcount was incremented, either by qcow2_alloc_clusters() + * or explicitly by update_cluster_refcount(). Refcount blocks must be + * flushed before the caller's L2 table updates. + */ + qcow2_cache_set_dependency(bs, s->l2_table_cache, s->refcount_block_cache); return offset; } @@ -782,10 +790,6 @@ int qcow2_update_snapshot_refcount(BlockDriverState *bs, if (ret < 0) { goto fail; } - - /* TODO Flushing once for the whole function should - * be enough */ - bdrv_flush(bs->file); } /* compressed clusters are never modified */ refcount = 2; @@ -841,7 +845,7 @@ int qcow2_update_snapshot_refcount(BlockDriverState *bs, } } - ret = 0; + ret = bdrv_flush(bs); fail: if (l2_table) { qcow2_cache_put(bs, s->l2_table_cache, (void**) &l2_table); diff --git a/block/qcow2-snapshot.c b/block/qcow2-snapshot.c index eb8fcd5..992a5c8 100644 --- a/block/qcow2-snapshot.c +++ b/block/qcow2-snapshot.c @@ -180,11 +180,14 @@ static int qcow2_write_snapshots(BlockDriverState *bs) /* Allocate space for the new snapshot list */ snapshots_offset = qcow2_alloc_clusters(bs, snapshots_size); - bdrv_flush(bs->file); offset = snapshots_offset; if (offset < 0) { return offset; } + ret = bdrv_flush(bs); + if (ret < 0) { + return ret; + } /* Write all snapshots to the new list */ for(i = 0; i < s->nb_snapshots; i++) { @@ -378,11 +381,6 @@ int qcow2_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info) goto fail; } - ret = bdrv_flush(bs); - if (ret < 0) { - goto fail; - } - /* Append the new snapshot to the snapshot list */ new_snapshot_list = g_malloc((s->nb_snapshots + 1) * sizeof(QCowSnapshot)); if (s->snapshots) { diff --git a/block/qcow2.c b/block/qcow2.c index 7610e56..1f99866 100644 --- a/block/qcow2.c +++ b/block/qcow2.c @@ -285,11 +285,26 @@ static int qcow2_check(BlockDriverState *bs, BdrvCheckResult *result, return ret; } -static int qcow2_open(BlockDriverState *bs, int flags) +static QemuOptsList qcow2_runtime_opts = { + .name = "qcow2", + .head = QTAILQ_HEAD_INITIALIZER(qcow2_runtime_opts.head), + .desc = { + { + .name = "lazy_refcounts", + .type = QEMU_OPT_BOOL, + .help = "Postpone refcount updates", + }, + { /* end of list */ } + }, +}; + +static int qcow2_open(BlockDriverState *bs, QDict *options, int flags) { BDRVQcowState *s = bs->opaque; int len, i, ret = 0; QCowHeader header; + QemuOpts *opts; + Error *local_err = NULL; uint64_t ext_end; ret = bdrv_pread(bs->file, 0, &header, sizeof(header)); @@ -495,6 +510,28 @@ static int qcow2_open(BlockDriverState *bs, int flags) } } + /* Enable lazy_refcounts according to image and command line options */ + opts = qemu_opts_create_nofail(&qcow2_runtime_opts); + qemu_opts_absorb_qdict(opts, options, &local_err); + if (error_is_set(&local_err)) { + qerror_report_err(local_err); + error_free(local_err); + ret = -EINVAL; + goto fail; + } + + s->use_lazy_refcounts = qemu_opt_get_bool(opts, "lazy_refcounts", + (s->compatible_features & QCOW2_COMPAT_LAZY_REFCOUNTS)); + + qemu_opts_del(opts); + + if (s->use_lazy_refcounts && s->qcow_version < 3) { + qerror_report(ERROR_CLASS_GENERIC_ERROR, "Lazy refcounts require " + "a qcow2 image with at least qemu 1.1 compatibility level"); + ret = -EINVAL; + goto fail; + } + #ifdef DEBUG_ALLOC { BdrvCheckResult result = {0}; @@ -584,7 +621,7 @@ static int coroutine_fn qcow2_co_is_allocated(BlockDriverState *bs, *pnum = 0; } - return (cluster_offset != 0); + return (cluster_offset != 0) || (ret == QCOW2_CLUSTER_ZERO); } /* handle reading after the end of the backing file */ @@ -665,10 +702,6 @@ static coroutine_fn int qcow2_co_readv(BlockDriverState *bs, int64_t sector_num, break; case QCOW2_CLUSTER_ZERO: - if (s->qcow_version < 3) { - ret = -EIO; - goto fail; - } qemu_iovec_memset(&hd_qiov, 0, 0, 512 * cur_nr_sectors); break; @@ -912,7 +945,7 @@ static void qcow2_invalidate_cache(BlockDriverState *bs) qcow2_close(bs); memset(s, 0, sizeof(BDRVQcowState)); - qcow2_open(bs, flags); + qcow2_open(bs, NULL, flags); if (crypt_method) { s->crypt_method = crypt_method; @@ -1265,7 +1298,7 @@ static int qcow2_create2(const char *filename, int64_t total_size, */ BlockDriver* drv = bdrv_find_format("qcow2"); assert(drv != NULL); - ret = bdrv_open(bs, filename, + ret = bdrv_open(bs, filename, NULL, BDRV_O_RDWR | BDRV_O_CACHE_WB | BDRV_O_NO_FLUSH, drv); if (ret < 0) { goto out; diff --git a/block/qcow2.h b/block/qcow2.h index 718b52b..103abdb 100644 --- a/block/qcow2.h +++ b/block/qcow2.h @@ -173,6 +173,7 @@ typedef struct BDRVQcowState { int flags; int qcow_version; + bool use_lazy_refcounts; uint64_t incompatible_features; uint64_t compatible_features; diff --git a/block/qed.c b/block/qed.c index b8515e5..46e12b3 100644 --- a/block/qed.c +++ b/block/qed.c @@ -373,7 +373,7 @@ static void bdrv_qed_rebind(BlockDriverState *bs) s->bs = bs; } -static int bdrv_qed_open(BlockDriverState *bs, int flags) +static int bdrv_qed_open(BlockDriverState *bs, QDict *options, int flags) { BDRVQEDState *s = bs->opaque; QEDHeader le_header; @@ -1526,7 +1526,7 @@ static void bdrv_qed_invalidate_cache(BlockDriverState *bs) bdrv_qed_close(bs); memset(s, 0, sizeof(BDRVQEDState)); - bdrv_qed_open(bs, bs->open_flags); + bdrv_qed_open(bs, NULL, bs->open_flags); } static int bdrv_qed_check(BlockDriverState *bs, BdrvCheckResult *result, diff --git a/block/raw-posix.c b/block/raw-posix.c index 4dfdf98..8a3cdbc 100644 --- a/block/raw-posix.c +++ b/block/raw-posix.c @@ -750,6 +750,7 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd, BlockDriverCompletionFunc *cb, void *opaque, int type) { RawPosixAIOData *acb = g_slice_new(RawPosixAIOData); + ThreadPool *pool; acb->bs = bs; acb->aio_type = type; @@ -763,7 +764,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd, acb->aio_offset = sector_num * 512; trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); - return thread_pool_submit_aio(aio_worker, acb, cb, opaque); + pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); } static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs, @@ -1413,6 +1415,7 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState *bs, { BDRVRawState *s = bs->opaque; RawPosixAIOData *acb; + ThreadPool *pool; if (fd_open(bs) < 0) return NULL; @@ -1424,7 +1427,8 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState *bs, acb->aio_offset = 0; acb->aio_ioctl_buf = buf; acb->aio_ioctl_cmd = req; - return thread_pool_submit_aio(aio_worker, acb, cb, opaque); + pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); } #elif defined(__FreeBSD__) || defined(__FreeBSD_kernel__) diff --git a/block/raw-win32.c b/block/raw-win32.c index b89ac19..18e0068 100644 --- a/block/raw-win32.c +++ b/block/raw-win32.c @@ -144,6 +144,7 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile, BlockDriverCompletionFunc *cb, void *opaque, int type) { RawWin32AIOData *acb = g_slice_new(RawWin32AIOData); + ThreadPool *pool; acb->bs = bs; acb->hfile = hfile; @@ -157,7 +158,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile, acb->aio_offset = sector_num * 512; trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); - return thread_pool_submit_aio(aio_worker, acb, cb, opaque); + pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); } int qemu_ftruncate64(int fd, int64_t length) diff --git a/block/raw.c b/block/raw.c index 75812db..ce10422 100644 --- a/block/raw.c +++ b/block/raw.c @@ -3,7 +3,7 @@ #include "block/block_int.h" #include "qemu/module.h" -static int raw_open(BlockDriverState *bs, int flags) +static int raw_open(BlockDriverState *bs, QDict *options, int flags) { bs->sg = bs->file->sg; return 0; diff --git a/block/sheepdog.c b/block/sheepdog.c index c711c28..4245328 100644 --- a/block/sheepdog.c +++ b/block/sheepdog.c @@ -468,6 +468,8 @@ static int connect_to_sdog(BDRVSheepdogState *s) if (err != NULL) { qerror_report_err(err); error_free(err); + } else { + socket_set_nonblock(fd); } return fd; @@ -499,6 +501,13 @@ static void restart_co_req(void *opaque) qemu_coroutine_enter(co, NULL); } +static int have_co_req(void *opaque) +{ + /* this handler is set only when there is a pending request, so + * always returns 1. */ + return 1; +} + typedef struct SheepdogReqCo { int sockfd; SheepdogReq *hdr; @@ -521,15 +530,14 @@ static coroutine_fn void do_co_req(void *opaque) unsigned int *rlen = srco->rlen; co = qemu_coroutine_self(); - qemu_aio_set_fd_handler(sockfd, NULL, restart_co_req, NULL, co); + qemu_aio_set_fd_handler(sockfd, NULL, restart_co_req, have_co_req, co); - socket_set_block(sockfd); ret = send_co_req(sockfd, hdr, data, wlen); if (ret < 0) { goto out; } - qemu_aio_set_fd_handler(sockfd, restart_co_req, NULL, NULL, co); + qemu_aio_set_fd_handler(sockfd, restart_co_req, NULL, have_co_req, co); ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr)); if (ret < sizeof(*hdr)) { @@ -552,8 +560,9 @@ static coroutine_fn void do_co_req(void *opaque) } ret = 0; out: + /* there is at most one request for this sockfd, so it is safe to + * set each handler to NULL. */ qemu_aio_set_fd_handler(sockfd, NULL, NULL, NULL, NULL); - socket_set_nonblock(sockfd); srco->ret = ret; srco->finished = true; @@ -776,8 +785,6 @@ static int get_sheep_fd(BDRVSheepdogState *s) return fd; } - socket_set_nonblock(fd); - qemu_aio_set_fd_handler(fd, co_read_response, NULL, aio_flush_request, s); return fd; } diff --git a/block/vdi.c b/block/vdi.c index 87c691b..2662d89 100644 --- a/block/vdi.c +++ b/block/vdi.c @@ -364,7 +364,7 @@ static int vdi_probe(const uint8_t *buf, int buf_size, const char *filename) return result; } -static int vdi_open(BlockDriverState *bs, int flags) +static int vdi_open(BlockDriverState *bs, QDict *options, int flags) { BDRVVdiState *s = bs->opaque; VdiHeader header; diff --git a/block/vmdk.c b/block/vmdk.c index aef1abc..e92104a 100644 --- a/block/vmdk.c +++ b/block/vmdk.c @@ -723,7 +723,7 @@ static int vmdk_open_desc_file(BlockDriverState *bs, int flags, return vmdk_parse_extents(buf, bs, bs->file->filename); } -static int vmdk_open(BlockDriverState *bs, int flags) +static int vmdk_open(BlockDriverState *bs, QDict *options, int flags) { int ret; BDRVVmdkState *s = bs->opaque; @@ -1527,7 +1527,7 @@ static int vmdk_create(const char *filename, QEMUOptionParameter *options) if (backing_file) { char parent_filename[PATH_MAX]; BlockDriverState *bs = bdrv_new(""); - ret = bdrv_open(bs, backing_file, 0, NULL); + ret = bdrv_open(bs, backing_file, NULL, 0, NULL); if (ret != 0) { bdrv_delete(bs); return ret; diff --git a/block/vpc.c b/block/vpc.c index 82229ef..3cad52e 100644 --- a/block/vpc.c +++ b/block/vpc.c @@ -155,7 +155,7 @@ static int vpc_probe(const uint8_t *buf, int buf_size, const char *filename) return 0; } -static int vpc_open(BlockDriverState *bs, int flags) +static int vpc_open(BlockDriverState *bs, QDict *options, int flags) { BDRVVPCState *s = bs->opaque; int i; diff --git a/block/vvfat.c b/block/vvfat.c index 06e6654..b8eb38a 100644 --- a/block/vvfat.c +++ b/block/vvfat.c @@ -2830,7 +2830,7 @@ static int enable_write_target(BDRVVVFATState *s) return -1; } - ret = bdrv_open(s->qcow, s->qcow_filename, + ret = bdrv_open(s->qcow, s->qcow_filename, NULL, BDRV_O_RDWR | BDRV_O_CACHE_WB | BDRV_O_NO_FLUSH, bdrv_qcow); if (ret < 0) { return ret; @@ -5,6 +5,29 @@ * * This work is licensed under the terms of the GNU GPL, version 2 or * later. See the COPYING file in the top-level directory. + * + * This file incorporates work covered by the following copyright and + * permission notice: + * + * Copyright (c) 2003-2008 Fabrice Bellard + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. */ #include "sysemu/blockdev.h" @@ -22,6 +45,7 @@ #include "sysemu/arch_init.h" static QTAILQ_HEAD(drivelist, DriveInfo) drives = QTAILQ_HEAD_INITIALIZER(drives); +extern QemuOptsList qemu_common_drive_opts; static const char *const if_name[IF_COUNT] = { [IF_NONE] = "none", @@ -191,6 +215,7 @@ static void drive_uninit(DriveInfo *dinfo) bdrv_delete(dinfo->bdrv); g_free(dinfo->id); QTAILQ_REMOVE(&drives, dinfo, next); + g_free(dinfo->serial); g_free(dinfo); } @@ -287,7 +312,7 @@ static bool do_check_io_limits(BlockIOLimit *io_limits, Error **errp) return true; } -DriveInfo *drive_init(QemuOpts *opts, BlockInterfaceType block_default_type) +DriveInfo *drive_init(QemuOpts *all_opts, BlockInterfaceType block_default_type) { const char *buf; const char *file = NULL; @@ -310,10 +335,36 @@ DriveInfo *drive_init(QemuOpts *opts, BlockInterfaceType block_default_type) bool copy_on_read; int ret; Error *error = NULL; + QemuOpts *opts; + QDict *bs_opts; + const char *id; translation = BIOS_ATA_TRANSLATION_AUTO; media = MEDIA_DISK; + /* Check common options by copying from all_opts to opts, all other options + * are stored in bs_opts. */ + id = qemu_opts_id(all_opts); + opts = qemu_opts_create(&qemu_common_drive_opts, id, 1, &error); + if (error_is_set(&error)) { + qerror_report_err(error); + error_free(error); + return NULL; + } + + bs_opts = qdict_new(); + qemu_opts_to_qdict(all_opts, bs_opts); + qemu_opts_absorb_qdict(opts, bs_opts, &error); + if (error_is_set(&error)) { + qerror_report_err(error); + error_free(error); + return NULL; + } + + if (id) { + qdict_del(bs_opts, "id"); + } + /* extract parameters */ bus_id = qemu_opt_get_number(opts, "bus", 0); unit_id = qemu_opt_get_number(opts, "unit", -1); @@ -564,9 +615,11 @@ DriveInfo *drive_init(QemuOpts *opts, BlockInterfaceType block_default_type) dinfo->heads = heads; dinfo->secs = secs; dinfo->trans = translation; - dinfo->opts = opts; + dinfo->opts = all_opts; dinfo->refcount = 1; - dinfo->serial = serial; + if (serial != NULL) { + dinfo->serial = g_strdup(serial); + } QTAILQ_INSERT_TAIL(&drives, dinfo, next); bdrv_set_on_error(dinfo->bdrv, on_read_error, on_write_error); @@ -587,17 +640,20 @@ DriveInfo *drive_init(QemuOpts *opts, BlockInterfaceType block_default_type) case IF_MTD: break; case IF_VIRTIO: + { /* add virtio block device */ - opts = qemu_opts_create_nofail(qemu_find_opts("device")); + QemuOpts *devopts; + devopts = qemu_opts_create_nofail(qemu_find_opts("device")); if (arch_type == QEMU_ARCH_S390X) { - qemu_opt_set(opts, "driver", "virtio-blk-s390"); + qemu_opt_set(devopts, "driver", "virtio-blk-s390"); } else { - qemu_opt_set(opts, "driver", "virtio-blk-pci"); + qemu_opt_set(devopts, "driver", "virtio-blk-pci"); } - qemu_opt_set(opts, "drive", dinfo->id); + qemu_opt_set(devopts, "drive", dinfo->id); if (devaddr) - qemu_opt_set(opts, "addr", devaddr); + qemu_opt_set(devopts, "addr", devaddr); break; + } default: abort(); } @@ -635,7 +691,9 @@ DriveInfo *drive_init(QemuOpts *opts, BlockInterfaceType block_default_type) error_report("warning: disabling copy_on_read on readonly drive"); } - ret = bdrv_open(dinfo->bdrv, file, bdrv_flags, drv); + ret = bdrv_open(dinfo->bdrv, file, bs_opts, bdrv_flags, drv); + bs_opts = NULL; + if (ret < 0) { if (ret == -EMEDIUMTYPE) { error_report("could not open disk image %s: not in %s format", @@ -649,9 +707,14 @@ DriveInfo *drive_init(QemuOpts *opts, BlockInterfaceType block_default_type) if (bdrv_key_required(dinfo->bdrv)) autostart = 0; + + qemu_opts_del(opts); + return dinfo; err: + qemu_opts_del(opts); + QDECREF(bs_opts); bdrv_delete(dinfo->bdrv); g_free(dinfo->id); QTAILQ_REMOVE(&drives, dinfo, next); @@ -820,7 +883,9 @@ void qmp_transaction(BlockdevActionList *dev_list, Error **errp) /* We will manually add the backing_hd field to the bs later */ states->new_bs = bdrv_new(""); - ret = bdrv_open(states->new_bs, new_image_file, + /* TODO Inherit bs->options or only take explicit options with an + * extended QMP command? */ + ret = bdrv_open(states->new_bs, new_image_file, NULL, flags | BDRV_O_NO_BACKING, drv); if (ret != 0) { error_set(errp, QERR_OPEN_FILE_FAILED, new_image_file); @@ -921,7 +986,7 @@ static void qmp_bdrv_open_encrypted(BlockDriverState *bs, const char *filename, int bdrv_flags, BlockDriver *drv, const char *password, Error **errp) { - if (bdrv_open(bs, filename, bdrv_flags, drv) < 0) { + if (bdrv_open(bs, filename, NULL, bdrv_flags, drv) < 0) { error_set(errp, QERR_OPEN_FILE_FAILED, filename); return; } @@ -1330,7 +1395,7 @@ void qmp_drive_mirror(const char *device, const char *target, * file. */ target_bs = bdrv_new(""); - ret = bdrv_open(target_bs, target, flags | BDRV_O_NO_BACKING, drv); + ret = bdrv_open(target_bs, target, NULL, flags | BDRV_O_NO_BACKING, drv); if (ret < 0) { bdrv_delete(target_bs); @@ -1459,9 +1524,9 @@ BlockJobInfoList *qmp_query_block_jobs(Error **errp) return dummy.next; } -QemuOptsList qemu_drive_opts = { +QemuOptsList qemu_common_drive_opts = { .name = "drive", - .head = QTAILQ_HEAD_INITIALIZER(qemu_drive_opts.head), + .head = QTAILQ_HEAD_INITIALIZER(qemu_common_drive_opts.head), .desc = { { .name = "bus", @@ -1580,3 +1645,15 @@ QemuOptsList qemu_drive_opts = { { /* end of list */ } }, }; + +QemuOptsList qemu_drive_opts = { + .name = "drive", + .head = QTAILQ_HEAD_INITIALIZER(qemu_drive_opts.head), + .desc = { + /* + * no elements => accept any params + * validation will happen later + */ + { /* end of list */ } + }, +}; diff --git a/hw/dataplane/virtio-blk.c b/hw/dataplane/virtio-blk.c index dfe5f9b..1242d61 100644 --- a/hw/dataplane/virtio-blk.c +++ b/hw/dataplane/virtio-blk.c @@ -263,6 +263,11 @@ static int process_request(IOQueue *ioq, struct iovec iov[], } } +static int flush_true(EventNotifier *e) +{ + return true; +} + static void handle_notify(EventNotifier *e) { VirtIOBlockDataPlane *s = container_of(e, VirtIOBlockDataPlane, @@ -342,6 +347,14 @@ static void handle_notify(EventNotifier *e) } } +static int flush_io(EventNotifier *e) +{ + VirtIOBlockDataPlane *s = container_of(e, VirtIOBlockDataPlane, + io_notifier); + + return s->num_reqs > 0; +} + static void handle_io(EventNotifier *e) { VirtIOBlockDataPlane *s = container_of(e, VirtIOBlockDataPlane, @@ -472,7 +485,7 @@ void virtio_blk_data_plane_start(VirtIOBlockDataPlane *s) exit(1); } s->host_notifier = *virtio_queue_get_host_notifier(vq); - aio_set_event_notifier(s->ctx, &s->host_notifier, handle_notify, NULL); + aio_set_event_notifier(s->ctx, &s->host_notifier, handle_notify, flush_true); /* Set up ioqueue */ ioq_init(&s->ioqueue, s->fd, REQ_MAX); @@ -480,7 +493,7 @@ void virtio_blk_data_plane_start(VirtIOBlockDataPlane *s) ioq_put_iocb(&s->ioqueue, &s->requests[i].iocb); } s->io_notifier = *ioq_get_notifier(&s->ioqueue); - aio_set_event_notifier(s->ctx, &s->io_notifier, handle_io, NULL); + aio_set_event_notifier(s->ctx, &s->io_notifier, handle_io, flush_io); s->started = true; trace_virtio_blk_data_plane_start(s); diff --git a/hw/xen_disk.c b/hw/xen_disk.c index cc09a2f..83329e2 100644 --- a/hw/xen_disk.c +++ b/hw/xen_disk.c @@ -763,7 +763,7 @@ static int blk_init(struct XenDevice *xendev) xen_be_printf(&blkdev->xendev, 2, "create new bdrv (xenbus setup)\n"); blkdev->bs = bdrv_new(blkdev->dev); if (blkdev->bs) { - if (bdrv_open(blkdev->bs, blkdev->filename, qflags, + if (bdrv_open(blkdev->bs, blkdev->filename, NULL, qflags, bdrv_find_whitelisted_format(blkdev->fileproto)) != 0) { bdrv_delete(blkdev->bs); blkdev->bs = NULL; diff --git a/include/block/aio.h b/include/block/aio.h index 5b54d38..1836793 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -66,6 +66,9 @@ typedef struct AioContext { /* GPollFDs for aio_poll() */ GArray *pollfds; + + /* Thread pool for performing work and receiving completion callbacks */ + struct ThreadPool *thread_pool; } AioContext; /* Returns 1 if there are still outstanding AIO requests; 0 otherwise */ @@ -223,6 +226,9 @@ void aio_set_event_notifier(AioContext *ctx, */ GSource *aio_get_g_source(AioContext *ctx); +/* Return the ThreadPool bound to this AioContext */ +struct ThreadPool *aio_get_thread_pool(AioContext *ctx); + /* Functions to operate on the main QEMU AioContext. */ bool qemu_aio_wait(void); diff --git a/include/block/block.h b/include/block/block.h index 0f750d7..d4f34d6 100644 --- a/include/block/block.h +++ b/include/block/block.h @@ -137,8 +137,8 @@ int bdrv_parse_cache_flags(const char *mode, int *flags); int bdrv_parse_discard_flags(const char *mode, int *flags); int bdrv_file_open(BlockDriverState **pbs, const char *filename, int flags); int bdrv_open_backing_file(BlockDriverState *bs); -int bdrv_open(BlockDriverState *bs, const char *filename, int flags, - BlockDriver *drv); +int bdrv_open(BlockDriverState *bs, const char *filename, QDict *options, + int flags, BlockDriver *drv); BlockReopenQueue *bdrv_reopen_queue(BlockReopenQueue *bs_queue, BlockDriverState *bs, int flags); int bdrv_reopen_multiple(BlockReopenQueue *bs_queue, Error **errp); diff --git a/include/block/block_int.h b/include/block/block_int.h index eaad53e..ce0aa26 100644 --- a/include/block/block_int.h +++ b/include/block/block_int.h @@ -82,7 +82,7 @@ struct BlockDriver { void (*bdrv_reopen_commit)(BDRVReopenState *reopen_state); void (*bdrv_reopen_abort)(BDRVReopenState *reopen_state); - int (*bdrv_open)(BlockDriverState *bs, int flags); + int (*bdrv_open)(BlockDriverState *bs, QDict *options, int flags); int (*bdrv_file_open)(BlockDriverState *bs, const char *filename, int flags); int (*bdrv_read)(BlockDriverState *bs, int64_t sector_num, uint8_t *buf, int nb_sectors); @@ -286,6 +286,7 @@ struct BlockDriverState { /* long-running background operation */ BlockJob *job; + QDict *options; }; int get_tmp_filename(char *filename, int size); @@ -293,6 +294,13 @@ int get_tmp_filename(char *filename, int size); void bdrv_set_io_limits(BlockDriverState *bs, BlockIOLimit *io_limits); +/** + * bdrv_get_aio_context: + * + * Returns: the currently bound #AioContext + */ +AioContext *bdrv_get_aio_context(BlockDriverState *bs); + #ifdef _WIN32 int is_windows_drive(const char *filename); #endif diff --git a/include/block/coroutine.h b/include/block/coroutine.h index c31fae3..a978162 100644 --- a/include/block/coroutine.h +++ b/include/block/coroutine.h @@ -104,6 +104,7 @@ bool qemu_in_coroutine(void); */ typedef struct CoQueue { QTAILQ_HEAD(, Coroutine) entries; + AioContext *ctx; } CoQueue; /** diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h index 200703e..32afcdd 100644 --- a/include/block/thread-pool.h +++ b/include/block/thread-pool.h @@ -26,9 +26,16 @@ typedef int ThreadPoolFunc(void *opaque); -BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, - BlockDriverCompletionFunc *cb, void *opaque); -int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg); -void thread_pool_submit(ThreadPoolFunc *func, void *arg); +typedef struct ThreadPool ThreadPool; + +ThreadPool *thread_pool_new(struct AioContext *ctx); +void thread_pool_free(ThreadPool *pool); + +BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool, + ThreadPoolFunc *func, void *arg, + BlockDriverCompletionFunc *cb, void *opaque); +int coroutine_fn thread_pool_submit_co(ThreadPool *pool, + ThreadPoolFunc *func, void *arg); +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg); #endif diff --git a/include/qapi/qmp/qdict.h b/include/qapi/qmp/qdict.h index 6d9a4be..685b2e3 100644 --- a/include/qapi/qmp/qdict.h +++ b/include/qapi/qmp/qdict.h @@ -64,4 +64,6 @@ int64_t qdict_get_try_int(const QDict *qdict, const char *key, int qdict_get_try_bool(const QDict *qdict, const char *key, int def_value); const char *qdict_get_try_str(const QDict *qdict, const char *key); +QDict *qdict_clone_shallow(const QDict *src); + #endif /* QDICT_H */ diff --git a/include/qemu/main-loop.h b/include/qemu/main-loop.h index 0995288..6f0200a 100644 --- a/include/qemu/main-loop.h +++ b/include/qemu/main-loop.h @@ -82,6 +82,11 @@ int qemu_init_main_loop(void); int main_loop_wait(int nonblocking); /** + * qemu_get_aio_context: Return the main loop's AioContext + */ +AioContext *qemu_get_aio_context(void); + +/** * qemu_notify_event: Force processing of pending events. * * Similar to signaling a condition variable, qemu_notify_event forces diff --git a/include/qemu/option.h b/include/qemu/option.h index ba197cd..bdb6d21 100644 --- a/include/qemu/option.h +++ b/include/qemu/option.h @@ -149,6 +149,7 @@ void qemu_opts_set_defaults(QemuOptsList *list, const char *params, QemuOpts *qemu_opts_from_qdict(QemuOptsList *list, const QDict *qdict, Error **errp); QDict *qemu_opts_to_qdict(QemuOpts *opts, QDict *qdict); +void qemu_opts_absorb_qdict(QemuOpts *opts, QDict *qdict, Error **errp); typedef int (*qemu_opts_loopfunc)(QemuOpts *opts, void *opaque); int qemu_opts_print(QemuOpts *opts, void *dummy); diff --git a/include/sysemu/blockdev.h b/include/sysemu/blockdev.h index 1fe5332..804ec88 100644 --- a/include/sysemu/blockdev.h +++ b/include/sysemu/blockdev.h @@ -40,7 +40,7 @@ struct DriveInfo { int media_cd; int cyls, heads, secs, trans; QemuOpts *opts; - const char *serial; + char *serial; QTAILQ_ENTRY(DriveInfo) next; int refcount; }; diff --git a/main-loop.c b/main-loop.c index 8c9b58c..eb80ff3 100644 --- a/main-loop.c +++ b/main-loop.c @@ -109,6 +109,11 @@ static int qemu_signal_init(void) static AioContext *qemu_aio_context; +AioContext *qemu_get_aio_context(void) +{ + return qemu_aio_context; +} + void qemu_notify_event(void) { if (!qemu_aio_context) { diff --git a/qemu-coroutine-lock.c b/qemu-coroutine-lock.c index 97ef01c..86efe1f 100644 --- a/qemu-coroutine-lock.c +++ b/qemu-coroutine-lock.c @@ -29,28 +29,36 @@ #include "block/aio.h" #include "trace.h" -static QTAILQ_HEAD(, Coroutine) unlock_bh_queue = - QTAILQ_HEAD_INITIALIZER(unlock_bh_queue); -static QEMUBH* unlock_bh; +/* Coroutines are awoken from a BH to allow the current coroutine to complete + * its flow of execution. The BH may run after the CoQueue has been destroyed, + * so keep BH data in a separate heap-allocated struct. + */ +typedef struct { + QEMUBH *bh; + QTAILQ_HEAD(, Coroutine) entries; +} CoQueueNextData; static void qemu_co_queue_next_bh(void *opaque) { + CoQueueNextData *data = opaque; Coroutine *next; trace_qemu_co_queue_next_bh(); - while ((next = QTAILQ_FIRST(&unlock_bh_queue))) { - QTAILQ_REMOVE(&unlock_bh_queue, next, co_queue_next); + while ((next = QTAILQ_FIRST(&data->entries))) { + QTAILQ_REMOVE(&data->entries, next, co_queue_next); qemu_coroutine_enter(next, NULL); } + + qemu_bh_delete(data->bh); + g_slice_free(CoQueueNextData, data); } void qemu_co_queue_init(CoQueue *queue) { QTAILQ_INIT(&queue->entries); - if (!unlock_bh) { - unlock_bh = qemu_bh_new(qemu_co_queue_next_bh, NULL); - } + /* This will be exposed to callers once there are multiple AioContexts */ + queue->ctx = qemu_get_aio_context(); } void coroutine_fn qemu_co_queue_wait(CoQueue *queue) @@ -69,26 +77,39 @@ void coroutine_fn qemu_co_queue_wait_insert_head(CoQueue *queue) assert(qemu_in_coroutine()); } -bool qemu_co_queue_next(CoQueue *queue) +static bool qemu_co_queue_do_restart(CoQueue *queue, bool single) { Coroutine *next; + CoQueueNextData *data; + + if (QTAILQ_EMPTY(&queue->entries)) { + return false; + } - next = QTAILQ_FIRST(&queue->entries); - if (next) { + data = g_slice_new(CoQueueNextData); + data->bh = aio_bh_new(queue->ctx, qemu_co_queue_next_bh, data); + QTAILQ_INIT(&data->entries); + qemu_bh_schedule(data->bh); + + while ((next = QTAILQ_FIRST(&queue->entries)) != NULL) { QTAILQ_REMOVE(&queue->entries, next, co_queue_next); - QTAILQ_INSERT_TAIL(&unlock_bh_queue, next, co_queue_next); + QTAILQ_INSERT_TAIL(&data->entries, next, co_queue_next); trace_qemu_co_queue_next(next); - qemu_bh_schedule(unlock_bh); + if (single) { + break; + } } + return true; +} - return (next != NULL); +bool qemu_co_queue_next(CoQueue *queue) +{ + return qemu_co_queue_do_restart(queue, true); } void qemu_co_queue_restart_all(CoQueue *queue) { - while (qemu_co_queue_next(queue)) { - /* Do nothing */ - } + qemu_co_queue_do_restart(queue, false); } bool qemu_co_queue_empty(CoQueue *queue) @@ -276,7 +276,7 @@ static BlockDriverState *bdrv_new_open(const char *filename, drv = NULL; } - ret = bdrv_open(bs, filename, flags, drv); + ret = bdrv_open(bs, filename, NULL, flags, drv); if (ret < 0) { error_report("Could not open '%s': %s", filename, strerror(-ret)); goto fail; @@ -2156,7 +2156,7 @@ static int img_rebase(int argc, char **argv) bs_old_backing = bdrv_new("old_backing"); bdrv_get_backing_filename(bs, backing_name, sizeof(backing_name)); - ret = bdrv_open(bs_old_backing, backing_name, BDRV_O_FLAGS, + ret = bdrv_open(bs_old_backing, backing_name, NULL, BDRV_O_FLAGS, old_backing_drv); if (ret) { error_report("Could not open old backing file '%s'", backing_name); @@ -2164,7 +2164,7 @@ static int img_rebase(int argc, char **argv) } if (out_baseimg[0]) { bs_new_backing = bdrv_new("new_backing"); - ret = bdrv_open(bs_new_backing, out_baseimg, BDRV_O_FLAGS, + ret = bdrv_open(bs_new_backing, out_baseimg, NULL, BDRV_O_FLAGS, new_backing_drv); if (ret) { error_report("Could not open new backing file '%s'", @@ -1773,7 +1773,7 @@ static int openfile(char *name, int flags, int growable) } else { bs = bdrv_new("hda"); - if (bdrv_open(bs, name, flags, NULL) < 0) { + if (bdrv_open(bs, name, NULL, flags, NULL) < 0) { fprintf(stderr, "%s: can't open device %s\n", progname, name); bdrv_delete(bs); bs = NULL; @@ -557,7 +557,7 @@ int main(int argc, char **argv) bs = bdrv_new("hda"); srcpath = argv[optind]; - if ((ret = bdrv_open(bs, srcpath, flags, NULL)) < 0) { + if ((ret = bdrv_open(bs, srcpath, NULL, flags, NULL)) < 0) { errno = -ret; err(EXIT_FAILURE, "Failed to bdrv_open '%s'", argv[optind]); } diff --git a/qobject/qdict.c b/qobject/qdict.c index 7543ccc..ed381f9 100644 --- a/qobject/qdict.c +++ b/qobject/qdict.c @@ -401,6 +401,28 @@ const QDictEntry *qdict_next(const QDict *qdict, const QDictEntry *entry) } /** + * qdict_clone_shallow(): Clones a given QDict. Its entries are not copied, but + * another reference is added. + */ +QDict *qdict_clone_shallow(const QDict *src) +{ + QDict *dest; + QDictEntry *entry; + int i; + + dest = qdict_new(); + + for (i = 0; i < QDICT_BUCKET_MAX; i++) { + QLIST_FOREACH(entry, &src->table[i], next) { + qobject_incref(entry->value); + qdict_put_obj(dest, entry->key, entry->value); + } + } + + return dest; +} + +/** * qentry_destroy(): Free all the memory allocated by a QDictEntry */ static void qentry_destroy(QDictEntry *e) diff --git a/tests/qemu-iotests/007 b/tests/qemu-iotests/007 index 0139264..c454f2c 100755 --- a/tests/qemu-iotests/007 +++ b/tests/qemu-iotests/007 @@ -50,10 +50,9 @@ _make_test_img 1M for i in `seq 1 10`; do echo "savevm $i" - # XXX(hch): adding -nographic would be good, but hangs the test - $QEMU -hda $TEST_IMG -monitor stdio >/dev/null 2>&1 <<EOF -savevm test-$i -quit + $QEMU -nographic -hda $TEST_IMG -serial none -monitor stdio >/dev/null 2>&1 <<EOF +savevm test-$i +quit EOF done diff --git a/tests/qemu-iotests/050 b/tests/qemu-iotests/050 new file mode 100755 index 0000000..05793e2 --- /dev/null +++ b/tests/qemu-iotests/050 @@ -0,0 +1,75 @@ +#!/bin/bash +# +# Test qemu-img rebase with zero clusters +# +# Copyright (C) 2013 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +# creator +owner=pbonzini@redhat.com + +seq=`basename $0` +echo "QA output created by $seq" + +here=`pwd` +tmp=/tmp/$$ +status=1 # failure is the default! + +_cleanup() +{ + _cleanup_test_img + rm -f $TEST_IMG.old + rm -f $TEST_IMG.new +} +trap "_cleanup; exit \$status" 0 1 2 3 15 + +# get standard environment, filters and checks +. ./common.rc +. ./common.filter + +_supported_fmt qcow2 qed +_supported_proto file +_supported_os Linux + +if test "$IMGFMT" = qcow2 && test $IMGOPTS = ""; then + IMGOPTS=compat=1.1 +fi + +echo +echo "== Creating images ==" + +size=10M +_make_test_img $size +$QEMU_IO -c "write -P 0x40 0 1048576" $TEST_IMG | _filter_qemu_io +mv $TEST_IMG $TEST_IMG.old + +_make_test_img $size +$QEMU_IO -c "write -P 0x5a 0 1048576" $TEST_IMG | _filter_qemu_io +mv $TEST_IMG $TEST_IMG.new + +_make_test_img -b $TEST_IMG.old $size +$QEMU_IO -c "write -z 0 1048576" $TEST_IMG | _filter_qemu_io + +echo +echo "== Rebasing the image ==" + +$QEMU_IMG rebase -b $TEST_IMG.new $TEST_IMG +$QEMU_IO -c "read -P 0x00 0 1048576" $TEST_IMG | _filter_qemu_io + +# success, all done +echo "*** done" +rm -f $seq.full +status=0 diff --git a/tests/qemu-iotests/050.out b/tests/qemu-iotests/050.out new file mode 100644 index 0000000..3f5f7e1 --- /dev/null +++ b/tests/qemu-iotests/050.out @@ -0,0 +1,17 @@ +QA output created by 050 + +== Creating images == +Formatting 'TEST_DIR/t.IMGFMT', fmt=IMGFMT size=10485760 +wrote 1048576/1048576 bytes at offset 0 +1 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec) +Formatting 'TEST_DIR/t.IMGFMT', fmt=IMGFMT size=10485760 +wrote 1048576/1048576 bytes at offset 0 +1 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec) +Formatting 'TEST_DIR/t.IMGFMT', fmt=IMGFMT size=10485760 backing_file='TEST_DIR/t.IMGFMT.old' +wrote 1048576/1048576 bytes at offset 0 +1 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec) + +== Rebasing the image == +read 1048576/1048576 bytes at offset 0 +1 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec) +*** done diff --git a/tests/qemu-iotests/group b/tests/qemu-iotests/group index fcf57e0..1d7e4f3 100644 --- a/tests/qemu-iotests/group +++ b/tests/qemu-iotests/group @@ -56,3 +56,4 @@ 047 rw auto 048 img auto quick 049 rw auto +050 rw auto backing quick diff --git a/tests/test-thread-pool.c b/tests/test-thread-pool.c index 9998e03..22915aa 100644 --- a/tests/test-thread-pool.c +++ b/tests/test-thread-pool.c @@ -4,6 +4,8 @@ #include "block/thread-pool.h" #include "block/block.h" +static AioContext *ctx; +static ThreadPool *pool; static int active; typedef struct { @@ -38,19 +40,10 @@ static void done_cb(void *opaque, int ret) active--; } -/* A non-blocking poll of the main AIO context (we cannot use aio_poll - * because we do not know the AioContext). - */ -static void qemu_aio_wait_nonblocking(void) -{ - qemu_notify_event(); - qemu_aio_wait(); -} - /* Wait until all aio and bh activity has finished */ static void qemu_aio_wait_all(void) { - while (qemu_aio_wait()) { + while (aio_poll(ctx, true)) { /* Do nothing */ } } @@ -58,7 +51,7 @@ static void qemu_aio_wait_all(void) static void test_submit(void) { WorkerTestData data = { .n = 0 }; - thread_pool_submit(worker_cb, &data); + thread_pool_submit(pool, worker_cb, &data); qemu_aio_wait_all(); g_assert_cmpint(data.n, ==, 1); } @@ -66,7 +59,8 @@ static void test_submit(void) static void test_submit_aio(void) { WorkerTestData data = { .n = 0, .ret = -EINPROGRESS }; - data.aiocb = thread_pool_submit_aio(worker_cb, &data, done_cb, &data); + data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data, + done_cb, &data); /* The callbacks are not called until after the first wait. */ active = 1; @@ -84,7 +78,7 @@ static void co_test_cb(void *opaque) active = 1; data->n = 0; data->ret = -EINPROGRESS; - thread_pool_submit_co(worker_cb, data); + thread_pool_submit_co(pool, worker_cb, data); /* The test continues in test_submit_co, after qemu_coroutine_enter... */ @@ -126,12 +120,12 @@ static void test_submit_many(void) for (i = 0; i < 100; i++) { data[i].n = 0; data[i].ret = -EINPROGRESS; - thread_pool_submit_aio(worker_cb, &data[i], done_cb, &data[i]); + thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]); } active = 100; while (active > 0) { - qemu_aio_wait(); + aio_poll(ctx, true); } for (i = 0; i < 100; i++) { g_assert_cmpint(data[i].n, ==, 1); @@ -154,7 +148,7 @@ static void test_cancel(void) for (i = 0; i < 100; i++) { data[i].n = 0; data[i].ret = -EINPROGRESS; - data[i].aiocb = thread_pool_submit_aio(long_cb, &data[i], + data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i], done_cb, &data[i]); } @@ -162,7 +156,8 @@ static void test_cancel(void) * run, but do not waste too much time... */ active = 100; - qemu_aio_wait_nonblocking(); + aio_notify(ctx); + aio_poll(ctx, false); /* Wait some time for the threads to start, with some sanity * testing on the behavior of the scheduler... @@ -208,11 +203,10 @@ static void test_cancel(void) int main(int argc, char **argv) { - /* These should be removed once each AioContext has its thread pool. - * The test should create its own AioContext. - */ - qemu_init_main_loop(); - bdrv_init(); + int ret; + + ctx = aio_context_new(); + pool = aio_get_thread_pool(ctx); g_test_init(&argc, &argv, NULL); g_test_add_func("/thread-pool/submit", test_submit); @@ -220,5 +214,9 @@ int main(int argc, char **argv) g_test_add_func("/thread-pool/submit-co", test_submit_co); g_test_add_func("/thread-pool/submit-many", test_submit_many); g_test_add_func("/thread-pool/cancel", test_cancel); - return g_test_run(); + + ret = g_test_run(); + + aio_context_unref(ctx); + return ret; } diff --git a/thread-pool.c b/thread-pool.c index e3ca64d..0ebd4c2 100644 --- a/thread-pool.c +++ b/thread-pool.c @@ -24,7 +24,7 @@ #include "qemu/event_notifier.h" #include "block/thread-pool.h" -static void do_spawn_thread(void); +static void do_spawn_thread(ThreadPool *pool); typedef struct ThreadPoolElement ThreadPoolElement; @@ -37,6 +37,7 @@ enum ThreadState { struct ThreadPoolElement { BlockDriverAIOCB common; + ThreadPool *pool; ThreadPoolFunc *func; void *arg; @@ -54,49 +55,56 @@ struct ThreadPoolElement { QLIST_ENTRY(ThreadPoolElement) all; }; -static EventNotifier notifier; -static QemuMutex lock; -static QemuCond check_cancel; -static QemuSemaphore sem; -static int max_threads = 64; -static QEMUBH *new_thread_bh; - -/* The following variables are protected by the global mutex. */ -static QLIST_HEAD(, ThreadPoolElement) head; - -/* The following variables are protected by lock. */ -static QTAILQ_HEAD(, ThreadPoolElement) request_list; -static int cur_threads; -static int idle_threads; -static int new_threads; /* backlog of threads we need to create */ -static int pending_threads; /* threads created but not running yet */ -static int pending_cancellations; /* whether we need a cond_broadcast */ - -static void *worker_thread(void *unused) +struct ThreadPool { + EventNotifier notifier; + AioContext *ctx; + QemuMutex lock; + QemuCond check_cancel; + QemuCond worker_stopped; + QemuSemaphore sem; + int max_threads; + QEMUBH *new_thread_bh; + + /* The following variables are only accessed from one AioContext. */ + QLIST_HEAD(, ThreadPoolElement) head; + + /* The following variables are protected by lock. */ + QTAILQ_HEAD(, ThreadPoolElement) request_list; + int cur_threads; + int idle_threads; + int new_threads; /* backlog of threads we need to create */ + int pending_threads; /* threads created but not running yet */ + int pending_cancellations; /* whether we need a cond_broadcast */ + bool stopping; +}; + +static void *worker_thread(void *opaque) { - qemu_mutex_lock(&lock); - pending_threads--; - do_spawn_thread(); + ThreadPool *pool = opaque; + + qemu_mutex_lock(&pool->lock); + pool->pending_threads--; + do_spawn_thread(pool); - while (1) { + while (!pool->stopping) { ThreadPoolElement *req; int ret; do { - idle_threads++; - qemu_mutex_unlock(&lock); - ret = qemu_sem_timedwait(&sem, 10000); - qemu_mutex_lock(&lock); - idle_threads--; - } while (ret == -1 && !QTAILQ_EMPTY(&request_list)); - if (ret == -1) { + pool->idle_threads++; + qemu_mutex_unlock(&pool->lock); + ret = qemu_sem_timedwait(&pool->sem, 10000); + qemu_mutex_lock(&pool->lock); + pool->idle_threads--; + } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list)); + if (ret == -1 || pool->stopping) { break; } - req = QTAILQ_FIRST(&request_list); - QTAILQ_REMOVE(&request_list, req, reqs); + req = QTAILQ_FIRST(&pool->request_list); + QTAILQ_REMOVE(&pool->request_list, req, reqs); req->state = THREAD_ACTIVE; - qemu_mutex_unlock(&lock); + qemu_mutex_unlock(&pool->lock); ret = req->func(req->arg); @@ -105,45 +113,48 @@ static void *worker_thread(void *unused) smp_wmb(); req->state = THREAD_DONE; - qemu_mutex_lock(&lock); - if (pending_cancellations) { - qemu_cond_broadcast(&check_cancel); + qemu_mutex_lock(&pool->lock); + if (pool->pending_cancellations) { + qemu_cond_broadcast(&pool->check_cancel); } - event_notifier_set(¬ifier); + event_notifier_set(&pool->notifier); } - cur_threads--; - qemu_mutex_unlock(&lock); + pool->cur_threads--; + qemu_cond_signal(&pool->worker_stopped); + qemu_mutex_unlock(&pool->lock); return NULL; } -static void do_spawn_thread(void) +static void do_spawn_thread(ThreadPool *pool) { QemuThread t; /* Runs with lock taken. */ - if (!new_threads) { + if (!pool->new_threads) { return; } - new_threads--; - pending_threads++; + pool->new_threads--; + pool->pending_threads++; - qemu_thread_create(&t, worker_thread, NULL, QEMU_THREAD_DETACHED); + qemu_thread_create(&t, worker_thread, pool, QEMU_THREAD_DETACHED); } static void spawn_thread_bh_fn(void *opaque) { - qemu_mutex_lock(&lock); - do_spawn_thread(); - qemu_mutex_unlock(&lock); + ThreadPool *pool = opaque; + + qemu_mutex_lock(&pool->lock); + do_spawn_thread(pool); + qemu_mutex_unlock(&pool->lock); } -static void spawn_thread(void) +static void spawn_thread(ThreadPool *pool) { - cur_threads++; - new_threads++; + pool->cur_threads++; + pool->new_threads++; /* If there are threads being created, they will spawn new workers, so * we don't spend time creating many threads in a loop holding a mutex or * starving the current vcpu. @@ -151,23 +162,25 @@ static void spawn_thread(void) * If there are no idle threads, ask the main thread to create one, so we * inherit the correct affinity instead of the vcpu affinity. */ - if (!pending_threads) { - qemu_bh_schedule(new_thread_bh); + if (!pool->pending_threads) { + qemu_bh_schedule(pool->new_thread_bh); } } static void event_notifier_ready(EventNotifier *notifier) { + ThreadPool *pool = container_of(notifier, ThreadPool, notifier); ThreadPoolElement *elem, *next; event_notifier_test_and_clear(notifier); restart: - QLIST_FOREACH_SAFE(elem, &head, all, next) { + QLIST_FOREACH_SAFE(elem, &pool->head, all, next) { if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) { continue; } if (elem->state == THREAD_DONE) { - trace_thread_pool_complete(elem, elem->common.opaque, elem->ret); + trace_thread_pool_complete(pool, elem, elem->common.opaque, + elem->ret); } if (elem->state == THREAD_DONE && elem->common.cb) { QLIST_REMOVE(elem, all); @@ -186,34 +199,36 @@ restart: static int thread_pool_active(EventNotifier *notifier) { - return !QLIST_EMPTY(&head); + ThreadPool *pool = container_of(notifier, ThreadPool, notifier); + return !QLIST_EMPTY(&pool->head); } static void thread_pool_cancel(BlockDriverAIOCB *acb) { ThreadPoolElement *elem = (ThreadPoolElement *)acb; + ThreadPool *pool = elem->pool; trace_thread_pool_cancel(elem, elem->common.opaque); - qemu_mutex_lock(&lock); + qemu_mutex_lock(&pool->lock); if (elem->state == THREAD_QUEUED && /* No thread has yet started working on elem. we can try to "steal" * the item from the worker if we can get a signal from the * semaphore. Because this is non-blocking, we can do it with * the lock taken and ensure that elem will remain THREAD_QUEUED. */ - qemu_sem_timedwait(&sem, 0) == 0) { - QTAILQ_REMOVE(&request_list, elem, reqs); + qemu_sem_timedwait(&pool->sem, 0) == 0) { + QTAILQ_REMOVE(&pool->request_list, elem, reqs); elem->state = THREAD_CANCELED; - event_notifier_set(¬ifier); + event_notifier_set(&pool->notifier); } else { - pending_cancellations++; + pool->pending_cancellations++; while (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) { - qemu_cond_wait(&check_cancel, &lock); + qemu_cond_wait(&pool->check_cancel, &pool->lock); } - pending_cancellations--; + pool->pending_cancellations--; } - qemu_mutex_unlock(&lock); + qemu_mutex_unlock(&pool->lock); } static const AIOCBInfo thread_pool_aiocb_info = { @@ -221,7 +236,8 @@ static const AIOCBInfo thread_pool_aiocb_info = { .cancel = thread_pool_cancel, }; -BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, +BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool, + ThreadPoolFunc *func, void *arg, BlockDriverCompletionFunc *cb, void *opaque) { ThreadPoolElement *req; @@ -230,18 +246,19 @@ BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, req->func = func; req->arg = arg; req->state = THREAD_QUEUED; + req->pool = pool; - QLIST_INSERT_HEAD(&head, req, all); + QLIST_INSERT_HEAD(&pool->head, req, all); - trace_thread_pool_submit(req, arg); + trace_thread_pool_submit(pool, req, arg); - qemu_mutex_lock(&lock); - if (idle_threads == 0 && cur_threads < max_threads) { - spawn_thread(); + qemu_mutex_lock(&pool->lock); + if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) { + spawn_thread(pool); } - QTAILQ_INSERT_TAIL(&request_list, req, reqs); - qemu_mutex_unlock(&lock); - qemu_sem_post(&sem); + QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); + qemu_mutex_unlock(&pool->lock); + qemu_sem_post(&pool->sem); return &req->common; } @@ -258,32 +275,80 @@ static void thread_pool_co_cb(void *opaque, int ret) qemu_coroutine_enter(co->co, NULL); } -int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg) +int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, + void *arg) { ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; assert(qemu_in_coroutine()); - thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc); + thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc); qemu_coroutine_yield(); return tpc.ret; } -void thread_pool_submit(ThreadPoolFunc *func, void *arg) +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg) +{ + thread_pool_submit_aio(pool, func, arg, NULL, NULL); +} + +static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) { - thread_pool_submit_aio(func, arg, NULL, NULL); + if (!ctx) { + ctx = qemu_get_aio_context(); + } + + memset(pool, 0, sizeof(*pool)); + event_notifier_init(&pool->notifier, false); + pool->ctx = ctx; + qemu_mutex_init(&pool->lock); + qemu_cond_init(&pool->check_cancel); + qemu_cond_init(&pool->worker_stopped); + qemu_sem_init(&pool->sem, 0); + pool->max_threads = 64; + pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); + + QLIST_INIT(&pool->head); + QTAILQ_INIT(&pool->request_list); + + aio_set_event_notifier(ctx, &pool->notifier, event_notifier_ready, + thread_pool_active); } -static void thread_pool_init(void) +ThreadPool *thread_pool_new(AioContext *ctx) { - QLIST_INIT(&head); - event_notifier_init(¬ifier, false); - qemu_mutex_init(&lock); - qemu_cond_init(&check_cancel); - qemu_sem_init(&sem, 0); - qemu_aio_set_event_notifier(¬ifier, event_notifier_ready, - thread_pool_active); - - QTAILQ_INIT(&request_list); - new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL); + ThreadPool *pool = g_new(ThreadPool, 1); + thread_pool_init_one(pool, ctx); + return pool; } -block_init(thread_pool_init) +void thread_pool_free(ThreadPool *pool) +{ + if (!pool) { + return; + } + + assert(QLIST_EMPTY(&pool->head)); + + qemu_mutex_lock(&pool->lock); + + /* Stop new threads from spawning */ + qemu_bh_delete(pool->new_thread_bh); + pool->cur_threads -= pool->new_threads; + pool->new_threads = 0; + + /* Wait for worker threads to terminate */ + pool->stopping = true; + while (pool->cur_threads > 0) { + qemu_sem_post(&pool->sem); + qemu_cond_wait(&pool->worker_stopped, &pool->lock); + } + + qemu_mutex_unlock(&pool->lock); + + aio_set_event_notifier(pool->ctx, &pool->notifier, NULL, NULL); + qemu_sem_destroy(&pool->sem); + qemu_cond_destroy(&pool->check_cancel); + qemu_cond_destroy(&pool->worker_stopped); + qemu_mutex_destroy(&pool->lock); + event_notifier_cleanup(&pool->notifier); + g_free(pool); +} diff --git a/trace-events b/trace-events index d6a847d..cd73b7f 100644 --- a/trace-events +++ b/trace-events @@ -115,8 +115,8 @@ virtio_blk_data_plane_complete_request(void *s, unsigned int head, int ret) "dat vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring physical %#"PRIx64" desc %p avail %p used %p" # thread-pool.c -thread_pool_submit(void *req, void *opaque) "req %p opaque %p" -thread_pool_complete(void *req, void *opaque, int ret) "req %p opaque %p ret %d" +thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p" +thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d" thread_pool_cancel(void *req, void *opaque) "req %p opaque %p" # posix-aio-compat.c diff --git a/util/qemu-option.c b/util/qemu-option.c index 5a1d03c..8b74bf1 100644 --- a/util/qemu-option.c +++ b/util/qemu-option.c @@ -1067,6 +1067,40 @@ QemuOpts *qemu_opts_from_qdict(QemuOptsList *list, const QDict *qdict, } /* + * Adds all QDict entries to the QemuOpts that can be added and removes them + * from the QDict. When this function returns, the QDict contains only those + * entries that couldn't be added to the QemuOpts. + */ +void qemu_opts_absorb_qdict(QemuOpts *opts, QDict *qdict, Error **errp) +{ + const QDictEntry *entry, *next; + + entry = qdict_first(qdict); + + while (entry != NULL) { + Error *local_err = NULL; + OptsFromQDictState state = { + .errp = &local_err, + .opts = opts, + }; + + next = qdict_next(qdict, entry); + + if (find_desc_by_name(opts->list->desc, entry->key)) { + qemu_opts_from_qdict_1(entry->key, entry->value, &state); + if (error_is_set(&local_err)) { + error_propagate(errp, local_err); + return; + } else { + qdict_del(qdict, entry->key); + } + } + + entry = next; + } +} + +/* * Convert from QemuOpts to QDict. * The QDict values are of type QString. * TODO We'll want to use types appropriate for opt->desc->type, but |