[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH 09/15] linux-aio: fix submit aio as a batch
From: |
Paolo Bonzini |
Subject: |
Re: [Qemu-devel] [PATCH 09/15] linux-aio: fix submit aio as a batch |
Date: |
Wed, 30 Jul 2014 15:59:24 +0200 |
User-agent: |
Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20100101 Thunderbird/24.6.0 |
Il 30/07/2014 13:39, Ming Lei ha scritto:
> In the enqueue path, we can't complete request, otherwise
> "Co-routine re-entered recursively" may be caused, so this
> patch fixes the issue with below ideas:
>
> - for -EAGAIN or partial completion, retry the submission by
> an introduced event handler
> - for part of completion, also update the io queue
> - for other failure, return the failure if in enqueue path,
> otherwise, abort all queued I/O
>
> Signed-off-by: Ming Lei <address@hidden>
> ---
> block/linux-aio.c | 90
> ++++++++++++++++++++++++++++++++++++++++-------------
> 1 file changed, 68 insertions(+), 22 deletions(-)
>
> diff --git a/block/linux-aio.c b/block/linux-aio.c
> index 7ac7e8c..5eb9c92 100644
> --- a/block/linux-aio.c
> +++ b/block/linux-aio.c
> @@ -51,6 +51,7 @@ struct qemu_laio_state {
>
> /* io queue for submit at batch */
> LaioQueue io_q;
> + EventNotifier retry; /* handle -EAGAIN and partial completion */
> };
>
> static inline ssize_t io_event_ret(struct io_event *ev)
> @@ -154,45 +155,80 @@ static void ioq_init(LaioQueue *io_q)
> io_q->plugged = 0;
> }
>
> -static int ioq_submit(struct qemu_laio_state *s)
> +static void abort_queue(struct qemu_laio_state *s)
> +{
> + int i;
> + for (i = 0; i < s->io_q.idx; i++) {
> + struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i],
> + struct qemu_laiocb,
> + iocb);
> + laiocb->ret = -EIO;
> + qemu_laio_process_completion(s, laiocb);
> + }
> +}
> +
> +static int ioq_submit(struct qemu_laio_state *s, bool enqueue)
> {
> int ret, i = 0;
> int len = s->io_q.idx;
> + int j = 0;
>
> - do {
> - ret = io_submit(s->ctx, len, s->io_q.iocbs);
> - } while (i++ < 3 && ret == -EAGAIN);
> + if (!len) {
> + return 0;
> + }
>
> - /* empty io queue */
> - s->io_q.idx = 0;
> + ret = io_submit(s->ctx, len, s->io_q.iocbs);
> + if (ret == -EAGAIN) {
> + event_notifier_set(&s->retry);
Retrying immediately (and just doing a couple of system calls to waste
time) is not an improvement. The right place to retry is in
qemu_laio_completion_cb, after io_getevents has been called and
presumably the queue depth has decreased.
If !s->io_q.plugged but io_submit fails you can call ioq_enqueue and it
will just work. Then you can only go to out_free_aiocb if the queue is
full (independent of the "plug" state).
Paolo
> + return 0;
> + } else if (ret < 0) {
> + if (enqueue) {
> + return ret;
> + }
>
> - if (ret < 0) {
> - i = 0;
> - } else {
> - i = ret;
> + /* in non-queue path, all IOs have to be completed */
> + abort_queue(s);
> + ret = len;
> + } else if (ret == 0) {
> + goto out;
> }
>
> - for (; i < len; i++) {
> - struct qemu_laiocb *laiocb =
> - container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);
> -
> - laiocb->ret = (ret < 0) ? ret : -EIO;
> - qemu_laio_process_completion(s, laiocb);
> + for (i = ret; i < len; i++) {
> + s->io_q.iocbs[j++] = s->io_q.iocbs[i];
> }
> +
> + out:
> + /* update io queue */
> + s->io_q.idx -= ret;
> +
> return ret;
> }
>
> -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
> +static void ioq_submit_retry(EventNotifier *e)
> +{
> + struct qemu_laio_state *s = container_of(e, struct qemu_laio_state,
> retry);
> +
> + event_notifier_test_and_clear(e);
> + ioq_submit(s, false);
> +}
> +
> +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
> {
> unsigned int idx = s->io_q.idx;
>
> + if (unlikely(idx == s->io_q.size)) {
> + return -1;
> + }
> +
> s->io_q.iocbs[idx++] = iocb;
> s->io_q.idx = idx;
>
> - /* submit immediately if queue is full */
> - if (idx == s->io_q.size) {
> - ioq_submit(s);
> + /* submit immediately if queue depth is above 2/3 */
> + if (idx > s->io_q.size * 2 / 3) {
> + return ioq_submit(s, true);
> }
> +
> + return 0;
> }
>
> void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
> @@ -214,7 +250,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx,
> bool unplug)
> }
>
> if (s->io_q.idx > 0) {
> - ret = ioq_submit(s);
> + ret = ioq_submit(s, false);
> }
>
> return ret;
> @@ -258,7 +294,9 @@ BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void
> *aio_ctx, int fd,
> goto out_free_aiocb;
> }
> } else {
> - ioq_enqueue(s, iocbs);
> + if (ioq_enqueue(s, iocbs) < 0) {
> + goto out_free_aiocb;
> + }
> }
> return &laiocb->common;
>
> @@ -272,6 +310,7 @@ void laio_detach_aio_context(void *s_, AioContext
> *old_context)
> struct qemu_laio_state *s = s_;
>
> aio_set_event_notifier(old_context, &s->e, NULL);
> + aio_set_event_notifier(old_context, &s->retry, NULL);
> }
>
> void laio_attach_aio_context(void *s_, AioContext *new_context)
> @@ -279,6 +318,7 @@ void laio_attach_aio_context(void *s_, AioContext
> *new_context)
> struct qemu_laio_state *s = s_;
>
> aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
> + aio_set_event_notifier(new_context, &s->retry, ioq_submit_retry);
> }
>
> void *laio_init(void)
> @@ -295,9 +335,14 @@ void *laio_init(void)
> }
>
> ioq_init(&s->io_q);
> + if (event_notifier_init(&s->retry, false) < 0) {
> + goto out_notifer_init;
> + }
>
> return s;
>
> +out_notifer_init:
> + io_destroy(s->ctx);
> out_close_efd:
> event_notifier_cleanup(&s->e);
> out_free_state:
> @@ -310,6 +355,7 @@ void laio_cleanup(void *s_)
> struct qemu_laio_state *s = s_;
>
> event_notifier_cleanup(&s->e);
> + event_notifier_cleanup(&s->retry);
>
> if (io_destroy(s->ctx) != 0) {
> fprintf(stderr, "%s: destroy AIO context %p failed\n",
>
[Qemu-devel] [PATCH 10/15] linux-aio: increase max event to 256, Ming Lei, 2014/07/30
[Qemu-devel] [PATCH 11/15] linux-aio: remove 'node' from 'struct qemu_laiocb', Ming Lei, 2014/07/30
[Qemu-devel] [PATCH 12/15] hw/virtio-pci: introduce num_queues property, Ming Lei, 2014/07/30
[Qemu-devel] [PATCH 13/15] hw/virtio/virtio-blk.h: introduce VIRTIO_BLK_F_MQ, Ming Lei, 2014/07/30
[Qemu-devel] [PATCH 14/15] hw/block/virtio-blk: create num_queues vqs if dataplane is enabled, Ming Lei, 2014/07/30