diff options
-rw-r--r-- | Makefile | 7 | ||||
-rw-r--r-- | Makefile.objs | 22 | ||||
-rw-r--r-- | aio-posix.c | 268 | ||||
-rw-r--r-- | aio-win32.c | 215 | ||||
-rw-r--r-- | aio.c | 194 | ||||
-rw-r--r-- | arch_init.h | 2 | ||||
-rw-r--r-- | async.c | 118 | ||||
-rw-r--r-- | block/Makefile.objs | 9 | ||||
-rw-r--r-- | block/linux-aio.c (renamed from linux-aio.c) | 52 | ||||
-rw-r--r-- | block/raw-aio.h (renamed from block/raw-posix-aio.h) | 29 | ||||
-rw-r--r-- | block/raw-posix.c | 308 | ||||
-rw-r--r-- | block/raw-win32.c | 221 | ||||
-rw-r--r-- | block/win32-aio.c | 226 | ||||
-rw-r--r-- | compiler.h | 11 | ||||
-rw-r--r-- | cutils.c | 108 | ||||
-rw-r--r-- | event_notifier-posix.c | 120 | ||||
-rw-r--r-- | event_notifier-win32.c | 59 | ||||
-rw-r--r-- | event_notifier.c | 67 | ||||
-rw-r--r-- | event_notifier.h | 20 | ||||
-rw-r--r-- | hw/hw.h | 1 | ||||
-rw-r--r-- | iohandler.c | 1 | ||||
-rw-r--r-- | iov.c | 103 | ||||
-rw-r--r-- | main-loop.c | 157 | ||||
-rw-r--r-- | main-loop.h | 66 | ||||
-rw-r--r-- | osdep.c | 30 | ||||
-rw-r--r-- | oslib-posix.c | 31 | ||||
-rw-r--r-- | oslib-win32.c | 5 | ||||
-rw-r--r-- | posix-aio-compat.c | 679 | ||||
-rw-r--r-- | qapi/Makefile.objs | 4 | ||||
-rw-r--r-- | qemu-aio.h | 206 | ||||
-rw-r--r-- | qemu-char.h | 1 | ||||
-rw-r--r-- | qemu-common.h | 3 | ||||
-rw-r--r-- | qemu-coroutine-lock.c | 2 | ||||
-rw-r--r-- | qemu-os-win32.h | 1 | ||||
-rw-r--r-- | qemu-sockets.c | 18 | ||||
-rw-r--r-- | qemu-thread-posix.c | 80 | ||||
-rw-r--r-- | qemu-thread-posix.h | 5 | ||||
-rw-r--r-- | qemu-thread-win32.c | 35 | ||||
-rw-r--r-- | qemu-thread-win32.h | 4 | ||||
-rw-r--r-- | qemu-thread.h | 7 | ||||
-rw-r--r-- | qemu-timer.c | 12 | ||||
-rw-r--r-- | qemu-tool.c | 35 | ||||
-rw-r--r-- | qemu-user.c | 20 | ||||
-rw-r--r-- | qmp.c | 3 | ||||
-rw-r--r-- | tests/Makefile | 8 | ||||
-rw-r--r-- | thread-pool.c | 289 | ||||
-rw-r--r-- | thread-pool.h | 34 | ||||
-rw-r--r-- | trace-events | 5 | ||||
-rw-r--r-- | vl.c | 17 |
49 files changed, 2429 insertions, 1489 deletions
@@ -171,8 +171,7 @@ endif qemu-img.o: qemu-img-cmds.h tools-obj-y = $(oslib-obj-y) $(trace-obj-y) qemu-tool.o qemu-timer.o \ - qemu-timer-common.o main-loop.o notify.o \ - iohandler.o cutils.o iov.o async.o error.o + main-loop.o iohandler.o error.o tools-obj-$(CONFIG_POSIX) += compatfd.o qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y) @@ -181,7 +180,7 @@ qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(block-obj-y) qemu-bridge-helper$(EXESUF): qemu-bridge-helper.o -vscclient$(EXESUF): $(libcacard-y) $(oslib-obj-y) $(trace-obj-y) $(tools-obj-y) qemu-timer-common.o libcacard/vscclient.o +vscclient$(EXESUF): $(libcacard-y) $(oslib-obj-y) $(trace-obj-y) libcacard/vscclient.o $(call quiet-command,$(CC) $(LDFLAGS) -o $@ $^ $(libcacard_libs) $(LIBS)," LINK $@") fsdev/virtfs-proxy-helper$(EXESUF): fsdev/virtfs-proxy-helper.o fsdev/virtio-9p-marshal.o oslib-posix.o $(trace-obj-y) @@ -224,7 +223,7 @@ $(SRC_PATH)/qapi-schema.json $(SRC_PATH)/scripts/qapi-commands.py $(qapi-py) QGALIB_GEN=$(addprefix qga/qapi-generated/, qga-qapi-types.h qga-qapi-visit.h qga-qmp-commands.h) $(qga-obj-y) qemu-ga.o: $(QGALIB_GEN) -qemu-ga$(EXESUF): qemu-ga.o $(qga-obj-y) $(tools-obj-y) $(qapi-obj-y) $(qobject-obj-y) $(version-obj-y) +qemu-ga$(EXESUF): qemu-ga.o $(qga-obj-y) $(oslib-obj-y) $(trace-obj-y) $(qapi-obj-y) $(qobject-obj-y) $(version-obj-y) QEMULIBS=libuser libdis libdis-user diff --git a/Makefile.objs b/Makefile.objs index 9eca179..2b5427e 100644 --- a/Makefile.objs +++ b/Makefile.objs @@ -19,7 +19,7 @@ universal-obj-y += $(qom-obj-y) ####################################################################### # oslib-obj-y is code depending on the OS (win32 vs posix) -oslib-obj-y = osdep.o +oslib-obj-y = osdep.o cutils.o qemu-timer-common.o oslib-obj-$(CONFIG_WIN32) += oslib-win32.o qemu-thread-win32.o oslib-obj-$(CONFIG_POSIX) += oslib-posix.o qemu-thread-posix.o @@ -41,12 +41,12 @@ coroutine-obj-$(CONFIG_WIN32) += coroutine-win32.o ####################################################################### # block-obj-y is code used by both qemu system emulation and qemu-img -block-obj-y = cutils.o iov.o cache-utils.o qemu-option.o module.o async.o -block-obj-y += nbd.o block.o blockjob.o aio.o aes.o qemu-config.o -block-obj-y += qemu-progress.o qemu-sockets.o uri.o notify.o +block-obj-y = iov.o cache-utils.o qemu-option.o module.o async.o +block-obj-y += nbd.o block.o blockjob.o aes.o qemu-config.o +block-obj-y += thread-pool.o qemu-progress.o qemu-sockets.o uri.o notify.o block-obj-y += $(coroutine-obj-y) $(qobject-obj-y) $(version-obj-y) -block-obj-$(CONFIG_POSIX) += posix-aio-compat.o -block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o +block-obj-$(CONFIG_POSIX) += event_notifier-posix.o aio-posix.o +block-obj-$(CONFIG_WIN32) += event_notifier-win32.o aio-win32.o block-obj-y += block/ block-obj-y += $(qapi-obj-y) qapi-types.o qapi-visit.o @@ -92,9 +92,8 @@ common-obj-y += ui/ common-obj-y += bt-host.o bt-vhci.o common-obj-y += dma-helpers.o -common-obj-y += iov.o acl.o +common-obj-y += acl.o common-obj-$(CONFIG_POSIX) += compatfd.o -common-obj-y += event_notifier.o common-obj-y += qemu-timer.o qemu-timer-common.o common-obj-y += qtest.o common-obj-y += vl.o @@ -113,7 +112,7 @@ endif user-obj-y = user-obj-y += envlist.o path.o user-obj-y += tcg-runtime.o host-utils.o -user-obj-y += cutils.o iov.o cache-utils.o +user-obj-y += cache-utils.o user-obj-y += module.o user-obj-y += qemu-user.o user-obj-y += $(trace-obj-y) @@ -228,9 +227,8 @@ universal-obj-y += $(qapi-obj-y) ###################################################################### # guest agent -qga-obj-y = qga/ qemu-ga.o module.o -qga-obj-$(CONFIG_WIN32) += oslib-win32.o -qga-obj-$(CONFIG_POSIX) += oslib-posix.o qemu-sockets.o qemu-option.o +qga-obj-y = qga/ qemu-ga.o module.o qemu-tool.o +qga-obj-$(CONFIG_POSIX) += qemu-sockets.o qemu-option.o vl.o: QEMU_CFLAGS+=$(GPROF_CFLAGS) diff --git a/aio-posix.c b/aio-posix.c new file mode 100644 index 0000000..05cc84e --- /dev/null +++ b/aio-posix.c @@ -0,0 +1,268 @@ +/* + * QEMU aio implementation + * + * Copyright IBM, Corp. 2008 + * + * Authors: + * Anthony Liguori <aliguori@us.ibm.com> + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + * + * Contributions after 2012-01-13 are licensed under the terms of the + * GNU GPL, version 2 or (at your option) any later version. + */ + +#include "qemu-common.h" +#include "block.h" +#include "qemu-queue.h" +#include "qemu_socket.h" + +struct AioHandler +{ + GPollFD pfd; + IOHandler *io_read; + IOHandler *io_write; + AioFlushHandler *io_flush; + int deleted; + void *opaque; + QLIST_ENTRY(AioHandler) node; +}; + +static AioHandler *find_aio_handler(AioContext *ctx, int fd) +{ + AioHandler *node; + + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + if (node->pfd.fd == fd) + if (!node->deleted) + return node; + } + + return NULL; +} + +void aio_set_fd_handler(AioContext *ctx, + int fd, + IOHandler *io_read, + IOHandler *io_write, + AioFlushHandler *io_flush, + void *opaque) +{ + AioHandler *node; + + node = find_aio_handler(ctx, fd); + + /* Are we deleting the fd handler? */ + if (!io_read && !io_write) { + if (node) { + g_source_remove_poll(&ctx->source, &node->pfd); + + /* If the lock is held, just mark the node as deleted */ + if (ctx->walking_handlers) { + node->deleted = 1; + node->pfd.revents = 0; + } else { + /* Otherwise, delete it for real. We can't just mark it as + * deleted because deleted nodes are only cleaned up after + * releasing the walking_handlers lock. + */ + QLIST_REMOVE(node, node); + g_free(node); + } + } + } else { + if (node == NULL) { + /* Alloc and insert if it's not already there */ + node = g_malloc0(sizeof(AioHandler)); + node->pfd.fd = fd; + QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node); + + g_source_add_poll(&ctx->source, &node->pfd); + } + /* Update handler with latest information */ + node->io_read = io_read; + node->io_write = io_write; + node->io_flush = io_flush; + node->opaque = opaque; + + node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP : 0); + node->pfd.events |= (io_write ? G_IO_OUT : 0); + } + + aio_notify(ctx); +} + +void aio_set_event_notifier(AioContext *ctx, + EventNotifier *notifier, + EventNotifierHandler *io_read, + AioFlushEventNotifierHandler *io_flush) +{ + aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), + (IOHandler *)io_read, NULL, + (AioFlushHandler *)io_flush, notifier); +} + +bool aio_pending(AioContext *ctx) +{ + AioHandler *node; + + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + int revents; + + /* + * FIXME: right now we cannot get G_IO_HUP and G_IO_ERR because + * main-loop.c is still select based (due to the slirp legacy). + * If main-loop.c ever switches to poll, G_IO_ERR should be + * tested too. Dispatching G_IO_ERR to both handlers should be + * okay, since handlers need to be ready for spurious wakeups. + */ + revents = node->pfd.revents & node->pfd.events; + if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) { + return true; + } + if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) { + return true; + } + } + + return false; +} + +bool aio_poll(AioContext *ctx, bool blocking) +{ + static struct timeval tv0; + AioHandler *node; + fd_set rdfds, wrfds; + int max_fd = -1; + int ret; + bool busy, progress; + + progress = false; + + /* + * If there are callbacks left that have been queued, we need to call then. + * Do not call select in this case, because it is possible that the caller + * does not need a complete flush (as is the case for qemu_aio_wait loops). + */ + if (aio_bh_poll(ctx)) { + blocking = false; + progress = true; + } + + /* + * Then dispatch any pending callbacks from the GSource. + * + * We have to walk very carefully in case qemu_aio_set_fd_handler is + * called while we're walking. + */ + node = QLIST_FIRST(&ctx->aio_handlers); + while (node) { + AioHandler *tmp; + int revents; + + ctx->walking_handlers++; + + revents = node->pfd.revents & node->pfd.events; + node->pfd.revents = 0; + + /* See comment in aio_pending. */ + if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) { + node->io_read(node->opaque); + progress = true; + } + if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) { + node->io_write(node->opaque); + progress = true; + } + + tmp = node; + node = QLIST_NEXT(node, node); + + ctx->walking_handlers--; + + if (!ctx->walking_handlers && tmp->deleted) { + QLIST_REMOVE(tmp, node); + g_free(tmp); + } + } + + if (progress && !blocking) { + return true; + } + + ctx->walking_handlers++; + + FD_ZERO(&rdfds); + FD_ZERO(&wrfds); + + /* fill fd sets */ + busy = false; + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + /* If there aren't pending AIO operations, don't invoke callbacks. + * Otherwise, if there are no AIO requests, qemu_aio_wait() would + * wait indefinitely. + */ + if (!node->deleted && node->io_flush) { + if (node->io_flush(node->opaque) == 0) { + continue; + } + busy = true; + } + if (!node->deleted && node->io_read) { + FD_SET(node->pfd.fd, &rdfds); + max_fd = MAX(max_fd, node->pfd.fd + 1); + } + if (!node->deleted && node->io_write) { + FD_SET(node->pfd.fd, &wrfds); + max_fd = MAX(max_fd, node->pfd.fd + 1); + } + } + + ctx->walking_handlers--; + + /* No AIO operations? Get us out of here */ + if (!busy) { + return progress; + } + + /* wait until next event */ + ret = select(max_fd, &rdfds, &wrfds, NULL, blocking ? NULL : &tv0); + + /* if we have any readable fds, dispatch event */ + if (ret > 0) { + /* we have to walk very carefully in case + * qemu_aio_set_fd_handler is called while we're walking */ + node = QLIST_FIRST(&ctx->aio_handlers); + while (node) { + AioHandler *tmp; + + ctx->walking_handlers++; + + if (!node->deleted && + FD_ISSET(node->pfd.fd, &rdfds) && + node->io_read) { + node->io_read(node->opaque); + progress = true; + } + if (!node->deleted && + FD_ISSET(node->pfd.fd, &wrfds) && + node->io_write) { + node->io_write(node->opaque); + progress = true; + } + + tmp = node; + node = QLIST_NEXT(node, node); + + ctx->walking_handlers--; + + if (!ctx->walking_handlers && tmp->deleted) { + QLIST_REMOVE(tmp, node); + g_free(tmp); + } + } + } + + return progress; +} diff --git a/aio-win32.c b/aio-win32.c new file mode 100644 index 0000000..a84eb71 --- /dev/null +++ b/aio-win32.c @@ -0,0 +1,215 @@ +/* + * QEMU aio implementation + * + * Copyright IBM Corp., 2008 + * Copyright Red Hat Inc., 2012 + * + * Authors: + * Anthony Liguori <aliguori@us.ibm.com> + * Paolo Bonzini <pbonzini@redhat.com> + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + * + * Contributions after 2012-01-13 are licensed under the terms of the + * GNU GPL, version 2 or (at your option) any later version. + */ + +#include "qemu-common.h" +#include "block.h" +#include "qemu-queue.h" +#include "qemu_socket.h" + +struct AioHandler { + EventNotifier *e; + EventNotifierHandler *io_notify; + AioFlushEventNotifierHandler *io_flush; + GPollFD pfd; + int deleted; + QLIST_ENTRY(AioHandler) node; +}; + +void aio_set_event_notifier(AioContext *ctx, + EventNotifier *e, + EventNotifierHandler *io_notify, + AioFlushEventNotifierHandler *io_flush) +{ + AioHandler *node; + + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + if (node->e == e && !node->deleted) { + break; + } + } + + /* Are we deleting the fd handler? */ + if (!io_notify) { + if (node) { + g_source_remove_poll(&ctx->source, &node->pfd); + + /* If the lock is held, just mark the node as deleted */ + if (ctx->walking_handlers) { + node->deleted = 1; + node->pfd.revents = 0; + } else { + /* Otherwise, delete it for real. We can't just mark it as + * deleted because deleted nodes are only cleaned up after + * releasing the walking_handlers lock. + */ + QLIST_REMOVE(node, node); + g_free(node); + } + } + } else { + if (node == NULL) { + /* Alloc and insert if it's not already there */ + node = g_malloc0(sizeof(AioHandler)); + node->e = e; + node->pfd.fd = (uintptr_t)event_notifier_get_handle(e); + node->pfd.events = G_IO_IN; + QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node); + + g_source_add_poll(&ctx->source, &node->pfd); + } + /* Update handler with latest information */ + node->io_notify = io_notify; + node->io_flush = io_flush; + } + + aio_notify(ctx); +} + +bool aio_pending(AioContext *ctx) +{ + AioHandler *node; + + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + if (node->pfd.revents && node->io_notify) { + return true; + } + } + + return false; +} + +bool aio_poll(AioContext *ctx, bool blocking) +{ + AioHandler *node; + HANDLE events[MAXIMUM_WAIT_OBJECTS + 1]; + bool busy, progress; + int count; + + progress = false; + + /* + * If there are callbacks left that have been queued, we need to call then. + * Do not call select in this case, because it is possible that the caller + * does not need a complete flush (as is the case for qemu_aio_wait loops). + */ + if (aio_bh_poll(ctx)) { + blocking = false; + progress = true; + } + + /* + * Then dispatch any pending callbacks from the GSource. + * + * We have to walk very carefully in case qemu_aio_set_fd_handler is + * called while we're walking. + */ + node = QLIST_FIRST(&ctx->aio_handlers); + while (node) { + AioHandler *tmp; + + ctx->walking_handlers++; + + if (node->pfd.revents && node->io_notify) { + node->pfd.revents = 0; + node->io_notify(node->e); + progress = true; + } + + tmp = node; + node = QLIST_NEXT(node, node); + + ctx->walking_handlers--; + + if (!ctx->walking_handlers && tmp->deleted) { + QLIST_REMOVE(tmp, node); + g_free(tmp); + } + } + + if (progress && !blocking) { + return true; + } + + ctx->walking_handlers++; + + /* fill fd sets */ + busy = false; + count = 0; + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + /* If there aren't pending AIO operations, don't invoke callbacks. + * Otherwise, if there are no AIO requests, qemu_aio_wait() would + * wait indefinitely. + */ + if (!node->deleted && node->io_flush) { + if (node->io_flush(node->e) == 0) { + continue; + } + busy = true; + } + if (!node->deleted && node->io_notify) { + events[count++] = event_notifier_get_handle(node->e); + } + } + + ctx->walking_handlers--; + + /* No AIO operations? Get us out of here */ + if (!busy) { + return progress; + } + + /* wait until next event */ + for (;;) { + int timeout = blocking ? INFINITE : 0; + int ret = WaitForMultipleObjects(count, events, FALSE, timeout); + + /* if we have any signaled events, dispatch event */ + if ((DWORD) (ret - WAIT_OBJECT_0) >= count) { + break; + } + + blocking = false; + + /* we have to walk very carefully in case + * qemu_aio_set_fd_handler is called while we're walking */ + node = QLIST_FIRST(&ctx->aio_handlers); + while (node) { + AioHandler *tmp; + + ctx->walking_handlers++; + + if (!node->deleted && + event_notifier_get_handle(node->e) == events[ret - WAIT_OBJECT_0] && + node->io_notify) { + node->io_notify(node->e); + progress = true; + } + + tmp = node; + node = QLIST_NEXT(node, node); + + ctx->walking_handlers--; + + if (!ctx->walking_handlers && tmp->deleted) { + QLIST_REMOVE(tmp, node); + g_free(tmp); + } + } + } + + return progress; +} @@ -1,194 +0,0 @@ -/* - * QEMU aio implementation - * - * Copyright IBM, Corp. 2008 - * - * Authors: - * Anthony Liguori <aliguori@us.ibm.com> - * - * This work is licensed under the terms of the GNU GPL, version 2. See - * the COPYING file in the top-level directory. - * - * Contributions after 2012-01-13 are licensed under the terms of the - * GNU GPL, version 2 or (at your option) any later version. - */ - -#include "qemu-common.h" -#include "block.h" -#include "qemu-queue.h" -#include "qemu_socket.h" - -typedef struct AioHandler AioHandler; - -/* The list of registered AIO handlers */ -static QLIST_HEAD(, AioHandler) aio_handlers; - -/* This is a simple lock used to protect the aio_handlers list. Specifically, - * it's used to ensure that no callbacks are removed while we're walking and - * dispatching callbacks. - */ -static int walking_handlers; - -struct AioHandler -{ - int fd; - IOHandler *io_read; - IOHandler *io_write; - AioFlushHandler *io_flush; - int deleted; - void *opaque; - QLIST_ENTRY(AioHandler) node; -}; - -static AioHandler *find_aio_handler(int fd) -{ - AioHandler *node; - - QLIST_FOREACH(node, &aio_handlers, node) { - if (node->fd == fd) - if (!node->deleted) - return node; - } - - return NULL; -} - -int qemu_aio_set_fd_handler(int fd, - IOHandler *io_read, - IOHandler *io_write, - AioFlushHandler *io_flush, - void *opaque) -{ - AioHandler *node; - - node = find_aio_handler(fd); - - /* Are we deleting the fd handler? */ - if (!io_read && !io_write) { - if (node) { - /* If the lock is held, just mark the node as deleted */ - if (walking_handlers) - node->deleted = 1; - else { - /* Otherwise, delete it for real. We can't just mark it as - * deleted because deleted nodes are only cleaned up after - * releasing the walking_handlers lock. - */ - QLIST_REMOVE(node, node); - g_free(node); - } - } - } else { - if (node == NULL) { - /* Alloc and insert if it's not already there */ - node = g_malloc0(sizeof(AioHandler)); - node->fd = fd; - QLIST_INSERT_HEAD(&aio_handlers, node, node); - } - /* Update handler with latest information */ - node->io_read = io_read; - node->io_write = io_write; - node->io_flush = io_flush; - node->opaque = opaque; - } - - qemu_set_fd_handler2(fd, NULL, io_read, io_write, opaque); - - return 0; -} - -void qemu_aio_flush(void) -{ - while (qemu_aio_wait()); -} - -bool qemu_aio_wait(void) -{ - AioHandler *node; - fd_set rdfds, wrfds; - int max_fd = -1; - int ret; - bool busy; - - /* - * If there are callbacks left that have been queued, we need to call then. - * Do not call select in this case, because it is possible that the caller - * does not need a complete flush (as is the case for qemu_aio_wait loops). - */ - if (qemu_bh_poll()) { - return true; - } - - walking_handlers++; - - FD_ZERO(&rdfds); - FD_ZERO(&wrfds); - - /* fill fd sets */ - busy = false; - QLIST_FOREACH(node, &aio_handlers, node) { - /* If there aren't pending AIO operations, don't invoke callbacks. - * Otherwise, if there are no AIO requests, qemu_aio_wait() would - * wait indefinitely. - */ - if (node->io_flush) { - if (node->io_flush(node->opaque) == 0) { - continue; - } - busy = true; - } - if (!node->deleted && node->io_read) { - FD_SET(node->fd, &rdfds); - max_fd = MAX(max_fd, node->fd + 1); - } - if (!node->deleted && node->io_write) { - FD_SET(node->fd, &wrfds); - max_fd = MAX(max_fd, node->fd + 1); - } - } - - walking_handlers--; - - /* No AIO operations? Get us out of here */ - if (!busy) { - return false; - } - - /* wait until next event */ - ret = select(max_fd, &rdfds, &wrfds, NULL, NULL); - - /* if we have any readable fds, dispatch event */ - if (ret > 0) { - /* we have to walk very carefully in case - * qemu_aio_set_fd_handler is called while we're walking */ - node = QLIST_FIRST(&aio_handlers); - while (node) { - AioHandler *tmp; - - walking_handlers++; - - if (!node->deleted && - FD_ISSET(node->fd, &rdfds) && - node->io_read) { - node->io_read(node->opaque); - } - if (!node->deleted && - FD_ISSET(node->fd, &wrfds) && - node->io_write) { - node->io_write(node->opaque); - } - - tmp = node; - node = QLIST_NEXT(node, node); - - walking_handlers--; - - if (!walking_handlers && tmp->deleted) { - QLIST_REMOVE(tmp, node); - g_free(tmp); - } - } - } - - return true; -} diff --git a/arch_init.h b/arch_init.h index d9c572a..5fc780c 100644 --- a/arch_init.h +++ b/arch_init.h @@ -34,6 +34,6 @@ int tcg_available(void); int kvm_available(void); int xen_available(void); -CpuDefinitionInfoList GCC_WEAK_DECL *arch_query_cpu_definitions(Error **errp); +CpuDefinitionInfoList *arch_query_cpu_definitions(Error **errp); #endif @@ -26,13 +26,11 @@ #include "qemu-aio.h" #include "main-loop.h" -/* Anchor of the list of Bottom Halves belonging to the context */ -static struct QEMUBH *first_bh; - /***********************************************************/ /* bottom halves (can be seen as timers which expire ASAP) */ struct QEMUBH { + AioContext *ctx; QEMUBHFunc *cb; void *opaque; QEMUBH *next; @@ -41,27 +39,27 @@ struct QEMUBH { bool deleted; }; -QEMUBH *qemu_bh_new(QEMUBHFunc *cb, void *opaque) +QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque) { QEMUBH *bh; bh = g_malloc0(sizeof(QEMUBH)); + bh->ctx = ctx; bh->cb = cb; bh->opaque = opaque; - bh->next = first_bh; - first_bh = bh; + bh->next = ctx->first_bh; + ctx->first_bh = bh; return bh; } -int qemu_bh_poll(void) +int aio_bh_poll(AioContext *ctx) { QEMUBH *bh, **bhp, *next; int ret; - static int nesting = 0; - nesting++; + ctx->walking_bh++; ret = 0; - for (bh = first_bh; bh; bh = next) { + for (bh = ctx->first_bh; bh; bh = next) { next = bh->next; if (!bh->deleted && bh->scheduled) { bh->scheduled = 0; @@ -72,11 +70,11 @@ int qemu_bh_poll(void) } } - nesting--; + ctx->walking_bh--; /* remove deleted bhs */ - if (!nesting) { - bhp = &first_bh; + if (!ctx->walking_bh) { + bhp = &ctx->first_bh; while (*bhp) { bh = *bhp; if (bh->deleted) { @@ -105,8 +103,7 @@ void qemu_bh_schedule(QEMUBH *bh) return; bh->scheduled = 1; bh->idle = 0; - /* stop the currently executing CPU to execute the BH ASAP */ - qemu_notify_event(); + aio_notify(bh->ctx); } void qemu_bh_cancel(QEMUBH *bh) @@ -120,16 +117,20 @@ void qemu_bh_delete(QEMUBH *bh) bh->deleted = 1; } -void qemu_bh_update_timeout(uint32_t *timeout) +static gboolean +aio_ctx_prepare(GSource *source, gint *timeout) { + AioContext *ctx = (AioContext *) source; QEMUBH *bh; + bool scheduled = false; - for (bh = first_bh; bh; bh = bh->next) { + for (bh = ctx->first_bh; bh; bh = bh->next) { if (!bh->deleted && bh->scheduled) { + scheduled = true; if (bh->idle) { /* idle bottom halves will be polled at least * every 10ms */ - *timeout = MIN(10, *timeout); + *timeout = 10; } else { /* non-idle bottom halves will be executed * immediately */ @@ -138,5 +139,86 @@ void qemu_bh_update_timeout(uint32_t *timeout) } } } + + return scheduled; +} + +static gboolean +aio_ctx_check(GSource *source) +{ + AioContext *ctx = (AioContext *) source; + QEMUBH *bh; + + for (bh = ctx->first_bh; bh; bh = bh->next) { + if (!bh->deleted && bh->scheduled) { + return true; + } + } + return aio_pending(ctx); +} + +static gboolean +aio_ctx_dispatch(GSource *source, + GSourceFunc callback, + gpointer user_data) +{ + AioContext *ctx = (AioContext *) source; + + assert(callback == NULL); + aio_poll(ctx, false); + return true; +} + +static void +aio_ctx_finalize(GSource *source) +{ + AioContext *ctx = (AioContext *) source; + + aio_set_event_notifier(ctx, &ctx->notifier, NULL, NULL); + event_notifier_cleanup(&ctx->notifier); +} + +static GSourceFuncs aio_source_funcs = { + aio_ctx_prepare, + aio_ctx_check, + aio_ctx_dispatch, + aio_ctx_finalize +}; + +GSource *aio_get_g_source(AioContext *ctx) +{ + g_source_ref(&ctx->source); + return &ctx->source; +} + +void aio_notify(AioContext *ctx) +{ + event_notifier_set(&ctx->notifier); +} + +AioContext *aio_context_new(void) +{ + AioContext *ctx; + ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext)); + event_notifier_init(&ctx->notifier, false); + aio_set_event_notifier(ctx, &ctx->notifier, + (EventNotifierHandler *) + event_notifier_test_and_clear, NULL); + + return ctx; } +void aio_context_ref(AioContext *ctx) +{ + g_source_ref(&ctx->source); +} + +void aio_context_unref(AioContext *ctx) +{ + g_source_unref(&ctx->source); +} + +void aio_flush(AioContext *ctx) +{ + while (aio_poll(ctx, true)); +} diff --git a/block/Makefile.objs b/block/Makefile.objs index 806e526..7f01510 100644 --- a/block/Makefile.objs +++ b/block/Makefile.objs @@ -2,13 +2,18 @@ block-obj-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vvfat block-obj-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-cache.o block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o block-obj-y += qed-check.o -block-obj-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o -block-obj-$(CONFIG_WIN32) += raw-win32.o +block-obj-y += parallels.o blkdebug.o blkverify.o +block-obj-$(CONFIG_WIN32) += raw-win32.o win32-aio.o block-obj-$(CONFIG_POSIX) += raw-posix.o +block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o + +ifeq ($(CONFIG_POSIX),y) +block-obj-y += nbd.o sheepdog.o block-obj-$(CONFIG_LIBISCSI) += iscsi.o block-obj-$(CONFIG_CURL) += curl.o block-obj-$(CONFIG_RBD) += rbd.o block-obj-$(CONFIG_GLUSTERFS) += gluster.o +endif common-obj-y += stream.o common-obj-y += commit.o diff --git a/linux-aio.c b/block/linux-aio.c index ce9b5d4..6ca984d 100644 --- a/linux-aio.c +++ b/block/linux-aio.c @@ -9,9 +9,10 @@ */ #include "qemu-common.h" #include "qemu-aio.h" -#include "block/raw-posix-aio.h" +#include "qemu-queue.h" +#include "block/raw-aio.h" +#include "event_notifier.h" -#include <sys/eventfd.h> #include <libaio.h> /* @@ -37,7 +38,7 @@ struct qemu_laiocb { struct qemu_laio_state { io_context_t ctx; - int efd; + EventNotifier e; int count; }; @@ -76,29 +77,17 @@ static void qemu_laio_process_completion(struct qemu_laio_state *s, qemu_aio_release(laiocb); } -static void qemu_laio_completion_cb(void *opaque) +static void qemu_laio_completion_cb(EventNotifier *e) { - struct qemu_laio_state *s = opaque; + struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e); - while (1) { + while (event_notifier_test_and_clear(&s->e)) { struct io_event events[MAX_EVENTS]; - uint64_t val; - ssize_t ret; struct timespec ts = { 0 }; int nevents, i; do { - ret = read(s->efd, &val, sizeof(val)); - } while (ret == -1 && errno == EINTR); - - if (ret == -1 && errno == EAGAIN) - break; - - if (ret != 8) - break; - - do { - nevents = io_getevents(s->ctx, val, MAX_EVENTS, events, &ts); + nevents = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS, events, &ts); } while (nevents == -EINTR); for (i = 0; i < nevents; i++) { @@ -112,9 +101,9 @@ static void qemu_laio_completion_cb(void *opaque) } } -static int qemu_laio_flush_cb(void *opaque) +static int qemu_laio_flush_cb(EventNotifier *e) { - struct qemu_laio_state *s = opaque; + struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e); return (s->count > 0) ? 1 : 0; } @@ -146,8 +135,9 @@ static void laio_cancel(BlockDriverAIOCB *blockacb) * We might be able to do this slightly more optimal by removing the * O_NONBLOCK flag. */ - while (laiocb->ret == -EINPROGRESS) - qemu_laio_completion_cb(laiocb->ctx); + while (laiocb->ret == -EINPROGRESS) { + qemu_laio_completion_cb(&laiocb->ctx->e); + } } static AIOPool laio_pool = { @@ -186,7 +176,7 @@ BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd, __func__, type); goto out_free_aiocb; } - io_set_eventfd(&laiocb->iocb, s->efd); + io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e)); s->count++; if (io_submit(s->ctx, 1, &iocbs) < 0) @@ -205,21 +195,21 @@ void *laio_init(void) struct qemu_laio_state *s; s = g_malloc0(sizeof(*s)); - s->efd = eventfd(0, 0); - if (s->efd == -1) + if (event_notifier_init(&s->e, false) < 0) { goto out_free_state; - fcntl(s->efd, F_SETFL, O_NONBLOCK); + } - if (io_setup(MAX_EVENTS, &s->ctx) != 0) + if (io_setup(MAX_EVENTS, &s->ctx) != 0) { goto out_close_efd; + } - qemu_aio_set_fd_handler(s->efd, qemu_laio_completion_cb, NULL, - qemu_laio_flush_cb, s); + qemu_aio_set_event_notifier(&s->e, qemu_laio_completion_cb, + qemu_laio_flush_cb); return s; out_close_efd: - close(s->efd); + event_notifier_cleanup(&s->e); out_free_state: g_free(s); return NULL; diff --git a/block/raw-posix-aio.h b/block/raw-aio.h index ba118f6..e77f361 100644 --- a/block/raw-posix-aio.h +++ b/block/raw-aio.h @@ -1,5 +1,5 @@ /* - * QEMU Posix block I/O backend AIO support + * Declarations for AIO in the raw protocol * * Copyright IBM, Corp. 2008 * @@ -12,8 +12,8 @@ * Contributions after 2012-01-13 are licensed under the terms of the * GNU GPL, version 2 or (at your option) any later version. */ -#ifndef QEMU_RAW_POSIX_AIO_H -#define QEMU_RAW_POSIX_AIO_H +#ifndef QEMU_RAW_AIO_H +#define QEMU_RAW_AIO_H /* AIO request types */ #define QEMU_AIO_READ 0x0001 @@ -27,19 +27,22 @@ #define QEMU_AIO_MISALIGNED 0x1000 -/* posix-aio-compat.c - thread pool based implementation */ -int paio_init(void); -BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd, - int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, - BlockDriverCompletionFunc *cb, void *opaque, int type); -BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd, - unsigned long int req, void *buf, - BlockDriverCompletionFunc *cb, void *opaque); - /* linux-aio.c - Linux native implementation */ +#ifdef CONFIG_LINUX_AIO void *laio_init(void); BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd, int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, BlockDriverCompletionFunc *cb, void *opaque, int type); +#endif + +#ifdef _WIN32 +typedef struct QEMUWin32AIOState QEMUWin32AIOState; +QEMUWin32AIOState *win32_aio_init(void); +int win32_aio_attach(QEMUWin32AIOState *aio, HANDLE hfile); +BlockDriverAIOCB *win32_aio_submit(BlockDriverState *bs, + QEMUWin32AIOState *aio, HANDLE hfile, + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, + BlockDriverCompletionFunc *cb, void *opaque, int type); +#endif -#endif /* QEMU_RAW_POSIX_AIO_H */ +#endif /* QEMU_RAW_AIO_H */ diff --git a/block/raw-posix.c b/block/raw-posix.c index 28d439f..f2f0404 100644 --- a/block/raw-posix.c +++ b/block/raw-posix.c @@ -27,7 +27,10 @@ #include "qemu-log.h" #include "block_int.h" #include "module.h" -#include "block/raw-posix-aio.h" +#include "trace.h" +#include "thread-pool.h" +#include "iov.h" +#include "raw-aio.h" #if defined(__APPLE__) && (__MACH__) #include <paths.h> @@ -149,6 +152,20 @@ typedef struct BDRVRawReopenState { static int fd_open(BlockDriverState *bs); static int64_t raw_getlength(BlockDriverState *bs); +typedef struct RawPosixAIOData { + BlockDriverState *bs; + int aio_fildes; + union { + struct iovec *aio_iov; + void *aio_ioctl_buf; + }; + int aio_niov; + size_t aio_nbytes; +#define aio_ioctl_cmd aio_nbytes /* for QEMU_AIO_IOCTL */ + off_t aio_offset; + int aio_type; +} RawPosixAIOData; + #if defined(__FreeBSD__) || defined(__FreeBSD_kernel__) static int cdrom_reopen(BlockDriverState *bs); #endif @@ -266,14 +283,10 @@ static int raw_open_common(BlockDriverState *bs, const char *filename, } s->fd = fd; - /* We're falling back to POSIX AIO in some cases so init always */ - if (paio_init() < 0) { - goto out_close; - } - #ifdef CONFIG_LINUX_AIO if (raw_set_aio(&s->aio_ctx, &s->use_aio, bdrv_flags)) { - goto out_close; + qemu_close(fd); + return -errno; } #endif @@ -284,10 +297,6 @@ static int raw_open_common(BlockDriverState *bs, const char *filename, #endif return 0; - -out_close: - qemu_close(fd); - return -errno; } static int raw_open(BlockDriverState *bs, const char *filename, int flags) @@ -434,6 +443,283 @@ static int qiov_is_aligned(BlockDriverState *bs, QEMUIOVector *qiov) return 1; } +static ssize_t handle_aiocb_ioctl(RawPosixAIOData *aiocb) +{ + int ret; + + ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf); + if (ret == -1) { + return -errno; + } + + /* + * This looks weird, but the aio code only considers a request + * successful if it has written the full number of bytes. + * + * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command, + * so in fact we return the ioctl command here to make posix_aio_read() + * happy.. + */ + return aiocb->aio_nbytes; +} + +static ssize_t handle_aiocb_flush(RawPosixAIOData *aiocb) +{ + int ret; + + ret = qemu_fdatasync(aiocb->aio_fildes); + if (ret == -1) { + return -errno; + } + return 0; +} + +#ifdef CONFIG_PREADV + +static bool preadv_present = true; + +static ssize_t +qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset) +{ + return preadv(fd, iov, nr_iov, offset); +} + +static ssize_t +qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset) +{ + return pwritev(fd, iov, nr_iov, offset); +} + +#else + +static bool preadv_present = false; + +static ssize_t +qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset) +{ + return -ENOSYS; +} + +static ssize_t +qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset) +{ + return -ENOSYS; +} + +#endif + +static ssize_t handle_aiocb_rw_vector(RawPosixAIOData *aiocb) +{ + ssize_t len; + + do { + if (aiocb->aio_type & QEMU_AIO_WRITE) + len = qemu_pwritev(aiocb->aio_fildes, + aiocb->aio_iov, + aiocb->aio_niov, + aiocb->aio_offset); + else + len = qemu_preadv(aiocb->aio_fildes, + aiocb->aio_iov, + aiocb->aio_niov, + aiocb->aio_offset); + } while (len == -1 && errno == EINTR); + + if (len == -1) { + return -errno; + } + return len; +} + +/* + * Read/writes the data to/from a given linear buffer. + * + * Returns the number of bytes handles or -errno in case of an error. Short + * reads are only returned if the end of the file is reached. + */ +static ssize_t handle_aiocb_rw_linear(RawPosixAIOData *aiocb, char *buf) +{ + ssize_t offset = 0; + ssize_t len; + + while (offset < aiocb->aio_nbytes) { + if (aiocb->aio_type & QEMU_AIO_WRITE) { + len = pwrite(aiocb->aio_fildes, + (const char *)buf + offset, + aiocb->aio_nbytes - offset, + aiocb->aio_offset + offset); + } else { + len = pread(aiocb->aio_fildes, + buf + offset, + aiocb->aio_nbytes - offset, + aiocb->aio_offset + offset); + } + if (len == -1 && errno == EINTR) { + continue; + } else if (len == -1) { + offset = -errno; + break; + } else if (len == 0) { + break; + } + offset += len; + } + + return offset; +} + +static ssize_t handle_aiocb_rw(RawPosixAIOData *aiocb) +{ + ssize_t nbytes; + char *buf; + + if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) { + /* + * If there is just a single buffer, and it is properly aligned + * we can just use plain pread/pwrite without any problems. + */ + if (aiocb->aio_niov == 1) { + return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base); + } + /* + * We have more than one iovec, and all are properly aligned. + * + * Try preadv/pwritev first and fall back to linearizing the + * buffer if it's not supported. + */ + if (preadv_present) { + nbytes = handle_aiocb_rw_vector(aiocb); + if (nbytes == aiocb->aio_nbytes || + (nbytes < 0 && nbytes != -ENOSYS)) { + return nbytes; + } + preadv_present = false; + } + + /* + * XXX(hch): short read/write. no easy way to handle the reminder + * using these interfaces. For now retry using plain + * pread/pwrite? + */ + } + + /* + * Ok, we have to do it the hard way, copy all segments into + * a single aligned buffer. + */ + buf = qemu_blockalign(aiocb->bs, aiocb->aio_nbytes); + if (aiocb->aio_type & QEMU_AIO_WRITE) { + char *p = buf; + int i; + + for (i = 0; i < aiocb->aio_niov; ++i) { + memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len); + p += aiocb->aio_iov[i].iov_len; + } + } + + nbytes = handle_aiocb_rw_linear(aiocb, buf); + if (!(aiocb->aio_type & QEMU_AIO_WRITE)) { + char *p = buf; + size_t count = aiocb->aio_nbytes, copy; + int i; + + for (i = 0; i < aiocb->aio_niov && count; ++i) { + copy = count; + if (copy > aiocb->aio_iov[i].iov_len) { + copy = aiocb->aio_iov[i].iov_len; + } + memcpy(aiocb->aio_iov[i].iov_base, p, copy); + p += copy; + count -= copy; + } + } + qemu_vfree(buf); + + return nbytes; +} + +static int aio_worker(void *arg) +{ + RawPosixAIOData *aiocb = arg; + ssize_t ret = 0; + + switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) { + case QEMU_AIO_READ: + ret = handle_aiocb_rw(aiocb); + if (ret >= 0 && ret < aiocb->aio_nbytes && aiocb->bs->growable) { + iov_memset(aiocb->aio_iov, aiocb->aio_niov, ret, + 0, aiocb->aio_nbytes - ret); + + ret = aiocb->aio_nbytes; + } + if (ret == aiocb->aio_nbytes) { + ret = 0; + } else if (ret >= 0 && ret < aiocb->aio_nbytes) { + ret = -EINVAL; + } + break; + case QEMU_AIO_WRITE: + ret = handle_aiocb_rw(aiocb); + if (ret == aiocb->aio_nbytes) { + ret = 0; + } else if (ret >= 0 && ret < aiocb->aio_nbytes) { + ret = -EINVAL; + } + break; + case QEMU_AIO_FLUSH: + ret = handle_aiocb_flush(aiocb); + break; + case QEMU_AIO_IOCTL: + ret = handle_aiocb_ioctl(aiocb); + break; + default: + fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type); + ret = -EINVAL; + break; + } + + g_slice_free(RawPosixAIOData, aiocb); + return ret; +} + +static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd, + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, + BlockDriverCompletionFunc *cb, void *opaque, int type) +{ + RawPosixAIOData *acb = g_slice_new(RawPosixAIOData); + + acb->bs = bs; + acb->aio_type = type; + acb->aio_fildes = fd; + + if (qiov) { + acb->aio_iov = qiov->iov; + acb->aio_niov = qiov->niov; + } + acb->aio_nbytes = nb_sectors * 512; + acb->aio_offset = sector_num * 512; + + trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); + return thread_pool_submit_aio(aio_worker, acb, cb, opaque); +} + +static BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd, + unsigned long int req, void *buf, + BlockDriverCompletionFunc *cb, void *opaque) +{ + RawPosixAIOData *acb = g_slice_new(RawPosixAIOData); + + acb->bs = bs; + acb->aio_type = QEMU_AIO_IOCTL; + acb->aio_fildes = fd; + acb->aio_offset = 0; + acb->aio_ioctl_buf = buf; + acb->aio_ioctl_cmd = req; + + return thread_pool_submit_aio(aio_worker, acb, cb, opaque); +} + static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs, int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, BlockDriverCompletionFunc *cb, void *opaque, int type) diff --git a/block/raw-win32.c b/block/raw-win32.c index 78c8306..0c05c58 100644 --- a/block/raw-win32.c +++ b/block/raw-win32.c @@ -25,6 +25,10 @@ #include "qemu-timer.h" #include "block_int.h" #include "module.h" +#include "raw-aio.h" +#include "trace.h" +#include "thread-pool.h" +#include "iov.h" #include <windows.h> #include <winioctl.h> @@ -32,12 +36,130 @@ #define FTYPE_CD 1 #define FTYPE_HARDDISK 2 +static QEMUWin32AIOState *aio; + +typedef struct RawWin32AIOData { + BlockDriverState *bs; + HANDLE hfile; + struct iovec *aio_iov; + int aio_niov; + size_t aio_nbytes; + off64_t aio_offset; + int aio_type; +} RawWin32AIOData; + typedef struct BDRVRawState { HANDLE hfile; int type; char drive_path[16]; /* format: "d:\" */ + QEMUWin32AIOState *aio; } BDRVRawState; +/* + * Read/writes the data to/from a given linear buffer. + * + * Returns the number of bytes handles or -errno in case of an error. Short + * reads are only returned if the end of the file is reached. + */ +static size_t handle_aiocb_rw(RawWin32AIOData *aiocb) +{ + size_t offset = 0; + int i; + + for (i = 0; i < aiocb->aio_niov; i++) { + OVERLAPPED ov; + DWORD ret, ret_count, len; + + memset(&ov, 0, sizeof(ov)); + ov.Offset = (aiocb->aio_offset + offset); + ov.OffsetHigh = (aiocb->aio_offset + offset) >> 32; + len = aiocb->aio_iov[i].iov_len; + if (aiocb->aio_type & QEMU_AIO_WRITE) { + ret = WriteFile(aiocb->hfile, aiocb->aio_iov[i].iov_base, + len, &ret_count, &ov); + } else { + ret = ReadFile(aiocb->hfile, aiocb->aio_iov[i].iov_base, + len, &ret_count, &ov); + } + if (!ret) { + ret_count = 0; + } + if (ret_count != len) { + break; + } + offset += len; + } + + return offset; +} + +static int aio_worker(void *arg) +{ + RawWin32AIOData *aiocb = arg; + ssize_t ret = 0; + size_t count; + + switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) { + case QEMU_AIO_READ: + count = handle_aiocb_rw(aiocb); + if (count < aiocb->aio_nbytes && aiocb->bs->growable) { + /* A short read means that we have reached EOF. Pad the buffer + * with zeros for bytes after EOF. */ + iov_memset(aiocb->aio_iov, aiocb->aio_niov, count, + 0, aiocb->aio_nbytes - count); + + count = aiocb->aio_nbytes; + } + if (count == aiocb->aio_nbytes) { + ret = 0; + } else { + ret = -EINVAL; + } + break; + case QEMU_AIO_WRITE: + count = handle_aiocb_rw(aiocb); + if (count == aiocb->aio_nbytes) { + count = 0; + } else { + count = -EINVAL; + } + break; + case QEMU_AIO_FLUSH: + if (!FlushFileBuffers(aiocb->hfile)) { + return -EIO; + } + break; + default: + fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type); + ret = -EINVAL; + break; + } + + g_slice_free(RawWin32AIOData, aiocb); + return ret; +} + +static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile, + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, + BlockDriverCompletionFunc *cb, void *opaque, int type) +{ + RawWin32AIOData *acb = g_slice_new(RawWin32AIOData); + + acb->bs = bs; + acb->hfile = hfile; + acb->aio_type = type; + + if (qiov) { + acb->aio_iov = qiov->iov; + acb->aio_niov = qiov->niov; + } + acb->aio_nbytes = nb_sectors * 512; + acb->aio_offset = sector_num * 512; + + trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); + return thread_pool_submit_aio(aio_worker, acb, cb, opaque); +} + int qemu_ftruncate64(int fd, int64_t length) { LARGE_INTEGER li; @@ -89,6 +211,9 @@ static void raw_parse_flags(int flags, int *access_flags, DWORD *overlapped) } *overlapped = FILE_ATTRIBUTE_NORMAL; + if (flags & BDRV_O_NATIVE_AIO) { + *overlapped |= FILE_FLAG_OVERLAPPED; + } if (flags & BDRV_O_NOCACHE) { *overlapped |= FILE_FLAG_NO_BUFFERING; } @@ -103,6 +228,13 @@ static int raw_open(BlockDriverState *bs, const char *filename, int flags) s->type = FTYPE_FILE; raw_parse_flags(flags, &access_flags, &overlapped); + + if ((flags & BDRV_O_NATIVE_AIO) && aio == NULL) { + aio = win32_aio_init(); + if (aio == NULL) { + return -EINVAL; + } + } s->hfile = CreateFile(filename, access_flags, FILE_SHARE_READ, NULL, @@ -112,64 +244,53 @@ static int raw_open(BlockDriverState *bs, const char *filename, int flags) if (err == ERROR_ACCESS_DENIED) return -EACCES; - return -1; + return -EINVAL; + } + + if (flags & BDRV_O_NATIVE_AIO) { + int ret = win32_aio_attach(aio, s->hfile); + if (ret < 0) { + CloseHandle(s->hfile); + return ret; + } + s->aio = aio; } return 0; } -static int raw_read(BlockDriverState *bs, int64_t sector_num, - uint8_t *buf, int nb_sectors) +static BlockDriverAIOCB *raw_aio_readv(BlockDriverState *bs, + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, + BlockDriverCompletionFunc *cb, void *opaque) { BDRVRawState *s = bs->opaque; - OVERLAPPED ov; - DWORD ret_count; - int ret; - int64_t offset = sector_num * 512; - int count = nb_sectors * 512; - - memset(&ov, 0, sizeof(ov)); - ov.Offset = offset; - ov.OffsetHigh = offset >> 32; - ret = ReadFile(s->hfile, buf, count, &ret_count, &ov); - if (!ret) - return ret_count; - if (ret_count == count) - ret_count = 0; - return ret_count; + if (s->aio) { + return win32_aio_submit(bs, s->aio, s->hfile, sector_num, qiov, + nb_sectors, cb, opaque, QEMU_AIO_READ); + } else { + return paio_submit(bs, s->hfile, sector_num, qiov, nb_sectors, + cb, opaque, QEMU_AIO_READ); + } } -static int raw_write(BlockDriverState *bs, int64_t sector_num, - const uint8_t *buf, int nb_sectors) +static BlockDriverAIOCB *raw_aio_writev(BlockDriverState *bs, + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, + BlockDriverCompletionFunc *cb, void *opaque) { BDRVRawState *s = bs->opaque; - OVERLAPPED ov; - DWORD ret_count; - int ret; - int64_t offset = sector_num * 512; - int count = nb_sectors * 512; - - memset(&ov, 0, sizeof(ov)); - ov.Offset = offset; - ov.OffsetHigh = offset >> 32; - ret = WriteFile(s->hfile, buf, count, &ret_count, &ov); - if (!ret) - return ret_count; - if (ret_count == count) - ret_count = 0; - return ret_count; + if (s->aio) { + return win32_aio_submit(bs, s->aio, s->hfile, sector_num, qiov, + nb_sectors, cb, opaque, QEMU_AIO_WRITE); + } else { + return paio_submit(bs, s->hfile, sector_num, qiov, nb_sectors, + cb, opaque, QEMU_AIO_WRITE); + } } -static int raw_flush(BlockDriverState *bs) +static BlockDriverAIOCB *raw_aio_flush(BlockDriverState *bs, + BlockDriverCompletionFunc *cb, void *opaque) { BDRVRawState *s = bs->opaque; - int ret; - - ret = FlushFileBuffers(s->hfile); - if (ret == 0) { - return -EIO; - } - - return 0; + return paio_submit(bs, s->hfile, 0, NULL, 0, cb, opaque, QEMU_AIO_FLUSH); } static void raw_close(BlockDriverState *bs) @@ -290,9 +411,9 @@ static BlockDriver bdrv_file = { .bdrv_close = raw_close, .bdrv_create = raw_create, - .bdrv_read = raw_read, - .bdrv_write = raw_write, - .bdrv_co_flush_to_disk = raw_flush, + .bdrv_aio_readv = raw_aio_readv, + .bdrv_aio_writev = raw_aio_writev, + .bdrv_aio_flush = raw_aio_flush, .bdrv_truncate = raw_truncate, .bdrv_getlength = raw_getlength, @@ -413,9 +534,9 @@ static BlockDriver bdrv_host_device = { .bdrv_close = raw_close, .bdrv_has_zero_init = hdev_has_zero_init, - .bdrv_read = raw_read, - .bdrv_write = raw_write, - .bdrv_co_flush_to_disk = raw_flush, + .bdrv_aio_readv = raw_aio_readv, + .bdrv_aio_writev = raw_aio_writev, + .bdrv_aio_flush = raw_aio_flush, .bdrv_getlength = raw_getlength, .bdrv_get_allocated_file_size diff --git a/block/win32-aio.c b/block/win32-aio.c new file mode 100644 index 0000000..c34dc73 --- /dev/null +++ b/block/win32-aio.c @@ -0,0 +1,226 @@ +/* + * Block driver for RAW files (win32) + * + * Copyright (c) 2006 Fabrice Bellard + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +#include "qemu-common.h" +#include "qemu-timer.h" +#include "block_int.h" +#include "module.h" +#include "qemu-common.h" +#include "qemu-aio.h" +#include "raw-aio.h" +#include "event_notifier.h" +#include <windows.h> +#include <winioctl.h> + +#define FTYPE_FILE 0 +#define FTYPE_CD 1 +#define FTYPE_HARDDISK 2 + +struct QEMUWin32AIOState { + HANDLE hIOCP; + EventNotifier e; + int count; +}; + +typedef struct QEMUWin32AIOCB { + BlockDriverAIOCB common; + struct QEMUWin32AIOState *ctx; + int nbytes; + OVERLAPPED ov; + QEMUIOVector *qiov; + void *buf; + bool is_read; + bool is_linear; +} QEMUWin32AIOCB; + +/* + * Completes an AIO request (calls the callback and frees the ACB). + */ +static void win32_aio_process_completion(QEMUWin32AIOState *s, + QEMUWin32AIOCB *waiocb, DWORD count) +{ + int ret; + s->count--; + + if (waiocb->ov.Internal != 0) { + ret = -EIO; + } else { + ret = 0; + if (count < waiocb->nbytes) { + /* Short reads mean EOF, pad with zeros. */ + if (waiocb->is_read) { + qemu_iovec_memset(waiocb->qiov, count, 0, + waiocb->qiov->size - count); + } else { + ret = -EINVAL; + } + } + } + + if (!waiocb->is_linear) { + if (ret == 0 && waiocb->is_read) { + QEMUIOVector *qiov = waiocb->qiov; + char *p = waiocb->buf; + int i; + + for (i = 0; i < qiov->niov; ++i) { + memcpy(p, qiov->iov[i].iov_base, qiov->iov[i].iov_len); + p += qiov->iov[i].iov_len; + } + g_free(waiocb->buf); + } + } + + + waiocb->common.cb(waiocb->common.opaque, ret); + qemu_aio_release(waiocb); +} + +static void win32_aio_completion_cb(EventNotifier *e) +{ + QEMUWin32AIOState *s = container_of(e, QEMUWin32AIOState, e); + DWORD count; + ULONG_PTR key; + OVERLAPPED *ov; + + event_notifier_test_and_clear(&s->e); + while (GetQueuedCompletionStatus(s->hIOCP, &count, &key, &ov, 0)) { + QEMUWin32AIOCB *waiocb = container_of(ov, QEMUWin32AIOCB, ov); + + win32_aio_process_completion(s, waiocb, count); + } +} + +static int win32_aio_flush_cb(EventNotifier *e) +{ + QEMUWin32AIOState *s = container_of(e, QEMUWin32AIOState, e); + + return (s->count > 0) ? 1 : 0; +} + +static void win32_aio_cancel(BlockDriverAIOCB *blockacb) +{ + QEMUWin32AIOCB *waiocb = (QEMUWin32AIOCB *)blockacb; + + /* + * CancelIoEx is only supported in Vista and newer. For now, just + * wait for completion. + */ + while (!HasOverlappedIoCompleted(&waiocb->ov)) { + qemu_aio_wait(); + } +} + +static AIOPool win32_aio_pool = { + .aiocb_size = sizeof(QEMUWin32AIOCB), + .cancel = win32_aio_cancel, +}; + +BlockDriverAIOCB *win32_aio_submit(BlockDriverState *bs, + QEMUWin32AIOState *aio, HANDLE hfile, + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, + BlockDriverCompletionFunc *cb, void *opaque, int type) +{ + struct QEMUWin32AIOCB *waiocb; + uint64_t offset = sector_num * 512; + DWORD rc; + + waiocb = qemu_aio_get(&win32_aio_pool, bs, cb, opaque); + waiocb->nbytes = nb_sectors * 512; + waiocb->qiov = qiov; + waiocb->is_read = (type == QEMU_AIO_READ); + + if (qiov->niov > 1) { + waiocb->buf = qemu_blockalign(bs, qiov->size); + if (type & QEMU_AIO_WRITE) { + char *p = waiocb->buf; + int i; + + for (i = 0; i < qiov->niov; ++i) { + memcpy(p, qiov->iov[i].iov_base, qiov->iov[i].iov_len); + p += qiov->iov[i].iov_len; + } + } + waiocb->is_linear = false; + } else { + waiocb->buf = qiov->iov[0].iov_base; + waiocb->is_linear = true; + } + + waiocb->ov = (OVERLAPPED) { + .Offset = (DWORD) offset, + .OffsetHigh = (DWORD) (offset >> 32), + .hEvent = event_notifier_get_handle(&aio->e) + }; + aio->count++; + + if (type & QEMU_AIO_READ) { + rc = ReadFile(hfile, waiocb->buf, waiocb->nbytes, NULL, &waiocb->ov); + } else { + rc = WriteFile(hfile, waiocb->buf, waiocb->nbytes, NULL, &waiocb->ov); + } + if(rc == 0 && GetLastError() != ERROR_IO_PENDING) { + goto out_dec_count; + } + return &waiocb->common; + +out_dec_count: + aio->count--; + qemu_aio_release(waiocb); + return NULL; +} + +int win32_aio_attach(QEMUWin32AIOState *aio, HANDLE hfile) +{ + if (CreateIoCompletionPort(hfile, aio->hIOCP, (ULONG_PTR) 0, 0) == NULL) { + return -EINVAL; + } else { + return 0; + } +} + +QEMUWin32AIOState *win32_aio_init(void) +{ + QEMUWin32AIOState *s; + + s = g_malloc0(sizeof(*s)); + if (event_notifier_init(&s->e, false) < 0) { + goto out_free_state; + } + + s->hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); + if (s->hIOCP == NULL) { + goto out_close_efd; + } + + qemu_aio_set_event_notifier(&s->e, win32_aio_completion_cb, + win32_aio_flush_cb); + + return s; + +out_close_efd: + event_notifier_cleanup(&s->e); +out_free_state: + g_free(s); + return NULL; +} @@ -50,16 +50,13 @@ # define __printf__ __gnu_printf__ # endif # endif -#if defined(_WIN32) -#define GCC_WEAK __attribute__((weak)) -#define GCC_WEAK_DECL GCC_WEAK -#else -#define GCC_WEAK __attribute__((weak)) -#define GCC_WEAK_DECL -#endif +# define QEMU_WEAK_ALIAS(newname, oldname) \ + typeof(oldname) newname __attribute__((weak, alias (#oldname))) #else #define GCC_ATTR /**/ #define GCC_FMT_ATTR(n, m) +#define QEMU_WEAK_ALIAS(newname, oldname) \ + _Pragma("weak " #newname "=" #oldname) #endif #endif /* COMPILER_H */ @@ -142,109 +142,6 @@ int qemu_fdatasync(int fd) #endif } -/* io vectors */ - -void qemu_iovec_init(QEMUIOVector *qiov, int alloc_hint) -{ - qiov->iov = g_malloc(alloc_hint * sizeof(struct iovec)); - qiov->niov = 0; - qiov->nalloc = alloc_hint; - qiov->size = 0; -} - -void qemu_iovec_init_external(QEMUIOVector *qiov, struct iovec *iov, int niov) -{ - int i; - - qiov->iov = iov; - qiov->niov = niov; - qiov->nalloc = -1; - qiov->size = 0; - for (i = 0; i < niov; i++) - qiov->size += iov[i].iov_len; -} - -void qemu_iovec_add(QEMUIOVector *qiov, void *base, size_t len) -{ - assert(qiov->nalloc != -1); - - if (qiov->niov == qiov->nalloc) { - qiov->nalloc = 2 * qiov->nalloc + 1; - qiov->iov = g_realloc(qiov->iov, qiov->nalloc * sizeof(struct iovec)); - } - qiov->iov[qiov->niov].iov_base = base; - qiov->iov[qiov->niov].iov_len = len; - qiov->size += len; - ++qiov->niov; -} - -/* - * Concatenates (partial) iovecs from src to the end of dst. - * It starts copying after skipping `soffset' bytes at the - * beginning of src and adds individual vectors from src to - * dst copies up to `sbytes' bytes total, or up to the end - * of src if it comes first. This way, it is okay to specify - * very large value for `sbytes' to indicate "up to the end - * of src". - * Only vector pointers are processed, not the actual data buffers. - */ -void qemu_iovec_concat(QEMUIOVector *dst, - QEMUIOVector *src, size_t soffset, size_t sbytes) -{ - int i; - size_t done; - struct iovec *siov = src->iov; - assert(dst->nalloc != -1); - assert(src->size >= soffset); - for (i = 0, done = 0; done < sbytes && i < src->niov; i++) { - if (soffset < siov[i].iov_len) { - size_t len = MIN(siov[i].iov_len - soffset, sbytes - done); - qemu_iovec_add(dst, siov[i].iov_base + soffset, len); - done += len; - soffset = 0; - } else { - soffset -= siov[i].iov_len; - } - } - /* return done; */ -} - -void qemu_iovec_destroy(QEMUIOVector *qiov) -{ - assert(qiov->nalloc != -1); - - qemu_iovec_reset(qiov); - g_free(qiov->iov); - qiov->nalloc = 0; - qiov->iov = NULL; -} - -void qemu_iovec_reset(QEMUIOVector *qiov) -{ - assert(qiov->nalloc != -1); - - qiov->niov = 0; - qiov->size = 0; -} - -size_t qemu_iovec_to_buf(QEMUIOVector *qiov, size_t offset, - void *buf, size_t bytes) -{ - return iov_to_buf(qiov->iov, qiov->niov, offset, buf, bytes); -} - -size_t qemu_iovec_from_buf(QEMUIOVector *qiov, size_t offset, - const void *buf, size_t bytes) -{ - return iov_from_buf(qiov->iov, qiov->niov, offset, buf, bytes); -} - -size_t qemu_iovec_memset(QEMUIOVector *qiov, size_t offset, - int fillc, size_t bytes) -{ - return iov_memset(qiov->iov, qiov->niov, offset, fillc, bytes); -} - /* * Checks if a buffer is all zeroes * @@ -383,11 +280,6 @@ int qemu_parse_fd(const char *param) return fd; } -int qemu_parse_fdset(const char *param) -{ - return qemu_parse_fd(param); -} - /* round down to the nearest power of 2*/ int64_t pow2floor(int64_t value) { diff --git a/event_notifier-posix.c b/event_notifier-posix.c new file mode 100644 index 0000000..6f3239a --- /dev/null +++ b/event_notifier-posix.c @@ -0,0 +1,120 @@ +/* + * event notifier support + * + * Copyright Red Hat, Inc. 2010 + * + * Authors: + * Michael S. Tsirkin <mst@redhat.com> + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + */ + +#include "qemu-common.h" +#include "event_notifier.h" +#include "qemu-char.h" + +#ifdef CONFIG_EVENTFD +#include <sys/eventfd.h> +#endif + +void event_notifier_init_fd(EventNotifier *e, int fd) +{ + e->rfd = fd; + e->wfd = fd; +} + +int event_notifier_init(EventNotifier *e, int active) +{ + int fds[2]; + int ret; + +#ifdef CONFIG_EVENTFD + ret = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); +#else + ret = -1; + errno = ENOSYS; +#endif + if (ret >= 0) { + e->rfd = e->wfd = ret; + } else { + if (errno != ENOSYS) { + return -errno; + } + if (qemu_pipe(fds) < 0) { + return -errno; + } + ret = fcntl_setfl(fds[0], O_NONBLOCK); + if (ret < 0) { + ret = -errno; + goto fail; + } + ret = fcntl_setfl(fds[1], O_NONBLOCK); + if (ret < 0) { + ret = -errno; + goto fail; + } + e->rfd = fds[0]; + e->wfd = fds[1]; + } + if (active) { + event_notifier_set(e); + } + return 0; + +fail: + close(fds[0]); + close(fds[1]); + return ret; +} + +void event_notifier_cleanup(EventNotifier *e) +{ + if (e->rfd != e->wfd) { + close(e->rfd); + } + close(e->wfd); +} + +int event_notifier_get_fd(EventNotifier *e) +{ + return e->rfd; +} + +int event_notifier_set_handler(EventNotifier *e, + EventNotifierHandler *handler) +{ + return qemu_set_fd_handler(e->rfd, (IOHandler *)handler, NULL, e); +} + +int event_notifier_set(EventNotifier *e) +{ + static const uint64_t value = 1; + ssize_t ret; + + do { + ret = write(e->wfd, &value, sizeof(value)); + } while (ret < 0 && errno == EINTR); + + /* EAGAIN is fine, a read must be pending. */ + if (ret < 0 && errno != EAGAIN) { + return -errno; + } + return 0; +} + +int event_notifier_test_and_clear(EventNotifier *e) +{ + int value; + ssize_t len; + char buffer[512]; + + /* Drain the notify pipe. For eventfd, only 8 bytes will be read. */ + value = 0; + do { + len = read(e->rfd, buffer, sizeof(buffer)); + value |= (len > 0); + } while ((len == -1 && errno == EINTR) || len == sizeof(buffer)); + + return value; +} diff --git a/event_notifier-win32.c b/event_notifier-win32.c new file mode 100644 index 0000000..c723dad --- /dev/null +++ b/event_notifier-win32.c @@ -0,0 +1,59 @@ +/* + * event notifier support + * + * Copyright Red Hat, Inc. 2010 + * + * Authors: + * Michael S. Tsirkin <mst@redhat.com> + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + */ + +#include "qemu-common.h" +#include "event_notifier.h" +#include "main-loop.h" + +int event_notifier_init(EventNotifier *e, int active) +{ + e->event = CreateEvent(NULL, FALSE, FALSE, NULL); + assert(e->event); + return 0; +} + +void event_notifier_cleanup(EventNotifier *e) +{ + CloseHandle(e->event); +} + +HANDLE event_notifier_get_handle(EventNotifier *e) +{ + return e->event; +} + +int event_notifier_set_handler(EventNotifier *e, + EventNotifierHandler *handler) +{ + if (handler) { + return qemu_add_wait_object(e->event, (IOHandler *)handler, e); + } else { + qemu_del_wait_object(e->event, (IOHandler *)handler, e); + return 0; + } +} + +int event_notifier_set(EventNotifier *e) +{ + SetEvent(e->event); + return 0; +} + +int event_notifier_test_and_clear(EventNotifier *e) +{ + int ret = WaitForSingleObject(e->event, 0); + if (ret == WAIT_OBJECT_0) { + ResetEvent(e->event); + return true; + } + return false; +} diff --git a/event_notifier.c b/event_notifier.c deleted file mode 100644 index 2c207e1..0000000 --- a/event_notifier.c +++ /dev/null @@ -1,67 +0,0 @@ -/* - * event notifier support - * - * Copyright Red Hat, Inc. 2010 - * - * Authors: - * Michael S. Tsirkin <mst@redhat.com> - * - * This work is licensed under the terms of the GNU GPL, version 2 or later. - * See the COPYING file in the top-level directory. - */ - -#include "qemu-common.h" -#include "event_notifier.h" -#include "qemu-char.h" - -#ifdef CONFIG_EVENTFD -#include <sys/eventfd.h> -#endif - -void event_notifier_init_fd(EventNotifier *e, int fd) -{ - e->fd = fd; -} - -int event_notifier_init(EventNotifier *e, int active) -{ -#ifdef CONFIG_EVENTFD - int fd = eventfd(!!active, EFD_NONBLOCK | EFD_CLOEXEC); - if (fd < 0) - return -errno; - e->fd = fd; - return 0; -#else - return -ENOSYS; -#endif -} - -void event_notifier_cleanup(EventNotifier *e) -{ - close(e->fd); -} - -int event_notifier_get_fd(EventNotifier *e) -{ - return e->fd; -} - -int event_notifier_set_handler(EventNotifier *e, - EventNotifierHandler *handler) -{ - return qemu_set_fd_handler(e->fd, (IOHandler *)handler, NULL, e); -} - -int event_notifier_set(EventNotifier *e) -{ - uint64_t value = 1; - int r = write(e->fd, &value, sizeof(value)); - return r == sizeof(value); -} - -int event_notifier_test_and_clear(EventNotifier *e) -{ - uint64_t value; - int r = read(e->fd, &value, sizeof(value)); - return r == sizeof(value); -} diff --git a/event_notifier.h b/event_notifier.h index f0ec2f2..88b57af 100644 --- a/event_notifier.h +++ b/event_notifier.h @@ -15,18 +15,32 @@ #include "qemu-common.h" +#ifdef _WIN32 +#include <windows.h> +#endif + struct EventNotifier { - int fd; +#ifdef _WIN32 + HANDLE event; +#else + int rfd; + int wfd; +#endif }; typedef void EventNotifierHandler(EventNotifier *); -void event_notifier_init_fd(EventNotifier *, int fd); int event_notifier_init(EventNotifier *, int active); void event_notifier_cleanup(EventNotifier *); -int event_notifier_get_fd(EventNotifier *); int event_notifier_set(EventNotifier *); int event_notifier_test_and_clear(EventNotifier *); int event_notifier_set_handler(EventNotifier *, EventNotifierHandler *); +#ifdef CONFIG_POSIX +void event_notifier_init_fd(EventNotifier *, int fd); +int event_notifier_get_fd(EventNotifier *); +#else +HANDLE event_notifier_get_handle(EventNotifier *); +#endif + #endif @@ -10,6 +10,7 @@ #include "ioport.h" #include "irq.h" +#include "qemu-aio.h" #include "qemu-file.h" #include "vmstate.h" #include "qemu-log.h" diff --git a/iohandler.c b/iohandler.c index a2d871b..60460a6 100644 --- a/iohandler.c +++ b/iohandler.c @@ -26,6 +26,7 @@ #include "qemu-common.h" #include "qemu-char.h" #include "qemu-queue.h" +#include "qemu-aio.h" #include "main-loop.h" #ifndef _WIN32 @@ -251,3 +251,106 @@ unsigned iov_copy(struct iovec *dst_iov, unsigned int dst_iov_cnt, assert(offset == 0); return j; } + +/* io vectors */ + +void qemu_iovec_init(QEMUIOVector *qiov, int alloc_hint) +{ + qiov->iov = g_malloc(alloc_hint * sizeof(struct iovec)); + qiov->niov = 0; + qiov->nalloc = alloc_hint; + qiov->size = 0; +} + +void qemu_iovec_init_external(QEMUIOVector *qiov, struct iovec *iov, int niov) +{ + int i; + + qiov->iov = iov; + qiov->niov = niov; + qiov->nalloc = -1; + qiov->size = 0; + for (i = 0; i < niov; i++) + qiov->size += iov[i].iov_len; +} + +void qemu_iovec_add(QEMUIOVector *qiov, void *base, size_t len) +{ + assert(qiov->nalloc != -1); + + if (qiov->niov == qiov->nalloc) { + qiov->nalloc = 2 * qiov->nalloc + 1; + qiov->iov = g_realloc(qiov->iov, qiov->nalloc * sizeof(struct iovec)); + } + qiov->iov[qiov->niov].iov_base = base; + qiov->iov[qiov->niov].iov_len = len; + qiov->size += len; + ++qiov->niov; +} + +/* + * Concatenates (partial) iovecs from src to the end of dst. + * It starts copying after skipping `soffset' bytes at the + * beginning of src and adds individual vectors from src to + * dst copies up to `sbytes' bytes total, or up to the end + * of src if it comes first. This way, it is okay to specify + * very large value for `sbytes' to indicate "up to the end + * of src". + * Only vector pointers are processed, not the actual data buffers. + */ +void qemu_iovec_concat(QEMUIOVector *dst, + QEMUIOVector *src, size_t soffset, size_t sbytes) +{ + int i; + size_t done; + struct iovec *siov = src->iov; + assert(dst->nalloc != -1); + assert(src->size >= soffset); + for (i = 0, done = 0; done < sbytes && i < src->niov; i++) { + if (soffset < siov[i].iov_len) { + size_t len = MIN(siov[i].iov_len - soffset, sbytes - done); + qemu_iovec_add(dst, siov[i].iov_base + soffset, len); + done += len; + soffset = 0; + } else { + soffset -= siov[i].iov_len; + } + } + /* return done; */ +} + +void qemu_iovec_destroy(QEMUIOVector *qiov) +{ + assert(qiov->nalloc != -1); + + qemu_iovec_reset(qiov); + g_free(qiov->iov); + qiov->nalloc = 0; + qiov->iov = NULL; +} + +void qemu_iovec_reset(QEMUIOVector *qiov) +{ + assert(qiov->nalloc != -1); + + qiov->niov = 0; + qiov->size = 0; +} + +size_t qemu_iovec_to_buf(QEMUIOVector *qiov, size_t offset, + void *buf, size_t bytes) +{ + return iov_to_buf(qiov->iov, qiov->niov, offset, buf, bytes); +} + +size_t qemu_iovec_from_buf(QEMUIOVector *qiov, size_t offset, + const void *buf, size_t bytes) +{ + return iov_from_buf(qiov->iov, qiov->niov, offset, buf, bytes); +} + +size_t qemu_iovec_memset(QEMUIOVector *qiov, size_t offset, + int fillc, size_t bytes) +{ + return iov_memset(qiov->iov, qiov->niov, offset, fillc, bytes); +} diff --git a/main-loop.c b/main-loop.c index eb3b6e6..e43c7c8 100644 --- a/main-loop.c +++ b/main-loop.c @@ -26,75 +26,12 @@ #include "qemu-timer.h" #include "slirp/slirp.h" #include "main-loop.h" +#include "qemu-aio.h" #ifndef _WIN32 #include "compatfd.h" -static int io_thread_fd = -1; - -void qemu_notify_event(void) -{ - /* Write 8 bytes to be compatible with eventfd. */ - static const uint64_t val = 1; - ssize_t ret; - - if (io_thread_fd == -1) { - return; - } - do { - ret = write(io_thread_fd, &val, sizeof(val)); - } while (ret < 0 && errno == EINTR); - - /* EAGAIN is fine, a read must be pending. */ - if (ret < 0 && errno != EAGAIN) { - fprintf(stderr, "qemu_notify_event: write() failed: %s\n", - strerror(errno)); - exit(1); - } -} - -static void qemu_event_read(void *opaque) -{ - int fd = (intptr_t)opaque; - ssize_t len; - char buffer[512]; - - /* Drain the notify pipe. For eventfd, only 8 bytes will be read. */ - do { - len = read(fd, buffer, sizeof(buffer)); - } while ((len == -1 && errno == EINTR) || len == sizeof(buffer)); -} - -static int qemu_event_init(void) -{ - int err; - int fds[2]; - - err = qemu_eventfd(fds); - if (err == -1) { - return -errno; - } - err = fcntl_setfl(fds[0], O_NONBLOCK); - if (err < 0) { - goto fail; - } - err = fcntl_setfl(fds[1], O_NONBLOCK); - if (err < 0) { - goto fail; - } - qemu_set_fd_handler2(fds[0], NULL, qemu_event_read, NULL, - (void *)(intptr_t)fds[0]); - - io_thread_fd = fds[1]; - return 0; - -fail: - close(fds[0]); - close(fds[1]); - return err; -} - /* If we have signalfd, we mask out the signals we want to handle and then * use signalfd to listen for them. We rely on whatever the current signal * handler is to dispatch the signals when we receive them. @@ -164,44 +101,29 @@ static int qemu_signal_init(void) #else /* _WIN32 */ -static HANDLE qemu_event_handle = NULL; - -static void dummy_event_handler(void *opaque) -{ -} - -static int qemu_event_init(void) +static int qemu_signal_init(void) { - qemu_event_handle = CreateEvent(NULL, FALSE, FALSE, NULL); - if (!qemu_event_handle) { - fprintf(stderr, "Failed CreateEvent: %ld\n", GetLastError()); - return -1; - } - qemu_add_wait_object(qemu_event_handle, dummy_event_handler, NULL); return 0; } +#endif + +static AioContext *qemu_aio_context; void qemu_notify_event(void) { - if (!qemu_event_handle) { + if (!qemu_aio_context) { return; } - if (!SetEvent(qemu_event_handle)) { - fprintf(stderr, "qemu_notify_event: SetEvent failed: %ld\n", - GetLastError()); - exit(1); - } -} - -static int qemu_signal_init(void) -{ - return 0; + aio_notify(qemu_aio_context); } -#endif -int main_loop_init(void) +int qemu_init_main_loop(void) { int ret; + GSource *src; + + init_clocks(); + init_timer_alarm(); qemu_mutex_lock_iothread(); ret = qemu_signal_init(); @@ -209,12 +131,10 @@ int main_loop_init(void) return ret; } - /* Note eventfd must be drained before signalfd handlers run */ - ret = qemu_event_init(); - if (ret) { - return ret; - } - + qemu_aio_context = aio_context_new(); + src = aio_get_g_source(qemu_aio_context); + g_source_attach(src, NULL); + g_source_unref(src); return 0; } @@ -400,7 +320,8 @@ void qemu_del_wait_object(HANDLE handle, WaitObjectFunc *func, void *opaque) void qemu_fd_register(int fd) { - WSAEventSelect(fd, qemu_event_handle, FD_READ | FD_ACCEPT | FD_CLOSE | + WSAEventSelect(fd, event_notifier_get_handle(&qemu_aio_context->notifier), + FD_READ | FD_ACCEPT | FD_CLOSE | FD_CONNECT | FD_WRITE | FD_OOB); } @@ -477,8 +398,6 @@ int main_loop_wait(int nonblocking) if (nonblocking) { timeout = 0; - } else { - qemu_bh_update_timeout(&timeout); } /* poll any events */ @@ -501,9 +420,41 @@ int main_loop_wait(int nonblocking) qemu_run_all_timers(); - /* Check bottom-halves last in case any of the earlier events triggered - them. */ - qemu_bh_poll(); - return ret; } + +/* Functions to operate on the main QEMU AioContext. */ + +QEMUBH *qemu_bh_new(QEMUBHFunc *cb, void *opaque) +{ + return aio_bh_new(qemu_aio_context, cb, opaque); +} + +void qemu_aio_flush(void) +{ + aio_flush(qemu_aio_context); +} + +bool qemu_aio_wait(void) +{ + return aio_poll(qemu_aio_context, true); +} + +#ifdef CONFIG_POSIX +void qemu_aio_set_fd_handler(int fd, + IOHandler *io_read, + IOHandler *io_write, + AioFlushHandler *io_flush, + void *opaque) +{ + aio_set_fd_handler(qemu_aio_context, fd, io_read, io_write, io_flush, + opaque); +} +#endif + +void qemu_aio_set_event_notifier(EventNotifier *notifier, + EventNotifierHandler *io_read, + AioFlushEventNotifierHandler *io_flush) +{ + aio_set_event_notifier(qemu_aio_context, notifier, io_read, io_flush); +} diff --git a/main-loop.h b/main-loop.h index dce1cd9..326c742 100644 --- a/main-loop.h +++ b/main-loop.h @@ -25,6 +25,8 @@ #ifndef QEMU_MAIN_LOOP_H #define QEMU_MAIN_LOOP_H 1 +#include "qemu-aio.h" + #define SIG_IPI SIGUSR1 /** @@ -43,16 +45,6 @@ int qemu_init_main_loop(void); /** - * main_loop_init: Initializes main loop - * - * Internal (but shared for compatibility reasons) initialization routine - * for the main loop. This should not be used by applications directly, - * use qemu_init_main_loop() instead. - * - */ -int main_loop_init(void); - -/** * main_loop_wait: Run one iteration of the main loop. * * If @nonblocking is true, poll for events, otherwise suspend until @@ -173,7 +165,6 @@ void qemu_del_wait_object(HANDLE handle, WaitObjectFunc *func, void *opaque); typedef void IOReadHandler(void *opaque, const uint8_t *buf, int size); typedef int IOCanReadHandler(void *opaque); -typedef void IOHandler(void *opaque); /** * qemu_set_fd_handler2: Register a file descriptor with the main loop @@ -254,56 +245,6 @@ int qemu_set_fd_handler(int fd, IOHandler *fd_write, void *opaque); -typedef struct QEMUBH QEMUBH; -typedef void QEMUBHFunc(void *opaque); - -/** - * qemu_bh_new: Allocate a new bottom half structure. - * - * Bottom halves are lightweight callbacks whose invocation is guaranteed - * to be wait-free, thread-safe and signal-safe. The #QEMUBH structure - * is opaque and must be allocated prior to its use. - */ -QEMUBH *qemu_bh_new(QEMUBHFunc *cb, void *opaque); - -/** - * qemu_bh_schedule: Schedule a bottom half. - * - * Scheduling a bottom half interrupts the main loop and causes the - * execution of the callback that was passed to qemu_bh_new. - * - * Bottom halves that are scheduled from a bottom half handler are instantly - * invoked. This can create an infinite loop if a bottom half handler - * schedules itself. - * - * @bh: The bottom half to be scheduled. - */ -void qemu_bh_schedule(QEMUBH *bh); - -/** - * qemu_bh_cancel: Cancel execution of a bottom half. - * - * Canceling execution of a bottom half undoes the effect of calls to - * qemu_bh_schedule without freeing its resources yet. While cancellation - * itself is also wait-free and thread-safe, it can of course race with the - * loop that executes bottom halves unless you are holding the iothread - * mutex. This makes it mostly useless if you are not holding the mutex. - * - * @bh: The bottom half to be canceled. - */ -void qemu_bh_cancel(QEMUBH *bh); - -/** - *qemu_bh_delete: Cancel execution of a bottom half and free its resources. - * - * Deleting a bottom half frees the memory that was allocated for it by - * qemu_bh_new. It also implies canceling the bottom half if it was - * scheduled. - * - * @bh: The bottom half to be deleted. - */ -void qemu_bh_delete(QEMUBH *bh); - #ifdef CONFIG_POSIX /** * qemu_add_child_watch: Register a child process for reaping. @@ -359,8 +300,7 @@ void qemu_fd_register(int fd); void qemu_iohandler_fill(int *pnfds, fd_set *readfds, fd_set *writefds, fd_set *xfds); void qemu_iohandler_poll(fd_set *readfds, fd_set *writefds, fd_set *xfds, int rc); +QEMUBH *qemu_bh_new(QEMUBHFunc *cb, void *opaque); void qemu_bh_schedule_idle(QEMUBH *bh); -int qemu_bh_poll(void); -void qemu_bh_update_timeout(uint32_t *timeout); #endif @@ -134,6 +134,11 @@ fail: errno = serrno; return -1; } + +static int qemu_parse_fdset(const char *param) +{ + return qemu_parse_fd(param); +} #endif /* @@ -394,3 +399,28 @@ bool fips_get_state(void) { return fips_enabled; } + + +static int default_fdset_get_fd(int64_t fdset_id, int flags) +{ + return -1; +} +QEMU_WEAK_ALIAS(monitor_fdset_get_fd, default_fdset_get_fd); + +static int default_fdset_dup_fd_add(int64_t fdset_id, int dup_fd) +{ + return -1; +} +QEMU_WEAK_ALIAS(monitor_fdset_dup_fd_add, default_fdset_dup_fd_add); + +static int default_fdset_dup_fd_remove(int dup_fd) +{ + return -1; +} +QEMU_WEAK_ALIAS(monitor_fdset_dup_fd_remove, default_fdset_dup_fd_remove); + +static int default_fdset_dup_fd_find(int dup_fd) +{ + return -1; +} +QEMU_WEAK_ALIAS(monitor_fdset_dup_fd_find, default_fdset_dup_fd_find); diff --git a/oslib-posix.c b/oslib-posix.c index dbeb627..9db9c3d 100644 --- a/oslib-posix.c +++ b/oslib-posix.c @@ -61,9 +61,6 @@ static int running_on_valgrind = -1; #ifdef CONFIG_LINUX #include <sys/syscall.h> #endif -#ifdef CONFIG_EVENTFD -#include <sys/eventfd.h> -#endif int qemu_get_thread_id(void) { @@ -183,34 +180,6 @@ int qemu_pipe(int pipefd[2]) return ret; } -/* - * Creates an eventfd that looks like a pipe and has EFD_CLOEXEC set. - */ -int qemu_eventfd(int fds[2]) -{ -#ifdef CONFIG_EVENTFD - int ret; - - ret = eventfd(0, 0); - if (ret >= 0) { - fds[0] = ret; - fds[1] = dup(ret); - if (fds[1] == -1) { - close(ret); - return -1; - } - qemu_set_cloexec(ret); - qemu_set_cloexec(fds[1]); - return 0; - } - if (errno != ENOSYS) { - return -1; - } -#endif - - return qemu_pipe(fds); -} - int qemu_utimens(const char *path, const struct timespec *times) { struct timeval tv[2], tv_now; diff --git a/oslib-win32.c b/oslib-win32.c index 51b33e8..9ca83df 100644 --- a/oslib-win32.c +++ b/oslib-win32.c @@ -150,3 +150,8 @@ int qemu_get_thread_id(void) { return GetCurrentThreadId(); } + +static void default_qemu_fd_register(int fd) +{ +} +QEMU_WEAK_ALIAS(qemu_fd_register, default_qemu_fd_register); diff --git a/posix-aio-compat.c b/posix-aio-compat.c deleted file mode 100644 index 96e4daf..0000000 --- a/posix-aio-compat.c +++ /dev/null @@ -1,679 +0,0 @@ -/* - * QEMU posix-aio emulation - * - * Copyright IBM, Corp. 2008 - * - * Authors: - * Anthony Liguori <aliguori@us.ibm.com> - * - * This work is licensed under the terms of the GNU GPL, version 2. See - * the COPYING file in the top-level directory. - * - * Contributions after 2012-01-13 are licensed under the terms of the - * GNU GPL, version 2 or (at your option) any later version. - */ - -#include <sys/ioctl.h> -#include <sys/types.h> -#include <pthread.h> -#include <unistd.h> -#include <errno.h> -#include <time.h> -#include <string.h> -#include <stdlib.h> -#include <stdio.h> - -#include "qemu-queue.h" -#include "osdep.h" -#include "sysemu.h" -#include "qemu-common.h" -#include "trace.h" -#include "block_int.h" -#include "iov.h" - -#include "block/raw-posix-aio.h" - -static void do_spawn_thread(void); - -struct qemu_paiocb { - BlockDriverAIOCB common; - int aio_fildes; - union { - struct iovec *aio_iov; - void *aio_ioctl_buf; - }; - int aio_niov; - size_t aio_nbytes; -#define aio_ioctl_cmd aio_nbytes /* for QEMU_AIO_IOCTL */ - off_t aio_offset; - - QTAILQ_ENTRY(qemu_paiocb) node; - int aio_type; - ssize_t ret; - int active; - struct qemu_paiocb *next; -}; - -typedef struct PosixAioState { - int rfd, wfd; - struct qemu_paiocb *first_aio; -} PosixAioState; - - -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; -static pthread_t thread_id; -static pthread_attr_t attr; -static int max_threads = 64; -static int cur_threads = 0; -static int idle_threads = 0; -static int new_threads = 0; /* backlog of threads we need to create */ -static int pending_threads = 0; /* threads created but not running yet */ -static QEMUBH *new_thread_bh; -static QTAILQ_HEAD(, qemu_paiocb) request_list; - -#ifdef CONFIG_PREADV -static int preadv_present = 1; -#else -static int preadv_present = 0; -#endif - -static void die2(int err, const char *what) -{ - fprintf(stderr, "%s failed: %s\n", what, strerror(err)); - abort(); -} - -static void die(const char *what) -{ - die2(errno, what); -} - -static void mutex_lock(pthread_mutex_t *mutex) -{ - int ret = pthread_mutex_lock(mutex); - if (ret) die2(ret, "pthread_mutex_lock"); -} - -static void mutex_unlock(pthread_mutex_t *mutex) -{ - int ret = pthread_mutex_unlock(mutex); - if (ret) die2(ret, "pthread_mutex_unlock"); -} - -static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, - struct timespec *ts) -{ - int ret = pthread_cond_timedwait(cond, mutex, ts); - if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait"); - return ret; -} - -static void cond_signal(pthread_cond_t *cond) -{ - int ret = pthread_cond_signal(cond); - if (ret) die2(ret, "pthread_cond_signal"); -} - -static void thread_create(pthread_t *thread, pthread_attr_t *attr, - void *(*start_routine)(void*), void *arg) -{ - int ret = pthread_create(thread, attr, start_routine, arg); - if (ret) die2(ret, "pthread_create"); -} - -static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb) -{ - int ret; - - ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf); - if (ret == -1) - return -errno; - - /* - * This looks weird, but the aio code only considers a request - * successful if it has written the full number of bytes. - * - * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command, - * so in fact we return the ioctl command here to make posix_aio_read() - * happy.. - */ - return aiocb->aio_nbytes; -} - -static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb) -{ - int ret; - - ret = qemu_fdatasync(aiocb->aio_fildes); - if (ret == -1) - return -errno; - return 0; -} - -#ifdef CONFIG_PREADV - -static ssize_t -qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset) -{ - return preadv(fd, iov, nr_iov, offset); -} - -static ssize_t -qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset) -{ - return pwritev(fd, iov, nr_iov, offset); -} - -#else - -static ssize_t -qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset) -{ - return -ENOSYS; -} - -static ssize_t -qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset) -{ - return -ENOSYS; -} - -#endif - -static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb) -{ - ssize_t len; - - do { - if (aiocb->aio_type & QEMU_AIO_WRITE) - len = qemu_pwritev(aiocb->aio_fildes, - aiocb->aio_iov, - aiocb->aio_niov, - aiocb->aio_offset); - else - len = qemu_preadv(aiocb->aio_fildes, - aiocb->aio_iov, - aiocb->aio_niov, - aiocb->aio_offset); - } while (len == -1 && errno == EINTR); - - if (len == -1) - return -errno; - return len; -} - -/* - * Read/writes the data to/from a given linear buffer. - * - * Returns the number of bytes handles or -errno in case of an error. Short - * reads are only returned if the end of the file is reached. - */ -static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf) -{ - ssize_t offset = 0; - ssize_t len; - - while (offset < aiocb->aio_nbytes) { - if (aiocb->aio_type & QEMU_AIO_WRITE) - len = pwrite(aiocb->aio_fildes, - (const char *)buf + offset, - aiocb->aio_nbytes - offset, - aiocb->aio_offset + offset); - else - len = pread(aiocb->aio_fildes, - buf + offset, - aiocb->aio_nbytes - offset, - aiocb->aio_offset + offset); - - if (len == -1 && errno == EINTR) - continue; - else if (len == -1) { - offset = -errno; - break; - } else if (len == 0) - break; - - offset += len; - } - - return offset; -} - -static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb) -{ - ssize_t nbytes; - char *buf; - - if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) { - /* - * If there is just a single buffer, and it is properly aligned - * we can just use plain pread/pwrite without any problems. - */ - if (aiocb->aio_niov == 1) - return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base); - - /* - * We have more than one iovec, and all are properly aligned. - * - * Try preadv/pwritev first and fall back to linearizing the - * buffer if it's not supported. - */ - if (preadv_present) { - nbytes = handle_aiocb_rw_vector(aiocb); - if (nbytes == aiocb->aio_nbytes) - return nbytes; - if (nbytes < 0 && nbytes != -ENOSYS) - return nbytes; - preadv_present = 0; - } - - /* - * XXX(hch): short read/write. no easy way to handle the reminder - * using these interfaces. For now retry using plain - * pread/pwrite? - */ - } - - /* - * Ok, we have to do it the hard way, copy all segments into - * a single aligned buffer. - */ - buf = qemu_blockalign(aiocb->common.bs, aiocb->aio_nbytes); - if (aiocb->aio_type & QEMU_AIO_WRITE) { - char *p = buf; - int i; - - for (i = 0; i < aiocb->aio_niov; ++i) { - memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len); - p += aiocb->aio_iov[i].iov_len; - } - } - - nbytes = handle_aiocb_rw_linear(aiocb, buf); - if (!(aiocb->aio_type & QEMU_AIO_WRITE)) { - char *p = buf; - size_t count = aiocb->aio_nbytes, copy; - int i; - - for (i = 0; i < aiocb->aio_niov && count; ++i) { - copy = count; - if (copy > aiocb->aio_iov[i].iov_len) - copy = aiocb->aio_iov[i].iov_len; - memcpy(aiocb->aio_iov[i].iov_base, p, copy); - p += copy; - count -= copy; - } - } - qemu_vfree(buf); - - return nbytes; -} - -static void posix_aio_notify_event(void); - -static void *aio_thread(void *unused) -{ - mutex_lock(&lock); - pending_threads--; - mutex_unlock(&lock); - do_spawn_thread(); - - while (1) { - struct qemu_paiocb *aiocb; - ssize_t ret = 0; - qemu_timeval tv; - struct timespec ts; - - qemu_gettimeofday(&tv); - ts.tv_sec = tv.tv_sec + 10; - ts.tv_nsec = 0; - - mutex_lock(&lock); - - while (QTAILQ_EMPTY(&request_list) && - !(ret == ETIMEDOUT)) { - idle_threads++; - ret = cond_timedwait(&cond, &lock, &ts); - idle_threads--; - } - - if (QTAILQ_EMPTY(&request_list)) - break; - - aiocb = QTAILQ_FIRST(&request_list); - QTAILQ_REMOVE(&request_list, aiocb, node); - aiocb->active = 1; - mutex_unlock(&lock); - - switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) { - case QEMU_AIO_READ: - ret = handle_aiocb_rw(aiocb); - if (ret >= 0 && ret < aiocb->aio_nbytes && aiocb->common.bs->growable) { - /* A short read means that we have reached EOF. Pad the buffer - * with zeros for bytes after EOF. */ - iov_memset(aiocb->aio_iov, aiocb->aio_niov, ret, - 0, aiocb->aio_nbytes - ret); - - ret = aiocb->aio_nbytes; - } - break; - case QEMU_AIO_WRITE: - ret = handle_aiocb_rw(aiocb); - break; - case QEMU_AIO_FLUSH: - ret = handle_aiocb_flush(aiocb); - break; - case QEMU_AIO_IOCTL: - ret = handle_aiocb_ioctl(aiocb); - break; - default: - fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type); - ret = -EINVAL; - break; - } - - mutex_lock(&lock); - aiocb->ret = ret; - mutex_unlock(&lock); - - posix_aio_notify_event(); - } - - cur_threads--; - mutex_unlock(&lock); - - return NULL; -} - -static void do_spawn_thread(void) -{ - sigset_t set, oldset; - - mutex_lock(&lock); - if (!new_threads) { - mutex_unlock(&lock); - return; - } - - new_threads--; - pending_threads++; - - mutex_unlock(&lock); - - /* block all signals */ - if (sigfillset(&set)) die("sigfillset"); - if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask"); - - thread_create(&thread_id, &attr, aio_thread, NULL); - - if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore"); -} - -static void spawn_thread_bh_fn(void *opaque) -{ - do_spawn_thread(); -} - -static void spawn_thread(void) -{ - cur_threads++; - new_threads++; - /* If there are threads being created, they will spawn new workers, so - * we don't spend time creating many threads in a loop holding a mutex or - * starving the current vcpu. - * - * If there are no idle threads, ask the main thread to create one, so we - * inherit the correct affinity instead of the vcpu affinity. - */ - if (!pending_threads) { - qemu_bh_schedule(new_thread_bh); - } -} - -static void qemu_paio_submit(struct qemu_paiocb *aiocb) -{ - aiocb->ret = -EINPROGRESS; - aiocb->active = 0; - mutex_lock(&lock); - if (idle_threads == 0 && cur_threads < max_threads) - spawn_thread(); - QTAILQ_INSERT_TAIL(&request_list, aiocb, node); - mutex_unlock(&lock); - cond_signal(&cond); -} - -static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb) -{ - ssize_t ret; - - mutex_lock(&lock); - ret = aiocb->ret; - mutex_unlock(&lock); - - return ret; -} - -static int qemu_paio_error(struct qemu_paiocb *aiocb) -{ - ssize_t ret = qemu_paio_return(aiocb); - - if (ret < 0) - ret = -ret; - else - ret = 0; - - return ret; -} - -static void posix_aio_read(void *opaque) -{ - PosixAioState *s = opaque; - struct qemu_paiocb *acb, **pacb; - int ret; - ssize_t len; - - /* read all bytes from signal pipe */ - for (;;) { - char bytes[16]; - - len = read(s->rfd, bytes, sizeof(bytes)); - if (len == -1 && errno == EINTR) - continue; /* try again */ - if (len == sizeof(bytes)) - continue; /* more to read */ - break; - } - - for(;;) { - pacb = &s->first_aio; - for(;;) { - acb = *pacb; - if (!acb) - return; - - ret = qemu_paio_error(acb); - if (ret == ECANCELED) { - /* remove the request */ - *pacb = acb->next; - qemu_aio_release(acb); - } else if (ret != EINPROGRESS) { - /* end of aio */ - if (ret == 0) { - ret = qemu_paio_return(acb); - if (ret == acb->aio_nbytes) - ret = 0; - else - ret = -EINVAL; - } else { - ret = -ret; - } - - trace_paio_complete(acb, acb->common.opaque, ret); - - /* remove the request */ - *pacb = acb->next; - /* call the callback */ - acb->common.cb(acb->common.opaque, ret); - qemu_aio_release(acb); - break; - } else { - pacb = &acb->next; - } - } - } -} - -static int posix_aio_flush(void *opaque) -{ - PosixAioState *s = opaque; - return !!s->first_aio; -} - -static PosixAioState *posix_aio_state; - -static void posix_aio_notify_event(void) -{ - char byte = 0; - ssize_t ret; - - ret = write(posix_aio_state->wfd, &byte, sizeof(byte)); - if (ret < 0 && errno != EAGAIN) - die("write()"); -} - -static void paio_remove(struct qemu_paiocb *acb) -{ - struct qemu_paiocb **pacb; - - /* remove the callback from the queue */ - pacb = &posix_aio_state->first_aio; - for(;;) { - if (*pacb == NULL) { - fprintf(stderr, "paio_remove: aio request not found!\n"); - break; - } else if (*pacb == acb) { - *pacb = acb->next; - qemu_aio_release(acb); - break; - } - pacb = &(*pacb)->next; - } -} - -static void paio_cancel(BlockDriverAIOCB *blockacb) -{ - struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb; - int active = 0; - - trace_paio_cancel(acb, acb->common.opaque); - - mutex_lock(&lock); - if (!acb->active) { - QTAILQ_REMOVE(&request_list, acb, node); - acb->ret = -ECANCELED; - } else if (acb->ret == -EINPROGRESS) { - active = 1; - } - mutex_unlock(&lock); - - if (active) { - /* fail safe: if the aio could not be canceled, we wait for - it */ - while (qemu_paio_error(acb) == EINPROGRESS) - ; - } - - paio_remove(acb); -} - -static AIOPool raw_aio_pool = { - .aiocb_size = sizeof(struct qemu_paiocb), - .cancel = paio_cancel, -}; - -BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd, - int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, - BlockDriverCompletionFunc *cb, void *opaque, int type) -{ - struct qemu_paiocb *acb; - - acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque); - acb->aio_type = type; - acb->aio_fildes = fd; - - if (qiov) { - acb->aio_iov = qiov->iov; - acb->aio_niov = qiov->niov; - } - acb->aio_nbytes = nb_sectors * 512; - acb->aio_offset = sector_num * 512; - - acb->next = posix_aio_state->first_aio; - posix_aio_state->first_aio = acb; - - trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); - qemu_paio_submit(acb); - return &acb->common; -} - -BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd, - unsigned long int req, void *buf, - BlockDriverCompletionFunc *cb, void *opaque) -{ - struct qemu_paiocb *acb; - - acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque); - acb->aio_type = QEMU_AIO_IOCTL; - acb->aio_fildes = fd; - acb->aio_offset = 0; - acb->aio_ioctl_buf = buf; - acb->aio_ioctl_cmd = req; - - acb->next = posix_aio_state->first_aio; - posix_aio_state->first_aio = acb; - - qemu_paio_submit(acb); - return &acb->common; -} - -int paio_init(void) -{ - PosixAioState *s; - int fds[2]; - int ret; - - if (posix_aio_state) - return 0; - - s = g_malloc(sizeof(PosixAioState)); - - s->first_aio = NULL; - if (qemu_pipe(fds) == -1) { - fprintf(stderr, "failed to create pipe\n"); - g_free(s); - return -1; - } - - s->rfd = fds[0]; - s->wfd = fds[1]; - - fcntl(s->rfd, F_SETFL, O_NONBLOCK); - fcntl(s->wfd, F_SETFL, O_NONBLOCK); - - qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s); - - ret = pthread_attr_init(&attr); - if (ret) - die2(ret, "pthread_attr_init"); - - ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - if (ret) - die2(ret, "pthread_attr_setdetachstate"); - - QTAILQ_INIT(&request_list); - new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL); - - posix_aio_state = s; - return 0; -} diff --git a/qapi/Makefile.objs b/qapi/Makefile.objs index 5f5846e..f9bd3b9 100644 --- a/qapi/Makefile.objs +++ b/qapi/Makefile.objs @@ -1,3 +1,5 @@ qapi-obj-y = qapi-visit-core.o qapi-dealloc-visitor.o qmp-input-visitor.o qapi-obj-y += qmp-output-visitor.o qmp-registry.o qmp-dispatch.o -qapi-obj-y += string-input-visitor.o string-output-visitor.o opts-visitor.o +qapi-obj-y += string-input-visitor.o string-output-visitor.o + +common-obj-y += opts-visitor.o @@ -15,7 +15,8 @@ #define QEMU_AIO_H #include "qemu-common.h" -#include "qemu-char.h" +#include "qemu-queue.h" +#include "event_notifier.h" typedef struct BlockDriverAIOCB BlockDriverAIOCB; typedef void BlockDriverCompletionFunc(void *opaque, int ret); @@ -38,20 +39,162 @@ void *qemu_aio_get(AIOPool *pool, BlockDriverState *bs, BlockDriverCompletionFunc *cb, void *opaque); void qemu_aio_release(void *p); +typedef struct AioHandler AioHandler; +typedef void QEMUBHFunc(void *opaque); +typedef void IOHandler(void *opaque); + +typedef struct AioContext { + GSource source; + + /* The list of registered AIO handlers */ + QLIST_HEAD(, AioHandler) aio_handlers; + + /* This is a simple lock used to protect the aio_handlers list. + * Specifically, it's used to ensure that no callbacks are removed while + * we're walking and dispatching callbacks. + */ + int walking_handlers; + + /* Anchor of the list of Bottom Halves belonging to the context */ + struct QEMUBH *first_bh; + + /* A simple lock used to protect the first_bh list, and ensure that + * no callbacks are removed while we're walking and dispatching callbacks. + */ + int walking_bh; + + /* Used for aio_notify. */ + EventNotifier notifier; +} AioContext; + /* Returns 1 if there are still outstanding AIO requests; 0 otherwise */ -typedef int (AioFlushHandler)(void *opaque); +typedef int (AioFlushEventNotifierHandler)(EventNotifier *e); + +/** + * aio_context_new: Allocate a new AioContext. + * + * AioContext provide a mini event-loop that can be waited on synchronously. + * They also provide bottom halves, a service to execute a piece of code + * as soon as possible. + */ +AioContext *aio_context_new(void); + +/** + * aio_context_ref: + * @ctx: The AioContext to operate on. + * + * Add a reference to an AioContext. + */ +void aio_context_ref(AioContext *ctx); + +/** + * aio_context_unref: + * @ctx: The AioContext to operate on. + * + * Drop a reference to an AioContext. + */ +void aio_context_unref(AioContext *ctx); + +/** + * aio_bh_new: Allocate a new bottom half structure. + * + * Bottom halves are lightweight callbacks whose invocation is guaranteed + * to be wait-free, thread-safe and signal-safe. The #QEMUBH structure + * is opaque and must be allocated prior to its use. + */ +QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque); + +/** + * aio_notify: Force processing of pending events. + * + * Similar to signaling a condition variable, aio_notify forces + * aio_wait to exit, so that the next call will re-examine pending events. + * The caller of aio_notify will usually call aio_wait again very soon, + * or go through another iteration of the GLib main loop. Hence, aio_notify + * also has the side effect of recalculating the sets of file descriptors + * that the main loop waits for. + * + * Calling aio_notify is rarely necessary, because for example scheduling + * a bottom half calls it already. + */ +void aio_notify(AioContext *ctx); + +/** + * aio_bh_poll: Poll bottom halves for an AioContext. + * + * These are internal functions used by the QEMU main loop. + */ +int aio_bh_poll(AioContext *ctx); + +/** + * qemu_bh_schedule: Schedule a bottom half. + * + * Scheduling a bottom half interrupts the main loop and causes the + * execution of the callback that was passed to qemu_bh_new. + * + * Bottom halves that are scheduled from a bottom half handler are instantly + * invoked. This can create an infinite loop if a bottom half handler + * schedules itself. + * + * @bh: The bottom half to be scheduled. + */ +void qemu_bh_schedule(QEMUBH *bh); + +/** + * qemu_bh_cancel: Cancel execution of a bottom half. + * + * Canceling execution of a bottom half undoes the effect of calls to + * qemu_bh_schedule without freeing its resources yet. While cancellation + * itself is also wait-free and thread-safe, it can of course race with the + * loop that executes bottom halves unless you are holding the iothread + * mutex. This makes it mostly useless if you are not holding the mutex. + * + * @bh: The bottom half to be canceled. + */ +void qemu_bh_cancel(QEMUBH *bh); + +/** + *qemu_bh_delete: Cancel execution of a bottom half and free its resources. + * + * Deleting a bottom half frees the memory that was allocated for it by + * qemu_bh_new. It also implies canceling the bottom half if it was + * scheduled. + * + * @bh: The bottom half to be deleted. + */ +void qemu_bh_delete(QEMUBH *bh); /* Flush any pending AIO operation. This function will block until all * outstanding AIO operations have been completed or cancelled. */ -void qemu_aio_flush(void); +void aio_flush(AioContext *ctx); -/* Wait for a single AIO completion to occur. This function will wait - * until a single AIO event has completed and it will ensure something - * has moved before returning. This can issue new pending aio as - * result of executing I/O completion or bh callbacks. +/* Return whether there are any pending callbacks from the GSource + * attached to the AioContext. * - * Return whether there is still any pending AIO operation. */ -bool qemu_aio_wait(void); + * This is used internally in the implementation of the GSource. + */ +bool aio_pending(AioContext *ctx); + +/* Progress in completing AIO work to occur. This can issue new pending + * aio as a result of executing I/O completion or bh callbacks. + * + * If there is no pending AIO operation or completion (bottom half), + * return false. If there are pending bottom halves, return true. + * + * If there are no pending bottom halves, but there are pending AIO + * operations, it may not be possible to make any progress without + * blocking. If @blocking is true, this function will wait until one + * or more AIO events have completed, to ensure something has moved + * before returning. + * + * If @blocking is false, this function will also return false if the + * function cannot make any progress without blocking. + */ +bool aio_poll(AioContext *ctx, bool blocking); + +#ifdef CONFIG_POSIX +/* Returns 1 if there are still outstanding AIO requests; 0 otherwise */ +typedef int (AioFlushHandler)(void *opaque); /* Register a file descriptor and associated callbacks. Behaves very similarly * to qemu_set_fd_handler2. Unlike qemu_set_fd_handler2, these callbacks will @@ -60,10 +203,45 @@ bool qemu_aio_wait(void); * Code that invokes AIO completion functions should rely on this function * instead of qemu_set_fd_handler[2]. */ -int qemu_aio_set_fd_handler(int fd, - IOHandler *io_read, - IOHandler *io_write, - AioFlushHandler *io_flush, - void *opaque); +void aio_set_fd_handler(AioContext *ctx, + int fd, + IOHandler *io_read, + IOHandler *io_write, + AioFlushHandler *io_flush, + void *opaque); +#endif + +/* Register an event notifier and associated callbacks. Behaves very similarly + * to event_notifier_set_handler. Unlike event_notifier_set_handler, these callbacks + * will be invoked when using either qemu_aio_wait() or qemu_aio_flush(). + * + * Code that invokes AIO completion functions should rely on this function + * instead of event_notifier_set_handler. + */ +void aio_set_event_notifier(AioContext *ctx, + EventNotifier *notifier, + EventNotifierHandler *io_read, + AioFlushEventNotifierHandler *io_flush); + +/* Return a GSource that lets the main loop poll the file descriptors attached + * to this AioContext. + */ +GSource *aio_get_g_source(AioContext *ctx); + +/* Functions to operate on the main QEMU AioContext. */ + +void qemu_aio_flush(void); +bool qemu_aio_wait(void); +void qemu_aio_set_event_notifier(EventNotifier *notifier, + EventNotifierHandler *io_read, + AioFlushEventNotifierHandler *io_flush); + +#ifdef CONFIG_POSIX +void qemu_aio_set_fd_handler(int fd, + IOHandler *io_read, + IOHandler *io_write, + AioFlushHandler *io_flush, + void *opaque); +#endif #endif diff --git a/qemu-char.h b/qemu-char.h index 486644b..5087168 100644 --- a/qemu-char.h +++ b/qemu-char.h @@ -5,6 +5,7 @@ #include "qemu-queue.h" #include "qemu-option.h" #include "qemu-config.h" +#include "qemu-aio.h" #include "qobject.h" #include "qstring.h" #include "main-loop.h" diff --git a/qemu-common.h b/qemu-common.h index b54612b..c3328d2 100644 --- a/qemu-common.h +++ b/qemu-common.h @@ -14,6 +14,7 @@ typedef struct QEMUTimer QEMUTimer; typedef struct QEMUFile QEMUFile; +typedef struct QEMUBH QEMUBH; typedef struct DeviceState DeviceState; struct Monitor; @@ -167,7 +168,6 @@ int qemu_fls(int i); int qemu_fdatasync(int fd); int fcntl_setfl(int fd, int flag); int qemu_parse_fd(const char *param); -int qemu_parse_fdset(const char *param); /* * strtosz() suffixes used to specify the default treatment of an @@ -218,7 +218,6 @@ ssize_t qemu_recv_full(int fd, void *buf, size_t count, int flags) QEMU_WARN_UNUSED_RESULT; #ifndef _WIN32 -int qemu_eventfd(int pipefd[2]); int qemu_pipe(int pipefd[2]); #endif diff --git a/qemu-coroutine-lock.c b/qemu-coroutine-lock.c index 26ad76b..9dda3f8 100644 --- a/qemu-coroutine-lock.c +++ b/qemu-coroutine-lock.c @@ -26,7 +26,7 @@ #include "qemu-coroutine.h" #include "qemu-coroutine-int.h" #include "qemu-queue.h" -#include "main-loop.h" +#include "qemu-aio.h" #include "trace.h" static QTAILQ_HEAD(, Coroutine) unlock_bh_queue = diff --git a/qemu-os-win32.h b/qemu-os-win32.h index 8ba466d..d0e9234d 100644 --- a/qemu-os-win32.h +++ b/qemu-os-win32.h @@ -28,7 +28,6 @@ #include <windows.h> #include <winsock2.h> -#include "main-loop.h" /* Workaround for older versions of MinGW. */ #ifndef ECONNREFUSED diff --git a/qemu-sockets.c b/qemu-sockets.c index cfed9c5..f2a6371 100644 --- a/qemu-sockets.c +++ b/qemu-sockets.c @@ -967,3 +967,21 @@ int socket_init(void) #endif return 0; } + +static int default_monitor_get_fd(Monitor *mon, const char *name, Error **errp) +{ + error_setg(errp, "only QEMU supports file descriptor passing"); + return -1; +} +QEMU_WEAK_ALIAS(monitor_get_fd, default_monitor_get_fd); + +static int default_qemu_set_fd_handler2(int fd, + IOCanReadHandler *fd_read_poll, + IOHandler *fd_read, + IOHandler *fd_write, + void *opaque) + +{ + abort(); +} +QEMU_WEAK_ALIAS(qemu_set_fd_handler2, default_qemu_set_fd_handler2); diff --git a/qemu-thread-posix.c b/qemu-thread-posix.c index 8fbabda..6a3d3a1 100644 --- a/qemu-thread-posix.c +++ b/qemu-thread-posix.c @@ -17,6 +17,9 @@ #include <signal.h> #include <stdint.h> #include <string.h> +#include <limits.h> +#include <unistd.h> +#include <sys/time.h> #include "qemu-thread.h" static void error_exit(int err, const char *msg) @@ -115,6 +118,83 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex) error_exit(err, __func__); } +void qemu_sem_init(QemuSemaphore *sem, int init) +{ + int rc; + + rc = sem_init(&sem->sem, 0, init); + if (rc < 0) { + error_exit(errno, __func__); + } +} + +void qemu_sem_destroy(QemuSemaphore *sem) +{ + int rc; + + rc = sem_destroy(&sem->sem); + if (rc < 0) { + error_exit(errno, __func__); + } +} + +void qemu_sem_post(QemuSemaphore *sem) +{ + int rc; + + rc = sem_post(&sem->sem); + if (rc < 0) { + error_exit(errno, __func__); + } +} + +int qemu_sem_timedwait(QemuSemaphore *sem, int ms) +{ + int rc; + + if (ms <= 0) { + /* This is cheaper than sem_timedwait. */ + do { + rc = sem_trywait(&sem->sem); + } while (rc == -1 && errno == EINTR); + if (rc == -1 && errno == EAGAIN) { + return -1; + } + } else { + struct timeval tv; + struct timespec ts; + gettimeofday(&tv, NULL); + ts.tv_nsec = tv.tv_usec * 1000 + (ms % 1000) * 1000000; + ts.tv_sec = tv.tv_sec + ms / 1000; + if (ts.tv_nsec >= 1000000000) { + ts.tv_sec++; + ts.tv_nsec -= 1000000000; + } + do { + rc = sem_timedwait(&sem->sem, &ts); + } while (rc == -1 && errno == EINTR); + if (rc == -1 && errno == ETIMEDOUT) { + return -1; + } + } + if (rc < 0) { + error_exit(errno, __func__); + } + return 0; +} + +void qemu_sem_wait(QemuSemaphore *sem) +{ + int rc; + + do { + rc = sem_wait(&sem->sem); + } while (rc == -1 && errno == EINTR); + if (rc < 0) { + error_exit(errno, __func__); + } +} + void qemu_thread_create(QemuThread *thread, void *(*start_routine)(void*), void *arg, int mode) diff --git a/qemu-thread-posix.h b/qemu-thread-posix.h index ee4618e..2542c15 100644 --- a/qemu-thread-posix.h +++ b/qemu-thread-posix.h @@ -1,6 +1,7 @@ #ifndef __QEMU_THREAD_POSIX_H #define __QEMU_THREAD_POSIX_H 1 #include "pthread.h" +#include <semaphore.h> struct QemuMutex { pthread_mutex_t lock; @@ -10,6 +11,10 @@ struct QemuCond { pthread_cond_t cond; }; +struct QemuSemaphore { + sem_t sem; +}; + struct QemuThread { pthread_t thread; }; diff --git a/qemu-thread-win32.c b/qemu-thread-win32.c index 177b398..4b3db60 100644 --- a/qemu-thread-win32.c +++ b/qemu-thread-win32.c @@ -192,6 +192,41 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex) qemu_mutex_lock(mutex); } +void qemu_sem_init(QemuSemaphore *sem, int init) +{ + /* Manual reset. */ + sem->sema = CreateSemaphore(NULL, init, LONG_MAX, NULL); +} + +void qemu_sem_destroy(QemuSemaphore *sem) +{ + CloseHandle(sem->sema); +} + +void qemu_sem_post(QemuSemaphore *sem) +{ + ReleaseSemaphore(sem->sema, 1, NULL); +} + +int qemu_sem_timedwait(QemuSemaphore *sem, int ms) +{ + int rc = WaitForSingleObject(sem->sema, ms); + if (rc == WAIT_OBJECT_0) { + return 0; + } + if (rc != WAIT_TIMEOUT) { + error_exit(GetLastError(), __func__); + } + return -1; +} + +void qemu_sem_wait(QemuSemaphore *sem) +{ + if (WaitForSingleObject(sem->sema, INFINITE) != WAIT_OBJECT_0) { + error_exit(GetLastError(), __func__); + } +} + struct QemuThreadData { /* Passed to win32_start_routine. */ void *(*start_routine)(void *); diff --git a/qemu-thread-win32.h b/qemu-thread-win32.h index b9d1be8..13adb95 100644 --- a/qemu-thread-win32.h +++ b/qemu-thread-win32.h @@ -13,6 +13,10 @@ struct QemuCond { HANDLE continue_event; }; +struct QemuSemaphore { + HANDLE sema; +}; + typedef struct QemuThreadData QemuThreadData; struct QemuThread { QemuThreadData *data; diff --git a/qemu-thread.h b/qemu-thread.h index 05fdaaf..3ee2f6b 100644 --- a/qemu-thread.h +++ b/qemu-thread.h @@ -6,6 +6,7 @@ typedef struct QemuMutex QemuMutex; typedef struct QemuCond QemuCond; +typedef struct QemuSemaphore QemuSemaphore; typedef struct QemuThread QemuThread; #ifdef _WIN32 @@ -38,6 +39,12 @@ void qemu_cond_signal(QemuCond *cond); void qemu_cond_broadcast(QemuCond *cond); void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex); +void qemu_sem_init(QemuSemaphore *sem, int init); +void qemu_sem_post(QemuSemaphore *sem); +void qemu_sem_wait(QemuSemaphore *sem); +int qemu_sem_timedwait(QemuSemaphore *sem, int ms); +void qemu_sem_destroy(QemuSemaphore *sem); + void qemu_thread_create(QemuThread *thread, void *(*start_routine)(void *), void *arg, int mode); diff --git a/qemu-timer.c b/qemu-timer.c index ede84ff..f3426c9 100644 --- a/qemu-timer.c +++ b/qemu-timer.c @@ -430,9 +430,11 @@ void qemu_unregister_clock_reset_notifier(QEMUClock *clock, Notifier *notifier) void init_clocks(void) { - rt_clock = qemu_new_clock(QEMU_CLOCK_REALTIME); - vm_clock = qemu_new_clock(QEMU_CLOCK_VIRTUAL); - host_clock = qemu_new_clock(QEMU_CLOCK_HOST); + if (!rt_clock) { + rt_clock = qemu_new_clock(QEMU_CLOCK_REALTIME); + vm_clock = qemu_new_clock(QEMU_CLOCK_VIRTUAL); + host_clock = qemu_new_clock(QEMU_CLOCK_HOST); + } } uint64_t qemu_timer_expire_time_ns(QEMUTimer *ts) @@ -745,6 +747,10 @@ int init_timer_alarm(void) struct qemu_alarm_timer *t = NULL; int i, err = -1; + if (alarm_timer) { + return 0; + } + for (i = 0; alarm_timers[i].name; i++) { t = &alarm_timers[i]; diff --git a/qemu-tool.c b/qemu-tool.c index da4c05a..b46631e 100644 --- a/qemu-tool.c +++ b/qemu-tool.c @@ -38,12 +38,6 @@ const char *qemu_get_vm_name(void) Monitor *cur_mon; -int monitor_get_fd(Monitor *mon, const char *name, Error **errp) -{ - error_setg(errp, "only QEMU supports file descriptor passing"); - return -1; -} - void vm_stop(RunState state) { abort(); @@ -74,29 +68,9 @@ void monitor_protocol_event(MonitorEvent event, QObject *data) { } -int monitor_fdset_get_fd(int64_t fdset_id, int flags) -{ - return -1; -} - -int monitor_fdset_dup_fd_add(int64_t fdset_id, int dup_fd) -{ - return -1; -} - -int monitor_fdset_dup_fd_remove(int dup_fd) -{ - return -1; -} - -int monitor_fdset_dup_fd_find(int dup_fd) -{ - return -1; -} - int64_t cpu_get_clock(void) { - return qemu_get_clock_ns(rt_clock); + return get_clock_realtime(); } int64_t cpu_get_icount(void) @@ -118,13 +92,6 @@ void qemu_clock_warp(QEMUClock *clock) { } -int qemu_init_main_loop(void) -{ - init_clocks(); - init_timer_alarm(); - return main_loop_init(); -} - void slirp_update_timeout(uint32_t *timeout) { } diff --git a/qemu-user.c b/qemu-user.c index 13fb9ae..08ccb0f 100644 --- a/qemu-user.c +++ b/qemu-user.c @@ -35,23 +35,3 @@ void monitor_vprintf(Monitor *mon, const char *fmt, va_list ap) void monitor_set_error(Monitor *mon, QError *qerror) { } - -int monitor_fdset_get_fd(int64_t fdset_id, int flags) -{ - return -1; -} - -int monitor_fdset_dup_fd_add(int64_t fdset_id, int dup_fd) -{ - return -1; -} - -int monitor_fdset_dup_fd_remove(int dup_fd) -{ - return -1; -} - -int monitor_fdset_dup_fd_find(int dup_fd) -{ - return -1; -} @@ -471,11 +471,12 @@ DevicePropertyInfoList *qmp_device_list_properties(const char *typename, return prop_list; } -CpuDefinitionInfoList GCC_WEAK *arch_query_cpu_definitions(Error **errp) +static CpuDefinitionInfoList *default_arch_query_cpu_definitions(Error **errp) { error_set(errp, QERR_NOT_SUPPORTED); return NULL; } +QEMU_WEAK_ALIAS(arch_query_cpu_definitions, default_arch_query_cpu_definitions); CpuDefinitionInfoList *qmp_query_cpu_definitions(Error **errp) { diff --git a/tests/Makefile b/tests/Makefile index 86c9b79..9bf0765 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -36,7 +36,7 @@ test-obj-y = tests/check-qint.o tests/check-qstring.o tests/check-qdict.o \ tests/test-qmp-input-visitor.o tests/test-qmp-input-strict.o \ tests/test-qmp-commands.o tests/test-visitor-serialization.o -test-qapi-obj-y = $(qobject-obj-y) $(qapi-obj-y) $(tools-obj-y) +test-qapi-obj-y = $(qobject-obj-y) $(qapi-obj-y) qemu-tool.o test-qapi-obj-y += tests/test-qapi-visit.o tests/test-qapi-types.o test-qapi-obj-y += module.o @@ -47,8 +47,8 @@ tests/check-qstring$(EXESUF): tests/check-qstring.o qstring.o tests/check-qdict$(EXESUF): tests/check-qdict.o qdict.o qfloat.o qint.o qstring.o qbool.o qlist.o tests/check-qlist$(EXESUF): tests/check-qlist.o qlist.o qint.o tests/check-qfloat$(EXESUF): tests/check-qfloat.o qfloat.o -tests/check-qjson$(EXESUF): tests/check-qjson.o $(qobject-obj-y) $(tools-obj-y) -tests/test-coroutine$(EXESUF): tests/test-coroutine.o $(coroutine-obj-y) $(tools-obj-y) +tests/check-qjson$(EXESUF): tests/check-qjson.o $(qobject-obj-y) qemu-tool.o +tests/test-coroutine$(EXESUF): tests/test-coroutine.o $(coroutine-obj-y) $(tools-obj-y) $(block-obj-y) iov.o tests/test-iov$(EXESUF): tests/test-iov.o iov.o tests/test-qapi-types.c tests/test-qapi-types.h :\ @@ -81,7 +81,7 @@ TARGETS=$(patsubst %-softmmu,%, $(filter %-softmmu,$(TARGET_DIRS))) QTEST_TARGETS=$(foreach TARGET,$(TARGETS), $(if $(check-qtest-$(TARGET)-y), $(TARGET),)) check-qtest-$(CONFIG_POSIX)=$(foreach TARGET,$(TARGETS), $(check-qtest-$(TARGET)-y)) -qtest-obj-y = tests/libqtest.o $(oslib-obj-y) $(tools-obj-y) +qtest-obj-y = tests/libqtest.o $(oslib-obj-y) $(check-qtest-y): $(qtest-obj-y) .PHONY: check-help diff --git a/thread-pool.c b/thread-pool.c new file mode 100644 index 0000000..651b324 --- /dev/null +++ b/thread-pool.c @@ -0,0 +1,289 @@ +/* + * QEMU block layer thread pool + * + * Copyright IBM, Corp. 2008 + * Copyright Red Hat, Inc. 2012 + * + * Authors: + * Anthony Liguori <aliguori@us.ibm.com> + * Paolo Bonzini <pbonzini@redhat.com> + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + * + * Contributions after 2012-01-13 are licensed under the terms of the + * GNU GPL, version 2 or (at your option) any later version. + */ +#include "qemu-common.h" +#include "qemu-queue.h" +#include "qemu-thread.h" +#include "osdep.h" +#include "qemu-coroutine.h" +#include "trace.h" +#include "block_int.h" +#include "event_notifier.h" +#include "thread-pool.h" + +static void do_spawn_thread(void); + +typedef struct ThreadPoolElement ThreadPoolElement; + +enum ThreadState { + THREAD_QUEUED, + THREAD_ACTIVE, + THREAD_DONE, + THREAD_CANCELED, +}; + +struct ThreadPoolElement { + BlockDriverAIOCB common; + ThreadPoolFunc *func; + void *arg; + + /* Moving state out of THREAD_QUEUED is protected by lock. After + * that, only the worker thread can write to it. Reads and writes + * of state and ret are ordered with memory barriers. + */ + enum ThreadState state; + int ret; + + /* Access to this list is protected by lock. */ + QTAILQ_ENTRY(ThreadPoolElement) reqs; + + /* Access to this list is protected by the global mutex. */ + QLIST_ENTRY(ThreadPoolElement) all; +}; + +static EventNotifier notifier; +static QemuMutex lock; +static QemuCond check_cancel; +static QemuSemaphore sem; +static int max_threads = 64; +static QEMUBH *new_thread_bh; + +/* The following variables are protected by the global mutex. */ +static QLIST_HEAD(, ThreadPoolElement) head; + +/* The following variables are protected by lock. */ +static QTAILQ_HEAD(, ThreadPoolElement) request_list; +static int cur_threads; +static int idle_threads; +static int new_threads; /* backlog of threads we need to create */ +static int pending_threads; /* threads created but not running yet */ +static int pending_cancellations; /* whether we need a cond_broadcast */ + +static void *worker_thread(void *unused) +{ + qemu_mutex_lock(&lock); + pending_threads--; + do_spawn_thread(); + + while (1) { + ThreadPoolElement *req; + int ret; + + do { + idle_threads++; + qemu_mutex_unlock(&lock); + ret = qemu_sem_timedwait(&sem, 10000); + qemu_mutex_lock(&lock); + idle_threads--; + } while (ret == -1 && !QTAILQ_EMPTY(&request_list)); + if (ret == -1) { + break; + } + + req = QTAILQ_FIRST(&request_list); + QTAILQ_REMOVE(&request_list, req, reqs); + req->state = THREAD_ACTIVE; + qemu_mutex_unlock(&lock); + + ret = req->func(req->arg); + + req->ret = ret; + /* Write ret before state. */ + smp_wmb(); + req->state = THREAD_DONE; + + qemu_mutex_lock(&lock); + if (pending_cancellations) { + qemu_cond_broadcast(&check_cancel); + } + + event_notifier_set(¬ifier); + } + + cur_threads--; + qemu_mutex_unlock(&lock); + return NULL; +} + +static void do_spawn_thread(void) +{ + QemuThread t; + + /* Runs with lock taken. */ + if (!new_threads) { + return; + } + + new_threads--; + pending_threads++; + + qemu_thread_create(&t, worker_thread, NULL, QEMU_THREAD_DETACHED); +} + +static void spawn_thread_bh_fn(void *opaque) +{ + qemu_mutex_lock(&lock); + do_spawn_thread(); + qemu_mutex_unlock(&lock); +} + +static void spawn_thread(void) +{ + cur_threads++; + new_threads++; + /* If there are threads being created, they will spawn new workers, so + * we don't spend time creating many threads in a loop holding a mutex or + * starving the current vcpu. + * + * If there are no idle threads, ask the main thread to create one, so we + * inherit the correct affinity instead of the vcpu affinity. + */ + if (!pending_threads) { + qemu_bh_schedule(new_thread_bh); + } +} + +static void event_notifier_ready(EventNotifier *notifier) +{ + ThreadPoolElement *elem, *next; + + event_notifier_test_and_clear(notifier); +restart: + QLIST_FOREACH_SAFE(elem, &head, all, next) { + if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) { + continue; + } + if (elem->state == THREAD_DONE) { + trace_thread_pool_complete(elem, elem->common.opaque, elem->ret); + } + if (elem->state == THREAD_DONE && elem->common.cb) { + QLIST_REMOVE(elem, all); + /* Read state before ret. */ + smp_rmb(); + elem->common.cb(elem->common.opaque, elem->ret); + qemu_aio_release(elem); + goto restart; + } else { + /* remove the request */ + QLIST_REMOVE(elem, all); + qemu_aio_release(elem); + } + } +} + +static int thread_pool_active(EventNotifier *notifier) +{ + return !QLIST_EMPTY(&head); +} + +static void thread_pool_cancel(BlockDriverAIOCB *acb) +{ + ThreadPoolElement *elem = (ThreadPoolElement *)acb; + + trace_thread_pool_cancel(elem, elem->common.opaque); + + qemu_mutex_lock(&lock); + if (elem->state == THREAD_QUEUED && + /* No thread has yet started working on elem. we can try to "steal" + * the item from the worker if we can get a signal from the + * semaphore. Because this is non-blocking, we can do it with + * the lock taken and ensure that elem will remain THREAD_QUEUED. + */ + qemu_sem_timedwait(&sem, 0) == 0) { + QTAILQ_REMOVE(&request_list, elem, reqs); + elem->state = THREAD_CANCELED; + event_notifier_set(¬ifier); + } else { + pending_cancellations++; + while (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) { + qemu_cond_wait(&check_cancel, &lock); + } + pending_cancellations--; + } + qemu_mutex_unlock(&lock); +} + +static AIOPool thread_pool_cb_pool = { + .aiocb_size = sizeof(ThreadPoolElement), + .cancel = thread_pool_cancel, +}; + +BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, + BlockDriverCompletionFunc *cb, void *opaque) +{ + ThreadPoolElement *req; + + req = qemu_aio_get(&thread_pool_cb_pool, NULL, cb, opaque); + req->func = func; + req->arg = arg; + req->state = THREAD_QUEUED; + + QLIST_INSERT_HEAD(&head, req, all); + + trace_thread_pool_submit(req, arg); + + qemu_mutex_lock(&lock); + if (idle_threads == 0 && cur_threads < max_threads) { + spawn_thread(); + } + QTAILQ_INSERT_TAIL(&request_list, req, reqs); + qemu_mutex_unlock(&lock); + qemu_sem_post(&sem); + return &req->common; +} + +typedef struct ThreadPoolCo { + Coroutine *co; + int ret; +} ThreadPoolCo; + +static void thread_pool_co_cb(void *opaque, int ret) +{ + ThreadPoolCo *co = opaque; + + co->ret = ret; + qemu_coroutine_enter(co->co, NULL); +} + +int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg) +{ + ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; + assert(qemu_in_coroutine()); + thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc); + qemu_coroutine_yield(); + return tpc.ret; +} + +void thread_pool_submit(ThreadPoolFunc *func, void *arg) +{ + thread_pool_submit_aio(func, arg, NULL, NULL); +} + +static void thread_pool_init(void) +{ + QLIST_INIT(&head); + event_notifier_init(¬ifier, false); + qemu_mutex_init(&lock); + qemu_cond_init(&check_cancel); + qemu_sem_init(&sem, 0); + qemu_aio_set_event_notifier(¬ifier, event_notifier_ready, + thread_pool_active); + + QTAILQ_INIT(&request_list); + new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL); +} + +block_init(thread_pool_init) diff --git a/thread-pool.h b/thread-pool.h new file mode 100644 index 0000000..378a4ac --- /dev/null +++ b/thread-pool.h @@ -0,0 +1,34 @@ +/* + * QEMU block layer thread pool + * + * Copyright IBM, Corp. 2008 + * Copyright Red Hat, Inc. 2012 + * + * Authors: + * Anthony Liguori <aliguori@us.ibm.com> + * Paolo Bonzini <pbonzini@redhat.com> + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + * + * Contributions after 2012-01-13 are licensed under the terms of the + * GNU GPL, version 2 or (at your option) any later version. + */ + +#ifndef QEMU_THREAD_POOL_H +#define QEMU_THREAD_POOL_H 1 + +#include "qemu-common.h" +#include "qemu-queue.h" +#include "qemu-thread.h" +#include "qemu-coroutine.h" +#include "block_int.h" + +typedef int ThreadPoolFunc(void *opaque); + +BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, + BlockDriverCompletionFunc *cb, void *opaque); +int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg); +void thread_pool_submit(ThreadPoolFunc *func, void *arg); + +#endif diff --git a/trace-events b/trace-events index 7ee21e5..066cdaf 100644 --- a/trace-events +++ b/trace-events @@ -98,6 +98,11 @@ virtio_blk_rw_complete(void *req, int ret) "req %p ret %d" virtio_blk_handle_write(void *req, uint64_t sector, size_t nsectors) "req %p sector %"PRIu64" nsectors %zu" virtio_blk_handle_read(void *req, uint64_t sector, size_t nsectors) "req %p sector %"PRIu64" nsectors %zu" +# thread-pool.c +thread_pool_submit(void *req, void *opaque) "req %p opaque %p" +thread_pool_complete(void *req, void *opaque, int ret) "req %p opaque %p ret %d" +thread_pool_cancel(void *req, void *opaque) "req %p opaque %p" + # posix-aio-compat.c paio_submit(void *acb, void *opaque, int64_t sector_num, int nb_sectors, int type) "acb %p opaque %p sector_num %"PRId64" nb_sectors %d type %d" paio_complete(void *acb, void *opaque, int ret) "acb %p opaque %p ret %d" @@ -2441,11 +2441,6 @@ static void free_and_trace(gpointer mem) free(mem); } -int qemu_init_main_loop(void) -{ - return main_loop_init(); -} - int main(int argc, char **argv, char **envp) { int i; @@ -3418,6 +3413,12 @@ int main(int argc, char **argv, char **envp) } loc_set_none(); + qemu_init_cpu_loop(); + if (qemu_init_main_loop()) { + fprintf(stderr, "qemu_init_main_loop failed\n"); + exit(1); + } + if (qemu_opts_foreach(qemu_find_opts("sandbox"), parse_sandbox, NULL, 0)) { exit(1); } @@ -3580,12 +3581,6 @@ int main(int argc, char **argv, char **envp) configure_accelerator(); - qemu_init_cpu_loop(); - if (qemu_init_main_loop()) { - fprintf(stderr, "qemu_init_main_loop failed\n"); - exit(1); - } - machine_opts = qemu_opts_find(qemu_find_opts("machine"), 0); if (machine_opts) { kernel_filename = qemu_opt_get(machine_opts, "kernel"); |