diff options
-rw-r--r-- | block/linux-aio.c | 71 |
1 files changed, 55 insertions, 16 deletions
diff --git a/block/linux-aio.c b/block/linux-aio.c index 7ac7e8c..9aca758 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -51,6 +51,12 @@ struct qemu_laio_state { /* io queue for submit at batch */ LaioQueue io_q; + + /* I/O completion processing */ + QEMUBH *completion_bh; + struct io_event events[MAX_EVENTS]; + int event_idx; + int event_max; }; static inline ssize_t io_event_ret(struct io_event *ev) @@ -86,27 +92,58 @@ static void qemu_laio_process_completion(struct qemu_laio_state *s, qemu_aio_release(laiocb); } -static void qemu_laio_completion_cb(EventNotifier *e) +/* The completion BH fetches completed I/O requests and invokes their + * callbacks. + * + * The function is somewhat tricky because it supports nested event loops, for + * example when a request callback invokes aio_poll(). In order to do this, + * the completion events array and index are kept in qemu_laio_state. The BH + * reschedules itself as long as there are completions pending so it will + * either be called again in a nested event loop or will be called after all + * events have been completed. When there are no events left to complete, the + * BH returns without rescheduling. + */ +static void qemu_laio_completion_bh(void *opaque) { - struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e); - - while (event_notifier_test_and_clear(&s->e)) { - struct io_event events[MAX_EVENTS]; - struct timespec ts = { 0 }; - int nevents, i; + struct qemu_laio_state *s = opaque; + /* Fetch more completion events when empty */ + if (s->event_idx == s->event_max) { do { - nevents = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS, events, &ts); - } while (nevents == -EINTR); + struct timespec ts = { 0 }; + s->event_max = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS, + s->events, &ts); + } while (s->event_max == -EINTR); + + s->event_idx = 0; + if (s->event_max <= 0) { + s->event_max = 0; + return; /* no more events */ + } + } - for (i = 0; i < nevents; i++) { - struct iocb *iocb = events[i].obj; - struct qemu_laiocb *laiocb = - container_of(iocb, struct qemu_laiocb, iocb); + /* Reschedule so nested event loops see currently pending completions */ + qemu_bh_schedule(s->completion_bh); - laiocb->ret = io_event_ret(&events[i]); - qemu_laio_process_completion(s, laiocb); - } + /* Process completion events */ + while (s->event_idx < s->event_max) { + struct iocb *iocb = s->events[s->event_idx].obj; + struct qemu_laiocb *laiocb = + container_of(iocb, struct qemu_laiocb, iocb); + + laiocb->ret = io_event_ret(&s->events[s->event_idx]); + s->event_idx++; + + qemu_laio_process_completion(s, laiocb); + } +} + +static void qemu_laio_completion_cb(EventNotifier *e) +{ + struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e); + + if (event_notifier_test_and_clear(&s->e)) { + qemu_bh_schedule(s->completion_bh); } } @@ -272,12 +309,14 @@ void laio_detach_aio_context(void *s_, AioContext *old_context) struct qemu_laio_state *s = s_; aio_set_event_notifier(old_context, &s->e, NULL); + qemu_bh_delete(s->completion_bh); } void laio_attach_aio_context(void *s_, AioContext *new_context) { struct qemu_laio_state *s = s_; + s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s); aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb); } |