diff options
Diffstat (limited to 'util/thread-pool.c')
-rw-r--r-- | util/thread-pool.c | 184 |
1 files changed, 149 insertions, 35 deletions
diff --git a/util/thread-pool.c b/util/thread-pool.c index 27eb777..d2ead6b 100644 --- a/util/thread-pool.c +++ b/util/thread-pool.c @@ -23,9 +23,9 @@ #include "block/thread-pool.h" #include "qemu/main-loop.h" -static void do_spawn_thread(ThreadPool *pool); +static void do_spawn_thread(ThreadPoolAio *pool); -typedef struct ThreadPoolElement ThreadPoolElement; +typedef struct ThreadPoolElementAio ThreadPoolElementAio; enum ThreadState { THREAD_QUEUED, @@ -33,9 +33,9 @@ enum ThreadState { THREAD_DONE, }; -struct ThreadPoolElement { +struct ThreadPoolElementAio { BlockAIOCB common; - ThreadPool *pool; + ThreadPoolAio *pool; ThreadPoolFunc *func; void *arg; @@ -47,13 +47,13 @@ struct ThreadPoolElement { int ret; /* Access to this list is protected by lock. */ - QTAILQ_ENTRY(ThreadPoolElement) reqs; + QTAILQ_ENTRY(ThreadPoolElementAio) reqs; /* This list is only written by the thread pool's mother thread. */ - QLIST_ENTRY(ThreadPoolElement) all; + QLIST_ENTRY(ThreadPoolElementAio) all; }; -struct ThreadPool { +struct ThreadPoolAio { AioContext *ctx; QEMUBH *completion_bh; QemuMutex lock; @@ -62,10 +62,10 @@ struct ThreadPool { QEMUBH *new_thread_bh; /* The following variables are only accessed from one AioContext. */ - QLIST_HEAD(, ThreadPoolElement) head; + QLIST_HEAD(, ThreadPoolElementAio) head; /* The following variables are protected by lock. */ - QTAILQ_HEAD(, ThreadPoolElement) request_list; + QTAILQ_HEAD(, ThreadPoolElementAio) request_list; int cur_threads; int idle_threads; int new_threads; /* backlog of threads we need to create */ @@ -76,14 +76,14 @@ struct ThreadPool { static void *worker_thread(void *opaque) { - ThreadPool *pool = opaque; + ThreadPoolAio *pool = opaque; qemu_mutex_lock(&pool->lock); pool->pending_threads--; do_spawn_thread(pool); while (pool->cur_threads <= pool->max_threads) { - ThreadPoolElement *req; + ThreadPoolElementAio *req; int ret; if (QTAILQ_EMPTY(&pool->request_list)) { @@ -131,7 +131,7 @@ static void *worker_thread(void *opaque) return NULL; } -static void do_spawn_thread(ThreadPool *pool) +static void do_spawn_thread(ThreadPoolAio *pool) { QemuThread t; @@ -148,14 +148,14 @@ static void do_spawn_thread(ThreadPool *pool) static void spawn_thread_bh_fn(void *opaque) { - ThreadPool *pool = opaque; + ThreadPoolAio *pool = opaque; qemu_mutex_lock(&pool->lock); do_spawn_thread(pool); qemu_mutex_unlock(&pool->lock); } -static void spawn_thread(ThreadPool *pool) +static void spawn_thread(ThreadPoolAio *pool) { pool->cur_threads++; pool->new_threads++; @@ -173,8 +173,8 @@ static void spawn_thread(ThreadPool *pool) static void thread_pool_completion_bh(void *opaque) { - ThreadPool *pool = opaque; - ThreadPoolElement *elem, *next; + ThreadPoolAio *pool = opaque; + ThreadPoolElementAio *elem, *next; defer_call_begin(); /* cb() may use defer_call() to coalesce work */ @@ -184,8 +184,8 @@ restart: continue; } - trace_thread_pool_complete(pool, elem, elem->common.opaque, - elem->ret); + trace_thread_pool_complete_aio(pool, elem, elem->common.opaque, + elem->ret); QLIST_REMOVE(elem, all); if (elem->common.cb) { @@ -217,10 +217,10 @@ restart: static void thread_pool_cancel(BlockAIOCB *acb) { - ThreadPoolElement *elem = (ThreadPoolElement *)acb; - ThreadPool *pool = elem->pool; + ThreadPoolElementAio *elem = (ThreadPoolElementAio *)acb; + ThreadPoolAio *pool = elem->pool; - trace_thread_pool_cancel(elem, elem->common.opaque); + trace_thread_pool_cancel_aio(elem, elem->common.opaque); QEMU_LOCK_GUARD(&pool->lock); if (elem->state == THREAD_QUEUED) { @@ -234,16 +234,16 @@ static void thread_pool_cancel(BlockAIOCB *acb) } static const AIOCBInfo thread_pool_aiocb_info = { - .aiocb_size = sizeof(ThreadPoolElement), + .aiocb_size = sizeof(ThreadPoolElementAio), .cancel_async = thread_pool_cancel, }; BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, BlockCompletionFunc *cb, void *opaque) { - ThreadPoolElement *req; + ThreadPoolElementAio *req; AioContext *ctx = qemu_get_current_aio_context(); - ThreadPool *pool = aio_get_thread_pool(ctx); + ThreadPoolAio *pool = aio_get_thread_pool(ctx); /* Assert that the thread submitting work is the same running the pool */ assert(pool->ctx == qemu_get_current_aio_context()); @@ -256,7 +256,7 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, QLIST_INSERT_HEAD(&pool->head, req, all); - trace_thread_pool_submit(pool, req, arg); + trace_thread_pool_submit_aio(pool, req, arg); qemu_mutex_lock(&pool->lock); if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) { @@ -290,12 +290,7 @@ int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg) return tpc.ret; } -void thread_pool_submit(ThreadPoolFunc *func, void *arg) -{ - thread_pool_submit_aio(func, arg, NULL, NULL); -} - -void thread_pool_update_params(ThreadPool *pool, AioContext *ctx) +void thread_pool_update_params(ThreadPoolAio *pool, AioContext *ctx) { qemu_mutex_lock(&pool->lock); @@ -322,7 +317,7 @@ void thread_pool_update_params(ThreadPool *pool, AioContext *ctx) qemu_mutex_unlock(&pool->lock); } -static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) +static void thread_pool_init_one(ThreadPoolAio *pool, AioContext *ctx) { if (!ctx) { ctx = qemu_get_aio_context(); @@ -342,14 +337,14 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) thread_pool_update_params(pool, ctx); } -ThreadPool *thread_pool_new(AioContext *ctx) +ThreadPoolAio *thread_pool_new_aio(AioContext *ctx) { - ThreadPool *pool = g_new(ThreadPool, 1); + ThreadPoolAio *pool = g_new(ThreadPoolAio, 1); thread_pool_init_one(pool, ctx); return pool; } -void thread_pool_free(ThreadPool *pool) +void thread_pool_free_aio(ThreadPoolAio *pool) { if (!pool) { return; @@ -379,3 +374,122 @@ void thread_pool_free(ThreadPool *pool) qemu_mutex_destroy(&pool->lock); g_free(pool); } + +struct ThreadPool { + GThreadPool *t; + size_t cur_work; + QemuMutex cur_work_lock; + QemuCond all_finished_cond; +}; + +typedef struct { + ThreadPoolFunc *func; + void *opaque; + GDestroyNotify opaque_destroy; +} ThreadPoolElement; + +static void thread_pool_func(gpointer data, gpointer user_data) +{ + ThreadPool *pool = user_data; + g_autofree ThreadPoolElement *el = data; + + el->func(el->opaque); + + if (el->opaque_destroy) { + el->opaque_destroy(el->opaque); + } + + QEMU_LOCK_GUARD(&pool->cur_work_lock); + + assert(pool->cur_work > 0); + pool->cur_work--; + + if (pool->cur_work == 0) { + qemu_cond_signal(&pool->all_finished_cond); + } +} + +ThreadPool *thread_pool_new(void) +{ + ThreadPool *pool = g_new(ThreadPool, 1); + + pool->cur_work = 0; + qemu_mutex_init(&pool->cur_work_lock); + qemu_cond_init(&pool->all_finished_cond); + + pool->t = g_thread_pool_new(thread_pool_func, pool, 0, TRUE, NULL); + /* + * g_thread_pool_new() can only return errors if initial thread(s) + * creation fails but we ask for 0 initial threads above. + */ + assert(pool->t); + + return pool; +} + +void thread_pool_free(ThreadPool *pool) +{ + /* + * With _wait = TRUE this effectively waits for all + * previously submitted work to complete first. + */ + g_thread_pool_free(pool->t, FALSE, TRUE); + + qemu_cond_destroy(&pool->all_finished_cond); + qemu_mutex_destroy(&pool->cur_work_lock); + + g_free(pool); +} + +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, + void *opaque, GDestroyNotify opaque_destroy) +{ + ThreadPoolElement *el = g_new(ThreadPoolElement, 1); + + el->func = func; + el->opaque = opaque; + el->opaque_destroy = opaque_destroy; + + WITH_QEMU_LOCK_GUARD(&pool->cur_work_lock) { + pool->cur_work++; + } + + /* + * Ignore the return value since this function can only return errors + * if creation of an additional thread fails but even in this case the + * provided work is still getting queued (just for the existing threads). + */ + g_thread_pool_push(pool->t, el, NULL); +} + +void thread_pool_submit_immediate(ThreadPool *pool, ThreadPoolFunc *func, + void *opaque, GDestroyNotify opaque_destroy) +{ + thread_pool_submit(pool, func, opaque, opaque_destroy); + thread_pool_adjust_max_threads_to_work(pool); +} + +void thread_pool_wait(ThreadPool *pool) +{ + QEMU_LOCK_GUARD(&pool->cur_work_lock); + + while (pool->cur_work > 0) { + qemu_cond_wait(&pool->all_finished_cond, + &pool->cur_work_lock); + } +} + +bool thread_pool_set_max_threads(ThreadPool *pool, + int max_threads) +{ + assert(max_threads > 0); + + return g_thread_pool_set_max_threads(pool->t, max_threads, NULL); +} + +bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool) +{ + QEMU_LOCK_GUARD(&pool->cur_work_lock); + + return thread_pool_set_max_threads(pool, pool->cur_work); +} |