From 8c5135f90e2dcf1d5c3d03106e0ac6e371ccb572 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Thu, 8 Sep 2011 13:46:25 +0200 Subject: sheepdog: move coroutine send/recv function to generic code Outside coroutines, avoid busy waiting on EAGAIN by temporarily making the socket blocking. The API of qemu_recvv/qemu_sendv is slightly different from do_readv/do_writev because they do not handle coroutines. It returns the number of bytes written before encountering an EAGAIN. The specificity of yielding on EAGAIN is entirely in qemu-coroutine.c. Reviewed-by: MORITA Kazutaka Signed-off-by: Paolo Bonzini --- block/sheepdog.c | 230 +++++-------------------------------------------------- 1 file changed, 21 insertions(+), 209 deletions(-) (limited to 'block/sheepdog.c') diff --git a/block/sheepdog.c b/block/sheepdog.c index aa9707f..00ea5a0 100644 --- a/block/sheepdog.c +++ b/block/sheepdog.c @@ -443,129 +443,6 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov, return acb; } -#ifdef _WIN32 - -struct msghdr { - struct iovec *msg_iov; - size_t msg_iovlen; -}; - -static ssize_t sendmsg(int s, const struct msghdr *msg, int flags) -{ - size_t size = 0; - char *buf, *p; - int i, ret; - - /* count the msg size */ - for (i = 0; i < msg->msg_iovlen; i++) { - size += msg->msg_iov[i].iov_len; - } - buf = g_malloc(size); - - p = buf; - for (i = 0; i < msg->msg_iovlen; i++) { - memcpy(p, msg->msg_iov[i].iov_base, msg->msg_iov[i].iov_len); - p += msg->msg_iov[i].iov_len; - } - - ret = send(s, buf, size, flags); - - g_free(buf); - return ret; -} - -static ssize_t recvmsg(int s, struct msghdr *msg, int flags) -{ - size_t size = 0; - char *buf, *p; - int i, ret; - - /* count the msg size */ - for (i = 0; i < msg->msg_iovlen; i++) { - size += msg->msg_iov[i].iov_len; - } - buf = g_malloc(size); - - ret = qemu_recv(s, buf, size, flags); - if (ret < 0) { - goto out; - } - - p = buf; - for (i = 0; i < msg->msg_iovlen; i++) { - memcpy(msg->msg_iov[i].iov_base, p, msg->msg_iov[i].iov_len); - p += msg->msg_iov[i].iov_len; - } -out: - g_free(buf); - return ret; -} - -#endif - -/* - * Send/recv data with iovec buffers - * - * This function send/recv data from/to the iovec buffer directly. - * The first `offset' bytes in the iovec buffer are skipped and next - * `len' bytes are used. - * - * For example, - * - * do_send_recv(sockfd, iov, len, offset, 1); - * - * is equals to - * - * char *buf = malloc(size); - * iov_to_buf(iov, iovcnt, buf, offset, size); - * send(sockfd, buf, size, 0); - * free(buf); - */ -static int do_send_recv(int sockfd, struct iovec *iov, int len, int offset, - int write) -{ - struct msghdr msg; - int ret, diff; - - memset(&msg, 0, sizeof(msg)); - msg.msg_iov = iov; - msg.msg_iovlen = 1; - - len += offset; - - while (iov->iov_len < len) { - len -= iov->iov_len; - - iov++; - msg.msg_iovlen++; - } - - diff = iov->iov_len - len; - iov->iov_len -= diff; - - while (msg.msg_iov->iov_len <= offset) { - offset -= msg.msg_iov->iov_len; - - msg.msg_iov++; - msg.msg_iovlen--; - } - - msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base + offset; - msg.msg_iov->iov_len -= offset; - - if (write) { - ret = sendmsg(sockfd, &msg, 0); - } else { - ret = recvmsg(sockfd, &msg, 0); - } - - msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base - offset; - msg.msg_iov->iov_len += offset; - - iov->iov_len += diff; - return ret; -} - static int connect_to_sdog(const char *addr, const char *port) { char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; @@ -618,83 +495,19 @@ success: return fd; } -static int do_readv_writev(int sockfd, struct iovec *iov, int len, - int iov_offset, int write) -{ - int ret; -again: - ret = do_send_recv(sockfd, iov, len, iov_offset, write); - if (ret < 0) { - if (errno == EINTR) { - goto again; - } - if (errno == EAGAIN) { - if (qemu_in_coroutine()) { - qemu_coroutine_yield(); - } - goto again; - } - error_report("failed to recv a rsp, %s", strerror(errno)); - return 1; - } - - iov_offset += ret; - len -= ret; - if (len) { - goto again; - } - - return 0; -} - -static int do_readv(int sockfd, struct iovec *iov, int len, int iov_offset) -{ - return do_readv_writev(sockfd, iov, len, iov_offset, 0); -} - -static int do_writev(int sockfd, struct iovec *iov, int len, int iov_offset) -{ - return do_readv_writev(sockfd, iov, len, iov_offset, 1); -} - -static int do_read_write(int sockfd, void *buf, int len, int write) -{ - struct iovec iov; - - iov.iov_base = buf; - iov.iov_len = len; - - return do_readv_writev(sockfd, &iov, len, 0, write); -} - -static int do_read(int sockfd, void *buf, int len) -{ - return do_read_write(sockfd, buf, len, 0); -} - -static int do_write(int sockfd, void *buf, int len) -{ - return do_read_write(sockfd, buf, len, 1); -} - static int send_req(int sockfd, SheepdogReq *hdr, void *data, unsigned int *wlen) { int ret; - struct iovec iov[2]; - iov[0].iov_base = hdr; - iov[0].iov_len = sizeof(*hdr); - - if (*wlen) { - iov[1].iov_base = data; - iov[1].iov_len = *wlen; + ret = qemu_send_full(sockfd, hdr, sizeof(*hdr), 0); + if (ret < sizeof(*hdr)) { + error_report("failed to send a req, %s", strerror(errno)); } - ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen, 0); - if (ret) { + ret = qemu_send_full(sockfd, data, *wlen, 0); + if (ret < *wlen) { error_report("failed to send a req, %s", strerror(errno)); - ret = -1; } return ret; @@ -705,16 +518,15 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data, { int ret; + socket_set_block(sockfd); ret = send_req(sockfd, hdr, data, wlen); - if (ret) { - ret = -1; + if (ret < 0) { goto out; } - ret = do_read(sockfd, hdr, sizeof(*hdr)); - if (ret) { + ret = qemu_recv_full(sockfd, hdr, sizeof(*hdr), 0); + if (ret < sizeof(*hdr)) { error_report("failed to get a rsp, %s", strerror(errno)); - ret = -1; goto out; } @@ -723,15 +535,15 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data, } if (*rlen) { - ret = do_read(sockfd, data, *rlen); - if (ret) { + ret = qemu_recv_full(sockfd, data, *rlen, 0); + if (ret < *rlen) { error_report("failed to get the data, %s", strerror(errno)); - ret = -1; goto out; } } ret = 0; out: + socket_set_nonblock(sockfd); return ret; } @@ -793,8 +605,8 @@ static void coroutine_fn aio_read_response(void *opaque) } /* read a header */ - ret = do_read(fd, &rsp, sizeof(rsp)); - if (ret) { + ret = qemu_co_recv(fd, &rsp, sizeof(rsp)); + if (ret < 0) { error_report("failed to get the header, %s", strerror(errno)); goto out; } @@ -839,9 +651,9 @@ static void coroutine_fn aio_read_response(void *opaque) } break; case AIOCB_READ_UDATA: - ret = do_readv(fd, acb->qiov->iov, rsp.data_length, - aio_req->iov_offset); - if (ret) { + ret = qemu_co_recvv(fd, acb->qiov->iov, rsp.data_length, + aio_req->iov_offset); + if (ret < 0) { error_report("failed to get the data, %s", strerror(errno)); goto out; } @@ -1114,16 +926,16 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, set_cork(s->fd, 1); /* send a header */ - ret = do_write(s->fd, &hdr, sizeof(hdr)); - if (ret) { + ret = qemu_co_send(s->fd, &hdr, sizeof(hdr)); + if (ret < 0) { qemu_co_mutex_unlock(&s->lock); error_report("failed to send a req, %s", strerror(errno)); return -EIO; } if (wlen) { - ret = do_writev(s->fd, iov, wlen, aio_req->iov_offset); - if (ret) { + ret = qemu_co_sendv(s->fd, iov, wlen, aio_req->iov_offset); + if (ret < 0) { qemu_co_mutex_unlock(&s->lock); error_report("failed to send a data, %s", strerror(errno)); return -EIO; -- cgit v1.1