diff options
Diffstat (limited to 'util')
-rw-r--r-- | util/Makefile.objs | 6 | ||||
-rw-r--r-- | util/aio-posix.c | 728 | ||||
-rw-r--r-- | util/aio-win32.c | 407 | ||||
-rw-r--r-- | util/aiocb.c | 55 | ||||
-rw-r--r-- | util/async.c | 490 | ||||
-rw-r--r-- | util/iohandler.c | 136 | ||||
-rw-r--r-- | util/main-loop.c | 526 | ||||
-rw-r--r-- | util/qemu-coroutine-lock.c | 252 | ||||
-rw-r--r-- | util/qemu-coroutine-sleep.c | 2 | ||||
-rw-r--r-- | util/qemu-coroutine.c | 8 | ||||
-rw-r--r-- | util/qemu-timer.c | 669 | ||||
-rw-r--r-- | util/thread-pool.c | 347 | ||||
-rw-r--r-- | util/trace-events | 17 |
13 files changed, 3615 insertions, 28 deletions
diff --git a/util/Makefile.objs b/util/Makefile.objs index 56c8c23..bc629e2 100644 --- a/util/Makefile.objs +++ b/util/Makefile.objs @@ -1,14 +1,18 @@ util-obj-y = osdep.o cutils.o unicode.o qemu-timer-common.o util-obj-y += bufferiszero.o util-obj-y += lockcnt.o +util-obj-y += aiocb.o async.o thread-pool.o qemu-timer.o +util-obj-y += main-loop.o iohandler.o +util-obj-$(CONFIG_POSIX) += aio-posix.o util-obj-$(CONFIG_POSIX) += compatfd.o util-obj-$(CONFIG_POSIX) += event_notifier-posix.o util-obj-$(CONFIG_POSIX) += mmap-alloc.o util-obj-$(CONFIG_POSIX) += oslib-posix.o util-obj-$(CONFIG_POSIX) += qemu-openpty.o util-obj-$(CONFIG_POSIX) += qemu-thread-posix.o -util-obj-$(CONFIG_WIN32) += event_notifier-win32.o util-obj-$(CONFIG_POSIX) += memfd.o +util-obj-$(CONFIG_WIN32) += aio-win32.o +util-obj-$(CONFIG_WIN32) += event_notifier-win32.o util-obj-$(CONFIG_WIN32) += oslib-win32.o util-obj-$(CONFIG_WIN32) += qemu-thread-win32.o util-obj-y += envlist.o path.o module.o diff --git a/util/aio-posix.c b/util/aio-posix.c new file mode 100644 index 0000000..2d51239 --- /dev/null +++ b/util/aio-posix.c @@ -0,0 +1,728 @@ +/* + * QEMU aio implementation + * + * Copyright IBM, Corp. 2008 + * + * Authors: + * Anthony Liguori <aliguori@us.ibm.com> + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + * + * Contributions after 2012-01-13 are licensed under the terms of the + * GNU GPL, version 2 or (at your option) any later version. + */ + +#include "qemu/osdep.h" +#include "qemu-common.h" +#include "block/block.h" +#include "qemu/rcu_queue.h" +#include "qemu/sockets.h" +#include "qemu/cutils.h" +#include "trace.h" +#ifdef CONFIG_EPOLL_CREATE1 +#include <sys/epoll.h> +#endif + +struct AioHandler +{ + GPollFD pfd; + IOHandler *io_read; + IOHandler *io_write; + AioPollFn *io_poll; + IOHandler *io_poll_begin; + IOHandler *io_poll_end; + int deleted; + void *opaque; + bool is_external; + QLIST_ENTRY(AioHandler) node; +}; + +#ifdef CONFIG_EPOLL_CREATE1 + +/* The fd number threashold to switch to epoll */ +#define EPOLL_ENABLE_THRESHOLD 64 + +static void aio_epoll_disable(AioContext *ctx) +{ + ctx->epoll_available = false; + if (!ctx->epoll_enabled) { + return; + } + ctx->epoll_enabled = false; + close(ctx->epollfd); +} + +static inline int epoll_events_from_pfd(int pfd_events) +{ + return (pfd_events & G_IO_IN ? EPOLLIN : 0) | + (pfd_events & G_IO_OUT ? EPOLLOUT : 0) | + (pfd_events & G_IO_HUP ? EPOLLHUP : 0) | + (pfd_events & G_IO_ERR ? EPOLLERR : 0); +} + +static bool aio_epoll_try_enable(AioContext *ctx) +{ + AioHandler *node; + struct epoll_event event; + + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { + int r; + if (node->deleted || !node->pfd.events) { + continue; + } + event.events = epoll_events_from_pfd(node->pfd.events); + event.data.ptr = node; + r = epoll_ctl(ctx->epollfd, EPOLL_CTL_ADD, node->pfd.fd, &event); + if (r) { + return false; + } + } + ctx->epoll_enabled = true; + return true; +} + +static void aio_epoll_update(AioContext *ctx, AioHandler *node, bool is_new) +{ + struct epoll_event event; + int r; + int ctl; + + if (!ctx->epoll_enabled) { + return; + } + if (!node->pfd.events) { + ctl = EPOLL_CTL_DEL; + } else { + event.data.ptr = node; + event.events = epoll_events_from_pfd(node->pfd.events); + ctl = is_new ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; + } + + r = epoll_ctl(ctx->epollfd, ctl, node->pfd.fd, &event); + if (r) { + aio_epoll_disable(ctx); + } +} + +static int aio_epoll(AioContext *ctx, GPollFD *pfds, + unsigned npfd, int64_t timeout) +{ + AioHandler *node; + int i, ret = 0; + struct epoll_event events[128]; + + assert(npfd == 1); + assert(pfds[0].fd == ctx->epollfd); + if (timeout > 0) { + ret = qemu_poll_ns(pfds, npfd, timeout); + } + if (timeout <= 0 || ret > 0) { + ret = epoll_wait(ctx->epollfd, events, + sizeof(events) / sizeof(events[0]), + timeout); + if (ret <= 0) { + goto out; + } + for (i = 0; i < ret; i++) { + int ev = events[i].events; + node = events[i].data.ptr; + node->pfd.revents = (ev & EPOLLIN ? G_IO_IN : 0) | + (ev & EPOLLOUT ? G_IO_OUT : 0) | + (ev & EPOLLHUP ? G_IO_HUP : 0) | + (ev & EPOLLERR ? G_IO_ERR : 0); + } + } +out: + return ret; +} + +static bool aio_epoll_enabled(AioContext *ctx) +{ + /* Fall back to ppoll when external clients are disabled. */ + return !aio_external_disabled(ctx) && ctx->epoll_enabled; +} + +static bool aio_epoll_check_poll(AioContext *ctx, GPollFD *pfds, + unsigned npfd, int64_t timeout) +{ + if (!ctx->epoll_available) { + return false; + } + if (aio_epoll_enabled(ctx)) { + return true; + } + if (npfd >= EPOLL_ENABLE_THRESHOLD) { + if (aio_epoll_try_enable(ctx)) { + return true; + } else { + aio_epoll_disable(ctx); + } + } + return false; +} + +#else + +static void aio_epoll_update(AioContext *ctx, AioHandler *node, bool is_new) +{ +} + +static int aio_epoll(AioContext *ctx, GPollFD *pfds, + unsigned npfd, int64_t timeout) +{ + assert(false); +} + +static bool aio_epoll_enabled(AioContext *ctx) +{ + return false; +} + +static bool aio_epoll_check_poll(AioContext *ctx, GPollFD *pfds, + unsigned npfd, int64_t timeout) +{ + return false; +} + +#endif + +static AioHandler *find_aio_handler(AioContext *ctx, int fd) +{ + AioHandler *node; + + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + if (node->pfd.fd == fd) + if (!node->deleted) + return node; + } + + return NULL; +} + +void aio_set_fd_handler(AioContext *ctx, + int fd, + bool is_external, + IOHandler *io_read, + IOHandler *io_write, + AioPollFn *io_poll, + void *opaque) +{ + AioHandler *node; + bool is_new = false; + bool deleted = false; + + qemu_lockcnt_lock(&ctx->list_lock); + + node = find_aio_handler(ctx, fd); + + /* Are we deleting the fd handler? */ + if (!io_read && !io_write && !io_poll) { + if (node == NULL) { + qemu_lockcnt_unlock(&ctx->list_lock); + return; + } + + g_source_remove_poll(&ctx->source, &node->pfd); + + /* If the lock is held, just mark the node as deleted */ + if (qemu_lockcnt_count(&ctx->list_lock)) { + node->deleted = 1; + node->pfd.revents = 0; + } else { + /* Otherwise, delete it for real. We can't just mark it as + * deleted because deleted nodes are only cleaned up while + * no one is walking the handlers list. + */ + QLIST_REMOVE(node, node); + deleted = true; + } + + if (!node->io_poll) { + ctx->poll_disable_cnt--; + } + } else { + if (node == NULL) { + /* Alloc and insert if it's not already there */ + node = g_new0(AioHandler, 1); + node->pfd.fd = fd; + QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node); + + g_source_add_poll(&ctx->source, &node->pfd); + is_new = true; + + ctx->poll_disable_cnt += !io_poll; + } else { + ctx->poll_disable_cnt += !io_poll - !node->io_poll; + } + + /* Update handler with latest information */ + node->io_read = io_read; + node->io_write = io_write; + node->io_poll = io_poll; + node->opaque = opaque; + node->is_external = is_external; + + node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0); + node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0); + } + + aio_epoll_update(ctx, node, is_new); + qemu_lockcnt_unlock(&ctx->list_lock); + aio_notify(ctx); + + if (deleted) { + g_free(node); + } +} + +void aio_set_fd_poll(AioContext *ctx, int fd, + IOHandler *io_poll_begin, + IOHandler *io_poll_end) +{ + AioHandler *node = find_aio_handler(ctx, fd); + + if (!node) { + return; + } + + node->io_poll_begin = io_poll_begin; + node->io_poll_end = io_poll_end; +} + +void aio_set_event_notifier(AioContext *ctx, + EventNotifier *notifier, + bool is_external, + EventNotifierHandler *io_read, + AioPollFn *io_poll) +{ + aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), is_external, + (IOHandler *)io_read, NULL, io_poll, notifier); +} + +void aio_set_event_notifier_poll(AioContext *ctx, + EventNotifier *notifier, + EventNotifierHandler *io_poll_begin, + EventNotifierHandler *io_poll_end) +{ + aio_set_fd_poll(ctx, event_notifier_get_fd(notifier), + (IOHandler *)io_poll_begin, + (IOHandler *)io_poll_end); +} + +static void poll_set_started(AioContext *ctx, bool started) +{ + AioHandler *node; + + if (started == ctx->poll_started) { + return; + } + + ctx->poll_started = started; + + qemu_lockcnt_inc(&ctx->list_lock); + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { + IOHandler *fn; + + if (node->deleted) { + continue; + } + + if (started) { + fn = node->io_poll_begin; + } else { + fn = node->io_poll_end; + } + + if (fn) { + fn(node->opaque); + } + } + qemu_lockcnt_dec(&ctx->list_lock); +} + + +bool aio_prepare(AioContext *ctx) +{ + /* Poll mode cannot be used with glib's event loop, disable it. */ + poll_set_started(ctx, false); + + return false; +} + +bool aio_pending(AioContext *ctx) +{ + AioHandler *node; + bool result = false; + + /* + * We have to walk very carefully in case aio_set_fd_handler is + * called while we're walking. + */ + qemu_lockcnt_inc(&ctx->list_lock); + + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { + int revents; + + revents = node->pfd.revents & node->pfd.events; + if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read && + aio_node_check(ctx, node->is_external)) { + result = true; + break; + } + if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write && + aio_node_check(ctx, node->is_external)) { + result = true; + break; + } + } + qemu_lockcnt_dec(&ctx->list_lock); + + return result; +} + +static bool aio_dispatch_handlers(AioContext *ctx) +{ + AioHandler *node, *tmp; + bool progress = false; + + QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) { + int revents; + + revents = node->pfd.revents & node->pfd.events; + node->pfd.revents = 0; + + if (!node->deleted && + (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) && + aio_node_check(ctx, node->is_external) && + node->io_read) { + node->io_read(node->opaque); + + /* aio_notify() does not count as progress */ + if (node->opaque != &ctx->notifier) { + progress = true; + } + } + if (!node->deleted && + (revents & (G_IO_OUT | G_IO_ERR)) && + aio_node_check(ctx, node->is_external) && + node->io_write) { + node->io_write(node->opaque); + progress = true; + } + + if (node->deleted) { + if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) { + QLIST_REMOVE(node, node); + g_free(node); + qemu_lockcnt_inc_and_unlock(&ctx->list_lock); + } + } + } + + return progress; +} + +void aio_dispatch(AioContext *ctx) +{ + qemu_lockcnt_inc(&ctx->list_lock); + aio_bh_poll(ctx); + aio_dispatch_handlers(ctx); + qemu_lockcnt_dec(&ctx->list_lock); + + timerlistgroup_run_timers(&ctx->tlg); +} + +/* These thread-local variables are used only in a small part of aio_poll + * around the call to the poll() system call. In particular they are not + * used while aio_poll is performing callbacks, which makes it much easier + * to think about reentrancy! + * + * Stack-allocated arrays would be perfect but they have size limitations; + * heap allocation is expensive enough that we want to reuse arrays across + * calls to aio_poll(). And because poll() has to be called without holding + * any lock, the arrays cannot be stored in AioContext. Thread-local data + * has none of the disadvantages of these three options. + */ +static __thread GPollFD *pollfds; +static __thread AioHandler **nodes; +static __thread unsigned npfd, nalloc; +static __thread Notifier pollfds_cleanup_notifier; + +static void pollfds_cleanup(Notifier *n, void *unused) +{ + g_assert(npfd == 0); + g_free(pollfds); + g_free(nodes); + nalloc = 0; +} + +static void add_pollfd(AioHandler *node) +{ + if (npfd == nalloc) { + if (nalloc == 0) { + pollfds_cleanup_notifier.notify = pollfds_cleanup; + qemu_thread_atexit_add(&pollfds_cleanup_notifier); + nalloc = 8; + } else { + g_assert(nalloc <= INT_MAX); + nalloc *= 2; + } + pollfds = g_renew(GPollFD, pollfds, nalloc); + nodes = g_renew(AioHandler *, nodes, nalloc); + } + nodes[npfd] = node; + pollfds[npfd] = (GPollFD) { + .fd = node->pfd.fd, + .events = node->pfd.events, + }; + npfd++; +} + +static bool run_poll_handlers_once(AioContext *ctx) +{ + bool progress = false; + AioHandler *node; + + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { + if (!node->deleted && node->io_poll && + aio_node_check(ctx, node->is_external) && + node->io_poll(node->opaque)) { + progress = true; + } + + /* Caller handles freeing deleted nodes. Don't do it here. */ + } + + return progress; +} + +/* run_poll_handlers: + * @ctx: the AioContext + * @max_ns: maximum time to poll for, in nanoseconds + * + * Polls for a given time. + * + * Note that ctx->notify_me must be non-zero so this function can detect + * aio_notify(). + * + * Note that the caller must have incremented ctx->list_lock. + * + * Returns: true if progress was made, false otherwise + */ +static bool run_poll_handlers(AioContext *ctx, int64_t max_ns) +{ + bool progress; + int64_t end_time; + + assert(ctx->notify_me); + assert(qemu_lockcnt_count(&ctx->list_lock) > 0); + assert(ctx->poll_disable_cnt == 0); + + trace_run_poll_handlers_begin(ctx, max_ns); + + end_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + max_ns; + + do { + progress = run_poll_handlers_once(ctx); + } while (!progress && qemu_clock_get_ns(QEMU_CLOCK_REALTIME) < end_time); + + trace_run_poll_handlers_end(ctx, progress); + + return progress; +} + +/* try_poll_mode: + * @ctx: the AioContext + * @blocking: busy polling is only attempted when blocking is true + * + * ctx->notify_me must be non-zero so this function can detect aio_notify(). + * + * Note that the caller must have incremented ctx->list_lock. + * + * Returns: true if progress was made, false otherwise + */ +static bool try_poll_mode(AioContext *ctx, bool blocking) +{ + if (blocking && ctx->poll_max_ns && ctx->poll_disable_cnt == 0) { + /* See qemu_soonest_timeout() uint64_t hack */ + int64_t max_ns = MIN((uint64_t)aio_compute_timeout(ctx), + (uint64_t)ctx->poll_ns); + + if (max_ns) { + poll_set_started(ctx, true); + + if (run_poll_handlers(ctx, max_ns)) { + return true; + } + } + } + + poll_set_started(ctx, false); + + /* Even if we don't run busy polling, try polling once in case it can make + * progress and the caller will be able to avoid ppoll(2)/epoll_wait(2). + */ + return run_poll_handlers_once(ctx); +} + +bool aio_poll(AioContext *ctx, bool blocking) +{ + AioHandler *node; + int i; + int ret = 0; + bool progress; + int64_t timeout; + int64_t start = 0; + + /* aio_notify can avoid the expensive event_notifier_set if + * everything (file descriptors, bottom halves, timers) will + * be re-evaluated before the next blocking poll(). This is + * already true when aio_poll is called with blocking == false; + * if blocking == true, it is only true after poll() returns, + * so disable the optimization now. + */ + if (blocking) { + atomic_add(&ctx->notify_me, 2); + } + + qemu_lockcnt_inc(&ctx->list_lock); + + if (ctx->poll_max_ns) { + start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); + } + + progress = try_poll_mode(ctx, blocking); + if (!progress) { + assert(npfd == 0); + + /* fill pollfds */ + + if (!aio_epoll_enabled(ctx)) { + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { + if (!node->deleted && node->pfd.events + && aio_node_check(ctx, node->is_external)) { + add_pollfd(node); + } + } + } + + timeout = blocking ? aio_compute_timeout(ctx) : 0; + + /* wait until next event */ + if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) { + AioHandler epoll_handler; + + epoll_handler.pfd.fd = ctx->epollfd; + epoll_handler.pfd.events = G_IO_IN | G_IO_OUT | G_IO_HUP | G_IO_ERR; + npfd = 0; + add_pollfd(&epoll_handler); + ret = aio_epoll(ctx, pollfds, npfd, timeout); + } else { + ret = qemu_poll_ns(pollfds, npfd, timeout); + } + } + + if (blocking) { + atomic_sub(&ctx->notify_me, 2); + } + + /* Adjust polling time */ + if (ctx->poll_max_ns) { + int64_t block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start; + + if (block_ns <= ctx->poll_ns) { + /* This is the sweet spot, no adjustment needed */ + } else if (block_ns > ctx->poll_max_ns) { + /* We'd have to poll for too long, poll less */ + int64_t old = ctx->poll_ns; + + if (ctx->poll_shrink) { + ctx->poll_ns /= ctx->poll_shrink; + } else { + ctx->poll_ns = 0; + } + + trace_poll_shrink(ctx, old, ctx->poll_ns); + } else if (ctx->poll_ns < ctx->poll_max_ns && + block_ns < ctx->poll_max_ns) { + /* There is room to grow, poll longer */ + int64_t old = ctx->poll_ns; + int64_t grow = ctx->poll_grow; + + if (grow == 0) { + grow = 2; + } + + if (ctx->poll_ns) { + ctx->poll_ns *= grow; + } else { + ctx->poll_ns = 4000; /* start polling at 4 microseconds */ + } + + if (ctx->poll_ns > ctx->poll_max_ns) { + ctx->poll_ns = ctx->poll_max_ns; + } + + trace_poll_grow(ctx, old, ctx->poll_ns); + } + } + + aio_notify_accept(ctx); + + /* if we have any readable fds, dispatch event */ + if (ret > 0) { + for (i = 0; i < npfd; i++) { + nodes[i]->pfd.revents = pollfds[i].revents; + } + } + + npfd = 0; + + progress |= aio_bh_poll(ctx); + + if (ret > 0) { + progress |= aio_dispatch_handlers(ctx); + } + + qemu_lockcnt_dec(&ctx->list_lock); + + progress |= timerlistgroup_run_timers(&ctx->tlg); + + return progress; +} + +void aio_context_setup(AioContext *ctx) +{ + /* TODO remove this in final patch submission */ + if (getenv("QEMU_AIO_POLL_MAX_NS")) { + fprintf(stderr, "The QEMU_AIO_POLL_MAX_NS environment variable has " + "been replaced with -object iothread,poll-max-ns=NUM\n"); + exit(1); + } + +#ifdef CONFIG_EPOLL_CREATE1 + assert(!ctx->epollfd); + ctx->epollfd = epoll_create1(EPOLL_CLOEXEC); + if (ctx->epollfd == -1) { + fprintf(stderr, "Failed to create epoll instance: %s", strerror(errno)); + ctx->epoll_available = false; + } else { + ctx->epoll_available = true; + } +#endif +} + +void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns, + int64_t grow, int64_t shrink, Error **errp) +{ + /* No thread synchronization here, it doesn't matter if an incorrect value + * is used once. + */ + ctx->poll_max_ns = max_ns; + ctx->poll_ns = 0; + ctx->poll_grow = grow; + ctx->poll_shrink = shrink; + + aio_notify(ctx); +} diff --git a/util/aio-win32.c b/util/aio-win32.c new file mode 100644 index 0000000..bca496a --- /dev/null +++ b/util/aio-win32.c @@ -0,0 +1,407 @@ +/* + * QEMU aio implementation + * + * Copyright IBM Corp., 2008 + * Copyright Red Hat Inc., 2012 + * + * Authors: + * Anthony Liguori <aliguori@us.ibm.com> + * Paolo Bonzini <pbonzini@redhat.com> + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + * + * Contributions after 2012-01-13 are licensed under the terms of the + * GNU GPL, version 2 or (at your option) any later version. + */ + +#include "qemu/osdep.h" +#include "qemu-common.h" +#include "block/block.h" +#include "qemu/queue.h" +#include "qemu/sockets.h" +#include "qapi/error.h" +#include "qemu/rcu_queue.h" + +struct AioHandler { + EventNotifier *e; + IOHandler *io_read; + IOHandler *io_write; + EventNotifierHandler *io_notify; + GPollFD pfd; + int deleted; + void *opaque; + bool is_external; + QLIST_ENTRY(AioHandler) node; +}; + +void aio_set_fd_handler(AioContext *ctx, + int fd, + bool is_external, + IOHandler *io_read, + IOHandler *io_write, + AioPollFn *io_poll, + void *opaque) +{ + /* fd is a SOCKET in our case */ + AioHandler *node; + + qemu_lockcnt_lock(&ctx->list_lock); + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + if (node->pfd.fd == fd && !node->deleted) { + break; + } + } + + /* Are we deleting the fd handler? */ + if (!io_read && !io_write) { + if (node) { + /* If aio_poll is in progress, just mark the node as deleted */ + if (qemu_lockcnt_count(&ctx->list_lock)) { + node->deleted = 1; + node->pfd.revents = 0; + } else { + /* Otherwise, delete it for real. We can't just mark it as + * deleted because deleted nodes are only cleaned up after + * releasing the list_lock. + */ + QLIST_REMOVE(node, node); + g_free(node); + } + } + } else { + HANDLE event; + + if (node == NULL) { + /* Alloc and insert if it's not already there */ + node = g_new0(AioHandler, 1); + node->pfd.fd = fd; + QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node); + } + + node->pfd.events = 0; + if (node->io_read) { + node->pfd.events |= G_IO_IN; + } + if (node->io_write) { + node->pfd.events |= G_IO_OUT; + } + + node->e = &ctx->notifier; + + /* Update handler with latest information */ + node->opaque = opaque; + node->io_read = io_read; + node->io_write = io_write; + node->is_external = is_external; + + event = event_notifier_get_handle(&ctx->notifier); + WSAEventSelect(node->pfd.fd, event, + FD_READ | FD_ACCEPT | FD_CLOSE | + FD_CONNECT | FD_WRITE | FD_OOB); + } + + qemu_lockcnt_unlock(&ctx->list_lock); + aio_notify(ctx); +} + +void aio_set_fd_poll(AioContext *ctx, int fd, + IOHandler *io_poll_begin, + IOHandler *io_poll_end) +{ + /* Not implemented */ +} + +void aio_set_event_notifier(AioContext *ctx, + EventNotifier *e, + bool is_external, + EventNotifierHandler *io_notify, + AioPollFn *io_poll) +{ + AioHandler *node; + + qemu_lockcnt_lock(&ctx->list_lock); + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + if (node->e == e && !node->deleted) { + break; + } + } + + /* Are we deleting the fd handler? */ + if (!io_notify) { + if (node) { + g_source_remove_poll(&ctx->source, &node->pfd); + + /* aio_poll is in progress, just mark the node as deleted */ + if (qemu_lockcnt_count(&ctx->list_lock)) { + node->deleted = 1; + node->pfd.revents = 0; + } else { + /* Otherwise, delete it for real. We can't just mark it as + * deleted because deleted nodes are only cleaned up after + * releasing the list_lock. + */ + QLIST_REMOVE(node, node); + g_free(node); + } + } + } else { + if (node == NULL) { + /* Alloc and insert if it's not already there */ + node = g_new0(AioHandler, 1); + node->e = e; + node->pfd.fd = (uintptr_t)event_notifier_get_handle(e); + node->pfd.events = G_IO_IN; + node->is_external = is_external; + QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node); + + g_source_add_poll(&ctx->source, &node->pfd); + } + /* Update handler with latest information */ + node->io_notify = io_notify; + } + + qemu_lockcnt_unlock(&ctx->list_lock); + aio_notify(ctx); +} + +void aio_set_event_notifier_poll(AioContext *ctx, + EventNotifier *notifier, + EventNotifierHandler *io_poll_begin, + EventNotifierHandler *io_poll_end) +{ + /* Not implemented */ +} + +bool aio_prepare(AioContext *ctx) +{ + static struct timeval tv0; + AioHandler *node; + bool have_select_revents = false; + fd_set rfds, wfds; + + /* + * We have to walk very carefully in case aio_set_fd_handler is + * called while we're walking. + */ + qemu_lockcnt_inc(&ctx->list_lock); + + /* fill fd sets */ + FD_ZERO(&rfds); + FD_ZERO(&wfds); + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { + if (node->io_read) { + FD_SET ((SOCKET)node->pfd.fd, &rfds); + } + if (node->io_write) { + FD_SET ((SOCKET)node->pfd.fd, &wfds); + } + } + + if (select(0, &rfds, &wfds, NULL, &tv0) > 0) { + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { + node->pfd.revents = 0; + if (FD_ISSET(node->pfd.fd, &rfds)) { + node->pfd.revents |= G_IO_IN; + have_select_revents = true; + } + + if (FD_ISSET(node->pfd.fd, &wfds)) { + node->pfd.revents |= G_IO_OUT; + have_select_revents = true; + } + } + } + + qemu_lockcnt_dec(&ctx->list_lock); + return have_select_revents; +} + +bool aio_pending(AioContext *ctx) +{ + AioHandler *node; + bool result = false; + + /* + * We have to walk very carefully in case aio_set_fd_handler is + * called while we're walking. + */ + qemu_lockcnt_inc(&ctx->list_lock); + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { + if (node->pfd.revents && node->io_notify) { + result = true; + break; + } + + if ((node->pfd.revents & G_IO_IN) && node->io_read) { + result = true; + break; + } + if ((node->pfd.revents & G_IO_OUT) && node->io_write) { + result = true; + break; + } + } + + qemu_lockcnt_dec(&ctx->list_lock); + return result; +} + +static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event) +{ + AioHandler *node; + bool progress = false; + AioHandler *tmp; + + /* + * We have to walk very carefully in case aio_set_fd_handler is + * called while we're walking. + */ + QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) { + int revents = node->pfd.revents; + + if (!node->deleted && + (revents || event_notifier_get_handle(node->e) == event) && + node->io_notify) { + node->pfd.revents = 0; + node->io_notify(node->e); + + /* aio_notify() does not count as progress */ + if (node->e != &ctx->notifier) { + progress = true; + } + } + + if (!node->deleted && + (node->io_read || node->io_write)) { + node->pfd.revents = 0; + if ((revents & G_IO_IN) && node->io_read) { + node->io_read(node->opaque); + progress = true; + } + if ((revents & G_IO_OUT) && node->io_write) { + node->io_write(node->opaque); + progress = true; + } + + /* if the next select() will return an event, we have progressed */ + if (event == event_notifier_get_handle(&ctx->notifier)) { + WSANETWORKEVENTS ev; + WSAEnumNetworkEvents(node->pfd.fd, event, &ev); + if (ev.lNetworkEvents) { + progress = true; + } + } + } + + if (node->deleted) { + if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) { + QLIST_REMOVE(node, node); + g_free(node); + qemu_lockcnt_inc_and_unlock(&ctx->list_lock); + } + } + } + + return progress; +} + +void aio_dispatch(AioContext *ctx) +{ + qemu_lockcnt_inc(&ctx->list_lock); + aio_bh_poll(ctx); + aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE); + qemu_lockcnt_dec(&ctx->list_lock); + timerlistgroup_run_timers(&ctx->tlg); +} + +bool aio_poll(AioContext *ctx, bool blocking) +{ + AioHandler *node; + HANDLE events[MAXIMUM_WAIT_OBJECTS + 1]; + bool progress, have_select_revents, first; + int count; + int timeout; + + progress = false; + + /* aio_notify can avoid the expensive event_notifier_set if + * everything (file descriptors, bottom halves, timers) will + * be re-evaluated before the next blocking poll(). This is + * already true when aio_poll is called with blocking == false; + * if blocking == true, it is only true after poll() returns, + * so disable the optimization now. + */ + if (blocking) { + atomic_add(&ctx->notify_me, 2); + } + + qemu_lockcnt_inc(&ctx->list_lock); + have_select_revents = aio_prepare(ctx); + + /* fill fd sets */ + count = 0; + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { + if (!node->deleted && node->io_notify + && aio_node_check(ctx, node->is_external)) { + events[count++] = event_notifier_get_handle(node->e); + } + } + + first = true; + + /* ctx->notifier is always registered. */ + assert(count > 0); + + /* Multiple iterations, all of them non-blocking except the first, + * may be necessary to process all pending events. After the first + * WaitForMultipleObjects call ctx->notify_me will be decremented. + */ + do { + HANDLE event; + int ret; + + timeout = blocking && !have_select_revents + ? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0; + ret = WaitForMultipleObjects(count, events, FALSE, timeout); + if (blocking) { + assert(first); + atomic_sub(&ctx->notify_me, 2); + } + + if (first) { + aio_notify_accept(ctx); + progress |= aio_bh_poll(ctx); + first = false; + } + + /* if we have any signaled events, dispatch event */ + event = NULL; + if ((DWORD) (ret - WAIT_OBJECT_0) < count) { + event = events[ret - WAIT_OBJECT_0]; + events[ret - WAIT_OBJECT_0] = events[--count]; + } else if (!have_select_revents) { + break; + } + + have_select_revents = false; + blocking = false; + + progress |= aio_dispatch_handlers(ctx, event); + } while (count > 0); + + qemu_lockcnt_dec(&ctx->list_lock); + + progress |= timerlistgroup_run_timers(&ctx->tlg); + return progress; +} + +void aio_context_setup(AioContext *ctx) +{ +} + +void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns, + int64_t grow, int64_t shrink, Error **errp) +{ + error_setg(errp, "AioContext polling is not implemented on Windows"); +} diff --git a/util/aiocb.c b/util/aiocb.c new file mode 100644 index 0000000..5aef3a0 --- /dev/null +++ b/util/aiocb.c @@ -0,0 +1,55 @@ +/* + * BlockAIOCB allocation + * + * Copyright (c) 2003-2017 Fabrice Bellard and other QEMU contributors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include "qemu/osdep.h" +#include "block/aio.h" + +void *qemu_aio_get(const AIOCBInfo *aiocb_info, BlockDriverState *bs, + BlockCompletionFunc *cb, void *opaque) +{ + BlockAIOCB *acb; + + acb = g_malloc(aiocb_info->aiocb_size); + acb->aiocb_info = aiocb_info; + acb->bs = bs; + acb->cb = cb; + acb->opaque = opaque; + acb->refcnt = 1; + return acb; +} + +void qemu_aio_ref(void *p) +{ + BlockAIOCB *acb = p; + acb->refcnt++; +} + +void qemu_aio_unref(void *p) +{ + BlockAIOCB *acb = p; + assert(acb->refcnt > 0); + if (--acb->refcnt == 0) { + g_free(acb); + } +} diff --git a/util/async.c b/util/async.c new file mode 100644 index 0000000..7d469eb --- /dev/null +++ b/util/async.c @@ -0,0 +1,490 @@ +/* + * Data plane event loop + * + * Copyright (c) 2003-2008 Fabrice Bellard + * Copyright (c) 2009-2017 QEMU contributors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include "qemu/osdep.h" +#include "qapi/error.h" +#include "qemu-common.h" +#include "block/aio.h" +#include "block/thread-pool.h" +#include "qemu/main-loop.h" +#include "qemu/atomic.h" +#include "block/raw-aio.h" +#include "qemu/coroutine_int.h" +#include "trace.h" + +/***********************************************************/ +/* bottom halves (can be seen as timers which expire ASAP) */ + +struct QEMUBH { + AioContext *ctx; + QEMUBHFunc *cb; + void *opaque; + QEMUBH *next; + bool scheduled; + bool idle; + bool deleted; +}; + +void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque) +{ + QEMUBH *bh; + bh = g_new(QEMUBH, 1); + *bh = (QEMUBH){ + .ctx = ctx, + .cb = cb, + .opaque = opaque, + }; + qemu_lockcnt_lock(&ctx->list_lock); + bh->next = ctx->first_bh; + bh->scheduled = 1; + bh->deleted = 1; + /* Make sure that the members are ready before putting bh into list */ + smp_wmb(); + ctx->first_bh = bh; + qemu_lockcnt_unlock(&ctx->list_lock); + aio_notify(ctx); +} + +QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque) +{ + QEMUBH *bh; + bh = g_new(QEMUBH, 1); + *bh = (QEMUBH){ + .ctx = ctx, + .cb = cb, + .opaque = opaque, + }; + qemu_lockcnt_lock(&ctx->list_lock); + bh->next = ctx->first_bh; + /* Make sure that the members are ready before putting bh into list */ + smp_wmb(); + ctx->first_bh = bh; + qemu_lockcnt_unlock(&ctx->list_lock); + return bh; +} + +void aio_bh_call(QEMUBH *bh) +{ + bh->cb(bh->opaque); +} + +/* Multiple occurrences of aio_bh_poll cannot be called concurrently. + * The count in ctx->list_lock is incremented before the call, and is + * not affected by the call. + */ +int aio_bh_poll(AioContext *ctx) +{ + QEMUBH *bh, **bhp, *next; + int ret; + bool deleted = false; + + ret = 0; + for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) { + next = atomic_rcu_read(&bh->next); + /* The atomic_xchg is paired with the one in qemu_bh_schedule. The + * implicit memory barrier ensures that the callback sees all writes + * done by the scheduling thread. It also ensures that the scheduling + * thread sees the zero before bh->cb has run, and thus will call + * aio_notify again if necessary. + */ + if (atomic_xchg(&bh->scheduled, 0)) { + /* Idle BHs don't count as progress */ + if (!bh->idle) { + ret = 1; + } + bh->idle = 0; + aio_bh_call(bh); + } + if (bh->deleted) { + deleted = true; + } + } + + /* remove deleted bhs */ + if (!deleted) { + return ret; + } + + if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) { + bhp = &ctx->first_bh; + while (*bhp) { + bh = *bhp; + if (bh->deleted && !bh->scheduled) { + *bhp = bh->next; + g_free(bh); + } else { + bhp = &bh->next; + } + } + qemu_lockcnt_inc_and_unlock(&ctx->list_lock); + } + return ret; +} + +void qemu_bh_schedule_idle(QEMUBH *bh) +{ + bh->idle = 1; + /* Make sure that idle & any writes needed by the callback are done + * before the locations are read in the aio_bh_poll. + */ + atomic_mb_set(&bh->scheduled, 1); +} + +void qemu_bh_schedule(QEMUBH *bh) +{ + AioContext *ctx; + + ctx = bh->ctx; + bh->idle = 0; + /* The memory barrier implicit in atomic_xchg makes sure that: + * 1. idle & any writes needed by the callback are done before the + * locations are read in the aio_bh_poll. + * 2. ctx is loaded before scheduled is set and the callback has a chance + * to execute. + */ + if (atomic_xchg(&bh->scheduled, 1) == 0) { + aio_notify(ctx); + } +} + + +/* This func is async. + */ +void qemu_bh_cancel(QEMUBH *bh) +{ + bh->scheduled = 0; +} + +/* This func is async.The bottom half will do the delete action at the finial + * end. + */ +void qemu_bh_delete(QEMUBH *bh) +{ + bh->scheduled = 0; + bh->deleted = 1; +} + +int64_t +aio_compute_timeout(AioContext *ctx) +{ + int64_t deadline; + int timeout = -1; + QEMUBH *bh; + + for (bh = atomic_rcu_read(&ctx->first_bh); bh; + bh = atomic_rcu_read(&bh->next)) { + if (bh->scheduled) { + if (bh->idle) { + /* idle bottom halves will be polled at least + * every 10ms */ + timeout = 10000000; + } else { + /* non-idle bottom halves will be executed + * immediately */ + return 0; + } + } + } + + deadline = timerlistgroup_deadline_ns(&ctx->tlg); + if (deadline == 0) { + return 0; + } else { + return qemu_soonest_timeout(timeout, deadline); + } +} + +static gboolean +aio_ctx_prepare(GSource *source, gint *timeout) +{ + AioContext *ctx = (AioContext *) source; + + atomic_or(&ctx->notify_me, 1); + + /* We assume there is no timeout already supplied */ + *timeout = qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)); + + if (aio_prepare(ctx)) { + *timeout = 0; + } + + return *timeout == 0; +} + +static gboolean +aio_ctx_check(GSource *source) +{ + AioContext *ctx = (AioContext *) source; + QEMUBH *bh; + + atomic_and(&ctx->notify_me, ~1); + aio_notify_accept(ctx); + + for (bh = ctx->first_bh; bh; bh = bh->next) { + if (bh->scheduled) { + return true; + } + } + return aio_pending(ctx) || (timerlistgroup_deadline_ns(&ctx->tlg) == 0); +} + +static gboolean +aio_ctx_dispatch(GSource *source, + GSourceFunc callback, + gpointer user_data) +{ + AioContext *ctx = (AioContext *) source; + + assert(callback == NULL); + aio_dispatch(ctx); + return true; +} + +static void +aio_ctx_finalize(GSource *source) +{ + AioContext *ctx = (AioContext *) source; + + thread_pool_free(ctx->thread_pool); + +#ifdef CONFIG_LINUX_AIO + if (ctx->linux_aio) { + laio_detach_aio_context(ctx->linux_aio, ctx); + laio_cleanup(ctx->linux_aio); + ctx->linux_aio = NULL; + } +#endif + + assert(QSLIST_EMPTY(&ctx->scheduled_coroutines)); + qemu_bh_delete(ctx->co_schedule_bh); + + qemu_lockcnt_lock(&ctx->list_lock); + assert(!qemu_lockcnt_count(&ctx->list_lock)); + while (ctx->first_bh) { + QEMUBH *next = ctx->first_bh->next; + + /* qemu_bh_delete() must have been called on BHs in this AioContext */ + assert(ctx->first_bh->deleted); + + g_free(ctx->first_bh); + ctx->first_bh = next; + } + qemu_lockcnt_unlock(&ctx->list_lock); + + aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL); + event_notifier_cleanup(&ctx->notifier); + qemu_rec_mutex_destroy(&ctx->lock); + qemu_lockcnt_destroy(&ctx->list_lock); + timerlistgroup_deinit(&ctx->tlg); +} + +static GSourceFuncs aio_source_funcs = { + aio_ctx_prepare, + aio_ctx_check, + aio_ctx_dispatch, + aio_ctx_finalize +}; + +GSource *aio_get_g_source(AioContext *ctx) +{ + g_source_ref(&ctx->source); + return &ctx->source; +} + +ThreadPool *aio_get_thread_pool(AioContext *ctx) +{ + if (!ctx->thread_pool) { + ctx->thread_pool = thread_pool_new(ctx); + } + return ctx->thread_pool; +} + +#ifdef CONFIG_LINUX_AIO +LinuxAioState *aio_get_linux_aio(AioContext *ctx) +{ + if (!ctx->linux_aio) { + ctx->linux_aio = laio_init(); + laio_attach_aio_context(ctx->linux_aio, ctx); + } + return ctx->linux_aio; +} +#endif + +void aio_notify(AioContext *ctx) +{ + /* Write e.g. bh->scheduled before reading ctx->notify_me. Pairs + * with atomic_or in aio_ctx_prepare or atomic_add in aio_poll. + */ + smp_mb(); + if (ctx->notify_me) { + event_notifier_set(&ctx->notifier); + atomic_mb_set(&ctx->notified, true); + } +} + +void aio_notify_accept(AioContext *ctx) +{ + if (atomic_xchg(&ctx->notified, false)) { + event_notifier_test_and_clear(&ctx->notifier); + } +} + +static void aio_timerlist_notify(void *opaque) +{ + aio_notify(opaque); +} + +static void event_notifier_dummy_cb(EventNotifier *e) +{ +} + +/* Returns true if aio_notify() was called (e.g. a BH was scheduled) */ +static bool event_notifier_poll(void *opaque) +{ + EventNotifier *e = opaque; + AioContext *ctx = container_of(e, AioContext, notifier); + + return atomic_read(&ctx->notified); +} + +static void co_schedule_bh_cb(void *opaque) +{ + AioContext *ctx = opaque; + QSLIST_HEAD(, Coroutine) straight, reversed; + + QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines); + QSLIST_INIT(&straight); + + while (!QSLIST_EMPTY(&reversed)) { + Coroutine *co = QSLIST_FIRST(&reversed); + QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next); + QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next); + } + + while (!QSLIST_EMPTY(&straight)) { + Coroutine *co = QSLIST_FIRST(&straight); + QSLIST_REMOVE_HEAD(&straight, co_scheduled_next); + trace_aio_co_schedule_bh_cb(ctx, co); + aio_context_acquire(ctx); + qemu_coroutine_enter(co); + aio_context_release(ctx); + } +} + +AioContext *aio_context_new(Error **errp) +{ + int ret; + AioContext *ctx; + + ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext)); + aio_context_setup(ctx); + + ret = event_notifier_init(&ctx->notifier, false); + if (ret < 0) { + error_setg_errno(errp, -ret, "Failed to initialize event notifier"); + goto fail; + } + g_source_set_can_recurse(&ctx->source, true); + qemu_lockcnt_init(&ctx->list_lock); + + ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx); + QSLIST_INIT(&ctx->scheduled_coroutines); + + aio_set_event_notifier(ctx, &ctx->notifier, + false, + (EventNotifierHandler *) + event_notifier_dummy_cb, + event_notifier_poll); +#ifdef CONFIG_LINUX_AIO + ctx->linux_aio = NULL; +#endif + ctx->thread_pool = NULL; + qemu_rec_mutex_init(&ctx->lock); + timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx); + + ctx->poll_ns = 0; + ctx->poll_max_ns = 0; + ctx->poll_grow = 0; + ctx->poll_shrink = 0; + + return ctx; +fail: + g_source_destroy(&ctx->source); + return NULL; +} + +void aio_co_schedule(AioContext *ctx, Coroutine *co) +{ + trace_aio_co_schedule(ctx, co); + QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines, + co, co_scheduled_next); + qemu_bh_schedule(ctx->co_schedule_bh); +} + +void aio_co_wake(struct Coroutine *co) +{ + AioContext *ctx; + + /* Read coroutine before co->ctx. Matches smp_wmb in + * qemu_coroutine_enter. + */ + smp_read_barrier_depends(); + ctx = atomic_read(&co->ctx); + + if (ctx != qemu_get_current_aio_context()) { + aio_co_schedule(ctx, co); + return; + } + + if (qemu_in_coroutine()) { + Coroutine *self = qemu_coroutine_self(); + assert(self != co); + QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next); + } else { + aio_context_acquire(ctx); + qemu_coroutine_enter(co); + aio_context_release(ctx); + } +} + +void aio_context_ref(AioContext *ctx) +{ + g_source_ref(&ctx->source); +} + +void aio_context_unref(AioContext *ctx) +{ + g_source_unref(&ctx->source); +} + +void aio_context_acquire(AioContext *ctx) +{ + qemu_rec_mutex_lock(&ctx->lock); +} + +void aio_context_release(AioContext *ctx) +{ + qemu_rec_mutex_unlock(&ctx->lock); +} diff --git a/util/iohandler.c b/util/iohandler.c new file mode 100644 index 0000000..623b55b --- /dev/null +++ b/util/iohandler.c @@ -0,0 +1,136 @@ +/* + * QEMU System Emulator - managing I/O handler + * + * Copyright (c) 2003-2008 Fabrice Bellard + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include "qemu/osdep.h" +#include "qapi/error.h" +#include "qemu-common.h" +#include "qemu/queue.h" +#include "block/aio.h" +#include "qemu/main-loop.h" + +#ifndef _WIN32 +#include <sys/wait.h> +#endif + +/* This context runs on top of main loop. We can't reuse qemu_aio_context + * because iohandlers mustn't be polled by aio_poll(qemu_aio_context). */ +static AioContext *iohandler_ctx; + +static void iohandler_init(void) +{ + if (!iohandler_ctx) { + iohandler_ctx = aio_context_new(&error_abort); + } +} + +AioContext *iohandler_get_aio_context(void) +{ + iohandler_init(); + return iohandler_ctx; +} + +GSource *iohandler_get_g_source(void) +{ + iohandler_init(); + return aio_get_g_source(iohandler_ctx); +} + +void qemu_set_fd_handler(int fd, + IOHandler *fd_read, + IOHandler *fd_write, + void *opaque) +{ + iohandler_init(); + aio_set_fd_handler(iohandler_ctx, fd, false, + fd_read, fd_write, NULL, opaque); +} + +void event_notifier_set_handler(EventNotifier *e, + EventNotifierHandler *handler) +{ + iohandler_init(); + aio_set_event_notifier(iohandler_ctx, e, false, + handler, NULL); +} + +/* reaping of zombies. right now we're not passing the status to + anyone, but it would be possible to add a callback. */ +#ifndef _WIN32 +typedef struct ChildProcessRecord { + int pid; + QLIST_ENTRY(ChildProcessRecord) next; +} ChildProcessRecord; + +static QLIST_HEAD(, ChildProcessRecord) child_watches = + QLIST_HEAD_INITIALIZER(child_watches); + +static QEMUBH *sigchld_bh; + +static void sigchld_handler(int signal) +{ + qemu_bh_schedule(sigchld_bh); +} + +static void sigchld_bh_handler(void *opaque) +{ + ChildProcessRecord *rec, *next; + + QLIST_FOREACH_SAFE(rec, &child_watches, next, next) { + if (waitpid(rec->pid, NULL, WNOHANG) == rec->pid) { + QLIST_REMOVE(rec, next); + g_free(rec); + } + } +} + +static void qemu_init_child_watch(void) +{ + struct sigaction act; + sigchld_bh = qemu_bh_new(sigchld_bh_handler, NULL); + + memset(&act, 0, sizeof(act)); + act.sa_handler = sigchld_handler; + act.sa_flags = SA_NOCLDSTOP; + sigaction(SIGCHLD, &act, NULL); +} + +int qemu_add_child_watch(pid_t pid) +{ + ChildProcessRecord *rec; + + if (!sigchld_bh) { + qemu_init_child_watch(); + } + + QLIST_FOREACH(rec, &child_watches, next) { + if (rec->pid == pid) { + return 1; + } + } + rec = g_malloc0(sizeof(ChildProcessRecord)); + rec->pid = pid; + QLIST_INSERT_HEAD(&child_watches, rec, next); + return 0; +} +#endif diff --git a/util/main-loop.c b/util/main-loop.c new file mode 100644 index 0000000..ad10bca --- /dev/null +++ b/util/main-loop.c @@ -0,0 +1,526 @@ +/* + * QEMU System Emulator + * + * Copyright (c) 2003-2008 Fabrice Bellard + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include "qemu/osdep.h" +#include "qapi/error.h" +#include "qemu/cutils.h" +#include "qemu/timer.h" +#include "qemu/sockets.h" // struct in_addr needed for libslirp.h +#include "sysemu/qtest.h" +#include "slirp/libslirp.h" +#include "qemu/main-loop.h" +#include "block/aio.h" + +#ifndef _WIN32 + +#include "qemu/compatfd.h" + +/* If we have signalfd, we mask out the signals we want to handle and then + * use signalfd to listen for them. We rely on whatever the current signal + * handler is to dispatch the signals when we receive them. + */ +static void sigfd_handler(void *opaque) +{ + int fd = (intptr_t)opaque; + struct qemu_signalfd_siginfo info; + struct sigaction action; + ssize_t len; + + while (1) { + do { + len = read(fd, &info, sizeof(info)); + } while (len == -1 && errno == EINTR); + + if (len == -1 && errno == EAGAIN) { + break; + } + + if (len != sizeof(info)) { + printf("read from sigfd returned %zd: %m\n", len); + return; + } + + sigaction(info.ssi_signo, NULL, &action); + if ((action.sa_flags & SA_SIGINFO) && action.sa_sigaction) { + action.sa_sigaction(info.ssi_signo, + (siginfo_t *)&info, NULL); + } else if (action.sa_handler) { + action.sa_handler(info.ssi_signo); + } + } +} + +static int qemu_signal_init(void) +{ + int sigfd; + sigset_t set; + + /* + * SIG_IPI must be blocked in the main thread and must not be caught + * by sigwait() in the signal thread. Otherwise, the cpu thread will + * not catch it reliably. + */ + sigemptyset(&set); + sigaddset(&set, SIG_IPI); + sigaddset(&set, SIGIO); + sigaddset(&set, SIGALRM); + sigaddset(&set, SIGBUS); + /* SIGINT cannot be handled via signalfd, so that ^C can be used + * to interrupt QEMU when it is being run under gdb. SIGHUP and + * SIGTERM are also handled asynchronously, even though it is not + * strictly necessary, because they use the same handler as SIGINT. + */ + pthread_sigmask(SIG_BLOCK, &set, NULL); + + sigdelset(&set, SIG_IPI); + sigfd = qemu_signalfd(&set); + if (sigfd == -1) { + fprintf(stderr, "failed to create signalfd\n"); + return -errno; + } + + fcntl_setfl(sigfd, O_NONBLOCK); + + qemu_set_fd_handler(sigfd, sigfd_handler, NULL, (void *)(intptr_t)sigfd); + + return 0; +} + +#else /* _WIN32 */ + +static int qemu_signal_init(void) +{ + return 0; +} +#endif + +static AioContext *qemu_aio_context; +static QEMUBH *qemu_notify_bh; + +static void notify_event_cb(void *opaque) +{ + /* No need to do anything; this bottom half is only used to + * kick the kernel out of ppoll/poll/WaitForMultipleObjects. + */ +} + +AioContext *qemu_get_aio_context(void) +{ + return qemu_aio_context; +} + +void qemu_notify_event(void) +{ + if (!qemu_aio_context) { + return; + } + qemu_bh_schedule(qemu_notify_bh); +} + +static GArray *gpollfds; + +int qemu_init_main_loop(Error **errp) +{ + int ret; + GSource *src; + Error *local_error = NULL; + + init_clocks(); + + ret = qemu_signal_init(); + if (ret) { + return ret; + } + + qemu_aio_context = aio_context_new(&local_error); + if (!qemu_aio_context) { + error_propagate(errp, local_error); + return -EMFILE; + } + qemu_notify_bh = qemu_bh_new(notify_event_cb, NULL); + gpollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD)); + src = aio_get_g_source(qemu_aio_context); + g_source_set_name(src, "aio-context"); + g_source_attach(src, NULL); + g_source_unref(src); + src = iohandler_get_g_source(); + g_source_set_name(src, "io-handler"); + g_source_attach(src, NULL); + g_source_unref(src); + return 0; +} + +static int max_priority; + +#ifndef _WIN32 +static int glib_pollfds_idx; +static int glib_n_poll_fds; + +static void glib_pollfds_fill(int64_t *cur_timeout) +{ + GMainContext *context = g_main_context_default(); + int timeout = 0; + int64_t timeout_ns; + int n; + + g_main_context_prepare(context, &max_priority); + + glib_pollfds_idx = gpollfds->len; + n = glib_n_poll_fds; + do { + GPollFD *pfds; + glib_n_poll_fds = n; + g_array_set_size(gpollfds, glib_pollfds_idx + glib_n_poll_fds); + pfds = &g_array_index(gpollfds, GPollFD, glib_pollfds_idx); + n = g_main_context_query(context, max_priority, &timeout, pfds, + glib_n_poll_fds); + } while (n != glib_n_poll_fds); + + if (timeout < 0) { + timeout_ns = -1; + } else { + timeout_ns = (int64_t)timeout * (int64_t)SCALE_MS; + } + + *cur_timeout = qemu_soonest_timeout(timeout_ns, *cur_timeout); +} + +static void glib_pollfds_poll(void) +{ + GMainContext *context = g_main_context_default(); + GPollFD *pfds = &g_array_index(gpollfds, GPollFD, glib_pollfds_idx); + + if (g_main_context_check(context, max_priority, pfds, glib_n_poll_fds)) { + g_main_context_dispatch(context); + } +} + +#define MAX_MAIN_LOOP_SPIN (1000) + +static int os_host_main_loop_wait(int64_t timeout) +{ + int ret; + static int spin_counter; + + glib_pollfds_fill(&timeout); + + /* If the I/O thread is very busy or we are incorrectly busy waiting in + * the I/O thread, this can lead to starvation of the BQL such that the + * VCPU threads never run. To make sure we can detect the later case, + * print a message to the screen. If we run into this condition, create + * a fake timeout in order to give the VCPU threads a chance to run. + */ + if (!timeout && (spin_counter > MAX_MAIN_LOOP_SPIN)) { + static bool notified; + + if (!notified && !qtest_enabled() && !qtest_driver()) { + fprintf(stderr, + "main-loop: WARNING: I/O thread spun for %d iterations\n", + MAX_MAIN_LOOP_SPIN); + notified = true; + } + + timeout = SCALE_MS; + } + + if (timeout) { + spin_counter = 0; + qemu_mutex_unlock_iothread(); + } else { + spin_counter++; + } + + ret = qemu_poll_ns((GPollFD *)gpollfds->data, gpollfds->len, timeout); + + if (timeout) { + qemu_mutex_lock_iothread(); + } + + glib_pollfds_poll(); + return ret; +} +#else +/***********************************************************/ +/* Polling handling */ + +typedef struct PollingEntry { + PollingFunc *func; + void *opaque; + struct PollingEntry *next; +} PollingEntry; + +static PollingEntry *first_polling_entry; + +int qemu_add_polling_cb(PollingFunc *func, void *opaque) +{ + PollingEntry **ppe, *pe; + pe = g_malloc0(sizeof(PollingEntry)); + pe->func = func; + pe->opaque = opaque; + for(ppe = &first_polling_entry; *ppe != NULL; ppe = &(*ppe)->next); + *ppe = pe; + return 0; +} + +void qemu_del_polling_cb(PollingFunc *func, void *opaque) +{ + PollingEntry **ppe, *pe; + for(ppe = &first_polling_entry; *ppe != NULL; ppe = &(*ppe)->next) { + pe = *ppe; + if (pe->func == func && pe->opaque == opaque) { + *ppe = pe->next; + g_free(pe); + break; + } + } +} + +/***********************************************************/ +/* Wait objects support */ +typedef struct WaitObjects { + int num; + int revents[MAXIMUM_WAIT_OBJECTS + 1]; + HANDLE events[MAXIMUM_WAIT_OBJECTS + 1]; + WaitObjectFunc *func[MAXIMUM_WAIT_OBJECTS + 1]; + void *opaque[MAXIMUM_WAIT_OBJECTS + 1]; +} WaitObjects; + +static WaitObjects wait_objects = {0}; + +int qemu_add_wait_object(HANDLE handle, WaitObjectFunc *func, void *opaque) +{ + WaitObjects *w = &wait_objects; + if (w->num >= MAXIMUM_WAIT_OBJECTS) { + return -1; + } + w->events[w->num] = handle; + w->func[w->num] = func; + w->opaque[w->num] = opaque; + w->revents[w->num] = 0; + w->num++; + return 0; +} + +void qemu_del_wait_object(HANDLE handle, WaitObjectFunc *func, void *opaque) +{ + int i, found; + WaitObjects *w = &wait_objects; + + found = 0; + for (i = 0; i < w->num; i++) { + if (w->events[i] == handle) { + found = 1; + } + if (found) { + w->events[i] = w->events[i + 1]; + w->func[i] = w->func[i + 1]; + w->opaque[i] = w->opaque[i + 1]; + w->revents[i] = w->revents[i + 1]; + } + } + if (found) { + w->num--; + } +} + +void qemu_fd_register(int fd) +{ + WSAEventSelect(fd, event_notifier_get_handle(&qemu_aio_context->notifier), + FD_READ | FD_ACCEPT | FD_CLOSE | + FD_CONNECT | FD_WRITE | FD_OOB); +} + +static int pollfds_fill(GArray *pollfds, fd_set *rfds, fd_set *wfds, + fd_set *xfds) +{ + int nfds = -1; + int i; + + for (i = 0; i < pollfds->len; i++) { + GPollFD *pfd = &g_array_index(pollfds, GPollFD, i); + int fd = pfd->fd; + int events = pfd->events; + if (events & G_IO_IN) { + FD_SET(fd, rfds); + nfds = MAX(nfds, fd); + } + if (events & G_IO_OUT) { + FD_SET(fd, wfds); + nfds = MAX(nfds, fd); + } + if (events & G_IO_PRI) { + FD_SET(fd, xfds); + nfds = MAX(nfds, fd); + } + } + return nfds; +} + +static void pollfds_poll(GArray *pollfds, int nfds, fd_set *rfds, + fd_set *wfds, fd_set *xfds) +{ + int i; + + for (i = 0; i < pollfds->len; i++) { + GPollFD *pfd = &g_array_index(pollfds, GPollFD, i); + int fd = pfd->fd; + int revents = 0; + + if (FD_ISSET(fd, rfds)) { + revents |= G_IO_IN; + } + if (FD_ISSET(fd, wfds)) { + revents |= G_IO_OUT; + } + if (FD_ISSET(fd, xfds)) { + revents |= G_IO_PRI; + } + pfd->revents = revents & pfd->events; + } +} + +static int os_host_main_loop_wait(int64_t timeout) +{ + GMainContext *context = g_main_context_default(); + GPollFD poll_fds[1024 * 2]; /* this is probably overkill */ + int select_ret = 0; + int g_poll_ret, ret, i, n_poll_fds; + PollingEntry *pe; + WaitObjects *w = &wait_objects; + gint poll_timeout; + int64_t poll_timeout_ns; + static struct timeval tv0; + fd_set rfds, wfds, xfds; + int nfds; + + /* XXX: need to suppress polling by better using win32 events */ + ret = 0; + for (pe = first_polling_entry; pe != NULL; pe = pe->next) { + ret |= pe->func(pe->opaque); + } + if (ret != 0) { + return ret; + } + + FD_ZERO(&rfds); + FD_ZERO(&wfds); + FD_ZERO(&xfds); + nfds = pollfds_fill(gpollfds, &rfds, &wfds, &xfds); + if (nfds >= 0) { + select_ret = select(nfds + 1, &rfds, &wfds, &xfds, &tv0); + if (select_ret != 0) { + timeout = 0; + } + if (select_ret > 0) { + pollfds_poll(gpollfds, nfds, &rfds, &wfds, &xfds); + } + } + + g_main_context_prepare(context, &max_priority); + n_poll_fds = g_main_context_query(context, max_priority, &poll_timeout, + poll_fds, ARRAY_SIZE(poll_fds)); + g_assert(n_poll_fds <= ARRAY_SIZE(poll_fds)); + + for (i = 0; i < w->num; i++) { + poll_fds[n_poll_fds + i].fd = (DWORD_PTR)w->events[i]; + poll_fds[n_poll_fds + i].events = G_IO_IN; + } + + if (poll_timeout < 0) { + poll_timeout_ns = -1; + } else { + poll_timeout_ns = (int64_t)poll_timeout * (int64_t)SCALE_MS; + } + + poll_timeout_ns = qemu_soonest_timeout(poll_timeout_ns, timeout); + + qemu_mutex_unlock_iothread(); + g_poll_ret = qemu_poll_ns(poll_fds, n_poll_fds + w->num, poll_timeout_ns); + + qemu_mutex_lock_iothread(); + if (g_poll_ret > 0) { + for (i = 0; i < w->num; i++) { + w->revents[i] = poll_fds[n_poll_fds + i].revents; + } + for (i = 0; i < w->num; i++) { + if (w->revents[i] && w->func[i]) { + w->func[i](w->opaque[i]); + } + } + } + + if (g_main_context_check(context, max_priority, poll_fds, n_poll_fds)) { + g_main_context_dispatch(context); + } + + return select_ret || g_poll_ret; +} +#endif + +int main_loop_wait(int nonblocking) +{ + int ret; + uint32_t timeout = UINT32_MAX; + int64_t timeout_ns; + + if (nonblocking) { + timeout = 0; + } + + /* poll any events */ + g_array_set_size(gpollfds, 0); /* reset for new iteration */ + /* XXX: separate device handlers from system ones */ +#ifdef CONFIG_SLIRP + slirp_pollfds_fill(gpollfds, &timeout); +#endif + + if (timeout == UINT32_MAX) { + timeout_ns = -1; + } else { + timeout_ns = (uint64_t)timeout * (int64_t)(SCALE_MS); + } + + timeout_ns = qemu_soonest_timeout(timeout_ns, + timerlistgroup_deadline_ns( + &main_loop_tlg)); + + ret = os_host_main_loop_wait(timeout_ns); +#ifdef CONFIG_SLIRP + slirp_pollfds_poll(gpollfds, (ret < 0)); +#endif + + /* CPU thread can infinitely wait for event after + missing the warp */ + qemu_start_warp_timer(); + qemu_clock_run_all_timers(); + + return ret; +} + +/* Functions to operate on the main QEMU AioContext. */ + +QEMUBH *qemu_bh_new(QEMUBHFunc *cb, void *opaque) +{ + return aio_bh_new(qemu_aio_context, cb, opaque); +} diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c index 14cf9ce..6328eed 100644 --- a/util/qemu-coroutine-lock.c +++ b/util/qemu-coroutine-lock.c @@ -20,13 +20,19 @@ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. + * + * The lock-free mutex implementation is based on OSv + * (core/lfmutex.cc, include/lockfree/mutex.hh). + * Copyright (C) 2013 Cloudius Systems, Ltd. */ #include "qemu/osdep.h" #include "qemu-common.h" #include "qemu/coroutine.h" #include "qemu/coroutine_int.h" +#include "qemu/processor.h" #include "qemu/queue.h" +#include "block/aio.h" #include "trace.h" void qemu_co_queue_init(CoQueue *queue) @@ -34,12 +40,30 @@ void qemu_co_queue_init(CoQueue *queue) QSIMPLEQ_INIT(&queue->entries); } -void coroutine_fn qemu_co_queue_wait(CoQueue *queue) +void coroutine_fn qemu_co_queue_wait(CoQueue *queue, CoMutex *mutex) { Coroutine *self = qemu_coroutine_self(); QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next); + + if (mutex) { + qemu_co_mutex_unlock(mutex); + } + + /* There is no race condition here. Other threads will call + * aio_co_schedule on our AioContext, which can reenter this + * coroutine but only after this yield and after the main loop + * has gone through the next iteration. + */ qemu_coroutine_yield(); assert(qemu_in_coroutine()); + + /* TODO: OSv implements wait morphing here, where the wakeup + * primitive automatically places the woken coroutine on the + * mutex's queue. This avoids the thundering herd effect. + */ + if (mutex) { + qemu_co_mutex_lock(mutex); + } } /** @@ -63,7 +87,6 @@ void qemu_co_queue_run_restart(Coroutine *co) static bool qemu_co_queue_do_restart(CoQueue *queue, bool single) { - Coroutine *self = qemu_coroutine_self(); Coroutine *next; if (QSIMPLEQ_EMPTY(&queue->entries)) { @@ -72,8 +95,7 @@ static bool qemu_co_queue_do_restart(CoQueue *queue, bool single) while ((next = QSIMPLEQ_FIRST(&queue->entries)) != NULL) { QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next); - QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, next, co_queue_next); - trace_qemu_co_queue_next(next); + aio_co_wake(next); if (single) { break; } @@ -112,27 +134,157 @@ bool qemu_co_queue_empty(CoQueue *queue) return QSIMPLEQ_FIRST(&queue->entries) == NULL; } +/* The wait records are handled with a multiple-producer, single-consumer + * lock-free queue. There cannot be two concurrent pop_waiter() calls + * because pop_waiter() can only be called while mutex->handoff is zero. + * This can happen in three cases: + * - in qemu_co_mutex_unlock, before the hand-off protocol has started. + * In this case, qemu_co_mutex_lock will see mutex->handoff == 0 and + * not take part in the handoff. + * - in qemu_co_mutex_lock, if it steals the hand-off responsibility from + * qemu_co_mutex_unlock. In this case, qemu_co_mutex_unlock will fail + * the cmpxchg (it will see either 0 or the next sequence value) and + * exit. The next hand-off cannot begin until qemu_co_mutex_lock has + * woken up someone. + * - in qemu_co_mutex_unlock, if it takes the hand-off token itself. + * In this case another iteration starts with mutex->handoff == 0; + * a concurrent qemu_co_mutex_lock will fail the cmpxchg, and + * qemu_co_mutex_unlock will go back to case (1). + * + * The following functions manage this queue. + */ +typedef struct CoWaitRecord { + Coroutine *co; + QSLIST_ENTRY(CoWaitRecord) next; +} CoWaitRecord; + +static void push_waiter(CoMutex *mutex, CoWaitRecord *w) +{ + w->co = qemu_coroutine_self(); + QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next); +} + +static void move_waiters(CoMutex *mutex) +{ + QSLIST_HEAD(, CoWaitRecord) reversed; + QSLIST_MOVE_ATOMIC(&reversed, &mutex->from_push); + while (!QSLIST_EMPTY(&reversed)) { + CoWaitRecord *w = QSLIST_FIRST(&reversed); + QSLIST_REMOVE_HEAD(&reversed, next); + QSLIST_INSERT_HEAD(&mutex->to_pop, w, next); + } +} + +static CoWaitRecord *pop_waiter(CoMutex *mutex) +{ + CoWaitRecord *w; + + if (QSLIST_EMPTY(&mutex->to_pop)) { + move_waiters(mutex); + if (QSLIST_EMPTY(&mutex->to_pop)) { + return NULL; + } + } + w = QSLIST_FIRST(&mutex->to_pop); + QSLIST_REMOVE_HEAD(&mutex->to_pop, next); + return w; +} + +static bool has_waiters(CoMutex *mutex) +{ + return QSLIST_EMPTY(&mutex->to_pop) || QSLIST_EMPTY(&mutex->from_push); +} + void qemu_co_mutex_init(CoMutex *mutex) { memset(mutex, 0, sizeof(*mutex)); - qemu_co_queue_init(&mutex->queue); } -void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex) +static void coroutine_fn qemu_co_mutex_wake(CoMutex *mutex, Coroutine *co) +{ + /* Read co before co->ctx; pairs with smp_wmb() in + * qemu_coroutine_enter(). + */ + smp_read_barrier_depends(); + mutex->ctx = co->ctx; + aio_co_wake(co); +} + +static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx, + CoMutex *mutex) { Coroutine *self = qemu_coroutine_self(); + CoWaitRecord w; + unsigned old_handoff; trace_qemu_co_mutex_lock_entry(mutex, self); + w.co = self; + push_waiter(mutex, &w); + + /* This is the "Responsibility Hand-Off" protocol; a lock() picks from + * a concurrent unlock() the responsibility of waking somebody up. + */ + old_handoff = atomic_mb_read(&mutex->handoff); + if (old_handoff && + has_waiters(mutex) && + atomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) { + /* There can be no concurrent pops, because there can be only + * one active handoff at a time. + */ + CoWaitRecord *to_wake = pop_waiter(mutex); + Coroutine *co = to_wake->co; + if (co == self) { + /* We got the lock ourselves! */ + assert(to_wake == &w); + mutex->ctx = ctx; + return; + } + + qemu_co_mutex_wake(mutex, co); + } - while (mutex->locked) { - qemu_co_queue_wait(&mutex->queue); + qemu_coroutine_yield(); + trace_qemu_co_mutex_lock_return(mutex, self); +} + +void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex) +{ + AioContext *ctx = qemu_get_current_aio_context(); + Coroutine *self = qemu_coroutine_self(); + int waiters, i; + + /* Running a very small critical section on pthread_mutex_t and CoMutex + * shows that pthread_mutex_t is much faster because it doesn't actually + * go to sleep. What happens is that the critical section is shorter + * than the latency of entering the kernel and thus FUTEX_WAIT always + * fails. With CoMutex there is no such latency but you still want to + * avoid wait and wakeup. So introduce it artificially. + */ + i = 0; +retry_fast_path: + waiters = atomic_cmpxchg(&mutex->locked, 0, 1); + if (waiters != 0) { + while (waiters == 1 && ++i < 1000) { + if (atomic_read(&mutex->ctx) == ctx) { + break; + } + if (atomic_read(&mutex->locked) == 0) { + goto retry_fast_path; + } + cpu_relax(); + } + waiters = atomic_fetch_inc(&mutex->locked); } - mutex->locked = true; + if (waiters == 0) { + /* Uncontended. */ + trace_qemu_co_mutex_lock_uncontended(mutex, self); + mutex->ctx = ctx; + } else { + qemu_co_mutex_lock_slowpath(ctx, mutex); + } mutex->holder = self; self->locks_held++; - - trace_qemu_co_mutex_lock_return(mutex, self); } void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex) @@ -141,14 +293,51 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex) trace_qemu_co_mutex_unlock_entry(mutex, self); - assert(mutex->locked == true); + assert(mutex->locked); assert(mutex->holder == self); assert(qemu_in_coroutine()); - mutex->locked = false; + mutex->ctx = NULL; mutex->holder = NULL; self->locks_held--; - qemu_co_queue_next(&mutex->queue); + if (atomic_fetch_dec(&mutex->locked) == 1) { + /* No waiting qemu_co_mutex_lock(). Pfew, that was easy! */ + return; + } + + for (;;) { + CoWaitRecord *to_wake = pop_waiter(mutex); + unsigned our_handoff; + + if (to_wake) { + qemu_co_mutex_wake(mutex, to_wake->co); + break; + } + + /* Some concurrent lock() is in progress (we know this because + * mutex->locked was >1) but it hasn't yet put itself on the wait + * queue. Pick a sequence number for the handoff protocol (not 0). + */ + if (++mutex->sequence == 0) { + mutex->sequence = 1; + } + + our_handoff = mutex->sequence; + atomic_mb_set(&mutex->handoff, our_handoff); + if (!has_waiters(mutex)) { + /* The concurrent lock has not added itself yet, so it + * will be able to pick our handoff. + */ + break; + } + + /* Try to do the handoff protocol ourselves; if somebody else has + * already taken it, however, we're done and they're responsible. + */ + if (atomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) { + break; + } + } trace_qemu_co_mutex_unlock_return(mutex, self); } @@ -157,16 +346,22 @@ void qemu_co_rwlock_init(CoRwlock *lock) { memset(lock, 0, sizeof(*lock)); qemu_co_queue_init(&lock->queue); + qemu_co_mutex_init(&lock->mutex); } void qemu_co_rwlock_rdlock(CoRwlock *lock) { Coroutine *self = qemu_coroutine_self(); - while (lock->writer) { - qemu_co_queue_wait(&lock->queue); + qemu_co_mutex_lock(&lock->mutex); + /* For fairness, wait if a writer is in line. */ + while (lock->pending_writer) { + qemu_co_queue_wait(&lock->queue, &lock->mutex); } lock->reader++; + qemu_co_mutex_unlock(&lock->mutex); + + /* The rest of the read-side critical section is run without the mutex. */ self->locks_held++; } @@ -175,10 +370,13 @@ void qemu_co_rwlock_unlock(CoRwlock *lock) Coroutine *self = qemu_coroutine_self(); assert(qemu_in_coroutine()); - if (lock->writer) { - lock->writer = false; + if (!lock->reader) { + /* The critical section started in qemu_co_rwlock_wrlock. */ qemu_co_queue_restart_all(&lock->queue); } else { + self->locks_held--; + + qemu_co_mutex_lock(&lock->mutex); lock->reader--; assert(lock->reader >= 0); /* Wakeup only one waiting writer */ @@ -186,16 +384,20 @@ void qemu_co_rwlock_unlock(CoRwlock *lock) qemu_co_queue_next(&lock->queue); } } - self->locks_held--; + qemu_co_mutex_unlock(&lock->mutex); } void qemu_co_rwlock_wrlock(CoRwlock *lock) { - Coroutine *self = qemu_coroutine_self(); - - while (lock->writer || lock->reader) { - qemu_co_queue_wait(&lock->queue); + qemu_co_mutex_lock(&lock->mutex); + lock->pending_writer++; + while (lock->reader) { + qemu_co_queue_wait(&lock->queue, &lock->mutex); } - lock->writer = true; - self->locks_held++; + lock->pending_writer--; + + /* The rest of the write-side critical section is run with + * the mutex taken, so that lock->reader remains zero. + * There is no need to update self->locks_held. + */ } diff --git a/util/qemu-coroutine-sleep.c b/util/qemu-coroutine-sleep.c index 25de3ed..9c56550 100644 --- a/util/qemu-coroutine-sleep.c +++ b/util/qemu-coroutine-sleep.c @@ -25,7 +25,7 @@ static void co_sleep_cb(void *opaque) { CoSleepCB *sleep_cb = opaque; - qemu_coroutine_enter(sleep_cb->co); + aio_co_wake(sleep_cb->co); } void coroutine_fn co_aio_sleep_ns(AioContext *ctx, QEMUClockType type, diff --git a/util/qemu-coroutine.c b/util/qemu-coroutine.c index a5d2f6c..72412e5 100644 --- a/util/qemu-coroutine.c +++ b/util/qemu-coroutine.c @@ -19,6 +19,7 @@ #include "qemu/atomic.h" #include "qemu/coroutine.h" #include "qemu/coroutine_int.h" +#include "block/aio.h" enum { POOL_BATCH_SIZE = 64, @@ -114,6 +115,13 @@ void qemu_coroutine_enter(Coroutine *co) } co->caller = self; + co->ctx = qemu_get_current_aio_context(); + + /* Store co->ctx before anything that stores co. Matches + * barrier in aio_co_wake and qemu_co_mutex_wake. + */ + smp_wmb(); + ret = qemu_coroutine_switch(self, co, COROUTINE_ENTER); qemu_co_queue_run_restart(co); diff --git a/util/qemu-timer.c b/util/qemu-timer.c new file mode 100644 index 0000000..ff620ec --- /dev/null +++ b/util/qemu-timer.c @@ -0,0 +1,669 @@ +/* + * QEMU System Emulator + * + * Copyright (c) 2003-2008 Fabrice Bellard + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include "qemu/osdep.h" +#include "qemu/main-loop.h" +#include "qemu/timer.h" +#include "sysemu/replay.h" +#include "sysemu/sysemu.h" + +#ifdef CONFIG_POSIX +#include <pthread.h> +#endif + +#ifdef CONFIG_PPOLL +#include <poll.h> +#endif + +#ifdef CONFIG_PRCTL_PR_SET_TIMERSLACK +#include <sys/prctl.h> +#endif + +/***********************************************************/ +/* timers */ + +typedef struct QEMUClock { + /* We rely on BQL to protect the timerlists */ + QLIST_HEAD(, QEMUTimerList) timerlists; + + NotifierList reset_notifiers; + int64_t last; + + QEMUClockType type; + bool enabled; +} QEMUClock; + +QEMUTimerListGroup main_loop_tlg; +static QEMUClock qemu_clocks[QEMU_CLOCK_MAX]; + +/* A QEMUTimerList is a list of timers attached to a clock. More + * than one QEMUTimerList can be attached to each clock, for instance + * used by different AioContexts / threads. Each clock also has + * a list of the QEMUTimerLists associated with it, in order that + * reenabling the clock can call all the notifiers. + */ + +struct QEMUTimerList { + QEMUClock *clock; + QemuMutex active_timers_lock; + QEMUTimer *active_timers; + QLIST_ENTRY(QEMUTimerList) list; + QEMUTimerListNotifyCB *notify_cb; + void *notify_opaque; + + /* lightweight method to mark the end of timerlist's running */ + QemuEvent timers_done_ev; +}; + +/** + * qemu_clock_ptr: + * @type: type of clock + * + * Translate a clock type into a pointer to QEMUClock object. + * + * Returns: a pointer to the QEMUClock object + */ +static inline QEMUClock *qemu_clock_ptr(QEMUClockType type) +{ + return &qemu_clocks[type]; +} + +static bool timer_expired_ns(QEMUTimer *timer_head, int64_t current_time) +{ + return timer_head && (timer_head->expire_time <= current_time); +} + +QEMUTimerList *timerlist_new(QEMUClockType type, + QEMUTimerListNotifyCB *cb, + void *opaque) +{ + QEMUTimerList *timer_list; + QEMUClock *clock = qemu_clock_ptr(type); + + timer_list = g_malloc0(sizeof(QEMUTimerList)); + qemu_event_init(&timer_list->timers_done_ev, true); + timer_list->clock = clock; + timer_list->notify_cb = cb; + timer_list->notify_opaque = opaque; + qemu_mutex_init(&timer_list->active_timers_lock); + QLIST_INSERT_HEAD(&clock->timerlists, timer_list, list); + return timer_list; +} + +void timerlist_free(QEMUTimerList *timer_list) +{ + assert(!timerlist_has_timers(timer_list)); + if (timer_list->clock) { + QLIST_REMOVE(timer_list, list); + } + qemu_mutex_destroy(&timer_list->active_timers_lock); + g_free(timer_list); +} + +static void qemu_clock_init(QEMUClockType type) +{ + QEMUClock *clock = qemu_clock_ptr(type); + + /* Assert that the clock of type TYPE has not been initialized yet. */ + assert(main_loop_tlg.tl[type] == NULL); + + clock->type = type; + clock->enabled = (type == QEMU_CLOCK_VIRTUAL ? false : true); + clock->last = INT64_MIN; + QLIST_INIT(&clock->timerlists); + notifier_list_init(&clock->reset_notifiers); + main_loop_tlg.tl[type] = timerlist_new(type, NULL, NULL); +} + +bool qemu_clock_use_for_deadline(QEMUClockType type) +{ + return !(use_icount && (type == QEMU_CLOCK_VIRTUAL)); +} + +void qemu_clock_notify(QEMUClockType type) +{ + QEMUTimerList *timer_list; + QEMUClock *clock = qemu_clock_ptr(type); + QLIST_FOREACH(timer_list, &clock->timerlists, list) { + timerlist_notify(timer_list); + } +} + +/* Disabling the clock will wait for related timerlists to stop + * executing qemu_run_timers. Thus, this functions should not + * be used from the callback of a timer that is based on @clock. + * Doing so would cause a deadlock. + * + * Caller should hold BQL. + */ +void qemu_clock_enable(QEMUClockType type, bool enabled) +{ + QEMUClock *clock = qemu_clock_ptr(type); + QEMUTimerList *tl; + bool old = clock->enabled; + clock->enabled = enabled; + if (enabled && !old) { + qemu_clock_notify(type); + } else if (!enabled && old) { + QLIST_FOREACH(tl, &clock->timerlists, list) { + qemu_event_wait(&tl->timers_done_ev); + } + } +} + +bool timerlist_has_timers(QEMUTimerList *timer_list) +{ + return !!atomic_read(&timer_list->active_timers); +} + +bool qemu_clock_has_timers(QEMUClockType type) +{ + return timerlist_has_timers( + main_loop_tlg.tl[type]); +} + +bool timerlist_expired(QEMUTimerList *timer_list) +{ + int64_t expire_time; + + if (!atomic_read(&timer_list->active_timers)) { + return false; + } + + qemu_mutex_lock(&timer_list->active_timers_lock); + if (!timer_list->active_timers) { + qemu_mutex_unlock(&timer_list->active_timers_lock); + return false; + } + expire_time = timer_list->active_timers->expire_time; + qemu_mutex_unlock(&timer_list->active_timers_lock); + + return expire_time < qemu_clock_get_ns(timer_list->clock->type); +} + +bool qemu_clock_expired(QEMUClockType type) +{ + return timerlist_expired( + main_loop_tlg.tl[type]); +} + +/* + * As above, but return -1 for no deadline, and do not cap to 2^32 + * as we know the result is always positive. + */ + +int64_t timerlist_deadline_ns(QEMUTimerList *timer_list) +{ + int64_t delta; + int64_t expire_time; + + if (!atomic_read(&timer_list->active_timers)) { + return -1; + } + + if (!timer_list->clock->enabled) { + return -1; + } + + /* The active timers list may be modified before the caller uses our return + * value but ->notify_cb() is called when the deadline changes. Therefore + * the caller should notice the change and there is no race condition. + */ + qemu_mutex_lock(&timer_list->active_timers_lock); + if (!timer_list->active_timers) { + qemu_mutex_unlock(&timer_list->active_timers_lock); + return -1; + } + expire_time = timer_list->active_timers->expire_time; + qemu_mutex_unlock(&timer_list->active_timers_lock); + + delta = expire_time - qemu_clock_get_ns(timer_list->clock->type); + + if (delta <= 0) { + return 0; + } + + return delta; +} + +/* Calculate the soonest deadline across all timerlists attached + * to the clock. This is used for the icount timeout so we + * ignore whether or not the clock should be used in deadline + * calculations. + */ +int64_t qemu_clock_deadline_ns_all(QEMUClockType type) +{ + int64_t deadline = -1; + QEMUTimerList *timer_list; + QEMUClock *clock = qemu_clock_ptr(type); + QLIST_FOREACH(timer_list, &clock->timerlists, list) { + deadline = qemu_soonest_timeout(deadline, + timerlist_deadline_ns(timer_list)); + } + return deadline; +} + +QEMUClockType timerlist_get_clock(QEMUTimerList *timer_list) +{ + return timer_list->clock->type; +} + +QEMUTimerList *qemu_clock_get_main_loop_timerlist(QEMUClockType type) +{ + return main_loop_tlg.tl[type]; +} + +void timerlist_notify(QEMUTimerList *timer_list) +{ + if (timer_list->notify_cb) { + timer_list->notify_cb(timer_list->notify_opaque); + } else { + qemu_notify_event(); + } +} + +/* Transition function to convert a nanosecond timeout to ms + * This is used where a system does not support ppoll + */ +int qemu_timeout_ns_to_ms(int64_t ns) +{ + int64_t ms; + if (ns < 0) { + return -1; + } + + if (!ns) { + return 0; + } + + /* Always round up, because it's better to wait too long than to wait too + * little and effectively busy-wait + */ + ms = DIV_ROUND_UP(ns, SCALE_MS); + + /* To avoid overflow problems, limit this to 2^31, i.e. approx 25 days */ + if (ms > (int64_t) INT32_MAX) { + ms = INT32_MAX; + } + + return (int) ms; +} + + +/* qemu implementation of g_poll which uses a nanosecond timeout but is + * otherwise identical to g_poll + */ +int qemu_poll_ns(GPollFD *fds, guint nfds, int64_t timeout) +{ +#ifdef CONFIG_PPOLL + if (timeout < 0) { + return ppoll((struct pollfd *)fds, nfds, NULL, NULL); + } else { + struct timespec ts; + int64_t tvsec = timeout / 1000000000LL; + /* Avoid possibly overflowing and specifying a negative number of + * seconds, which would turn a very long timeout into a busy-wait. + */ + if (tvsec > (int64_t)INT32_MAX) { + tvsec = INT32_MAX; + } + ts.tv_sec = tvsec; + ts.tv_nsec = timeout % 1000000000LL; + return ppoll((struct pollfd *)fds, nfds, &ts, NULL); + } +#else + return g_poll(fds, nfds, qemu_timeout_ns_to_ms(timeout)); +#endif +} + + +void timer_init_tl(QEMUTimer *ts, + QEMUTimerList *timer_list, int scale, + QEMUTimerCB *cb, void *opaque) +{ + ts->timer_list = timer_list; + ts->cb = cb; + ts->opaque = opaque; + ts->scale = scale; + ts->expire_time = -1; +} + +void timer_deinit(QEMUTimer *ts) +{ + assert(ts->expire_time == -1); + ts->timer_list = NULL; +} + +void timer_free(QEMUTimer *ts) +{ + g_free(ts); +} + +static void timer_del_locked(QEMUTimerList *timer_list, QEMUTimer *ts) +{ + QEMUTimer **pt, *t; + + ts->expire_time = -1; + pt = &timer_list->active_timers; + for(;;) { + t = *pt; + if (!t) + break; + if (t == ts) { + atomic_set(pt, t->next); + break; + } + pt = &t->next; + } +} + +static bool timer_mod_ns_locked(QEMUTimerList *timer_list, + QEMUTimer *ts, int64_t expire_time) +{ + QEMUTimer **pt, *t; + + /* add the timer in the sorted list */ + pt = &timer_list->active_timers; + for (;;) { + t = *pt; + if (!timer_expired_ns(t, expire_time)) { + break; + } + pt = &t->next; + } + ts->expire_time = MAX(expire_time, 0); + ts->next = *pt; + atomic_set(pt, ts); + + return pt == &timer_list->active_timers; +} + +static void timerlist_rearm(QEMUTimerList *timer_list) +{ + /* Interrupt execution to force deadline recalculation. */ + if (timer_list->clock->type == QEMU_CLOCK_VIRTUAL) { + qemu_start_warp_timer(); + } + timerlist_notify(timer_list); +} + +/* stop a timer, but do not dealloc it */ +void timer_del(QEMUTimer *ts) +{ + QEMUTimerList *timer_list = ts->timer_list; + + if (timer_list) { + qemu_mutex_lock(&timer_list->active_timers_lock); + timer_del_locked(timer_list, ts); + qemu_mutex_unlock(&timer_list->active_timers_lock); + } +} + +/* modify the current timer so that it will be fired when current_time + >= expire_time. The corresponding callback will be called. */ +void timer_mod_ns(QEMUTimer *ts, int64_t expire_time) +{ + QEMUTimerList *timer_list = ts->timer_list; + bool rearm; + + qemu_mutex_lock(&timer_list->active_timers_lock); + timer_del_locked(timer_list, ts); + rearm = timer_mod_ns_locked(timer_list, ts, expire_time); + qemu_mutex_unlock(&timer_list->active_timers_lock); + + if (rearm) { + timerlist_rearm(timer_list); + } +} + +/* modify the current timer so that it will be fired when current_time + >= expire_time or the current deadline, whichever comes earlier. + The corresponding callback will be called. */ +void timer_mod_anticipate_ns(QEMUTimer *ts, int64_t expire_time) +{ + QEMUTimerList *timer_list = ts->timer_list; + bool rearm; + + qemu_mutex_lock(&timer_list->active_timers_lock); + if (ts->expire_time == -1 || ts->expire_time > expire_time) { + if (ts->expire_time != -1) { + timer_del_locked(timer_list, ts); + } + rearm = timer_mod_ns_locked(timer_list, ts, expire_time); + } else { + rearm = false; + } + qemu_mutex_unlock(&timer_list->active_timers_lock); + + if (rearm) { + timerlist_rearm(timer_list); + } +} + +void timer_mod(QEMUTimer *ts, int64_t expire_time) +{ + timer_mod_ns(ts, expire_time * ts->scale); +} + +void timer_mod_anticipate(QEMUTimer *ts, int64_t expire_time) +{ + timer_mod_anticipate_ns(ts, expire_time * ts->scale); +} + +bool timer_pending(QEMUTimer *ts) +{ + return ts->expire_time >= 0; +} + +bool timer_expired(QEMUTimer *timer_head, int64_t current_time) +{ + return timer_expired_ns(timer_head, current_time * timer_head->scale); +} + +bool timerlist_run_timers(QEMUTimerList *timer_list) +{ + QEMUTimer *ts; + int64_t current_time; + bool progress = false; + QEMUTimerCB *cb; + void *opaque; + + if (!atomic_read(&timer_list->active_timers)) { + return false; + } + + qemu_event_reset(&timer_list->timers_done_ev); + if (!timer_list->clock->enabled) { + goto out; + } + + switch (timer_list->clock->type) { + case QEMU_CLOCK_REALTIME: + break; + default: + case QEMU_CLOCK_VIRTUAL: + if (!replay_checkpoint(CHECKPOINT_CLOCK_VIRTUAL)) { + goto out; + } + break; + case QEMU_CLOCK_HOST: + if (!replay_checkpoint(CHECKPOINT_CLOCK_HOST)) { + goto out; + } + break; + case QEMU_CLOCK_VIRTUAL_RT: + if (!replay_checkpoint(CHECKPOINT_CLOCK_VIRTUAL_RT)) { + goto out; + } + break; + } + + current_time = qemu_clock_get_ns(timer_list->clock->type); + for(;;) { + qemu_mutex_lock(&timer_list->active_timers_lock); + ts = timer_list->active_timers; + if (!timer_expired_ns(ts, current_time)) { + qemu_mutex_unlock(&timer_list->active_timers_lock); + break; + } + + /* remove timer from the list before calling the callback */ + timer_list->active_timers = ts->next; + ts->next = NULL; + ts->expire_time = -1; + cb = ts->cb; + opaque = ts->opaque; + qemu_mutex_unlock(&timer_list->active_timers_lock); + + /* run the callback (the timer list can be modified) */ + cb(opaque); + progress = true; + } + +out: + qemu_event_set(&timer_list->timers_done_ev); + return progress; +} + +bool qemu_clock_run_timers(QEMUClockType type) +{ + return timerlist_run_timers(main_loop_tlg.tl[type]); +} + +void timerlistgroup_init(QEMUTimerListGroup *tlg, + QEMUTimerListNotifyCB *cb, void *opaque) +{ + QEMUClockType type; + for (type = 0; type < QEMU_CLOCK_MAX; type++) { + tlg->tl[type] = timerlist_new(type, cb, opaque); + } +} + +void timerlistgroup_deinit(QEMUTimerListGroup *tlg) +{ + QEMUClockType type; + for (type = 0; type < QEMU_CLOCK_MAX; type++) { + timerlist_free(tlg->tl[type]); + } +} + +bool timerlistgroup_run_timers(QEMUTimerListGroup *tlg) +{ + QEMUClockType type; + bool progress = false; + for (type = 0; type < QEMU_CLOCK_MAX; type++) { + progress |= timerlist_run_timers(tlg->tl[type]); + } + return progress; +} + +int64_t timerlistgroup_deadline_ns(QEMUTimerListGroup *tlg) +{ + int64_t deadline = -1; + QEMUClockType type; + bool play = replay_mode == REPLAY_MODE_PLAY; + for (type = 0; type < QEMU_CLOCK_MAX; type++) { + if (qemu_clock_use_for_deadline(type)) { + if (!play || type == QEMU_CLOCK_REALTIME) { + deadline = qemu_soonest_timeout(deadline, + timerlist_deadline_ns(tlg->tl[type])); + } else { + /* Read clock from the replay file and + do not calculate the deadline, based on virtual clock. */ + qemu_clock_get_ns(type); + } + } + } + return deadline; +} + +int64_t qemu_clock_get_ns(QEMUClockType type) +{ + int64_t now, last; + QEMUClock *clock = qemu_clock_ptr(type); + + switch (type) { + case QEMU_CLOCK_REALTIME: + return get_clock(); + default: + case QEMU_CLOCK_VIRTUAL: + if (use_icount) { + return cpu_get_icount(); + } else { + return cpu_get_clock(); + } + case QEMU_CLOCK_HOST: + now = REPLAY_CLOCK(REPLAY_CLOCK_HOST, get_clock_realtime()); + last = clock->last; + clock->last = now; + if (now < last || now > (last + get_max_clock_jump())) { + notifier_list_notify(&clock->reset_notifiers, &now); + } + return now; + case QEMU_CLOCK_VIRTUAL_RT: + return REPLAY_CLOCK(REPLAY_CLOCK_VIRTUAL_RT, cpu_get_clock()); + } +} + +void qemu_clock_register_reset_notifier(QEMUClockType type, + Notifier *notifier) +{ + QEMUClock *clock = qemu_clock_ptr(type); + notifier_list_add(&clock->reset_notifiers, notifier); +} + +void qemu_clock_unregister_reset_notifier(QEMUClockType type, + Notifier *notifier) +{ + notifier_remove(notifier); +} + +void init_clocks(void) +{ + QEMUClockType type; + for (type = 0; type < QEMU_CLOCK_MAX; type++) { + qemu_clock_init(type); + } + +#ifdef CONFIG_PRCTL_PR_SET_TIMERSLACK + prctl(PR_SET_TIMERSLACK, 1, 0, 0, 0); +#endif +} + +uint64_t timer_expire_time_ns(QEMUTimer *ts) +{ + return timer_pending(ts) ? ts->expire_time : -1; +} + +bool qemu_clock_run_all_timers(void) +{ + bool progress = false; + QEMUClockType type; + + for (type = 0; type < QEMU_CLOCK_MAX; type++) { + progress |= qemu_clock_run_timers(type); + } + + return progress; +} diff --git a/util/thread-pool.c b/util/thread-pool.c new file mode 100644 index 0000000..ce6cd30 --- /dev/null +++ b/util/thread-pool.c @@ -0,0 +1,347 @@ +/* + * QEMU block layer thread pool + * + * Copyright IBM, Corp. 2008 + * Copyright Red Hat, Inc. 2012 + * + * Authors: + * Anthony Liguori <aliguori@us.ibm.com> + * Paolo Bonzini <pbonzini@redhat.com> + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + * + * Contributions after 2012-01-13 are licensed under the terms of the + * GNU GPL, version 2 or (at your option) any later version. + */ +#include "qemu/osdep.h" +#include "qemu-common.h" +#include "qemu/queue.h" +#include "qemu/thread.h" +#include "qemu/coroutine.h" +#include "trace.h" +#include "block/thread-pool.h" +#include "qemu/main-loop.h" + +static void do_spawn_thread(ThreadPool *pool); + +typedef struct ThreadPoolElement ThreadPoolElement; + +enum ThreadState { + THREAD_QUEUED, + THREAD_ACTIVE, + THREAD_DONE, +}; + +struct ThreadPoolElement { + BlockAIOCB common; + ThreadPool *pool; + ThreadPoolFunc *func; + void *arg; + + /* Moving state out of THREAD_QUEUED is protected by lock. After + * that, only the worker thread can write to it. Reads and writes + * of state and ret are ordered with memory barriers. + */ + enum ThreadState state; + int ret; + + /* Access to this list is protected by lock. */ + QTAILQ_ENTRY(ThreadPoolElement) reqs; + + /* Access to this list is protected by the global mutex. */ + QLIST_ENTRY(ThreadPoolElement) all; +}; + +struct ThreadPool { + AioContext *ctx; + QEMUBH *completion_bh; + QemuMutex lock; + QemuCond worker_stopped; + QemuSemaphore sem; + int max_threads; + QEMUBH *new_thread_bh; + + /* The following variables are only accessed from one AioContext. */ + QLIST_HEAD(, ThreadPoolElement) head; + + /* The following variables are protected by lock. */ + QTAILQ_HEAD(, ThreadPoolElement) request_list; + int cur_threads; + int idle_threads; + int new_threads; /* backlog of threads we need to create */ + int pending_threads; /* threads created but not running yet */ + bool stopping; +}; + +static void *worker_thread(void *opaque) +{ + ThreadPool *pool = opaque; + + qemu_mutex_lock(&pool->lock); + pool->pending_threads--; + do_spawn_thread(pool); + + while (!pool->stopping) { + ThreadPoolElement *req; + int ret; + + do { + pool->idle_threads++; + qemu_mutex_unlock(&pool->lock); + ret = qemu_sem_timedwait(&pool->sem, 10000); + qemu_mutex_lock(&pool->lock); + pool->idle_threads--; + } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list)); + if (ret == -1 || pool->stopping) { + break; + } + + req = QTAILQ_FIRST(&pool->request_list); + QTAILQ_REMOVE(&pool->request_list, req, reqs); + req->state = THREAD_ACTIVE; + qemu_mutex_unlock(&pool->lock); + + ret = req->func(req->arg); + + req->ret = ret; + /* Write ret before state. */ + smp_wmb(); + req->state = THREAD_DONE; + + qemu_mutex_lock(&pool->lock); + + qemu_bh_schedule(pool->completion_bh); + } + + pool->cur_threads--; + qemu_cond_signal(&pool->worker_stopped); + qemu_mutex_unlock(&pool->lock); + return NULL; +} + +static void do_spawn_thread(ThreadPool *pool) +{ + QemuThread t; + + /* Runs with lock taken. */ + if (!pool->new_threads) { + return; + } + + pool->new_threads--; + pool->pending_threads++; + + qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED); +} + +static void spawn_thread_bh_fn(void *opaque) +{ + ThreadPool *pool = opaque; + + qemu_mutex_lock(&pool->lock); + do_spawn_thread(pool); + qemu_mutex_unlock(&pool->lock); +} + +static void spawn_thread(ThreadPool *pool) +{ + pool->cur_threads++; + pool->new_threads++; + /* If there are threads being created, they will spawn new workers, so + * we don't spend time creating many threads in a loop holding a mutex or + * starving the current vcpu. + * + * If there are no idle threads, ask the main thread to create one, so we + * inherit the correct affinity instead of the vcpu affinity. + */ + if (!pool->pending_threads) { + qemu_bh_schedule(pool->new_thread_bh); + } +} + +static void thread_pool_completion_bh(void *opaque) +{ + ThreadPool *pool = opaque; + ThreadPoolElement *elem, *next; + + aio_context_acquire(pool->ctx); +restart: + QLIST_FOREACH_SAFE(elem, &pool->head, all, next) { + if (elem->state != THREAD_DONE) { + continue; + } + + trace_thread_pool_complete(pool, elem, elem->common.opaque, + elem->ret); + QLIST_REMOVE(elem, all); + + if (elem->common.cb) { + /* Read state before ret. */ + smp_rmb(); + + /* Schedule ourselves in case elem->common.cb() calls aio_poll() to + * wait for another request that completed at the same time. + */ + qemu_bh_schedule(pool->completion_bh); + + aio_context_release(pool->ctx); + elem->common.cb(elem->common.opaque, elem->ret); + aio_context_acquire(pool->ctx); + qemu_aio_unref(elem); + goto restart; + } else { + qemu_aio_unref(elem); + } + } + aio_context_release(pool->ctx); +} + +static void thread_pool_cancel(BlockAIOCB *acb) +{ + ThreadPoolElement *elem = (ThreadPoolElement *)acb; + ThreadPool *pool = elem->pool; + + trace_thread_pool_cancel(elem, elem->common.opaque); + + qemu_mutex_lock(&pool->lock); + if (elem->state == THREAD_QUEUED && + /* No thread has yet started working on elem. we can try to "steal" + * the item from the worker if we can get a signal from the + * semaphore. Because this is non-blocking, we can do it with + * the lock taken and ensure that elem will remain THREAD_QUEUED. + */ + qemu_sem_timedwait(&pool->sem, 0) == 0) { + QTAILQ_REMOVE(&pool->request_list, elem, reqs); + qemu_bh_schedule(pool->completion_bh); + + elem->state = THREAD_DONE; + elem->ret = -ECANCELED; + } + + qemu_mutex_unlock(&pool->lock); +} + +static AioContext *thread_pool_get_aio_context(BlockAIOCB *acb) +{ + ThreadPoolElement *elem = (ThreadPoolElement *)acb; + ThreadPool *pool = elem->pool; + return pool->ctx; +} + +static const AIOCBInfo thread_pool_aiocb_info = { + .aiocb_size = sizeof(ThreadPoolElement), + .cancel_async = thread_pool_cancel, + .get_aio_context = thread_pool_get_aio_context, +}; + +BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool, + ThreadPoolFunc *func, void *arg, + BlockCompletionFunc *cb, void *opaque) +{ + ThreadPoolElement *req; + + req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque); + req->func = func; + req->arg = arg; + req->state = THREAD_QUEUED; + req->pool = pool; + + QLIST_INSERT_HEAD(&pool->head, req, all); + + trace_thread_pool_submit(pool, req, arg); + + qemu_mutex_lock(&pool->lock); + if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) { + spawn_thread(pool); + } + QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); + qemu_mutex_unlock(&pool->lock); + qemu_sem_post(&pool->sem); + return &req->common; +} + +typedef struct ThreadPoolCo { + Coroutine *co; + int ret; +} ThreadPoolCo; + +static void thread_pool_co_cb(void *opaque, int ret) +{ + ThreadPoolCo *co = opaque; + + co->ret = ret; + aio_co_wake(co->co); +} + +int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, + void *arg) +{ + ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; + assert(qemu_in_coroutine()); + thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc); + qemu_coroutine_yield(); + return tpc.ret; +} + +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg) +{ + thread_pool_submit_aio(pool, func, arg, NULL, NULL); +} + +static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) +{ + if (!ctx) { + ctx = qemu_get_aio_context(); + } + + memset(pool, 0, sizeof(*pool)); + pool->ctx = ctx; + pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool); + qemu_mutex_init(&pool->lock); + qemu_cond_init(&pool->worker_stopped); + qemu_sem_init(&pool->sem, 0); + pool->max_threads = 64; + pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); + + QLIST_INIT(&pool->head); + QTAILQ_INIT(&pool->request_list); +} + +ThreadPool *thread_pool_new(AioContext *ctx) +{ + ThreadPool *pool = g_new(ThreadPool, 1); + thread_pool_init_one(pool, ctx); + return pool; +} + +void thread_pool_free(ThreadPool *pool) +{ + if (!pool) { + return; + } + + assert(QLIST_EMPTY(&pool->head)); + + qemu_mutex_lock(&pool->lock); + + /* Stop new threads from spawning */ + qemu_bh_delete(pool->new_thread_bh); + pool->cur_threads -= pool->new_threads; + pool->new_threads = 0; + + /* Wait for worker threads to terminate */ + pool->stopping = true; + while (pool->cur_threads > 0) { + qemu_sem_post(&pool->sem); + qemu_cond_wait(&pool->worker_stopped, &pool->lock); + } + + qemu_mutex_unlock(&pool->lock); + + qemu_bh_delete(pool->completion_bh); + qemu_sem_destroy(&pool->sem); + qemu_cond_destroy(&pool->worker_stopped); + qemu_mutex_destroy(&pool->lock); + g_free(pool); +} diff --git a/util/trace-events b/util/trace-events index 2b8aa30..ac27d94 100644 --- a/util/trace-events +++ b/util/trace-events @@ -1,5 +1,20 @@ # See docs/tracing.txt for syntax documentation. +# util/aio-posix.c +run_poll_handlers_begin(void *ctx, int64_t max_ns) "ctx %p max_ns %"PRId64 +run_poll_handlers_end(void *ctx, bool progress) "ctx %p progress %d" +poll_shrink(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64 +poll_grow(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64 + +# util/async.c +aio_co_schedule(void *ctx, void *co) "ctx %p co %p" +aio_co_schedule_bh_cb(void *ctx, void *co) "ctx %p co %p" + +# util/thread-pool.c +thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p" +thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d" +thread_pool_cancel(void *req, void *opaque) "req %p opaque %p" + # util/buffer.c buffer_resize(const char *buf, size_t olen, size_t len) "%s: old %zd, new %zd" buffer_move_empty(const char *buf, size_t len, const char *from) "%s: %zd bytes from %s" @@ -13,7 +28,7 @@ qemu_coroutine_terminate(void *co) "self %p" # util/qemu-coroutine-lock.c qemu_co_queue_run_restart(void *co) "co %p" -qemu_co_queue_next(void *nxt) "next %p" +qemu_co_mutex_lock_uncontended(void *mutex, void *self) "mutex %p self %p" qemu_co_mutex_lock_entry(void *mutex, void *self) "mutex %p self %p" qemu_co_mutex_lock_return(void *mutex, void *self) "mutex %p self %p" qemu_co_mutex_unlock_entry(void *mutex, void *self) "mutex %p self %p" |