[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [PATCH V3 4/6] block/rbd: migrate from aio to coroutines
From: |
Ilya Dryomov |
Subject: |
Re: [PATCH V3 4/6] block/rbd: migrate from aio to coroutines |
Date: |
Thu, 17 Jun 2021 16:43:47 +0200 |
On Wed, May 19, 2021 at 4:27 PM Peter Lieven <pl@kamp.de> wrote:
>
> Signed-off-by: Peter Lieven <pl@kamp.de>
> ---
> block/rbd.c | 255 ++++++++++++++++++----------------------------------
> 1 file changed, 87 insertions(+), 168 deletions(-)
>
> diff --git a/block/rbd.c b/block/rbd.c
> index 97a2ae4c84..0d8612a988 100644
> --- a/block/rbd.c
> +++ b/block/rbd.c
> @@ -66,22 +66,6 @@ typedef enum {
> RBD_AIO_FLUSH
> } RBDAIOCmd;
>
> -typedef struct RBDAIOCB {
> - BlockAIOCB common;
> - int64_t ret;
> - QEMUIOVector *qiov;
> - RBDAIOCmd cmd;
> - int error;
> - struct BDRVRBDState *s;
> -} RBDAIOCB;
> -
> -typedef struct RADOSCB {
> - RBDAIOCB *acb;
> - struct BDRVRBDState *s;
> - int64_t size;
> - int64_t ret;
> -} RADOSCB;
> -
> typedef struct BDRVRBDState {
> rados_t cluster;
> rados_ioctx_t io_ctx;
> @@ -93,6 +77,13 @@ typedef struct BDRVRBDState {
> uint64_t object_size;
> } BDRVRBDState;
>
> +typedef struct RBDTask {
> + BlockDriverState *bs;
> + Coroutine *co;
> + bool complete;
> + int64_t ret;
> +} RBDTask;
> +
> static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
> BlockdevOptionsRbd *opts, bool cache,
> const char *keypairs, const char *secretid,
> @@ -325,13 +316,6 @@ static int qemu_rbd_set_keypairs(rados_t cluster, const
> char *keypairs_json,
> return ret;
> }
>
> -static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
> -{
> - RBDAIOCB *acb = rcb->acb;
> - iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
> - acb->qiov->size - offs);
> -}
> -
> /* FIXME Deprecate and remove keypairs or make it available in QMP. */
> static int qemu_rbd_do_create(BlockdevCreateOptions *options,
> const char *keypairs, const char
> *password_secret,
> @@ -450,46 +434,6 @@ exit:
> return ret;
> }
>
> -/*
> - * This aio completion is being called from rbd_finish_bh() and runs in qemu
> - * BH context.
> - */
> -static void qemu_rbd_complete_aio(RADOSCB *rcb)
> -{
> - RBDAIOCB *acb = rcb->acb;
> - int64_t r;
> -
> - r = rcb->ret;
> -
> - if (acb->cmd != RBD_AIO_READ) {
> - if (r < 0) {
> - acb->ret = r;
> - acb->error = 1;
> - } else if (!acb->error) {
> - acb->ret = rcb->size;
> - }
> - } else {
> - if (r < 0) {
> - qemu_rbd_memset(rcb, 0);
> - acb->ret = r;
> - acb->error = 1;
> - } else if (r < rcb->size) {
> - qemu_rbd_memset(rcb, r);
> - if (!acb->error) {
> - acb->ret = rcb->size;
> - }
> - } else if (!acb->error) {
> - acb->ret = r;
> - }
> - }
> -
> - g_free(rcb);
> -
> - acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
> -
> - qemu_aio_unref(acb);
> -}
> -
> static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
> {
> const char **vals;
> @@ -826,89 +770,50 @@ static int qemu_rbd_resize(BlockDriverState *bs,
> uint64_t size)
> return 0;
> }
>
> -static const AIOCBInfo rbd_aiocb_info = {
> - .aiocb_size = sizeof(RBDAIOCB),
> -};
> -
> -static void rbd_finish_bh(void *opaque)
> +static void qemu_rbd_finish_bh(void *opaque)
> {
> - RADOSCB *rcb = opaque;
> - qemu_rbd_complete_aio(rcb);
> + RBDTask *task = opaque;
> + task->complete = 1;
> + aio_co_wake(task->co);
> }
>
> -/*
> - * This is the callback function for rbd_aio_read and _write
> - *
> - * Note: this function is being called from a non qemu thread so
> - * we need to be careful about what we do here. Generally we only
> - * schedule a BH, and do the rest of the io completion handling
> - * from rbd_finish_bh() which runs in a qemu context.
> - */
I would adapt this comment instead of removing it. I mean, it is
still true and the reason for going through aio_bh_schedule_oneshot()
instead of calling aio_co_wake() directly, right?
> -static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
> +static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
> {
> - RBDAIOCB *acb = rcb->acb;
> -
> - rcb->ret = rbd_aio_get_return_value(c);
> + task->ret = rbd_aio_get_return_value(c);
> rbd_aio_release(c);
> -
> - replay_bh_schedule_oneshot_event(bdrv_get_aio_context(acb->common.bs),
> - rbd_finish_bh, rcb);
> + aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
> + qemu_rbd_finish_bh, task);
> }
>
> -static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
> - int64_t off,
> - QEMUIOVector *qiov,
> - int64_t size,
> - BlockCompletionFunc *cb,
> - void *opaque,
> - RBDAIOCmd cmd)
> +static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
> + uint64_t offset,
> + uint64_t bytes,
> + QEMUIOVector *qiov,
> + int flags,
> + RBDAIOCmd cmd)
> {
> - RBDAIOCB *acb;
> - RADOSCB *rcb = NULL;
> + BDRVRBDState *s = bs->opaque;
> + RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
> rbd_completion_t c;
> int r;
>
> - BDRVRBDState *s = bs->opaque;
> + assert(!qiov || qiov->size == bytes);
>
> - acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
> - acb->cmd = cmd;
> - acb->qiov = qiov;
> - assert(!qiov || qiov->size == size);
> -
> - rcb = g_new(RADOSCB, 1);
> -
> - acb->ret = 0;
> - acb->error = 0;
> - acb->s = s;
> -
> - rcb->acb = acb;
> - rcb->s = acb->s;
> - rcb->size = size;
> - r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb,
> &c);
> + r = rbd_aio_create_completion(&task,
> + (rbd_callback_t) qemu_rbd_completion_cb,
> &c);
> if (r < 0) {
> - goto failed;
> + return r;
> }
>
> switch (cmd) {
> - case RBD_AIO_WRITE:
> - /*
> - * RBD APIs don't allow us to write more than actual size, so in
> order
> - * to support growing images, we resize the image before write
> - * operations that exceed the current size.
> - */
> - if (off + size > s->image_size) {
> - r = qemu_rbd_resize(bs, off + size);
> - if (r < 0) {
> - goto failed_completion;
> - }
> - }
> - r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
> - break;
> case RBD_AIO_READ:
> - r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
> + r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
> + break;
> + case RBD_AIO_WRITE:
> + r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
> break;
> case RBD_AIO_DISCARD:
> - r = rbd_aio_discard(s->image, off, size, c);
> + r = rbd_aio_discard(s->image, offset, bytes, c);
> break;
> case RBD_AIO_FLUSH:
> r = rbd_aio_flush(s->image, c);
> @@ -918,44 +823,69 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
> }
>
> if (r < 0) {
> - goto failed_completion;
> + error_report("rbd request failed early: cmd %d offset %" PRIu64
> + " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
> + bytes, flags, r, strerror(-r));
> + rbd_aio_release(c);
> + return r;
> }
> - return &acb->common;
>
> -failed_completion:
> - rbd_aio_release(c);
> -failed:
> - g_free(rcb);
> + while (!task.complete) {
> + qemu_coroutine_yield();
> + }
>
> - qemu_aio_unref(acb);
> - return NULL;
> + if (task.ret < 0) {
> + error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
> + PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd,
> offset,
> + bytes, flags, task.ret, strerror(-task.ret));
> + return task.ret;
> + }
> +
> + /* zero pad short reads */
> + if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
> + qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
I would use bytes instead of qiov->size here and on the previous
line. In many systems the iovec can be larger than the op and it
took me a couple of minutes to spot the "qiov->size == bytes" assert.
Also, previously we zeroed the entire op on read errors. I don't think
it matters but want to make sure this was dropped on purpose.
> + }
> +
> + return 0;
> +}
> +
> +static int
> +coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, uint64_t offset,
> + uint64_t bytes, QEMUIOVector *qiov,
> + int flags)
> +{
> + return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
> }
>
> -static BlockAIOCB *qemu_rbd_aio_preadv(BlockDriverState *bs,
> - uint64_t offset, uint64_t bytes,
> - QEMUIOVector *qiov, int flags,
> - BlockCompletionFunc *cb,
> - void *opaque)
> +static int
> +coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, uint64_t offset,
> + uint64_t bytes, QEMUIOVector *qiov,
> + int flags)
> {
> - return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
> - RBD_AIO_READ);
> + BDRVRBDState *s = bs->opaque;
> + /*
> + * RBD APIs don't allow us to write more than actual size, so in order
> + * to support growing images, we resize the image before write
> + * operations that exceed the current size.
> + */
> + if (offset + bytes > s->image_size) {
> + int r = qemu_rbd_resize(bs, offset + bytes);
> + if (r < 0) {
> + return r;
> + }
> + }
How can this be triggered today? qemu-io used to be able to, but that
support was removed a long time ago:
https://mail.gnu.org/archive/html/qemu-devel/2014-12/msg01592.html
Just checking that this piece of code is not vestigial at this point.
Thanks,
Ilya
- Re: [PATCH V3 4/6] block/rbd: migrate from aio to coroutines,
Ilya Dryomov <=