qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH v6 1/3] linux-aio: fix submit aio as a batch


From: Kevin Wolf
Subject: Re: [Qemu-devel] [PATCH v6 1/3] linux-aio: fix submit aio as a batch
Date: Wed, 26 Nov 2014 12:18:39 +0100
User-agent: Mutt/1.5.21 (2010-09-15)

Am 25.11.2014 um 08:23 hat Ming Lei geschrieben:
> In the submit path, we can't complete request directly,
> otherwise "Co-routine re-entered recursively" may be caused,
> so this patch fixes the issue with below ideas:
> 
>       - for -EAGAIN or partial completion, retry the submision
>       in following completion cb which is run in BH context
>       - for part of completion, update the io queue too
>       - for case of io queue full, submit queued requests
>       immediatelly and return failure to caller
>       - for other failure, abort all queued requests in BH
>     context, and requests won't be allow to submit until
>     aborting is handled
> 
> Reviewed-by: Paolo Bonzini <address@hidden>
> Signed-off-by: Ming Lei <address@hidden>

This looks like a quite complex fix to this problem, and it introduces
new error cases, too (while aborting, requests fail now, but they really
should just be waiting).

I'm wondering if this is the time to convert the linux-aio interface to
coroutines finally. It wouldn't only be a performance optimisation, but
would potentially also simplify this code.

>  block/linux-aio.c |  114 
> ++++++++++++++++++++++++++++++++++++++++++++---------
>  1 file changed, 95 insertions(+), 19 deletions(-)
> 
> diff --git a/block/linux-aio.c b/block/linux-aio.c
> index d92513b..11ac828 100644
> --- a/block/linux-aio.c
> +++ b/block/linux-aio.c
> @@ -38,11 +38,20 @@ struct qemu_laiocb {
>      QLIST_ENTRY(qemu_laiocb) node;
>  };
>  
> +/*
> + * TODO: support to batch I/O from multiple bs in one same
> + * AIO context, one important use case is multi-lun scsi,
> + * so in future the IO queue should be per AIO context.
> + */
>  typedef struct {
>      struct iocb *iocbs[MAX_QUEUED_IO];
>      int plugged;
>      unsigned int size;
>      unsigned int idx;
> +
> +    /* abort queued requests in BH context */
> +    QEMUBH *abort_bh;
> +    bool  aborting;

Two spaces.

>  } LaioQueue;
>  
>  struct qemu_laio_state {
> @@ -59,6 +68,8 @@ struct qemu_laio_state {
>      int event_max;
>  };
>  
> +static int ioq_submit(struct qemu_laio_state *s);
> +
>  static inline ssize_t io_event_ret(struct io_event *ev)
>  {
>      return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
> @@ -91,6 +102,13 @@ static void qemu_laio_process_completion(struct 
> qemu_laio_state *s,
>      qemu_aio_unref(laiocb);
>  }
>  
> +static void qemu_laio_start_retry(struct qemu_laio_state *s)
> +{
> +    if (s->io_q.idx) {
> +        ioq_submit(s);
> +    }
> +}
> +
>  /* The completion BH fetches completed I/O requests and invokes their
>   * callbacks.
>   *
> @@ -135,6 +153,8 @@ static void qemu_laio_completion_bh(void *opaque)
>  
>          qemu_laio_process_completion(s, laiocb);
>      }
> +
> +    qemu_laio_start_retry(s);
>  }

Why is qemu_laio_start_retry() a separate function? This is the only
caller.

>  static void qemu_laio_completion_cb(EventNotifier *e)
> @@ -175,47 +195,99 @@ static void ioq_init(LaioQueue *io_q)
>      io_q->size = MAX_QUEUED_IO;
>      io_q->idx = 0;
>      io_q->plugged = 0;
> +    io_q->aborting = false;
>  }
>  
> +/* Always return >= 0 and it means how many requests are submitted */
>  static int ioq_submit(struct qemu_laio_state *s)
>  {
> -    int ret, i = 0;
> +    int ret;
>      int len = s->io_q.idx;
>  
> -    do {
> -        ret = io_submit(s->ctx, len, s->io_q.iocbs);
> -    } while (i++ < 3 && ret == -EAGAIN);
> -
> -    /* empty io queue */
> -    s->io_q.idx = 0;
> +    if (!len) {
> +        return 0;
> +    }
>  
> +    ret = io_submit(s->ctx, len, s->io_q.iocbs);
>      if (ret < 0) {
> -        i = 0;
> -    } else {
> -        i = ret;
> +        /* retry in following completion cb */
> +        if (ret == -EAGAIN) {
> +            return 0;
> +        }
> +
> +        /*
> +         * Abort in BH context for avoiding Co-routine re-entered,
> +         * and update io queue at that time
> +         */
> +        qemu_bh_schedule(s->io_q.abort_bh);
> +        s->io_q.aborting = true;
> +        ret = 0;
>      }
>  
> -    for (; i < len; i++) {
> -        struct qemu_laiocb *laiocb =
> -            container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);

> +    /*
> +     * update io queue, and retry will be started automatically
> +     * in following completion cb for the remainder
> +     */
> +    if (ret > 0) {
> +        if (ret < len) {
> +            memmove(&s->io_q.iocbs[0], &s->io_q.iocbs[ret],
> +                    (len - ret) * sizeof(struct iocb *));
> +        }
> +        s->io_q.idx -= ret;
> +    }

Support for partly handled queues is nice, but a logically separate
change. Please move this to its own patch.

> -        laiocb->ret = (ret < 0) ? ret : -EIO;
> +    return ret;
> +}
> +
> +static void ioq_abort_bh(void *opaque)
> +{
> +    struct qemu_laio_state *s = opaque;
> +    int i;
> +
> +    for (i = 0; i < s->io_q.idx; i++) {
> +        struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i],
> +                                                  struct qemu_laiocb,
> +                                                  iocb);
> +        laiocb->ret = -EIO;

We shouldn't be throwing the real error code away.

>          qemu_laio_process_completion(s, laiocb);
>      }
> -    return ret;
> +
> +    s->io_q.idx = 0;
> +    s->io_q.aborting = false;
>  }
>  
> -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
> +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
>  {
>      unsigned int idx = s->io_q.idx;
>  
> +    /* Request can't be allowed to submit until aborting is handled */
> +    if (unlikely(s->io_q.aborting)) {
> +        return -EIO;
> +    }
> +
> +    if (unlikely(idx == s->io_q.size)) {
> +        ioq_submit(s);
> +
> +        if (unlikely(s->io_q.aborting)) {
> +            return -EIO;
> +        }
> +        idx = s->io_q.idx;
> +    }
> +
> +    /* It has to return now if queue is still full */
> +    if (unlikely(idx == s->io_q.size)) {
> +        return -EAGAIN;
> +    }
> +
>      s->io_q.iocbs[idx++] = iocb;
>      s->io_q.idx = idx;
>  
> -    /* submit immediately if queue is full */
> -    if (idx == s->io_q.size) {
> +    /* submit immediately if queue depth is above 2/3 */
> +    if (idx > s->io_q.size * 2 / 3) {
>          ioq_submit(s);
>      }

This change looks independent as well. Also isn't mentioned in the
commit message. Why is it a good idea to use only 2/3 of the queue
instead of making use of its full length?

> +    return 0;
>  }

Kevin



reply via email to

[Prev in Thread] Current Thread [Next in Thread]