qemu-block
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [PATCH V3 4/6] block/rbd: migrate from aio to coroutines


From: Peter Lieven
Subject: Re: [PATCH V3 4/6] block/rbd: migrate from aio to coroutines
Date: Fri, 18 Jun 2021 11:07:18 +0200
User-agent: Mozilla/5.0 (X11; Linux x86_64; rv:68.0) Gecko/20100101 Thunderbird/68.10.0

Am 17.06.21 um 16:43 schrieb Ilya Dryomov:
> On Wed, May 19, 2021 at 4:27 PM Peter Lieven <pl@kamp.de> wrote:
>> Signed-off-by: Peter Lieven <pl@kamp.de>
>> ---
>>  block/rbd.c | 255 ++++++++++++++++++----------------------------------
>>  1 file changed, 87 insertions(+), 168 deletions(-)
>>
>> diff --git a/block/rbd.c b/block/rbd.c
>> index 97a2ae4c84..0d8612a988 100644
>> --- a/block/rbd.c
>> +++ b/block/rbd.c
>> @@ -66,22 +66,6 @@ typedef enum {
>>      RBD_AIO_FLUSH
>>  } RBDAIOCmd;
>>
>> -typedef struct RBDAIOCB {
>> -    BlockAIOCB common;
>> -    int64_t ret;
>> -    QEMUIOVector *qiov;
>> -    RBDAIOCmd cmd;
>> -    int error;
>> -    struct BDRVRBDState *s;
>> -} RBDAIOCB;
>> -
>> -typedef struct RADOSCB {
>> -    RBDAIOCB *acb;
>> -    struct BDRVRBDState *s;
>> -    int64_t size;
>> -    int64_t ret;
>> -} RADOSCB;
>> -
>>  typedef struct BDRVRBDState {
>>      rados_t cluster;
>>      rados_ioctx_t io_ctx;
>> @@ -93,6 +77,13 @@ typedef struct BDRVRBDState {
>>      uint64_t object_size;
>>  } BDRVRBDState;
>>
>> +typedef struct RBDTask {
>> +    BlockDriverState *bs;
>> +    Coroutine *co;
>> +    bool complete;
>> +    int64_t ret;
>> +} RBDTask;
>> +
>>  static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
>>                              BlockdevOptionsRbd *opts, bool cache,
>>                              const char *keypairs, const char *secretid,
>> @@ -325,13 +316,6 @@ static int qemu_rbd_set_keypairs(rados_t cluster, const 
>> char *keypairs_json,
>>      return ret;
>>  }
>>
>> -static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
>> -{
>> -    RBDAIOCB *acb = rcb->acb;
>> -    iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
>> -               acb->qiov->size - offs);
>> -}
>> -
>>  /* FIXME Deprecate and remove keypairs or make it available in QMP. */
>>  static int qemu_rbd_do_create(BlockdevCreateOptions *options,
>>                                const char *keypairs, const char 
>> *password_secret,
>> @@ -450,46 +434,6 @@ exit:
>>      return ret;
>>  }
>>
>> -/*
>> - * This aio completion is being called from rbd_finish_bh() and runs in qemu
>> - * BH context.
>> - */
>> -static void qemu_rbd_complete_aio(RADOSCB *rcb)
>> -{
>> -    RBDAIOCB *acb = rcb->acb;
>> -    int64_t r;
>> -
>> -    r = rcb->ret;
>> -
>> -    if (acb->cmd != RBD_AIO_READ) {
>> -        if (r < 0) {
>> -            acb->ret = r;
>> -            acb->error = 1;
>> -        } else if (!acb->error) {
>> -            acb->ret = rcb->size;
>> -        }
>> -    } else {
>> -        if (r < 0) {
>> -            qemu_rbd_memset(rcb, 0);
>> -            acb->ret = r;
>> -            acb->error = 1;
>> -        } else if (r < rcb->size) {
>> -            qemu_rbd_memset(rcb, r);
>> -            if (!acb->error) {
>> -                acb->ret = rcb->size;
>> -            }
>> -        } else if (!acb->error) {
>> -            acb->ret = r;
>> -        }
>> -    }
>> -
>> -    g_free(rcb);
>> -
>> -    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
>> -
>> -    qemu_aio_unref(acb);
>> -}
>> -
>>  static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
>>  {
>>      const char **vals;
>> @@ -826,89 +770,50 @@ static int qemu_rbd_resize(BlockDriverState *bs, 
>> uint64_t size)
>>      return 0;
>>  }
>>
>> -static const AIOCBInfo rbd_aiocb_info = {
>> -    .aiocb_size = sizeof(RBDAIOCB),
>> -};
>> -
>> -static void rbd_finish_bh(void *opaque)
>> +static void qemu_rbd_finish_bh(void *opaque)
>>  {
>> -    RADOSCB *rcb = opaque;
>> -    qemu_rbd_complete_aio(rcb);
>> +    RBDTask *task = opaque;
>> +    task->complete = 1;
>> +    aio_co_wake(task->co);
>>  }
>>
>> -/*
>> - * This is the callback function for rbd_aio_read and _write
>> - *
>> - * Note: this function is being called from a non qemu thread so
>> - * we need to be careful about what we do here. Generally we only
>> - * schedule a BH, and do the rest of the io completion handling
>> - * from rbd_finish_bh() which runs in a qemu context.
>> - */
> I would adapt this comment instead of removing it.  I mean, it is
> still true and the reason for going through aio_bh_schedule_oneshot()
> instead of calling aio_co_wake() directly, right?


Sure, its still right, but I think rbd is the only driver with this comment.

I can leave it in, no problem.


>
>> -static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
>> +static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
>>  {
>> -    RBDAIOCB *acb = rcb->acb;
>> -
>> -    rcb->ret = rbd_aio_get_return_value(c);
>> +    task->ret = rbd_aio_get_return_value(c);
>>      rbd_aio_release(c);
>> -
>> -    replay_bh_schedule_oneshot_event(bdrv_get_aio_context(acb->common.bs),
>> -                                     rbd_finish_bh, rcb);
>> +    aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
>> +                            qemu_rbd_finish_bh, task);
>>  }
>>
>> -static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
>> -                                 int64_t off,
>> -                                 QEMUIOVector *qiov,
>> -                                 int64_t size,
>> -                                 BlockCompletionFunc *cb,
>> -                                 void *opaque,
>> -                                 RBDAIOCmd cmd)
>> +static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
>> +                                          uint64_t offset,
>> +                                          uint64_t bytes,
>> +                                          QEMUIOVector *qiov,
>> +                                          int flags,
>> +                                          RBDAIOCmd cmd)
>>  {
>> -    RBDAIOCB *acb;
>> -    RADOSCB *rcb = NULL;
>> +    BDRVRBDState *s = bs->opaque;
>> +    RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
>>      rbd_completion_t c;
>>      int r;
>>
>> -    BDRVRBDState *s = bs->opaque;
>> +    assert(!qiov || qiov->size == bytes);
>>
>> -    acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
>> -    acb->cmd = cmd;
>> -    acb->qiov = qiov;
>> -    assert(!qiov || qiov->size == size);
>> -
>> -    rcb = g_new(RADOSCB, 1);
>> -
>> -    acb->ret = 0;
>> -    acb->error = 0;
>> -    acb->s = s;
>> -
>> -    rcb->acb = acb;
>> -    rcb->s = acb->s;
>> -    rcb->size = size;
>> -    r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, 
>> &c);
>> +    r = rbd_aio_create_completion(&task,
>> +                                  (rbd_callback_t) qemu_rbd_completion_cb, 
>> &c);
>>      if (r < 0) {
>> -        goto failed;
>> +        return r;
>>      }
>>
>>      switch (cmd) {
>> -    case RBD_AIO_WRITE:
>> -        /*
>> -         * RBD APIs don't allow us to write more than actual size, so in 
>> order
>> -         * to support growing images, we resize the image before write
>> -         * operations that exceed the current size.
>> -         */
>> -        if (off + size > s->image_size) {
>> -            r = qemu_rbd_resize(bs, off + size);
>> -            if (r < 0) {
>> -                goto failed_completion;
>> -            }
>> -        }
>> -        r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
>> -        break;
>>      case RBD_AIO_READ:
>> -        r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
>> +        r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
>> +        break;
>> +    case RBD_AIO_WRITE:
>> +        r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
>>          break;
>>      case RBD_AIO_DISCARD:
>> -        r = rbd_aio_discard(s->image, off, size, c);
>> +        r = rbd_aio_discard(s->image, offset, bytes, c);
>>          break;
>>      case RBD_AIO_FLUSH:
>>          r = rbd_aio_flush(s->image, c);
>> @@ -918,44 +823,69 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
>>      }
>>
>>      if (r < 0) {
>> -        goto failed_completion;
>> +        error_report("rbd request failed early: cmd %d offset %" PRIu64
>> +                     " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
>> +                     bytes, flags, r, strerror(-r));
>> +        rbd_aio_release(c);
>> +        return r;
>>      }
>> -    return &acb->common;
>>
>> -failed_completion:
>> -    rbd_aio_release(c);
>> -failed:
>> -    g_free(rcb);
>> +    while (!task.complete) {
>> +        qemu_coroutine_yield();
>> +    }
>>
>> -    qemu_aio_unref(acb);
>> -    return NULL;
>> +    if (task.ret < 0) {
>> +        error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
>> +                     PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, 
>> offset,
>> +                     bytes, flags, task.ret, strerror(-task.ret));
>> +        return task.ret;
>> +    }
>> +
>> +    /* zero pad short reads */
>> +    if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
>> +        qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
> I would use bytes instead of qiov->size here and on the previous
> line.  In many systems the iovec can be larger than the op and it
> took me a couple of minutes to spot the "qiov->size == bytes" assert.


qemu_iovec_memset is an operation on the qiov. If one missed

the assertion one could argue that we memset beyond the size of the qiov

if we pass bytes - task.ret as length.


Is it true that there a systems which pass a bigger iovec than the actual op?

In this case the assertion would trigger there.


>
> Also, previously we zeroed the entire op on read errors.  I don't think
> it matters but want to make sure this was dropped on purpose.


I dropped it because its not done in other drivers. If you feel that we could 
reveal sensitive information I can put the wiping back in.


>
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +static int
>> +coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, uint64_t offset,
>> +                               uint64_t bytes, QEMUIOVector *qiov,
>> +                               int flags)
>> +{
>> +    return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
>>  }
>>
>> -static BlockAIOCB *qemu_rbd_aio_preadv(BlockDriverState *bs,
>> -                                       uint64_t offset, uint64_t bytes,
>> -                                       QEMUIOVector *qiov, int flags,
>> -                                       BlockCompletionFunc *cb,
>> -                                       void *opaque)
>> +static int
>> +coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, uint64_t offset,
>> +                                 uint64_t bytes, QEMUIOVector *qiov,
>> +                                 int flags)
>>  {
>> -    return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
>> -                         RBD_AIO_READ);
>> +    BDRVRBDState *s = bs->opaque;
>> +    /*
>> +     * RBD APIs don't allow us to write more than actual size, so in order
>> +     * to support growing images, we resize the image before write
>> +     * operations that exceed the current size.
>> +     */
>> +    if (offset + bytes > s->image_size) {
>> +        int r = qemu_rbd_resize(bs, offset + bytes);
>> +        if (r < 0) {
>> +            return r;
>> +        }
>> +    }
> How can this be triggered today?  qemu-io used to be able to, but that
> support was removed a long time ago:
>
>     https://mail.gnu.org/archive/html/qemu-devel/2014-12/msg01592.html
>
> Just checking that this piece of code is not vestigial at this point.


We had this discussion before. The way to hit this if you want to create e.g. a 
qcow2 image on rbd. It does not make that much sense, but it was supported in 
the past.

I would vote for removing it, but it might break something somewhere.


Thanks,

Peter






reply via email to

[Prev in Thread] Current Thread [Next in Thread]