aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--block/linux-aio.c71
1 files changed, 55 insertions, 16 deletions
diff --git a/block/linux-aio.c b/block/linux-aio.c
index 7ac7e8c..9aca758 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -51,6 +51,12 @@ struct qemu_laio_state {
/* io queue for submit at batch */
LaioQueue io_q;
+
+ /* I/O completion processing */
+ QEMUBH *completion_bh;
+ struct io_event events[MAX_EVENTS];
+ int event_idx;
+ int event_max;
};
static inline ssize_t io_event_ret(struct io_event *ev)
@@ -86,27 +92,58 @@ static void qemu_laio_process_completion(struct qemu_laio_state *s,
qemu_aio_release(laiocb);
}
-static void qemu_laio_completion_cb(EventNotifier *e)
+/* The completion BH fetches completed I/O requests and invokes their
+ * callbacks.
+ *
+ * The function is somewhat tricky because it supports nested event loops, for
+ * example when a request callback invokes aio_poll(). In order to do this,
+ * the completion events array and index are kept in qemu_laio_state. The BH
+ * reschedules itself as long as there are completions pending so it will
+ * either be called again in a nested event loop or will be called after all
+ * events have been completed. When there are no events left to complete, the
+ * BH returns without rescheduling.
+ */
+static void qemu_laio_completion_bh(void *opaque)
{
- struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);
-
- while (event_notifier_test_and_clear(&s->e)) {
- struct io_event events[MAX_EVENTS];
- struct timespec ts = { 0 };
- int nevents, i;
+ struct qemu_laio_state *s = opaque;
+ /* Fetch more completion events when empty */
+ if (s->event_idx == s->event_max) {
do {
- nevents = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS, events, &ts);
- } while (nevents == -EINTR);
+ struct timespec ts = { 0 };
+ s->event_max = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS,
+ s->events, &ts);
+ } while (s->event_max == -EINTR);
+
+ s->event_idx = 0;
+ if (s->event_max <= 0) {
+ s->event_max = 0;
+ return; /* no more events */
+ }
+ }
- for (i = 0; i < nevents; i++) {
- struct iocb *iocb = events[i].obj;
- struct qemu_laiocb *laiocb =
- container_of(iocb, struct qemu_laiocb, iocb);
+ /* Reschedule so nested event loops see currently pending completions */
+ qemu_bh_schedule(s->completion_bh);
- laiocb->ret = io_event_ret(&events[i]);
- qemu_laio_process_completion(s, laiocb);
- }
+ /* Process completion events */
+ while (s->event_idx < s->event_max) {
+ struct iocb *iocb = s->events[s->event_idx].obj;
+ struct qemu_laiocb *laiocb =
+ container_of(iocb, struct qemu_laiocb, iocb);
+
+ laiocb->ret = io_event_ret(&s->events[s->event_idx]);
+ s->event_idx++;
+
+ qemu_laio_process_completion(s, laiocb);
+ }
+}
+
+static void qemu_laio_completion_cb(EventNotifier *e)
+{
+ struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);
+
+ if (event_notifier_test_and_clear(&s->e)) {
+ qemu_bh_schedule(s->completion_bh);
}
}
@@ -272,12 +309,14 @@ void laio_detach_aio_context(void *s_, AioContext *old_context)
struct qemu_laio_state *s = s_;
aio_set_event_notifier(old_context, &s->e, NULL);
+ qemu_bh_delete(s->completion_bh);
}
void laio_attach_aio_context(void *s_, AioContext *new_context)
{
struct qemu_laio_state *s = s_;
+ s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
}