[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] Re: [PATCH] NBD: Convert the NBD driver to use the AIO inte
From: |
Kevin Wolf |
Subject: |
[Qemu-devel] Re: [PATCH] NBD: Convert the NBD driver to use the AIO interface. |
Date: |
Wed, 06 Apr 2011 13:30:19 +0200 |
User-agent: |
Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.1.15) Gecko/20101027 Fedora/3.0.10-1.fc12 Thunderbird/3.0.10 |
Am 24.02.2011 17:49, schrieb Nick Thomas:
> This preserves the previous behaviour where the NBD server is
> unavailable or goes away during guest execution, but switches the
> NBD backend to present the AIO interface instead of the sync IO
> interface.
>
> We also split read & write requests into 1 MiB blocks (minus header).
> This is a hard limit in the NBD servers (including qemu-nbd), but
> never seemed to come up with the previous backend code.
>
> All IO except setup and teardown is asynchronous, and we (in theory)
> handle canceled requests properly too.
>
> Signed-off-by: Nick Thomas <address@hidden>
Actually, the patch doesn't even apply. So just a quick review...
> block/nbd.c | 652 +++++++++++++++++++++++++++++++++++++++++++++++++++++-----
> nbd.c | 76 +++++--
> nbd.h | 13 +-
> 3 files changed, 662 insertions(+), 79 deletions(-)
>
> diff --git a/block/nbd.c b/block/nbd.c
> index 1d6b225..d6594e4 100644
> --- a/block/nbd.c
> +++ b/block/nbd.c
> @@ -34,7 +34,12 @@
> #include <sys/types.h>
> #include <unistd.h>
>
> -#define EN_OPTSTR ":exportname="
> +#define EN_OPTSTR ":exportname="
> +#define SECTOR_SIZE 512
> +
> +/* 1MiB minus header size */
> +#define NBD_MAX_READ ((1024*1024) - sizeof(struct nbd_reply))
> +#define NBD_MAX_WRITE ((1024*1024) - sizeof(struct nbd_request))
>
> /* #define DEBUG_NBD */
>
> @@ -45,17 +50,148 @@
> #define logout(fmt, ...) ((void)0)
> #endif
>
> -typedef struct BDRVNBDState {
> +/*
> + * Here's how the I/O works.
> + * qemu creates a BDRVNBDState for us, which is the context for all reads
> + * and writes.
> + *
> + * nbd_open is called to connect to the NBD server and set up on-read and
> + * on-write handlers (aio_read_response, aio_write_request).
> + *
> + * nbd_aio_readv/writev, called by qemu, create an NBDAIOCB (representing
> the
> + * I/O request to qemu).
> + * For read requests, read/writev creates a single AIOReq containing the NBD
> + * header. For write requests, 1 or more AIOReqs are created, containing the
> + * NBD header and the write data. These are pushed to reqs_to_send_head in
> the
> + * BDRVNBDState and the list in the NBDAIOCB.
> + *
> + * Each time aio_write_request is called by qemu, it gets the first AIOReq
> + * in the reqs_to_send_head and writes the data to the socket.
> + * If this results in the whole AIOReq being written to the socket, it moves
> + * the AIOReq to the reqs_for_reply_head in the BDRVNBDState. If the AIOReq
> + * isn't finished, then it's left where it is. to have more of it written
> + * next callback.
> + *
> + * If there's an unrecoverable error writing to the socket, we disconnect
> and
> + * return the entire acb as -EIO
> + *
> + * Each aio_read_response, we check the BDRVNBDState's current_req attribute
> + * to see if we're in the middle of a read. If not, we read a header's
> worth of
> + * data, then try to find an AIOReq in the reqs_for_reply_head.
> + *
> + * If we don't find one, then we're in a weird error state.
> + *
> + * Once we have our AIOReq, we remove it from reqs_for_reply_head and put it
> + * in the current_req attribute, then read from the socket to the buffer (if
> + * needed). If that completes the AIOReq, we clear the current_req attribute
> + * and deallocate the AIOReq.
> + * If the AIOReq is complete, and that's the last one for the NBDAIOCB, we
> call
> + * the 'done' callback' and return.
> + * If the AIOReq isn't complete, we just return. It'll be completed in
> future
> + * callbacks, since it's now the current_req
> + *
> + * If there'an unrecoverable error reading from the socket, [...]
Something's missing here? ;-)
> + */
> +
> +typedef struct NBDAIOCB NBDAIOCB;
> +typedef struct BDRVNBDState BDRVNBDState;
> +
> +static int nbd_establish_connection(BDRVNBDState *s);
> +static void nbd_teardown_connection(BDRVNBDState *s);
> +
> +typedef struct AIOReq {
> + NBDAIOCB *aiocb; /* Which QEMU operation this belongs to */
> +
> + /* Where on the NBDAIOCB's iov does this request start? */
> + off_t iov_offset;
> +
> + /* The NBD request header pertaining to this AIOReq.
> + * This specifies the handle of the request, the read offset and length.
> + */
> + NBDRequest nbd_req_hdr;
> +
> + /* How many bytes have been written to the NBD server so far. This will
> + * vary between 0 and sizeof(nbd_req_hdr) + nbd_req_hdr.len
> + */
> + size_t bytes_sent;
> +
> + /* How many bytes have been read from the NBD server so far. Varies
> between
> + * 0 and sizeof(nbd_rsp_hdr) + nbd_req_hdr.len
> + */
> + size_t bytes_got;
> +
> + /* Used to record this in the state object. waiting_sent is used to work
> + * out which queue the AIOReq is in. Before it's been sent, it's in the
> + * reqs_to_send_head. After being sent, if it's not current_req, it's in
> + * reqs_for_reply_head.
> + */
> + QTAILQ_ENTRY(AIOReq) socket_siblings;
> + bool waiting_sent;
> +
> + /* Used to enter this into an NBDAIOCB */
> + QLIST_ENTRY(AIOReq) aioreq_siblings;
> +} AIOReq;
> +
> +struct BDRVNBDState {
> + /* File descriptor for the socket to the NBD server */
> int sock;
> +
> + /* Size of the file being served */
> off_t size;
> +
> + /* block size */
> size_t blocksize;
> - char *export_name; /* An NBD server may export several devices */
>
> /* If it begins with '/', this is a UNIX domain socket. Otherwise,
> * it's a string of the form <hostname|ip4|\[ip6\]>:port
> */
> char *host_spec;
> -} BDRVNBDState;
> +
> + /* An NBD server may export several devices - this is the one we want*/
> + char *export_name;
> +
> + /* Used to generate unique NBD handles */
> + uint64_t aioreq_seq_num;
> +
> + /* AIOReqs yet to be transmitted */
> + QTAILQ_HEAD(reqs_to_send_head, AIOReq) reqs_to_send_head;
> +
> + /* AIOReqs that have been transmitted and are awaiting a reply */
> + QTAILQ_HEAD(reqs_for_reply_head, AIOReq) reqs_for_reply_head;
> +
> + /* AIOReq that is currently being read from the socket */
> + AIOReq *current_req;
> +
> + /* Used in aio_read_response. We may need to store received header bytes
> + * between reads - we don't have an AIOReq at that point.
> + */
> + uint8_t nbd_rsp_buf[sizeof(NBDReply)];
> + size_t nbd_rsp_offset;
> +
> +};
> +
> +enum AIOCBState {
> + AIOCB_WRITE_UDATA,
> + AIOCB_READ_UDATA,
> +};
> +
> +struct NBDAIOCB {
> + BlockDriverAIOCB common;
> + QEMUIOVector *qiov;
> + QEMUBH *bh;
> +
> + enum AIOCBState aiocb_type;
> +
> + int64_t sector_num;
> + int nb_sectors;
> + int ret;
> +
> + bool canceled;
> +
> + void (*aio_done_func)(NBDAIOCB *);
> +
> + QLIST_HEAD(aioreq_head, AIOReq) aioreq_head;
> +};
>
> static int nbd_config(BDRVNBDState *s, const char *filename, int flags)
> {
> @@ -103,9 +239,307 @@ out:
> return err;
> }
>
> -static int nbd_establish_connection(BlockDriverState *bs)
> +static inline AIOReq *nbd_alloc_aio_req(BDRVNBDState *s, NBDAIOCB *acb,
> + size_t data_len,
> + off_t offset,
> + off_t iov_offset)
> +{
> + AIOReq *aio_req;
> +
> + aio_req = qemu_malloc(sizeof(*aio_req));
> + aio_req->aiocb = acb;
> + aio_req->iov_offset = iov_offset;
> + aio_req->nbd_req_hdr.from = offset;
> + aio_req->nbd_req_hdr.len = data_len;
> + aio_req->nbd_req_hdr.handle = s->aioreq_seq_num++;
> +
> + if(acb->aiocb_type == AIOCB_READ_UDATA) {
> + aio_req->nbd_req_hdr.type = NBD_CMD_READ;
> + } else {
> + aio_req->nbd_req_hdr.type = NBD_CMD_WRITE;
> + }
> +
> + aio_req->bytes_sent = 0;
> + aio_req->bytes_got = 0;
> +
> + QTAILQ_INSERT_TAIL(&s->reqs_to_send_head, aio_req, socket_siblings);
> + aio_req->waiting_sent = true;
> + QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings);
> + return aio_req;
> +}
> +
> +static int nbd_aio_flush_request(void *opaque)
> +{
> + BDRVNBDState *s = opaque;
> +
> + return !(QTAILQ_EMPTY(&s->reqs_to_send_head) &&
> + QTAILQ_EMPTY(&s->reqs_for_reply_head) &&
> + (s->current_req == NULL));
> +}
> +
> +static inline int free_aio_req(BDRVNBDState *s, AIOReq *aio_req)
> +{
> + NBDAIOCB *acb = aio_req->aiocb;
> + QLIST_REMOVE(aio_req, aioreq_siblings);
> + qemu_free(aio_req);
> +
> + return !QLIST_EMPTY(&acb->aioreq_head);
> +}
> +
> +static void nbd_finish_aiocb(NBDAIOCB *acb)
> +{
> + acb->common.cb(acb->common.opaque, acb->ret);
> + qemu_aio_release(acb);
> +}
> +
> +static void nbd_handle_io_err(BDRVNBDState *s, AIOReq *aio_req, int err)
> +{
> + NBDAIOCB *acb;
> + AIOReq *a;
> +
> + /* These are fine - no need to do anything */
> + if(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
> + return;
> + }
> +
> + /* These errors mean the request failed. So we need to trash the acb
> + * (and all associated AIOReqs) and return -EIO. Partial reads are
> + * fine. Partial writes aren't great, but no worse than (say) a write
> + * to a physical disc that hits a bad sector.
> + */
> + if (aio_req == NULL) {
> + logout("Error %i (%s) on NBD I/O. Killing NBD\n", err,
> strerror(err));
> + } else {
> + acb = aio_req->aiocb;
> + logout("Error %i (%s) on NBD request (handle %lu). Killing NBD\n",
> err,
> + sterror(err), aio_req->nbd_req_hdr.handle);
> + acb->ret = -EIO;
Why don't we use the real error code?
> + QLIST_FOREACH(a, &acb->aioreq_head, aioreq_siblings) {
> + if (aio_req->waiting_sent) {
> + QTAILQ_REMOVE(&s->reqs_to_send_head, aio_req,
> socket_siblings);
> + } else {
> + if (s->current_req == aio_req) {
> + s->current_req = NULL;
> + } else {
> + QTAILQ_REMOVE(&s->reqs_for_reply_head, aio_req,
> + socket_siblings);
> + }
> + }
I think this logic should be part of free_aio_req.
> + free_aio_req(s, a);
> + }
> + nbd_finish_aiocb(acb);
> + }
> +
> + nbd_teardown_connection(s);
And now there's no way to get the disk back to life without a reboot? Do
I understand correctly that now trying to access the disk will always
return -EBADF?
> + return;
Unnecessary return;
> +}
> +
> +static void nbd_aio_write_request(void *opaque)
> +{
> + BDRVNBDState *s = opaque;
> + AIOReq *aio_req = NULL;
> + NBDAIOCB *acb;
> + size_t total;
> + ssize_t ret;
> +
> + if (QTAILQ_EMPTY(&s->reqs_to_send_head)) {
> + return;
> + }
> +
> + aio_req = QTAILQ_FIRST(&s->reqs_to_send_head);
> + acb = aio_req->aiocb;
> +
> + if(acb->aiocb_type == AIOCB_WRITE_UDATA) {
Space after if is missing
> + total = sizeof(NBDRequest) + aio_req->nbd_req_hdr.len;
> + } else {
> + total = sizeof(NBDRequest);
> + }
> +
> + /* Since we've not written (all of) the header yet, get on with it.
> + * We always grab the *head* of the queue in this callback, so we
> + * won't interleave writes to the socket.
> + *
> + * Creating the header buffer on the fly isn't ideal in the case of many
> + * retries, but almost all the time, this will happen exactly once.
> + */
> + if (aio_req->bytes_sent < sizeof(NBDRequest)) {
> + logout("Buffer not written in full, doing so\n");
> + uint8_t buf[sizeof(NBDRequest)];
> + QEMUIOVector hdr;
> + nbd_request_to_buf(&aio_req->nbd_req_hdr, buf);
> + qemu_iovec_init(&hdr, 1);
> + qemu_iovec_add(&hdr, &buf, sizeof(NBDRequest));
> + ret = writev(s->sock, hdr.iov, hdr.niov);
> + qemu_iovec_destroy(&hdr);
> +
> + if (ret == -1) {
> + nbd_handle_io_err(s, aio_req, socket_error());
> + return;
> + } else {
> + logout("Written %zu bytes to socket (request is %zu bytes)\n",
> ret,
> + sizeof(NBDRequest));
> + aio_req->bytes_sent += ret;
> + }
> + }
> +
> + /* If the header is sent & we're doing a write request, send data */
> + if (acb->aiocb_type == AIOCB_WRITE_UDATA &&
> + aio_req->bytes_sent >= sizeof(NBDRequest) &&
> + aio_req->bytes_sent < total) {
> + logout("Write request - putting data in socket\n");
> + off_t offset = (aio_req->bytes_sent - sizeof(NBDRequest)) +
> + aio_req->iov_offset;
> +
> + ret = nbd_wr_aio(s->sock, acb->qiov, total - aio_req->bytes_sent,
> + offset, false);
> +
> + if (ret < 0 ) {
I guess this is the space that was missing above. :-)
> + nbd_handle_io_err(s, aio_req, -ret);
> + return;
> + } else {
> + logout("Written %zu bytes to socket\n", ret);
> + aio_req->bytes_sent += ret;
> + }
> + }
> +
> + /* Request written. aio_read_response gets the reply */
> + if (aio_req->bytes_sent == total) {
> + logout("aio_req written to socket, moving to reqs_for_reply\n");
> + aio_req->waiting_sent = false;
> + QTAILQ_REMOVE(&s->reqs_to_send_head, aio_req, socket_siblings);
> + QTAILQ_INSERT_TAIL(&s->reqs_for_reply_head, aio_req, socket_siblings);
> + }
> +
> + return;
> +}
> +
> +static void nbd_aio_read_response(void *opaque)
> +{
> + BDRVNBDState *s = opaque;
> + AIOReq *aio_req = NULL;
> + NBDAIOCB *acb;
> + NBDReply rsp;
> +
> + size_t total;
> + ssize_t ret;
> + int rest;
> +
> + if (s->current_req == NULL && QTAILQ_EMPTY(&s->reqs_for_reply_head)) {
> + return;
> + }
> +
> + /* Build our nbd_reply object if we've got it */
> + if (s->current_req && (s->nbd_rsp_offset == sizeof(NBDReply))) {
> + nbd_buf_to_reply((uint8_t *)&s->nbd_rsp_buf, &rsp);
> + }
> +
> + if (s->current_req == NULL) {
> + /* Try to read a header */
Factor this whole block out in its own function?
> + QEMUIOVector hdr;
> + qemu_iovec_init(&hdr, 1);
> + qemu_iovec_add(&hdr, ((&s->nbd_rsp_buf) + s->nbd_rsp_offset),
> + (sizeof(NBDReply) - s->nbd_rsp_offset));
> + ret = readv(s->sock, hdr.iov, hdr.niov);
> + qemu_iovec_destroy(&hdr);
> +
> + if (ret == -1) {
> + nbd_handle_io_err(s, aio_req, socket_error());
> + return;
> + }
> +
> + s->nbd_rsp_offset += ret;
> +
> + if (s->nbd_rsp_offset == sizeof(NBDReply)) {
> + /* Turn data into NBDReply, find the matching aio_req */
> + nbd_buf_to_reply((uint8_t *)&s->nbd_rsp_buf, &rsp);
> +
> + /* Check the magic */
> + if (rsp.magic != NBD_REPLY_MAGIC) {
> + logout("Received invalid NBD response magic!\n");
> + nbd_handle_io_err(s, NULL, EIO);
> + return;
> + }
> +
> + QTAILQ_FOREACH(aio_req, &s->reqs_for_reply_head,
> socket_siblings) {
> + if (aio_req->nbd_req_hdr.handle == rsp.handle) {
> + s->current_req = aio_req;
> + s->current_req->bytes_got = sizeof(NBDReply);
> + break;
> + }
> + }
> +
> + if (!s->current_req) {
> + logout("cannot find aio_req for handle %lu\n", rsp.handle);
> + nbd_handle_io_err(s, NULL, EIO);
> + return;
> + }
> + QTAILQ_REMOVE(&s->reqs_for_reply_head, aio_req, socket_siblings);
> + } else {
> + /* We haven't finished reading the entire header yet. */
> + return;
> + }
> + }
> +
> + /* s->current_req and rsp are both usable now */
> + aio_req = s->current_req;
> + acb = aio_req->aiocb;
> +
> + total = sizeof(NBDReply);
> + if (acb->aiocb_type == AIOCB_READ_UDATA) {
> + total += aio_req->nbd_req_hdr.len;
> + }
> +
> + if (acb->aiocb_type == AIOCB_READ_UDATA && aio_req->bytes_got < total) {
> + off_t offset = (aio_req->bytes_got - sizeof(NBDReply)) +
> + aio_req->iov_offset;
> + QEMUIOVector *qiov = acb->qiov;
> + uint8_t *buf = NULL;
> +
> + if (acb->canceled) {
> + buf = qemu_malloc(total - offset);
> + qemu_iovec_init(qiov, 1);
> + qemu_iovec_add(qiov, buf, total - offset);
> + }
> +
> + ret = nbd_wr_aio(s->sock, qiov, total - offset, offset, true);
> +
> + if (acb->canceled) {
> + qemu_iovec_destroy(qiov);
> + qemu_free(buf);
> + }
> +
> + if (ret < 0) {
> + nbd_handle_io_err(s, aio_req, -ret);
> + return;
> + } else {
> + aio_req->bytes_got += ret;
> + }
> + }
> +
> + /* Entire request has been read */
> + if (total == aio_req->bytes_got) {
> + logout("Read all bytes of the response; removing response from
> s->current_req\n");
> + s->nbd_rsp_offset = 0;
> + s->current_req = NULL;
> + if (rsp.error != 0) {
> + acb->ret = -EIO;
> + logout("NBD request resulted in error %i\n", rsp.error);
> + }
> +
> + /* Free the aio_req. If the NBDAIOCB is finished, notify QEMU */
> + rest = free_aio_req(s, aio_req);
> + if (!rest) {
> + logout("acb complete\n");
> + acb->aio_done_func(acb);
> + }
> + }
> +
> + logout("Leaving nbd_aio_read_response\n");
> + return;
Another useless return.
> +}
> +
> +static int nbd_establish_connection(BDRVNBDState *s)
> {
> - BDRVNBDState *s = bs->opaque;
> int sock;
> int ret;
> off_t size;
> @@ -139,23 +573,29 @@ static int nbd_establish_connection(BlockDriverState
> *bs)
> s->sock = sock;
> s->size = size;
> s->blocksize = blocksize;
> + s->nbd_rsp_offset = 0;
> +
> + qemu_aio_set_fd_handler(sock, nbd_aio_read_response,
> nbd_aio_write_request,
> + nbd_aio_flush_request, NULL, s);
>
> logout("Established connection with NBD server\n");
> return 0;
> }
>
> -static void nbd_teardown_connection(BlockDriverState *bs)
> +static void nbd_teardown_connection(BDRVNBDState *s)
> {
> - BDRVNBDState *s = bs->opaque;
> struct nbd_request request;
>
> request.type = NBD_CMD_DISC;
> - request.handle = (uint64_t)(intptr_t)bs;
> + request.handle = s->aioreq_seq_num++;
> request.from = 0;
> request.len = 0;
> nbd_send_request(s->sock, &request);
>
> + qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, NULL);
> closesocket(s->sock);
> + logout("Connection to NBD server closed\n");
> + return;
> }
>
> static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
> @@ -169,95 +609,193 @@ static int nbd_open(BlockDriverState *bs, const char*
> filename, int flags)
> return result;
> }
>
> + QTAILQ_INIT(&s->reqs_to_send_head);
> + QTAILQ_INIT(&s->reqs_for_reply_head);
> +
> + s->current_req = NULL;
> + s->aioreq_seq_num = 0;
> + s->nbd_rsp_offset = 0;
> +
> /* establish TCP connection, return error if it fails
> * TODO: Configurable retry-until-timeout behaviour.
> */
> - result = nbd_establish_connection(bs);
> + result = nbd_establish_connection(s);
>
> return result;
> }
>
> -static int nbd_read(BlockDriverState *bs, int64_t sector_num,
> - uint8_t *buf, int nb_sectors)
> +static void nbd_close(BlockDriverState *bs)
> {
> BDRVNBDState *s = bs->opaque;
> - struct nbd_request request;
> - struct nbd_reply reply;
>
> - request.type = NBD_CMD_READ;
> - request.handle = (uint64_t)(intptr_t)bs;
> - request.from = sector_num * 512;;
> - request.len = nb_sectors * 512;
> + nbd_teardown_connection(s);
> + qemu_free(s->export_name);
> + qemu_free(s->host_spec);
>
> - if (nbd_send_request(s->sock, &request) == -1)
> - return -errno;
> + return;
> +}
>
> - if (nbd_receive_reply(s->sock, &reply) == -1)
> - return -errno;
> +/* We remove all the aiocbs currently sat in reqs_to_send_head (excepting the
> + * first, if any bytes have been transmitted). So we don't need to check for
> + * canceled in aio_write_request at all.If that finishes the acb, we call its
> + * completion function. Otherwise, we leave it alone.
> + *
> + * in nbd_aio_read_response, when we're handling a read request for an acb
> with
> + * canceled = true, we allocate a QEMUIOVector of the appropriate size to do
> + * the read, and throw the bytes away. Everything else goes on as normal.
> + */
> +static void nbd_aio_cancel(BlockDriverAIOCB *blockacb) {
> + NBDAIOCB *acb = (NBDAIOCB *)blockacb;
> + BDRVNBDState *s = acb->common.bs->opaque;
> + AIOReq *a;
> +
> + QLIST_FOREACH(a, &acb->aioreq_head, aioreq_siblings) {
> + if(a->waiting_sent && a->bytes_sent == 0) {
> + QTAILQ_REMOVE(&s->reqs_to_send_head, a, socket_siblings);
> + free_aio_req(s, a);
> + }
> + }
> +
> + acb->canceled = true;
> + acb->ret = -EIO;
> +
> + if QLIST_EMPTY(&acb->aioreq_head) {
> + nbd_finish_aiocb(acb);
> + }
> +}
> +
> +static AIOPool nbd_aio_pool = {
> + .aiocb_size = sizeof(NBDAIOCB),
> + .cancel = nbd_aio_cancel,
> +};
> +
> +static NBDAIOCB *nbd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
> + int64_t sector_num, int nb_sectors,
> + BlockDriverCompletionFunc *cb, void
> *opaque)
> +{
> + NBDAIOCB *acb;
>
> - if (reply.error !=0)
> - return -reply.error;
> + acb = qemu_aio_get(&nbd_aio_pool, bs, cb, opaque);
>
> - if (reply.handle != request.handle)
> + acb->qiov = qiov;
> + acb->sector_num = sector_num;
> + acb->nb_sectors = nb_sectors;
> +
> + acb->canceled = false;
> +
> + acb->aio_done_func = NULL;
> + acb->bh = NULL;
> + acb->ret = 0;
> +
> + QLIST_INIT(&acb->aioreq_head);
> + return acb;
> +}
> +
> +static int nbd_schedule_bh(QEMUBHFunc *cb, NBDAIOCB *acb)
> +{
> + if (acb->bh) {
> + logout("bug: %d %d\n", acb->aiocb_type, acb->aiocb_type);
> return -EIO;
> + }
>
> - if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len)
> + acb->bh = qemu_bh_new(cb, acb);
> + if (!acb->bh) {
> + logout("oom: %d %d\n", acb->aiocb_type, acb->aiocb_type);
> return -EIO;
> + }
> +
> + qemu_bh_schedule(acb->bh);
>
> return 0;
> }
>
> -static int nbd_write(BlockDriverState *bs, int64_t sector_num,
> - const uint8_t *buf, int nb_sectors)
> +static void nbd_readv_writev_bh_cb(void *p)
> {
> - BDRVNBDState *s = bs->opaque;
> - struct nbd_request request;
> - struct nbd_reply reply;
> + NBDAIOCB *acb = p;
>
> - request.type = NBD_CMD_WRITE;
> - request.handle = (uint64_t)(intptr_t)bs;
> - request.from = sector_num * 512;;
> - request.len = nb_sectors * 512;
> + size_t len, done = 0;
> + size_t total = acb->nb_sectors * SECTOR_SIZE;
>
> - if (nbd_send_request(s->sock, &request) == -1)
> - return -errno;
> + /* Where the read/write starts from */
> + off_t offset = acb->sector_num * SECTOR_SIZE;
> + BDRVNBDState *s = acb->common.bs->opaque;
>
> - if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len)
> - return -EIO;
> + AIOReq *aio_req;
> +
> + logout("Entering nbd_readv_writev_bh_cb\n");
>
> - if (nbd_receive_reply(s->sock, &reply) == -1)
> - return -errno;
> + qemu_bh_delete(acb->bh);
> + acb->bh = NULL;
>
> - if (reply.error !=0)
> - return -reply.error;
> + while (done < total) {
> + len = (total - done);
>
> - if (reply.handle != request.handle)
> - return -EIO;
> + /* Split read & write requests into segments if needed */
> + if (acb->aiocb_type == AIOCB_READ_UDATA && len > NBD_MAX_READ) {
> + len = NBD_MAX_READ;
> + }
>
> - return 0;
> + if (acb->aiocb_type == AIOCB_WRITE_UDATA && len > NBD_MAX_WRITE) {
> + len = NBD_MAX_WRITE;
> + }
> +
> + logout("Allocating an aio_req of %zu bytes\n", len);
> + aio_req = nbd_alloc_aio_req(s, acb, len, offset + done, done);
> +
> + done += len;
> + }
> +
> + if (QLIST_EMPTY(&acb->aioreq_head)) {
> + logout("acb->ioreq_head empty, so finishing acb now\n");
> + nbd_finish_aiocb(acb);
> + }
> + logout("Leaving nbd_readv_writev_bh_cb\n");
> + return;
> }
>
> -static void nbd_close(BlockDriverState *bs)
> +static BlockDriverAIOCB *nbd_aio_readv(BlockDriverState *bs,
> + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> + BlockDriverCompletionFunc *cb, void *opaque)
> +{
> + NBDAIOCB *acb;
> +
> + acb = nbd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
> + acb->aiocb_type = AIOCB_READ_UDATA;
> + acb->aio_done_func = nbd_finish_aiocb;
> +
> + nbd_schedule_bh(nbd_readv_writev_bh_cb, acb);
> + return &acb->common;
> +}
> +
> +static BlockDriverAIOCB *nbd_aio_writev(BlockDriverState *bs,
> + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> + BlockDriverCompletionFunc *cb, void *opaque)
> {
> - nbd_teardown_connection(bs);
> + NBDAIOCB *acb;
> +
> + acb = nbd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
> + acb->aiocb_type = AIOCB_WRITE_UDATA;
> + acb->aio_done_func = nbd_finish_aiocb;
> +
> + nbd_schedule_bh(nbd_readv_writev_bh_cb, acb);
> + return &acb->common;
> }
>
> static int64_t nbd_getlength(BlockDriverState *bs)
> {
> BDRVNBDState *s = bs->opaque;
> -
> return s->size;
> }
>
> static BlockDriver bdrv_nbd = {
> - .format_name = "nbd",
> - .instance_size = sizeof(BDRVNBDState),
> - .bdrv_file_open = nbd_open,
> - .bdrv_read = nbd_read,
> - .bdrv_write = nbd_write,
> - .bdrv_close = nbd_close,
> - .bdrv_getlength = nbd_getlength,
> - .protocol_name = "nbd",
> + .format_name = "nbd",
> + .instance_size = sizeof(BDRVNBDState),
> + .bdrv_file_open = nbd_open,
> + .bdrv_aio_readv = nbd_aio_readv,
> + .bdrv_aio_writev = nbd_aio_writev,
> + .bdrv_close = nbd_close,
> + .bdrv_getlength = nbd_getlength,
> + .protocol_name = "nbd"
> };
>
> static void bdrv_nbd_init(void)
> diff --git a/nbd.c b/nbd.c
> index a3e6d52..26d33c5 100644
> --- a/nbd.c
> +++ b/nbd.c
> @@ -31,7 +31,7 @@
>
> #include "qemu_socket.h"
>
> -//#define DEBUG_NBD
> +#define DEBUG_NBD
>
> #ifdef DEBUG_NBD
> #define TRACE(msg, ...) do { \
> @@ -49,10 +49,6 @@
>
> /* This is all part of the "official" NBD API */
>
> -#define NBD_REPLY_SIZE (4 + 4 + 8)
> -#define NBD_REQUEST_MAGIC 0x25609513
> -#define NBD_REPLY_MAGIC 0x67446698
> -
> #define NBD_SET_SOCK _IO(0xab, 0)
> #define NBD_SET_BLKSIZE _IO(0xab, 1)
> #define NBD_SET_SIZE _IO(0xab, 2)
> @@ -107,6 +103,30 @@ size_t nbd_wr_sync(int fd, void *buffer, size_t size,
> bool do_read)
> return offset;
> }
>
> +ssize_t nbd_wr_aio(int fd, QEMUIOVector *qiov, size_t len, off_t offset,
> + bool do_read)
Isn't this name misleading? The function is completely synchronous. It
just happens not to block because it's only called when the socket is ready.
Kevin
- [Qemu-devel] Re: [PATCH] NBD: Convert the NBD driver to use the AIO interface.,
Kevin Wolf <=