[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [PATCH 14/15] qemu-nbd: asynchronous operation
From: |
Paolo Bonzini |
Subject: |
[Qemu-devel] [PATCH 14/15] qemu-nbd: asynchronous operation |
Date: |
Mon, 10 Oct 2011 11:37:56 +0200 |
Using coroutines enable asynchronous operation on both the network and
the block side. Network can be owned by two coroutines at the same time,
one writing and one reading. On the send side, mutual exclusion is
guaranteed by a CoMutex. On the receive side, mutual exclusion is
guaranteed because new coroutines immediately start receiving data,
and no new coroutines are created as long as the previous one is receiving.
Between receive and send, qemu-nbd can have an arbitrary number of
in-flight block transfers. Throttling is implemented by the next
patch.
Signed-off-by: Paolo Bonzini <address@hidden>
---
nbd.c | 66 ++++++++++++++++++++++++++++++++++++++++++++++++----------------
1 files changed, 49 insertions(+), 17 deletions(-)
diff --git a/nbd.c b/nbd.c
index e016175..01d3a85 100644
--- a/nbd.c
+++ b/nbd.c
@@ -20,6 +20,8 @@
#include "block.h"
#include "block_int.h"
+#include "qemu-coroutine.h"
+
#include <errno.h>
#include <string.h>
#ifndef _WIN32
@@ -609,6 +611,11 @@ struct NBDClient {
NBDExport *exp;
int sock;
+
+ Coroutine *recv_coroutine;
+
+ CoMutex send_lock;
+ Coroutine *send_coroutine;
};
static void nbd_client_get(NBDClient *client)
@@ -683,13 +690,20 @@ void nbd_export_close(NBDExport *exp)
g_free(exp);
}
-static int nbd_do_send_reply(NBDRequest *req, struct nbd_reply *reply,
+static void nbd_read(void *opaque);
+static void nbd_restart_write(void *opaque);
+
+static int nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
int len)
{
NBDClient *client = req->client;
int csock = client->sock;
int rc, ret;
+ qemu_co_mutex_lock(&client->send_lock);
+ qemu_set_fd_handler2(csock, NULL, nbd_read, nbd_restart_write, client);
+ client->send_coroutine = qemu_coroutine_self();
+
if (!len) {
rc = nbd_send_reply(csock, reply);
if (rc == -1) {
@@ -699,7 +713,7 @@ static int nbd_do_send_reply(NBDRequest *req, struct
nbd_reply *reply,
socket_set_cork(csock, 1);
rc = nbd_send_reply(csock, reply);
if (rc != -1) {
- ret = write_sync(csock, req->data, len);
+ ret = qemu_co_send(csock, req->data, len);
if (ret != len) {
errno = EIO;
rc = -1;
@@ -710,15 +724,20 @@ static int nbd_do_send_reply(NBDRequest *req, struct
nbd_reply *reply,
}
socket_set_cork(csock, 0);
}
+
+ client->send_coroutine = NULL;
+ qemu_set_fd_handler2(csock, NULL, nbd_read, NULL, client);
+ qemu_co_mutex_unlock(&client->send_lock);
return rc;
}
-static int nbd_do_receive_request(NBDRequest *req, struct nbd_request *request)
+static int nbd_co_receive_request(NBDRequest *req, struct nbd_request *request)
{
NBDClient *client = req->client;
int csock = client->sock;
int rc;
+ client->recv_coroutine = qemu_coroutine_self();
if (nbd_receive_request(csock, request) == -1) {
rc = -EIO;
goto out;
@@ -743,7 +762,7 @@ static int nbd_do_receive_request(NBDRequest *req, struct
nbd_request *request)
if ((request->type & NBD_CMD_MASK_COMMAND) == NBD_CMD_WRITE) {
TRACE("Reading %u byte(s)", request->len);
- if (read_sync(csock, req->data, request->len) != request->len) {
+ if (qemu_co_recv(csock, req->data, request->len) != request->len) {
LOG("reading from socket failed");
rc = -EIO;
goto out;
@@ -752,21 +771,22 @@ static int nbd_do_receive_request(NBDRequest *req, struct
nbd_request *request)
rc = 0;
out:
+ client->recv_coroutine = NULL;
return rc;
}
-static int nbd_trip(NBDClient *client)
+static void nbd_trip(void *opaque)
{
+ NBDClient *client = opaque;
NBDRequest *req = nbd_request_get(client);
NBDExport *exp = client->exp;
struct nbd_request request;
struct nbd_reply reply;
- int rc = -1;
int ret;
TRACE("Reading request.");
- ret = nbd_do_receive_request(req, &request);
+ ret = nbd_co_receive_request(req, &request);
if (ret == -EIO) {
goto out;
}
@@ -801,7 +821,7 @@ static int nbd_trip(NBDClient *client)
}
TRACE("Read %u byte(s)", request.len);
- if (nbd_do_send_reply(req, &reply, request.len) < 0)
+ if (nbd_co_send_reply(req, &reply, request.len) < 0)
goto out;
break;
case NBD_CMD_WRITE:
@@ -832,13 +852,13 @@ static int nbd_trip(NBDClient *client)
}
}
- if (nbd_do_send_reply(req, &reply, 0) < 0)
+ if (nbd_co_send_reply(req, &reply, 0) < 0)
goto out;
break;
case NBD_CMD_DISC:
TRACE("Request type is DISCONNECT");
errno = 0;
- return 1;
+ goto out;
case NBD_CMD_FLUSH:
TRACE("Request type is FLUSH");
@@ -848,7 +868,7 @@ static int nbd_trip(NBDClient *client)
reply.error = -ret;
}
- if (nbd_do_send_reply(req, &reply, 0) < 0)
+ if (nbd_co_send_reply(req, &reply, 0) < 0)
goto out;
break;
case NBD_CMD_TRIM:
@@ -859,7 +879,7 @@ static int nbd_trip(NBDClient *client)
LOG("discard failed");
reply.error = -ret;
}
- if (nbd_do_send_reply(req, &reply, 0) < 0)
+ if (nbd_co_send_reply(req, &reply, 0) < 0)
goto out;
break;
default:
@@ -867,28 +887,39 @@ static int nbd_trip(NBDClient *client)
invalid_request:
reply.error = -EINVAL;
error_reply:
- if (nbd_do_send_reply(req, &reply, 0) == -1)
+ if (nbd_co_send_reply(req, &reply, 0) == -1)
goto out;
break;
}
TRACE("Request/Reply complete");
- rc = 0;
+ nbd_request_put(req);
+ return;
+
out:
nbd_request_put(req);
- return rc;
+ nbd_client_close(client);
}
static void nbd_read(void *opaque)
{
NBDClient *client = opaque;
- if (nbd_trip(client) != 0) {
- nbd_client_close(client);
+ if (client->recv_coroutine) {
+ qemu_coroutine_enter(client->recv_coroutine, NULL);
+ } else {
+ qemu_coroutine_enter(qemu_coroutine_create(nbd_trip), client);
}
}
+static void nbd_restart_write(void *opaque)
+{
+ NBDClient *client = opaque;
+
+ qemu_coroutine_enter(client->send_coroutine, NULL);
+}
+
NBDClient *nbd_client_new(NBDExport *exp, int csock,
void (*close)(NBDClient *))
{
@@ -901,6 +932,7 @@ NBDClient *nbd_client_new(NBDExport *exp, int csock,
client->exp = exp;
client->sock = csock;
client->close = close;
+ qemu_co_mutex_init(&client->send_lock);
qemu_set_fd_handler2(csock, NULL, nbd_read, NULL, client);
return client;
}
--
1.7.6
- [Qemu-devel] [PATCH 00/15] NBD server improvements, Paolo Bonzini, 2011/10/10
- [Qemu-devel] [PATCH 01/15] qemu-nbd: remove offset argument to nbd_trip, Paolo Bonzini, 2011/10/10
- [Qemu-devel] [PATCH 02/15] qemu-nbd: remove data_size argument to nbd_trip, Paolo Bonzini, 2011/10/10
- [Qemu-devel] [PATCH 03/15] move corking functions to osdep.c, Paolo Bonzini, 2011/10/10
- [Qemu-devel] [PATCH 04/15] qemu-nbd: simplify nbd_trip, Paolo Bonzini, 2011/10/10
- [Qemu-devel] [PATCH 06/15] qemu-nbd: more robust handling of invalid requests, Paolo Bonzini, 2011/10/10
- [Qemu-devel] [PATCH 11/15] qemu-nbd: use common main loop, Paolo Bonzini, 2011/10/10
- [Qemu-devel] [PATCH 07/15] qemu-nbd: introduce nbd_do_receive_request, Paolo Bonzini, 2011/10/10
- [Qemu-devel] [PATCH 14/15] qemu-nbd: asynchronous operation,
Paolo Bonzini <=
- [Qemu-devel] [PATCH 08/15] qemu-nbd: introduce NBDExport, Paolo Bonzini, 2011/10/10
- [Qemu-devel] [PATCH 05/15] qemu-nbd: introduce nbd_do_send_reply, Paolo Bonzini, 2011/10/10
- [Qemu-devel] [PATCH 12/15] qemu-nbd: move client handling to nbd.c, Paolo Bonzini, 2011/10/10
- [Qemu-devel] [PATCH 10/15] link the main loop and its dependencies into the tools, Paolo Bonzini, 2011/10/10
- [Qemu-devel] [PATCH 13/15] qemu-nbd: add client pointer to NBDRequest, Paolo Bonzini, 2011/10/10
- [Qemu-devel] [PATCH 09/15] qemu-nbd: introduce NBDRequest, Paolo Bonzini, 2011/10/10
- [Qemu-devel] [PATCH 15/15] qemu-nbd: throttle requests, Paolo Bonzini, 2011/10/10
- Re: [Qemu-devel] [PATCH 00/15] NBD server improvements, Paolo Bonzini, 2011/10/11