From 2df4624662103eb007428e6ded3b3496a952b154 Mon Sep 17 00:00:00 2001 From: MORITA Kazutaka Date: Fri, 12 Aug 2011 21:33:15 +0900 Subject: sheepdog: use coroutines This makes the sheepdog block driver support bdrv_co_readv/writev instead of bdrv_aio_readv/writev. With this patch, Sheepdog network I/O becomes fully asynchronous. The block driver yields back when send/recv returns EAGAIN, and is resumed when the sheepdog network connection is ready for the operation. Signed-off-by: MORITA Kazutaka Signed-off-by: Kevin Wolf --- block/sheepdog.c | 150 ++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 93 insertions(+), 57 deletions(-) diff --git a/block/sheepdog.c b/block/sheepdog.c index 57b6e1a..c1f6e07 100644 --- a/block/sheepdog.c +++ b/block/sheepdog.c @@ -274,7 +274,7 @@ struct SheepdogAIOCB { int ret; enum AIOCBState aiocb_type; - QEMUBH *bh; + Coroutine *coroutine; void (*aio_done_func)(SheepdogAIOCB *); int canceled; @@ -295,6 +295,10 @@ typedef struct BDRVSheepdogState { char *port; int fd; + CoMutex lock; + Coroutine *co_send; + Coroutine *co_recv; + uint32_t aioreq_seq_num; QLIST_HEAD(outstanding_aio_head, AIOReq) outstanding_aio_head; } BDRVSheepdogState; @@ -346,19 +350,16 @@ static const char * sd_strerror(int err) /* * Sheepdog I/O handling: * - * 1. In the sd_aio_readv/writev, read/write requests are added to the - * QEMU Bottom Halves. - * - * 2. In sd_readv_writev_bh_cb, the callbacks of BHs, we send the I/O - * requests to the server and link the requests to the - * outstanding_list in the BDRVSheepdogState. we exits the - * function without waiting for receiving the response. + * 1. In sd_co_rw_vector, we send the I/O requests to the server and + * link the requests to the outstanding_list in the + * BDRVSheepdogState. The function exits without waiting for + * receiving the response. * - * 3. We receive the response in aio_read_response, the fd handler to + * 2. We receive the response in aio_read_response, the fd handler to * the sheepdog connection. If metadata update is needed, we send * the write request to the vdi object in sd_write_done, the write - * completion function. The AIOCB callback is not called until all - * the requests belonging to the AIOCB are finished. + * completion function. We switch back to sd_co_readv/writev after + * all the requests belonging to the AIOCB are finished. */ static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb, @@ -398,7 +399,7 @@ static inline int free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req) static void sd_finish_aiocb(SheepdogAIOCB *acb) { if (!acb->canceled) { - acb->common.cb(acb->common.opaque, acb->ret); + qemu_coroutine_enter(acb->coroutine, NULL); } qemu_aio_release(acb); } @@ -411,7 +412,8 @@ static void sd_aio_cancel(BlockDriverAIOCB *blockacb) * Sheepdog cannot cancel the requests which are already sent to * the servers, so we just complete the request with -EIO here. */ - acb->common.cb(acb->common.opaque, -EIO); + acb->ret = -EIO; + qemu_coroutine_enter(acb->coroutine, NULL); acb->canceled = 1; } @@ -435,24 +437,12 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov, acb->aio_done_func = NULL; acb->canceled = 0; - acb->bh = NULL; + acb->coroutine = qemu_coroutine_self(); acb->ret = 0; QLIST_INIT(&acb->aioreq_head); return acb; } -static int sd_schedule_bh(QEMUBHFunc *cb, SheepdogAIOCB *acb) -{ - if (acb->bh) { - error_report("bug: %d %d", acb->aiocb_type, acb->aiocb_type); - return -EIO; - } - - acb->bh = qemu_bh_new(cb, acb); - qemu_bh_schedule(acb->bh); - return 0; -} - #ifdef _WIN32 struct msghdr { @@ -635,7 +625,13 @@ static int do_readv_writev(int sockfd, struct iovec *iov, int len, again: ret = do_send_recv(sockfd, iov, len, iov_offset, write); if (ret < 0) { - if (errno == EINTR || errno == EAGAIN) { + if (errno == EINTR) { + goto again; + } + if (errno == EAGAIN) { + if (qemu_in_coroutine()) { + qemu_coroutine_yield(); + } goto again; } error_report("failed to recv a rsp, %s", strerror(errno)); @@ -793,14 +789,14 @@ static void aio_read_response(void *opaque) unsigned long idx; if (QLIST_EMPTY(&s->outstanding_aio_head)) { - return; + goto out; } /* read a header */ ret = do_read(fd, &rsp, sizeof(rsp)); if (ret) { error_report("failed to get the header, %s", strerror(errno)); - return; + goto out; } /* find the right aio_req from the outstanding_aio list */ @@ -811,7 +807,7 @@ static void aio_read_response(void *opaque) } if (!aio_req) { error_report("cannot find aio_req %x", rsp.id); - return; + goto out; } acb = aio_req->aiocb; @@ -847,7 +843,7 @@ static void aio_read_response(void *opaque) aio_req->iov_offset); if (ret) { error_report("failed to get the data, %s", strerror(errno)); - return; + goto out; } break; } @@ -861,10 +857,30 @@ static void aio_read_response(void *opaque) if (!rest) { /* * We've finished all requests which belong to the AIOCB, so - * we can call the callback now. + * we can switch back to sd_co_readv/writev now. */ acb->aio_done_func(acb); } +out: + s->co_recv = NULL; +} + +static void co_read_response(void *opaque) +{ + BDRVSheepdogState *s = opaque; + + if (!s->co_recv) { + s->co_recv = qemu_coroutine_create(aio_read_response); + } + + qemu_coroutine_enter(s->co_recv, opaque); +} + +static void co_write_request(void *opaque) +{ + BDRVSheepdogState *s = opaque; + + qemu_coroutine_enter(s->co_send, NULL); } static int aio_flush_request(void *opaque) @@ -924,7 +940,7 @@ static int get_sheep_fd(BDRVSheepdogState *s) return -1; } - qemu_aio_set_fd_handler(fd, aio_read_response, NULL, aio_flush_request, + qemu_aio_set_fd_handler(fd, co_read_response, NULL, aio_flush_request, NULL, s); return fd; } @@ -1091,6 +1107,10 @@ static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, hdr.id = aio_req->id; + qemu_co_mutex_lock(&s->lock); + s->co_send = qemu_coroutine_self(); + qemu_aio_set_fd_handler(s->fd, co_read_response, co_write_request, + aio_flush_request, NULL, s); set_cork(s->fd, 1); /* send a header */ @@ -1109,6 +1129,9 @@ static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, } set_cork(s->fd, 0); + qemu_aio_set_fd_handler(s->fd, co_read_response, NULL, + aio_flush_request, NULL, s); + qemu_co_mutex_unlock(&s->lock); return 0; } @@ -1225,6 +1248,7 @@ static int sd_open(BlockDriverState *bs, const char *filename, int flags) bs->total_sectors = s->inode.vdi_size / SECTOR_SIZE; strncpy(s->name, vdi, sizeof(s->name)); + qemu_co_mutex_init(&s->lock); g_free(buf); return 0; out: @@ -1491,7 +1515,7 @@ static int sd_truncate(BlockDriverState *bs, int64_t offset) /* * This function is called after writing data objects. If we need to * update metadata, this sends a write request to the vdi object. - * Otherwise, this calls the AIOCB callback. + * Otherwise, this switches back to sd_co_readv/writev. */ static void sd_write_done(SheepdogAIOCB *acb) { @@ -1587,8 +1611,11 @@ out: * waiting the response. The responses are received in the * `aio_read_response' function which is called from the main loop as * a fd handler. + * + * Returns 1 when we need to wait a response, 0 when there is no sent + * request and -errno in error cases. */ -static void sd_readv_writev_bh_cb(void *p) +static int sd_co_rw_vector(void *p) { SheepdogAIOCB *acb = p; int ret = 0; @@ -1600,9 +1627,6 @@ static void sd_readv_writev_bh_cb(void *p) SheepdogInode *inode = &s->inode; AIOReq *aio_req; - qemu_bh_delete(acb->bh); - acb->bh = NULL; - if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) { /* * In the case we open the snapshot VDI, Sheepdog creates the @@ -1684,42 +1708,47 @@ static void sd_readv_writev_bh_cb(void *p) } out: if (QLIST_EMPTY(&acb->aioreq_head)) { - sd_finish_aiocb(acb); + return acb->ret; } + return 1; } -static BlockDriverAIOCB *sd_aio_writev(BlockDriverState *bs, int64_t sector_num, - QEMUIOVector *qiov, int nb_sectors, - BlockDriverCompletionFunc *cb, - void *opaque) +static int sd_co_writev(BlockDriverState *bs, int64_t sector_num, + int nb_sectors, QEMUIOVector *qiov) { SheepdogAIOCB *acb; + int ret; if (bs->growable && sector_num + nb_sectors > bs->total_sectors) { /* TODO: shouldn't block here */ if (sd_truncate(bs, (sector_num + nb_sectors) * SECTOR_SIZE) < 0) { - return NULL; + return -EIO; } bs->total_sectors = sector_num + nb_sectors; } - acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque); + acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, NULL, NULL); acb->aio_done_func = sd_write_done; acb->aiocb_type = AIOCB_WRITE_UDATA; - sd_schedule_bh(sd_readv_writev_bh_cb, acb); - return &acb->common; + ret = sd_co_rw_vector(acb); + if (ret <= 0) { + qemu_aio_release(acb); + return ret; + } + + qemu_coroutine_yield(); + + return acb->ret; } -static BlockDriverAIOCB *sd_aio_readv(BlockDriverState *bs, int64_t sector_num, - QEMUIOVector *qiov, int nb_sectors, - BlockDriverCompletionFunc *cb, - void *opaque) +static int sd_co_readv(BlockDriverState *bs, int64_t sector_num, + int nb_sectors, QEMUIOVector *qiov) { SheepdogAIOCB *acb; - int i; + int i, ret; - acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque); + acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, NULL, NULL); acb->aiocb_type = AIOCB_READ_UDATA; acb->aio_done_func = sd_finish_aiocb; @@ -1731,8 +1760,15 @@ static BlockDriverAIOCB *sd_aio_readv(BlockDriverState *bs, int64_t sector_num, memset(qiov->iov[i].iov_base, 0, qiov->iov[i].iov_len); } - sd_schedule_bh(sd_readv_writev_bh_cb, acb); - return &acb->common; + ret = sd_co_rw_vector(acb); + if (ret <= 0) { + qemu_aio_release(acb); + return ret; + } + + qemu_coroutine_yield(); + + return acb->ret; } static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info) @@ -2062,8 +2098,8 @@ BlockDriver bdrv_sheepdog = { .bdrv_getlength = sd_getlength, .bdrv_truncate = sd_truncate, - .bdrv_aio_readv = sd_aio_readv, - .bdrv_aio_writev = sd_aio_writev, + .bdrv_co_readv = sd_co_readv, + .bdrv_co_writev = sd_co_writev, .bdrv_snapshot_create = sd_snapshot_create, .bdrv_snapshot_goto = sd_snapshot_goto, -- cgit v1.1