diff options
Diffstat (limited to 'thread-pool.c')
-rw-r--r-- | thread-pool.c | 27 |
1 files changed, 14 insertions, 13 deletions
diff --git a/thread-pool.c b/thread-pool.c index dfb699d..23888dc 100644 --- a/thread-pool.c +++ b/thread-pool.c @@ -21,7 +21,6 @@ #include "block/coroutine.h" #include "trace.h" #include "block/block_int.h" -#include "qemu/event_notifier.h" #include "block/thread-pool.h" #include "qemu/main-loop.h" @@ -57,8 +56,8 @@ struct ThreadPoolElement { }; struct ThreadPool { - EventNotifier notifier; AioContext *ctx; + QEMUBH *completion_bh; QemuMutex lock; QemuCond check_cancel; QemuCond worker_stopped; @@ -119,7 +118,7 @@ static void *worker_thread(void *opaque) qemu_cond_broadcast(&pool->check_cancel); } - event_notifier_set(&pool->notifier); + qemu_bh_schedule(pool->completion_bh); } pool->cur_threads--; @@ -168,12 +167,11 @@ static void spawn_thread(ThreadPool *pool) } } -static void event_notifier_ready(EventNotifier *notifier) +static void thread_pool_completion_bh(void *opaque) { - ThreadPool *pool = container_of(notifier, ThreadPool, notifier); + ThreadPool *pool = opaque; ThreadPoolElement *elem, *next; - event_notifier_test_and_clear(notifier); restart: QLIST_FOREACH_SAFE(elem, &pool->head, all, next) { if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) { @@ -187,6 +185,12 @@ restart: QLIST_REMOVE(elem, all); /* Read state before ret. */ smp_rmb(); + + /* Schedule ourselves in case elem->common.cb() calls aio_poll() to + * wait for another request that completed at the same time. + */ + qemu_bh_schedule(pool->completion_bh); + elem->common.cb(elem->common.opaque, elem->ret); qemu_aio_release(elem); goto restart; @@ -215,7 +219,7 @@ static void thread_pool_cancel(BlockDriverAIOCB *acb) qemu_sem_timedwait(&pool->sem, 0) == 0) { QTAILQ_REMOVE(&pool->request_list, elem, reqs); elem->state = THREAD_CANCELED; - event_notifier_set(&pool->notifier); + qemu_bh_schedule(pool->completion_bh); } else { pool->pending_cancellations++; while (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) { @@ -224,7 +228,7 @@ static void thread_pool_cancel(BlockDriverAIOCB *acb) pool->pending_cancellations--; } qemu_mutex_unlock(&pool->lock); - event_notifier_ready(&pool->notifier); + thread_pool_completion_bh(pool); } static const AIOCBInfo thread_pool_aiocb_info = { @@ -293,8 +297,8 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) } memset(pool, 0, sizeof(*pool)); - event_notifier_init(&pool->notifier, false); pool->ctx = ctx; + pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool); qemu_mutex_init(&pool->lock); qemu_cond_init(&pool->check_cancel); qemu_cond_init(&pool->worker_stopped); @@ -304,8 +308,6 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) QLIST_INIT(&pool->head); QTAILQ_INIT(&pool->request_list); - - aio_set_event_notifier(ctx, &pool->notifier, event_notifier_ready); } ThreadPool *thread_pool_new(AioContext *ctx) @@ -339,11 +341,10 @@ void thread_pool_free(ThreadPool *pool) qemu_mutex_unlock(&pool->lock); - aio_set_event_notifier(pool->ctx, &pool->notifier, NULL); + qemu_bh_delete(pool->completion_bh); qemu_sem_destroy(&pool->sem); qemu_cond_destroy(&pool->check_cancel); qemu_cond_destroy(&pool->worker_stopped); qemu_mutex_destroy(&pool->lock); - event_notifier_cleanup(&pool->notifier); g_free(pool); } |