aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--block/nbd.c12
-rw-r--r--nbd.c40
2 files changed, 44 insertions, 8 deletions
diff --git a/block/nbd.c b/block/nbd.c
index 1b0e384..e0af5b4 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -151,10 +151,18 @@ static void nbd_reply_ready(void *opaque)
{
BDRVNBDState *s = opaque;
uint64_t i;
+ int ret;
if (s->reply.handle == 0) {
- /* No reply already in flight. Fetch a header. */
- if (nbd_receive_reply(s->sock, &s->reply) < 0) {
+ /* No reply already in flight. Fetch a header. It is possible
+ * that another thread has done the same thing in parallel, so
+ * the socket is not readable anymore.
+ */
+ ret = nbd_receive_reply(s->sock, &s->reply);
+ if (ret == -EAGAIN) {
+ return;
+ }
+ if (ret < 0) {
s->reply.handle = 0;
goto fail;
}
diff --git a/nbd.c b/nbd.c
index b31b3b2..4c6d7f1 100644
--- a/nbd.c
+++ b/nbd.c
@@ -78,9 +78,6 @@
/* That's all folks */
-#define read_sync(fd, buffer, size) nbd_wr_sync(fd, buffer, size, true)
-#define write_sync(fd, buffer, size) nbd_wr_sync(fd, buffer, size, false)
-
ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
{
size_t offset = 0;
@@ -107,7 +104,7 @@ ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
err = socket_error();
/* recoverable error */
- if (err == EINTR || err == EAGAIN) {
+ if (err == EINTR || (offset > 0 && err == EAGAIN)) {
continue;
}
@@ -126,6 +123,26 @@ ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
return offset;
}
+static ssize_t read_sync(int fd, void *buffer, size_t size)
+{
+ /* Sockets are kept in blocking mode in the negotiation phase. After
+ * that, a non-readable socket simply means that another thread stole
+ * our request/reply. Synchronization is done with recv_coroutine, so
+ * that this is coroutine-safe.
+ */
+ return nbd_wr_sync(fd, buffer, size, true);
+}
+
+static ssize_t write_sync(int fd, void *buffer, size_t size)
+{
+ int ret;
+ do {
+ /* For writes, we do expect the socket to be writable. */
+ ret = nbd_wr_sync(fd, buffer, size, false);
+ } while (ret == -EAGAIN);
+ return ret;
+}
+
static void combine_addr(char *buf, size_t len, const char* address,
uint16_t port)
{
@@ -203,6 +220,7 @@ static int nbd_send_negotiate(int csock, off_t size, uint32_t flags)
[28 .. 151] reserved (0)
*/
+ socket_set_block(csock);
rc = -EINVAL;
TRACE("Beginning negotiation.");
@@ -222,6 +240,7 @@ static int nbd_send_negotiate(int csock, off_t size, uint32_t flags)
TRACE("Negotiation succeeded.");
rc = 0;
fail:
+ socket_set_nonblock(csock);
return rc;
}
@@ -235,6 +254,7 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
TRACE("Receiving negotiation.");
+ socket_set_block(csock);
rc = -EINVAL;
if (read_sync(csock, buf, 8) != 8) {
@@ -349,6 +369,7 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
rc = 0;
fail:
+ socket_set_nonblock(csock);
return rc;
}
@@ -742,8 +763,11 @@ static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *reque
ssize_t rc;
client->recv_coroutine = qemu_coroutine_self();
- if (nbd_receive_request(csock, request) < 0) {
- rc = -EIO;
+ rc = nbd_receive_request(csock, request);
+ if (rc < 0) {
+ if (rc != -EAGAIN) {
+ rc = -EIO;
+ }
goto out;
}
@@ -791,6 +815,9 @@ static void nbd_trip(void *opaque)
TRACE("Reading request.");
ret = nbd_co_receive_request(req, &request);
+ if (ret == -EAGAIN) {
+ goto done;
+ }
if (ret == -EIO) {
goto out;
}
@@ -901,6 +928,7 @@ static void nbd_trip(void *opaque)
TRACE("Request/Reply complete");
+done:
nbd_request_put(req);
return;