qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [PATCH V3 4/6] block/rbd: migrate from aio to coroutines


From: Ilya Dryomov
Subject: Re: [PATCH V3 4/6] block/rbd: migrate from aio to coroutines
Date: Fri, 18 Jun 2021 13:26:31 +0200

On Fri, Jun 18, 2021 at 11:07 AM Peter Lieven <pl@kamp.de> wrote:
>
> Am 17.06.21 um 16:43 schrieb Ilya Dryomov:
> > On Wed, May 19, 2021 at 4:27 PM Peter Lieven <pl@kamp.de> wrote:
> >> Signed-off-by: Peter Lieven <pl@kamp.de>
> >> ---
> >>  block/rbd.c | 255 ++++++++++++++++++----------------------------------
> >>  1 file changed, 87 insertions(+), 168 deletions(-)
> >>
> >> diff --git a/block/rbd.c b/block/rbd.c
> >> index 97a2ae4c84..0d8612a988 100644
> >> --- a/block/rbd.c
> >> +++ b/block/rbd.c
> >> @@ -66,22 +66,6 @@ typedef enum {
> >>      RBD_AIO_FLUSH
> >>  } RBDAIOCmd;
> >>
> >> -typedef struct RBDAIOCB {
> >> -    BlockAIOCB common;
> >> -    int64_t ret;
> >> -    QEMUIOVector *qiov;
> >> -    RBDAIOCmd cmd;
> >> -    int error;
> >> -    struct BDRVRBDState *s;
> >> -} RBDAIOCB;
> >> -
> >> -typedef struct RADOSCB {
> >> -    RBDAIOCB *acb;
> >> -    struct BDRVRBDState *s;
> >> -    int64_t size;
> >> -    int64_t ret;
> >> -} RADOSCB;
> >> -
> >>  typedef struct BDRVRBDState {
> >>      rados_t cluster;
> >>      rados_ioctx_t io_ctx;
> >> @@ -93,6 +77,13 @@ typedef struct BDRVRBDState {
> >>      uint64_t object_size;
> >>  } BDRVRBDState;
> >>
> >> +typedef struct RBDTask {
> >> +    BlockDriverState *bs;
> >> +    Coroutine *co;
> >> +    bool complete;
> >> +    int64_t ret;
> >> +} RBDTask;
> >> +
> >>  static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
> >>                              BlockdevOptionsRbd *opts, bool cache,
> >>                              const char *keypairs, const char *secretid,
> >> @@ -325,13 +316,6 @@ static int qemu_rbd_set_keypairs(rados_t cluster, 
> >> const char *keypairs_json,
> >>      return ret;
> >>  }
> >>
> >> -static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
> >> -{
> >> -    RBDAIOCB *acb = rcb->acb;
> >> -    iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
> >> -               acb->qiov->size - offs);
> >> -}
> >> -
> >>  /* FIXME Deprecate and remove keypairs or make it available in QMP. */
> >>  static int qemu_rbd_do_create(BlockdevCreateOptions *options,
> >>                                const char *keypairs, const char 
> >> *password_secret,
> >> @@ -450,46 +434,6 @@ exit:
> >>      return ret;
> >>  }
> >>
> >> -/*
> >> - * This aio completion is being called from rbd_finish_bh() and runs in 
> >> qemu
> >> - * BH context.
> >> - */
> >> -static void qemu_rbd_complete_aio(RADOSCB *rcb)
> >> -{
> >> -    RBDAIOCB *acb = rcb->acb;
> >> -    int64_t r;
> >> -
> >> -    r = rcb->ret;
> >> -
> >> -    if (acb->cmd != RBD_AIO_READ) {
> >> -        if (r < 0) {
> >> -            acb->ret = r;
> >> -            acb->error = 1;
> >> -        } else if (!acb->error) {
> >> -            acb->ret = rcb->size;
> >> -        }
> >> -    } else {
> >> -        if (r < 0) {
> >> -            qemu_rbd_memset(rcb, 0);
> >> -            acb->ret = r;
> >> -            acb->error = 1;
> >> -        } else if (r < rcb->size) {
> >> -            qemu_rbd_memset(rcb, r);
> >> -            if (!acb->error) {
> >> -                acb->ret = rcb->size;
> >> -            }
> >> -        } else if (!acb->error) {
> >> -            acb->ret = r;
> >> -        }
> >> -    }
> >> -
> >> -    g_free(rcb);
> >> -
> >> -    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
> >> -
> >> -    qemu_aio_unref(acb);
> >> -}
> >> -
> >>  static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
> >>  {
> >>      const char **vals;
> >> @@ -826,89 +770,50 @@ static int qemu_rbd_resize(BlockDriverState *bs, 
> >> uint64_t size)
> >>      return 0;
> >>  }
> >>
> >> -static const AIOCBInfo rbd_aiocb_info = {
> >> -    .aiocb_size = sizeof(RBDAIOCB),
> >> -};
> >> -
> >> -static void rbd_finish_bh(void *opaque)
> >> +static void qemu_rbd_finish_bh(void *opaque)
> >>  {
> >> -    RADOSCB *rcb = opaque;
> >> -    qemu_rbd_complete_aio(rcb);
> >> +    RBDTask *task = opaque;
> >> +    task->complete = 1;
> >> +    aio_co_wake(task->co);
> >>  }
> >>
> >> -/*
> >> - * This is the callback function for rbd_aio_read and _write
> >> - *
> >> - * Note: this function is being called from a non qemu thread so
> >> - * we need to be careful about what we do here. Generally we only
> >> - * schedule a BH, and do the rest of the io completion handling
> >> - * from rbd_finish_bh() which runs in a qemu context.
> >> - */
> > I would adapt this comment instead of removing it.  I mean, it is
> > still true and the reason for going through aio_bh_schedule_oneshot()
> > instead of calling aio_co_wake() directly, right?
>
>
> Sure, its still right, but I think rbd is the only driver with this comment.
>
> I can leave it in, no problem.

Yeah, just massage it a bit to reflect the reality (at least the
function name).

>
>
> >
> >> -static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
> >> +static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
> >>  {
> >> -    RBDAIOCB *acb = rcb->acb;
> >> -
> >> -    rcb->ret = rbd_aio_get_return_value(c);
> >> +    task->ret = rbd_aio_get_return_value(c);
> >>      rbd_aio_release(c);
> >> -
> >> -    replay_bh_schedule_oneshot_event(bdrv_get_aio_context(acb->common.bs),
> >> -                                     rbd_finish_bh, rcb);
> >> +    aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
> >> +                            qemu_rbd_finish_bh, task);
> >>  }
> >>
> >> -static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
> >> -                                 int64_t off,
> >> -                                 QEMUIOVector *qiov,
> >> -                                 int64_t size,
> >> -                                 BlockCompletionFunc *cb,
> >> -                                 void *opaque,
> >> -                                 RBDAIOCmd cmd)
> >> +static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
> >> +                                          uint64_t offset,
> >> +                                          uint64_t bytes,
> >> +                                          QEMUIOVector *qiov,
> >> +                                          int flags,
> >> +                                          RBDAIOCmd cmd)
> >>  {
> >> -    RBDAIOCB *acb;
> >> -    RADOSCB *rcb = NULL;
> >> +    BDRVRBDState *s = bs->opaque;
> >> +    RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
> >>      rbd_completion_t c;
> >>      int r;
> >>
> >> -    BDRVRBDState *s = bs->opaque;
> >> +    assert(!qiov || qiov->size == bytes);
> >>
> >> -    acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
> >> -    acb->cmd = cmd;
> >> -    acb->qiov = qiov;
> >> -    assert(!qiov || qiov->size == size);
> >> -
> >> -    rcb = g_new(RADOSCB, 1);
> >> -
> >> -    acb->ret = 0;
> >> -    acb->error = 0;
> >> -    acb->s = s;
> >> -
> >> -    rcb->acb = acb;
> >> -    rcb->s = acb->s;
> >> -    rcb->size = size;
> >> -    r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, 
> >> &c);
> >> +    r = rbd_aio_create_completion(&task,
> >> +                                  (rbd_callback_t) 
> >> qemu_rbd_completion_cb, &c);
> >>      if (r < 0) {
> >> -        goto failed;
> >> +        return r;
> >>      }
> >>
> >>      switch (cmd) {
> >> -    case RBD_AIO_WRITE:
> >> -        /*
> >> -         * RBD APIs don't allow us to write more than actual size, so in 
> >> order
> >> -         * to support growing images, we resize the image before write
> >> -         * operations that exceed the current size.
> >> -         */
> >> -        if (off + size > s->image_size) {
> >> -            r = qemu_rbd_resize(bs, off + size);
> >> -            if (r < 0) {
> >> -                goto failed_completion;
> >> -            }
> >> -        }
> >> -        r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
> >> -        break;
> >>      case RBD_AIO_READ:
> >> -        r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
> >> +        r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
> >> +        break;
> >> +    case RBD_AIO_WRITE:
> >> +        r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
> >>          break;
> >>      case RBD_AIO_DISCARD:
> >> -        r = rbd_aio_discard(s->image, off, size, c);
> >> +        r = rbd_aio_discard(s->image, offset, bytes, c);
> >>          break;
> >>      case RBD_AIO_FLUSH:
> >>          r = rbd_aio_flush(s->image, c);
> >> @@ -918,44 +823,69 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState 
> >> *bs,
> >>      }
> >>
> >>      if (r < 0) {
> >> -        goto failed_completion;
> >> +        error_report("rbd request failed early: cmd %d offset %" PRIu64
> >> +                     " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
> >> +                     bytes, flags, r, strerror(-r));
> >> +        rbd_aio_release(c);
> >> +        return r;
> >>      }
> >> -    return &acb->common;
> >>
> >> -failed_completion:
> >> -    rbd_aio_release(c);
> >> -failed:
> >> -    g_free(rcb);
> >> +    while (!task.complete) {
> >> +        qemu_coroutine_yield();
> >> +    }
> >>
> >> -    qemu_aio_unref(acb);
> >> -    return NULL;
> >> +    if (task.ret < 0) {
> >> +        error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes 
> >> %"
> >> +                     PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, 
> >> offset,
> >> +                     bytes, flags, task.ret, strerror(-task.ret));
> >> +        return task.ret;
> >> +    }
> >> +
> >> +    /* zero pad short reads */
> >> +    if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
> >> +        qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
> > I would use bytes instead of qiov->size here and on the previous
> > line.  In many systems the iovec can be larger than the op and it
> > took me a couple of minutes to spot the "qiov->size == bytes" assert.
>
>
> qemu_iovec_memset is an operation on the qiov. If one missed
>
> the assertion one could argue that we memset beyond the size of the qiov
>
> if we pass bytes - task.ret as length.

Sorry, disregard -- I missed that rbd_aio_readv/writev() are specified
in terms of iovecs (i.e. iovec base + iovec count instead of iovec base +
number of bytes).

>
>
> Is it true that there a systems which pass a bigger iovec than the actual op?

Perhaps not in QEMU but it comes in many other places.  For example
in the kernel when the block device driver gets a "bio" which it doesn't
own and should act only on a small piece of it, etc.

>
> In this case the assertion would trigger there.
>
>
> >
> > Also, previously we zeroed the entire op on read errors.  I don't think
> > it matters but want to make sure this was dropped on purpose.
>
>
> I dropped it because its not done in other drivers. If you feel that we could 
> reveal sensitive information I can put the wiping back in.

I don't think so and we don't do this in kernel rbd either.

>
>
> >
> >> +    }
> >> +
> >> +    return 0;
> >> +}
> >> +
> >> +static int
> >> +coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, uint64_t offset,
> >> +                               uint64_t bytes, QEMUIOVector *qiov,
> >> +                               int flags)
> >> +{
> >> +    return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, 
> >> RBD_AIO_READ);
> >>  }
> >>
> >> -static BlockAIOCB *qemu_rbd_aio_preadv(BlockDriverState *bs,
> >> -                                       uint64_t offset, uint64_t bytes,
> >> -                                       QEMUIOVector *qiov, int flags,
> >> -                                       BlockCompletionFunc *cb,
> >> -                                       void *opaque)
> >> +static int
> >> +coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, uint64_t offset,
> >> +                                 uint64_t bytes, QEMUIOVector *qiov,
> >> +                                 int flags)
> >>  {
> >> -    return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
> >> -                         RBD_AIO_READ);
> >> +    BDRVRBDState *s = bs->opaque;
> >> +    /*
> >> +     * RBD APIs don't allow us to write more than actual size, so in order
> >> +     * to support growing images, we resize the image before write
> >> +     * operations that exceed the current size.
> >> +     */
> >> +    if (offset + bytes > s->image_size) {
> >> +        int r = qemu_rbd_resize(bs, offset + bytes);
> >> +        if (r < 0) {
> >> +            return r;
> >> +        }
> >> +    }
> > How can this be triggered today?  qemu-io used to be able to, but that
> > support was removed a long time ago:
> >
> >     https://mail.gnu.org/archive/html/qemu-devel/2014-12/msg01592.html
> >
> > Just checking that this piece of code is not vestigial at this point.
>
>
> We had this discussion before. The way to hit this if you want to create e.g. 
> a qcow2 image on rbd. It does not make that much sense, but it was supported 
> in the past.

Ah, so something like

    qemu-img create -f qcow2 rbd:foo/bar 10G

>
> I would vote for removing it, but it might break something somewhere.

Yeah, it's weird but definitely not up for removal in a patch that
migrates from one internal API to another.

Thanks,

                Ilya



reply via email to

[Prev in Thread] Current Thread [Next in Thread]