qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [PATCH 5/7] block/rbd: migrate from aio to coroutines


From: Jason Dillaman
Subject: Re: [PATCH 5/7] block/rbd: migrate from aio to coroutines
Date: Thu, 14 Jan 2021 14:19:10 -0500

On Sun, Dec 27, 2020 at 11:42 AM Peter Lieven <pl@kamp.de> wrote:
>
> Signed-off-by: Peter Lieven <pl@kamp.de>
> ---
>  block/rbd.c | 247 ++++++++++++++++++----------------------------------
>  1 file changed, 84 insertions(+), 163 deletions(-)
>
> diff --git a/block/rbd.c b/block/rbd.c
> index 27b232f4d8..2d77d0007f 100644
> --- a/block/rbd.c
> +++ b/block/rbd.c
> @@ -66,22 +66,6 @@ typedef enum {
>      RBD_AIO_FLUSH
>  } RBDAIOCmd;
>
> -typedef struct RBDAIOCB {
> -    BlockAIOCB common;
> -    int64_t ret;
> -    QEMUIOVector *qiov;
> -    RBDAIOCmd cmd;
> -    int error;
> -    struct BDRVRBDState *s;
> -} RBDAIOCB;
> -
> -typedef struct RADOSCB {
> -    RBDAIOCB *acb;
> -    struct BDRVRBDState *s;
> -    int64_t size;
> -    int64_t ret;
> -} RADOSCB;
> -
>  typedef struct BDRVRBDState {
>      rados_t cluster;
>      rados_ioctx_t io_ctx;
> @@ -94,6 +78,13 @@ typedef struct BDRVRBDState {
>      AioContext *aio_context;
>  } BDRVRBDState;
>
> +typedef struct RBDTask {
> +    BDRVRBDState *s;
> +    Coroutine *co;
> +    bool complete;
> +    int64_t ret;
> +} RBDTask;
> +
>  static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
>                              BlockdevOptionsRbd *opts, bool cache,
>                              const char *keypairs, const char *secretid,
> @@ -316,13 +307,6 @@ static int qemu_rbd_set_keypairs(rados_t cluster, const 
> char *keypairs_json,
>      return ret;
>  }
>
> -static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
> -{
> -    RBDAIOCB *acb = rcb->acb;
> -    iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
> -               acb->qiov->size - offs);
> -}
> -
>  /* FIXME Deprecate and remove keypairs or make it available in QMP. */
>  static int qemu_rbd_do_create(BlockdevCreateOptions *options,
>                                const char *keypairs, const char 
> *password_secret,
> @@ -440,46 +424,6 @@ exit:
>      return ret;
>  }
>
> -/*
> - * This aio completion is being called from rbd_finish_bh() and runs in qemu
> - * BH context.
> - */
> -static void qemu_rbd_complete_aio(RADOSCB *rcb)
> -{
> -    RBDAIOCB *acb = rcb->acb;
> -    int64_t r;
> -
> -    r = rcb->ret;
> -
> -    if (acb->cmd != RBD_AIO_READ) {
> -        if (r < 0) {
> -            acb->ret = r;
> -            acb->error = 1;
> -        } else if (!acb->error) {
> -            acb->ret = rcb->size;
> -        }
> -    } else {
> -        if (r < 0) {
> -            qemu_rbd_memset(rcb, 0);
> -            acb->ret = r;
> -            acb->error = 1;
> -        } else if (r < rcb->size) {
> -            qemu_rbd_memset(rcb, r);
> -            if (!acb->error) {
> -                acb->ret = rcb->size;
> -            }
> -        } else if (!acb->error) {
> -            acb->ret = r;
> -        }
> -    }
> -
> -    g_free(rcb);
> -
> -    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
> -
> -    qemu_aio_unref(acb);
> -}
> -
>  static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
>  {
>      const char **vals;
> @@ -817,88 +761,49 @@ static int qemu_rbd_resize(BlockDriverState *bs, 
> uint64_t size)
>      return 0;
>  }
>
> -static const AIOCBInfo rbd_aiocb_info = {
> -    .aiocb_size = sizeof(RBDAIOCB),
> -};
> -
> -static void rbd_finish_bh(void *opaque)
> +static void qemu_rbd_finish_bh(void *opaque)
>  {
> -    RADOSCB *rcb = opaque;
> -    qemu_rbd_complete_aio(rcb);
> +    RBDTask *task = opaque;
> +    task->complete = 1;
> +    aio_co_wake(task->co);
>  }
>
> -/*
> - * This is the callback function for rbd_aio_read and _write
> - *
> - * Note: this function is being called from a non qemu thread so
> - * we need to be careful about what we do here. Generally we only
> - * schedule a BH, and do the rest of the io completion handling
> - * from rbd_finish_bh() which runs in a qemu context.
> - */
> -static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
> +static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
>  {
> -    RBDAIOCB *acb = rcb->acb;
> -
> -    rcb->ret = rbd_aio_get_return_value(c);
> +    task->ret = rbd_aio_get_return_value(c);
>      rbd_aio_release(c);
> -
> -    replay_bh_schedule_oneshot_event(acb->s->aio_context, rbd_finish_bh, 
> rcb);
> +    aio_bh_schedule_oneshot(task->s->aio_context, qemu_rbd_finish_bh, task);
>  }
>
> -static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
> -                                 int64_t off,
> -                                 QEMUIOVector *qiov,
> -                                 int64_t size,
> -                                 BlockCompletionFunc *cb,
> -                                 void *opaque,
> -                                 RBDAIOCmd cmd)
> +static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
> +                                          uint64_t offset,
> +                                          uint64_t bytes,
> +                                          QEMUIOVector *qiov,
> +                                          int flags,
> +                                          RBDAIOCmd cmd)
>  {
> -    RBDAIOCB *acb;
> -    RADOSCB *rcb = NULL;
> -    rbd_completion_t c;
> -    int r;
> -
>      BDRVRBDState *s = bs->opaque;
> +    RBDTask task = { .s = s, .co = qemu_coroutine_self() };
> +    rbd_completion_t c;
> +    int r, ret = -EIO;
>
> -    acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
> -    acb->cmd = cmd;
> -    acb->qiov = qiov;
> -    assert(!qiov || qiov->size == size);
> -
> -    rcb = g_new(RADOSCB, 1);
> -
> -    acb->ret = 0;
> -    acb->error = 0;
> -    acb->s = s;
> +    assert(!qiov || qiov->size == bytes);
>
> -    rcb->acb = acb;
> -    rcb->s = acb->s;
> -    rcb->size = size;
> -    r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, 
> &c);
> +    r = rbd_aio_create_completion(&task,
> +                                  (rbd_callback_t) qemu_rbd_completion_cb, 
> &c);
>      if (r < 0) {
>          goto failed;
>      }
>
>      switch (cmd) {
> -    case RBD_AIO_WRITE:
> -        /*
> -         * RBD APIs don't allow us to write more than actual size, so in 
> order
> -         * to support growing images, we resize the image before write
> -         * operations that exceed the current size.
> -         */
> -        if (off + size > s->image_size) {
> -            r = qemu_rbd_resize(bs, off + size);

I realize this is inherited code that is being refactored, but is it
really possible for QEMU to issue IOs outside the block extents? This
also has the same issue as the previous commit comment where the
"image_size" is only initialized at image open time and therefore this
path will keep getting hit if IOs are issued beyond the original block
device extents.

> -            if (r < 0) {
> -                goto failed_completion;
> -            }
> -        }
> -        r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
> -        break;
>      case RBD_AIO_READ:
> -        r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
> +        r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
> +        break;
> +    case RBD_AIO_WRITE:
> +        r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
>          break;
>      case RBD_AIO_DISCARD:
> -        r = rbd_aio_discard(s->image, off, size, c);
> +        r = rbd_aio_discard(s->image, offset, bytes, c);
>          break;
>      case RBD_AIO_FLUSH:
>          r = rbd_aio_flush(s->image, c);
> @@ -908,44 +813,71 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
>      }
>
>      if (r < 0) {
> +        task.complete = 1;
> +        task.ret = r;
> +    }
> +
> +    while (!task.complete) {
> +        qemu_coroutine_yield();
> +    }
> +
> +    if (task.ret < 0) {
> +        error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
> +                     PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, 
> offset,
> +                     bytes, flags, task.ret, strerror(-task.ret));
>          goto failed_completion;
>      }
> -    return &acb->common;
> +
> +    /* zero pad short reads */
> +    if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
> +        qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
> +    }
> +
> +    return 0;
>
>  failed_completion:
>      rbd_aio_release(c);

Wasn't this already released in "qemu_rbd_completion_cb"?


>  failed:
> -    g_free(rcb);
> +    return ret;
> +}
>
> -    qemu_aio_unref(acb);
> -    return NULL;
> +static int
> +coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, uint64_t offset,
> +                               uint64_t bytes, QEMUIOVector *qiov,
> +                               int flags)
> +{
> +    return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
>  }
>
> -static BlockAIOCB *qemu_rbd_aio_preadv(BlockDriverState *bs,
> -                                       uint64_t offset, uint64_t bytes,
> -                                       QEMUIOVector *qiov, int flags,
> -                                       BlockCompletionFunc *cb,
> -                                       void *opaque)
> +static int
> +coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, uint64_t offset,
> +                                 uint64_t bytes, QEMUIOVector *qiov,
> +                                 int flags)
>  {
> -    return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
> -                         RBD_AIO_READ);
> +    BDRVRBDState *s = bs->opaque;
> +    /*
> +     * RBD APIs don't allow us to write more than actual size, so in order
> +     * to support growing images, we resize the image before write
> +     * operations that exceed the current size.
> +     */
> +    if (offset + bytes > s->image_size) {
> +        int r = qemu_rbd_resize(bs, offset + bytes);
> +        if (r < 0) {
> +            return r;
> +        }
> +    }
> +    return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
>  }
>
> -static BlockAIOCB *qemu_rbd_aio_pwritev(BlockDriverState *bs,
> -                                        uint64_t offset, uint64_t bytes,
> -                                        QEMUIOVector *qiov, int flags,
> -                                        BlockCompletionFunc *cb,
> -                                        void *opaque)
> +static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
>  {
> -    return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
> -                         RBD_AIO_WRITE);
> +    return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
>  }
>
> -static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
> -                                      BlockCompletionFunc *cb,
> -                                      void *opaque)
> +static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
> +                                             int64_t offset, int count)
>  {
> -    return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
> +    return qemu_rbd_start_co(bs, offset, count, NULL, 0, RBD_AIO_DISCARD);
>  }
>
>  static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
> @@ -1097,16 +1029,6 @@ static int qemu_rbd_snap_list(BlockDriverState *bs,
>      return snap_count;
>  }
>
> -static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs,
> -                                         int64_t offset,
> -                                         int bytes,
> -                                         BlockCompletionFunc *cb,
> -                                         void *opaque)
> -{
> -    return rbd_start_aio(bs, offset, NULL, bytes, cb, opaque,
> -                         RBD_AIO_DISCARD);
> -}
> -
>  static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
>                                                        Error **errp)
>  {
> @@ -1182,11 +1104,10 @@ static BlockDriver bdrv_rbd = {
>      .bdrv_co_truncate       = qemu_rbd_co_truncate,
>      .protocol_name          = "rbd",
>
> -    .bdrv_aio_preadv        = qemu_rbd_aio_preadv,
> -    .bdrv_aio_pwritev       = qemu_rbd_aio_pwritev,
> -
> -    .bdrv_aio_flush         = qemu_rbd_aio_flush,
> -    .bdrv_aio_pdiscard      = qemu_rbd_aio_pdiscard,
> +    .bdrv_co_preadv         = qemu_rbd_co_preadv,
> +    .bdrv_co_pwritev        = qemu_rbd_co_pwritev,
> +    .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
> +    .bdrv_co_pdiscard       = qemu_rbd_co_pdiscard,
>
>      .bdrv_snapshot_create   = qemu_rbd_snap_create,
>      .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
> --
> 2.17.1
>
>


--
Jason




reply via email to

[Prev in Thread] Current Thread [Next in Thread]