[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [PATCH 1/3] sheepdog: make send/recv operations non-blockin
From: |
MORITA Kazutaka |
Subject: |
[Qemu-devel] [PATCH 1/3] sheepdog: make send/recv operations non-blocking |
Date: |
Tue, 29 Mar 2011 21:13:06 +0900 |
This patch avoids retrying send/recv in AIO path when the sheepdog
connection is not ready for the operation.
Signed-off-by: MORITA Kazutaka <address@hidden>
---
block/sheepdog.c | 417 +++++++++++++++++++++++++++++++++++++-----------------
1 files changed, 289 insertions(+), 128 deletions(-)
diff --git a/block/sheepdog.c b/block/sheepdog.c
index a54e0de..cedf806 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -242,6 +242,19 @@ static inline int is_snapshot(struct SheepdogInode *inode)
typedef struct SheepdogAIOCB SheepdogAIOCB;
+enum ConnectionState {
+ C_IO_HEADER,
+ C_IO_DATA,
+ C_IO_END,
+ C_IO_CLOSED,
+};
+
+enum AIOReqState {
+ AIO_PENDING, /* not ready for sending this request */
+ AIO_SEND_OBJREQ, /* send this request */
+ AIO_RECV_OBJREQ, /* receive a result of this request */
+};
+
typedef struct AIOReq {
SheepdogAIOCB *aiocb;
unsigned int iov_offset;
@@ -253,6 +266,9 @@ typedef struct AIOReq {
uint8_t flags;
uint32_t id;
+ enum AIOReqState state;
+ struct SheepdogObjReq hdr;
+
QLIST_ENTRY(AIOReq) outstanding_aio_siblings;
QLIST_ENTRY(AIOReq) aioreq_siblings;
} AIOReq;
@@ -348,12 +364,14 @@ static const char * sd_strerror(int err)
* 1. In the sd_aio_readv/writev, read/write requests are added to the
* QEMU Bottom Halves.
*
- * 2. In sd_readv_writev_bh_cb, the callbacks of BHs, we send the I/O
- * requests to the server and link the requests to the
- * outstanding_list in the BDRVSheepdogState. we exits the
- * function without waiting for receiving the response.
+ * 2. In sd_readv_writev_bh_cb, the callbacks of BHs, we set up the
+ * I/O requests to the server and link the requests to the
+ * outstanding_list in the BDRVSheepdogState.
+ *
+ * 3. We send the request in aio_send_request, the fd handler to the
+ * sheepdog connection.
*
- * 3. We receive the response in aio_read_response, the fd handler to
+ * 4. We receive the response in aio_read_response, the fd handler to
* the sheepdog connection. If metadata update is needed, we send
* the write request to the vdi object in sd_write_done, the write
* completion function. The AIOCB callback is not called until all
@@ -377,8 +395,6 @@ static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s,
SheepdogAIOCB *acb,
aio_req->flags = flags;
aio_req->id = s->aioreq_seq_num++;
- QLIST_INSERT_HEAD(&s->outstanding_aio_head, aio_req,
- outstanding_aio_siblings);
QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings);
return aio_req;
@@ -640,20 +656,17 @@ static int do_readv_writev(int sockfd, struct iovec *iov,
int len,
again:
ret = do_send_recv(sockfd, iov, len, iov_offset, write);
if (ret < 0) {
- if (errno == EINTR || errno == EAGAIN) {
+ if (errno == EINTR) {
goto again;
}
+ if (errno == EAGAIN) {
+ return 0;
+ }
error_report("failed to recv a rsp, %s\n", strerror(errno));
- return 1;
- }
-
- iov_offset += ret;
- len -= ret;
- if (len) {
- goto again;
+ return -errno;
}
- return 0;
+ return ret;
}
static int do_readv(int sockfd, struct iovec *iov, int len, int iov_offset)
@@ -666,30 +679,30 @@ static int do_writev(int sockfd, struct iovec *iov, int
len, int iov_offset)
return do_readv_writev(sockfd, iov, len, iov_offset, 1);
}
-static int do_read_write(int sockfd, void *buf, int len, int write)
+static int do_read_write(int sockfd, void *buf, int len, int skip, int write)
{
struct iovec iov;
iov.iov_base = buf;
- iov.iov_len = len;
+ iov.iov_len = len + skip;
- return do_readv_writev(sockfd, &iov, len, 0, write);
+ return do_readv_writev(sockfd, &iov, len, skip, write);
}
-static int do_read(int sockfd, void *buf, int len)
+static int do_read(int sockfd, void *buf, int len, int skip)
{
- return do_read_write(sockfd, buf, len, 0);
+ return do_read_write(sockfd, buf, len, skip, 0);
}
-static int do_write(int sockfd, void *buf, int len)
+static int do_write(int sockfd, void *buf, int len, int skip)
{
- return do_read_write(sockfd, buf, len, 1);
+ return do_read_write(sockfd, buf, len, skip, 1);
}
static int send_req(int sockfd, SheepdogReq *hdr, void *data,
unsigned int *wlen)
{
- int ret;
+ int ret, done = 0;
struct iovec iov[2];
iov[0].iov_base = hdr;
@@ -700,19 +713,23 @@ static int send_req(int sockfd, SheepdogReq *hdr, void
*data,
iov[1].iov_len = *wlen;
}
- ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen, 0);
- if (ret) {
- error_report("failed to send a req, %s\n", strerror(errno));
- ret = -1;
+ while (done < sizeof(*hdr) + *wlen) {
+ ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen - done, done);
+ if (ret < 0) {
+ error_report("failed to send a req, %s\n", strerror(errno));
+ ret = -1;
+ }
+ done += ret;
}
- return ret;
+ return 0;
}
+/* This function shouldn't be used for asynchronous I/O */
static int do_req(int sockfd, SheepdogReq *hdr, void *data,
unsigned int *wlen, unsigned int *rlen)
{
- int ret;
+ int ret, done;
ret = send_req(sockfd, hdr, data, wlen);
if (ret) {
@@ -720,33 +737,39 @@ static int do_req(int sockfd, SheepdogReq *hdr, void
*data,
goto out;
}
- ret = do_read(sockfd, hdr, sizeof(*hdr));
- if (ret) {
- error_report("failed to get a rsp, %s\n", strerror(errno));
- ret = -1;
- goto out;
+ done = 0;
+ while (done < sizeof(*hdr)) {
+ ret = do_read(sockfd, hdr, sizeof(*hdr) - done, done);
+ if (ret < 0) {
+ error_report("failed to get a rsp, %s\n", strerror(errno));
+ ret = -1;
+ goto out;
+ }
+ done += ret;
}
if (*rlen > hdr->data_length) {
*rlen = hdr->data_length;
}
- if (*rlen) {
- ret = do_read(sockfd, data, *rlen);
- if (ret) {
+ done = 0;
+ while (done < *rlen) {
+ ret = do_read(sockfd, data, *rlen - done, done);
+ if (ret < 0) {
error_report("failed to get the data, %s\n", strerror(errno));
ret = -1;
goto out;
}
+ done += ret;
}
ret = 0;
out:
return ret;
}
-static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
- struct iovec *iov, int niov, int create,
- enum AIOCBState aiocb_type);
+static void setup_aio_header(BDRVSheepdogState *s, AIOReq *aio_req, int create,
+ enum AIOCBState aiocb_type);
+static int sd_update_fd_handler(BDRVSheepdogState *s);
/*
* This function searchs pending requests to the object `oid', and
@@ -756,10 +779,12 @@ static void send_pending_req(BDRVSheepdogState *s,
uint64_t oid, uint32_t id)
{
AIOReq *aio_req, *next;
SheepdogAIOCB *acb;
- int ret;
QLIST_FOREACH_SAFE(aio_req, &s->outstanding_aio_head,
outstanding_aio_siblings, next) {
+ if (aio_req->state != AIO_PENDING) {
+ continue;
+ }
if (id == aio_req->id) {
continue;
}
@@ -768,15 +793,9 @@ static void send_pending_req(BDRVSheepdogState *s,
uint64_t oid, uint32_t id)
}
acb = aio_req->aiocb;
- ret = add_aio_request(s, aio_req, acb->qiov->iov,
- acb->qiov->niov, 0, acb->aiocb_type);
- if (ret < 0) {
- error_report("add_aio_request is failed\n");
- free_aio_req(s, aio_req);
- if (QLIST_EMPTY(&acb->aioreq_head)) {
- sd_finish_aiocb(acb);
- }
- }
+ aio_req->state = AIO_SEND_OBJREQ;
+ setup_aio_header(s, aio_req, 0, acb->aiocb_type);
+ sd_update_fd_handler(s);
}
}
@@ -788,38 +807,92 @@ static void send_pending_req(BDRVSheepdogState *s,
uint64_t oid, uint32_t id)
*/
static void aio_read_response(void *opaque)
{
- SheepdogObjRsp rsp;
+ static SheepdogObjRsp rsp;
BDRVSheepdogState *s = opaque;
int fd = s->fd;
int ret;
- AIOReq *aio_req = NULL;
- SheepdogAIOCB *acb;
+ static AIOReq *aio_req;
+ static SheepdogAIOCB *acb;
int rest;
unsigned long idx;
+ static int done;
+ static enum ConnectionState conn_state = C_IO_HEADER;
if (QLIST_EMPTY(&s->outstanding_aio_head)) {
return;
}
- /* read a header */
- ret = do_read(fd, &rsp, sizeof(rsp));
- if (ret) {
- error_report("failed to get the header, %s\n", strerror(errno));
- return;
- }
+ switch (conn_state) {
+ case C_IO_HEADER:
+ /* read a header */
+ ret = do_read(fd, &rsp, sizeof(rsp) - done, done);
+ if (ret < 0) {
+ error_report("failed to get the header, %s\n", strerror(errno));
+ conn_state = C_IO_CLOSED;
+ break;
+ }
+ done += ret;
+ if (done < sizeof(rsp)) {
+ break;
+ }
+ done = 0;
- /* find the right aio_req from the outstanding_aio list */
- QLIST_FOREACH(aio_req, &s->outstanding_aio_head, outstanding_aio_siblings)
{
- if (aio_req->id == rsp.id) {
+ /* find the right aio_req from the outstanding_aio list */
+ QLIST_FOREACH(aio_req, &s->outstanding_aio_head,
+ outstanding_aio_siblings) {
+ if (aio_req->state == AIO_RECV_OBJREQ && aio_req->id == rsp.id) {
+ break;
+ }
+ }
+ if (!aio_req) {
+ error_report("bug: cannot find aio_req %x\n", rsp.id);
+ return;
+ }
+ acb = aio_req->aiocb;
+
+ if (rsp.result != SD_RES_SUCCESS) {
+ acb->ret = -EIO;
+ error_report("%s\n", sd_strerror(rsp.result));
+ conn_state = C_IO_END;
break;
}
+
+ if (acb->aiocb_type == AIOCB_WRITE_UDATA) {
+ conn_state = C_IO_END;
+ break;
+ }
+ conn_state = C_IO_DATA;
+ case C_IO_DATA:
+ ret = do_readv(fd, acb->qiov->iov, aio_req->data_len - done,
+ aio_req->iov_offset + done);
+ if (ret < 0) {
+ error_report("failed to get the data, %s\n", strerror(errno));
+ conn_state = C_IO_CLOSED;
+ }
+
+ done += ret;
+ if (done < aio_req->data_len) {
+ break;
+ }
+ done = 0;
+ conn_state = C_IO_END;
+ break;
+ default:
+ error_report("bug: invalid rx state %d", conn_state);
+ break;
}
- if (!aio_req) {
- error_report("cannot find aio_req %x\n", rsp.id);
- return;
+
+ if (conn_state == C_IO_CLOSED) {
+ acb->ret = -EIO;
+ rest = free_aio_req(s, aio_req);
+ if (!rest) {
+ acb->aio_done_func(acb);
+ }
}
- acb = aio_req->aiocb;
+ if (conn_state != C_IO_END) {
+ return;
+ }
switch (acb->aiocb_type) {
case AIOCB_WRITE_UDATA:
@@ -848,19 +921,10 @@ static void aio_read_response(void *opaque)
}
break;
case AIOCB_READ_UDATA:
- ret = do_readv(fd, acb->qiov->iov, rsp.data_length,
- aio_req->iov_offset);
- if (ret) {
- error_report("failed to get the data, %s\n", strerror(errno));
- return;
- }
break;
}
- if (rsp.result != SD_RES_SUCCESS) {
- acb->ret = -EIO;
- error_report("%s\n", sd_strerror(rsp.result));
- }
+ conn_state = C_IO_HEADER;
rest = free_aio_req(s, aio_req);
if (!rest) {
@@ -870,6 +934,8 @@ static void aio_read_response(void *opaque)
*/
acb->aio_done_func(acb);
}
+
+ sd_update_fd_handler(s);
}
static int aio_flush_request(void *opaque)
@@ -905,6 +971,129 @@ static int set_nodelay(int fd)
}
/*
+ * Send I/O requests.
+ *
+ * This function is registered as a fd handler, and called from the
+ * main loop when s->fd is ready for sending requests.
+ */
+static void aio_send_request(void *opaque)
+{
+ BDRVSheepdogState *s = opaque;
+ int ret, rest;
+ AIOReq *areq;
+ static AIOReq *aio_req;
+ static SheepdogAIOCB *acb;
+ static int done;
+ static enum ConnectionState conn_state = C_IO_HEADER;
+
+ set_cork(s->fd, 1);
+
+ switch (conn_state) {
+ case C_IO_HEADER:
+ if (!aio_req) {
+ /* find the oldest aio_req from the outstanding_aio list */
+ QLIST_FOREACH(areq, &s->outstanding_aio_head,
+ outstanding_aio_siblings) {
+ if (areq->state == AIO_SEND_OBJREQ) {
+ aio_req = areq;
+ }
+ }
+ if (!aio_req) {
+ error_report("bug: cannot find aio_req to be send");
+ return;
+ }
+ acb = aio_req->aiocb;
+ }
+
+ /* send a header */
+ ret = do_write(s->fd, &aio_req->hdr, sizeof(aio_req->hdr) - done,
done);
+ if (ret < 0) {
+ error_report("failed to send a req, %s\n", strerror(errno));
+ conn_state = C_IO_CLOSED;
+ break;
+ }
+ done += ret;
+ if (done < sizeof(aio_req->hdr)) {
+ break;
+ }
+ done = 0;
+ if (acb->aiocb_type == AIOCB_READ_UDATA) {
+ conn_state = C_IO_END;
+ break;
+ }
+ conn_state = C_IO_DATA;
+ case C_IO_DATA:
+ if (is_data_obj(aio_req->oid)) {
+ ret = do_writev(s->fd, acb->qiov->iov, aio_req->data_len - done,
+ aio_req->iov_offset + done);
+ } else {
+ ret = do_write(s->fd, (uint8_t *)&s->inode + aio_req->offset,
+ aio_req->data_len - done, done);
+ }
+ if (ret < 0) {
+ error_report("failed to send a data, %s\n", strerror(errno));
+ conn_state = C_IO_CLOSED;
+ }
+ done += ret;
+ if (done < aio_req->data_len) {
+ break;
+ }
+ done = 0;
+ conn_state = C_IO_END;
+ break;
+ default:
+ error_report("bug: invalid tx state %d", conn_state);
+ break;
+ }
+
+ set_cork(s->fd, 0);
+
+ if (conn_state == C_IO_CLOSED) {
+ acb->ret = -EIO;
+ rest = free_aio_req(s, aio_req);
+ if (!rest) {
+ /*
+ * We've finished all requests which belong to the AIOCB, so
+ * we can call the callback now.
+ */
+ acb->aio_done_func(acb);
+ }
+ }
+
+ if (conn_state != C_IO_END) {
+ return;
+ }
+
+ aio_req->state = AIO_RECV_OBJREQ;
+
+ conn_state = C_IO_HEADER;
+ aio_req = NULL;
+
+ sd_update_fd_handler(s);
+}
+
+/*
+ * Check outstanding requests and set proper fd handlers
+ */
+static int sd_update_fd_handler(BDRVSheepdogState *s)
+{
+ IOHandler *io_read = NULL, *io_write = NULL;
+ AIOReq *areq;
+
+ QLIST_FOREACH(areq, &s->outstanding_aio_head, outstanding_aio_siblings) {
+ if (areq->state == AIO_SEND_OBJREQ) {
+ io_write = aio_send_request;
+ }
+ if (areq->state == AIO_RECV_OBJREQ) {
+ io_read = aio_read_response;
+ }
+ }
+
+ return qemu_aio_set_fd_handler(s->fd, io_read, io_write,
+ aio_flush_request, NULL, s);
+}
+
+/*
* Return a socket discriptor to read/write objects.
*
* We cannot use this discriptor for other operations because
@@ -929,8 +1118,6 @@ static int get_sheep_fd(BDRVSheepdogState *s)
return -1;
}
- qemu_aio_set_fd_handler(fd, aio_read_response, NULL, aio_flush_request,
- NULL, s);
return fd;
}
@@ -1053,14 +1240,12 @@ out:
return ret;
}
-static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
- struct iovec *iov, int niov, int create,
- enum AIOCBState aiocb_type)
+static void setup_aio_header(BDRVSheepdogState *s, AIOReq *aio_req, int create,
+ enum AIOCBState aiocb_type)
{
int nr_copies = s->inode.nr_copies;
SheepdogObjReq hdr;
unsigned int wlen;
- int ret;
uint64_t oid = aio_req->oid;
unsigned int datalen = aio_req->data_len;
uint64_t offset = aio_req->offset;
@@ -1096,26 +1281,7 @@ static int add_aio_request(BDRVSheepdogState *s, AIOReq
*aio_req,
hdr.id = aio_req->id;
- set_cork(s->fd, 1);
-
- /* send a header */
- ret = do_write(s->fd, &hdr, sizeof(hdr));
- if (ret) {
- error_report("failed to send a req, %s\n", strerror(errno));
- return -EIO;
- }
-
- if (wlen) {
- ret = do_writev(s->fd, iov, wlen, aio_req->iov_offset);
- if (ret) {
- error_report("failed to send a data, %s\n", strerror(errno));
- return -EIO;
- }
- }
-
- set_cork(s->fd, 0);
-
- return 0;
+ memcpy(&aio_req->hdr, &hdr, sizeof(hdr));
}
static int read_write_object(int fd, char *buf, uint64_t oid, int copies,
@@ -1440,9 +1606,7 @@ static int sd_truncate(BlockDriverState *bs, int64_t
offset)
*/
static void sd_write_done(SheepdogAIOCB *acb)
{
- int ret;
BDRVSheepdogState *s = acb->common.bs->opaque;
- struct iovec iov;
AIOReq *aio_req;
uint32_t offset, data_len, mn, mx;
@@ -1457,22 +1621,20 @@ static void sd_write_done(SheepdogAIOCB *acb)
s->min_dirty_data_idx = UINT32_MAX;
s->max_dirty_data_idx = 0;
- iov.iov_base = &s->inode;
- iov.iov_len = sizeof(s->inode);
aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
data_len, offset, 0, 0, offset);
- ret = add_aio_request(s, aio_req, &iov, 1, 0, AIOCB_WRITE_UDATA);
- if (ret) {
- free_aio_req(s, aio_req);
- acb->ret = -EIO;
- goto out;
- }
+ aio_req->state = AIO_SEND_OBJREQ;
+ setup_aio_header(s, aio_req, 0, AIOCB_WRITE_UDATA);
acb->aio_done_func = sd_finish_aiocb;
acb->aiocb_type = AIOCB_WRITE_UDATA;
+
+ QLIST_INSERT_HEAD(&s->outstanding_aio_head, aio_req,
+ outstanding_aio_siblings);
+ sd_update_fd_handler(s);
return;
}
-out:
+
sd_finish_aiocb(acb);
}
@@ -1525,13 +1687,12 @@ out:
}
/*
- * Send I/O requests to the server.
+ * Set up I/O requests
*
- * This function sends requests to the server, links the requests to
- * the outstanding_list in BDRVSheepdogState, and exits without
- * waiting the response. The responses are received in the
- * `aio_read_response' function which is called from the main loop as
- * a fd handler.
+ * This function creates asynchronous I/O requests and links them to
+ * the outstanding_list in BDRVSheepdogState. The requests are sent
+ * in the `aio_send_requests' function which is called from the main
+ * loop as a fd handler.
*/
static void sd_readv_writev_bh_cb(void *p)
{
@@ -1553,6 +1714,7 @@ static void sd_readv_writev_bh_cb(void *p)
* In the case we open the snapshot VDI, Sheepdog creates the
* writable VDI when we do a write operation first.
*/
+ /* FIXME: we shouldn't block here */
ret = sd_create_branch(s);
if (ret) {
acb->ret = -EIO;
@@ -1592,6 +1754,9 @@ static void sd_readv_writev_bh_cb(void *p)
}
aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, old_oid,
done);
+ aio_req->state = AIO_SEND_OBJREQ;
+ QLIST_INSERT_HEAD(&s->outstanding_aio_head, aio_req,
+ outstanding_aio_siblings);
if (create) {
AIOReq *areq;
@@ -1609,19 +1774,13 @@ static void sd_readv_writev_bh_cb(void *p)
*/
aio_req->flags = 0;
aio_req->base_oid = 0;
+ aio_req->state = AIO_PENDING;
goto done;
}
}
}
- ret = add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
- create, acb->aiocb_type);
- if (ret < 0) {
- error_report("add_aio_request is failed\n");
- free_aio_req(s, aio_req);
- acb->ret = -EIO;
- goto out;
- }
+ setup_aio_header(s, aio_req, create, acb->aiocb_type);
done:
offset = 0;
idx++;
@@ -1631,6 +1790,8 @@ out:
if (QLIST_EMPTY(&acb->aioreq_head)) {
sd_finish_aiocb(acb);
}
+
+ sd_update_fd_handler(s);
}
static BlockDriverAIOCB *sd_aio_writev(BlockDriverState *bs, int64_t
sector_num,
--
1.5.6.5