[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [PATCH V3 4/6] block/rbd: migrate from aio to coroutines
From: |
Ilya Dryomov |
Subject: |
Re: [PATCH V3 4/6] block/rbd: migrate from aio to coroutines |
Date: |
Fri, 18 Jun 2021 13:26:31 +0200 |
On Fri, Jun 18, 2021 at 11:07 AM Peter Lieven <pl@kamp.de> wrote:
>
> Am 17.06.21 um 16:43 schrieb Ilya Dryomov:
> > On Wed, May 19, 2021 at 4:27 PM Peter Lieven <pl@kamp.de> wrote:
> >> Signed-off-by: Peter Lieven <pl@kamp.de>
> >> ---
> >> block/rbd.c | 255 ++++++++++++++++++----------------------------------
> >> 1 file changed, 87 insertions(+), 168 deletions(-)
> >>
> >> diff --git a/block/rbd.c b/block/rbd.c
> >> index 97a2ae4c84..0d8612a988 100644
> >> --- a/block/rbd.c
> >> +++ b/block/rbd.c
> >> @@ -66,22 +66,6 @@ typedef enum {
> >> RBD_AIO_FLUSH
> >> } RBDAIOCmd;
> >>
> >> -typedef struct RBDAIOCB {
> >> - BlockAIOCB common;
> >> - int64_t ret;
> >> - QEMUIOVector *qiov;
> >> - RBDAIOCmd cmd;
> >> - int error;
> >> - struct BDRVRBDState *s;
> >> -} RBDAIOCB;
> >> -
> >> -typedef struct RADOSCB {
> >> - RBDAIOCB *acb;
> >> - struct BDRVRBDState *s;
> >> - int64_t size;
> >> - int64_t ret;
> >> -} RADOSCB;
> >> -
> >> typedef struct BDRVRBDState {
> >> rados_t cluster;
> >> rados_ioctx_t io_ctx;
> >> @@ -93,6 +77,13 @@ typedef struct BDRVRBDState {
> >> uint64_t object_size;
> >> } BDRVRBDState;
> >>
> >> +typedef struct RBDTask {
> >> + BlockDriverState *bs;
> >> + Coroutine *co;
> >> + bool complete;
> >> + int64_t ret;
> >> +} RBDTask;
> >> +
> >> static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
> >> BlockdevOptionsRbd *opts, bool cache,
> >> const char *keypairs, const char *secretid,
> >> @@ -325,13 +316,6 @@ static int qemu_rbd_set_keypairs(rados_t cluster,
> >> const char *keypairs_json,
> >> return ret;
> >> }
> >>
> >> -static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
> >> -{
> >> - RBDAIOCB *acb = rcb->acb;
> >> - iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
> >> - acb->qiov->size - offs);
> >> -}
> >> -
> >> /* FIXME Deprecate and remove keypairs or make it available in QMP. */
> >> static int qemu_rbd_do_create(BlockdevCreateOptions *options,
> >> const char *keypairs, const char
> >> *password_secret,
> >> @@ -450,46 +434,6 @@ exit:
> >> return ret;
> >> }
> >>
> >> -/*
> >> - * This aio completion is being called from rbd_finish_bh() and runs in
> >> qemu
> >> - * BH context.
> >> - */
> >> -static void qemu_rbd_complete_aio(RADOSCB *rcb)
> >> -{
> >> - RBDAIOCB *acb = rcb->acb;
> >> - int64_t r;
> >> -
> >> - r = rcb->ret;
> >> -
> >> - if (acb->cmd != RBD_AIO_READ) {
> >> - if (r < 0) {
> >> - acb->ret = r;
> >> - acb->error = 1;
> >> - } else if (!acb->error) {
> >> - acb->ret = rcb->size;
> >> - }
> >> - } else {
> >> - if (r < 0) {
> >> - qemu_rbd_memset(rcb, 0);
> >> - acb->ret = r;
> >> - acb->error = 1;
> >> - } else if (r < rcb->size) {
> >> - qemu_rbd_memset(rcb, r);
> >> - if (!acb->error) {
> >> - acb->ret = rcb->size;
> >> - }
> >> - } else if (!acb->error) {
> >> - acb->ret = r;
> >> - }
> >> - }
> >> -
> >> - g_free(rcb);
> >> -
> >> - acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
> >> -
> >> - qemu_aio_unref(acb);
> >> -}
> >> -
> >> static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
> >> {
> >> const char **vals;
> >> @@ -826,89 +770,50 @@ static int qemu_rbd_resize(BlockDriverState *bs,
> >> uint64_t size)
> >> return 0;
> >> }
> >>
> >> -static const AIOCBInfo rbd_aiocb_info = {
> >> - .aiocb_size = sizeof(RBDAIOCB),
> >> -};
> >> -
> >> -static void rbd_finish_bh(void *opaque)
> >> +static void qemu_rbd_finish_bh(void *opaque)
> >> {
> >> - RADOSCB *rcb = opaque;
> >> - qemu_rbd_complete_aio(rcb);
> >> + RBDTask *task = opaque;
> >> + task->complete = 1;
> >> + aio_co_wake(task->co);
> >> }
> >>
> >> -/*
> >> - * This is the callback function for rbd_aio_read and _write
> >> - *
> >> - * Note: this function is being called from a non qemu thread so
> >> - * we need to be careful about what we do here. Generally we only
> >> - * schedule a BH, and do the rest of the io completion handling
> >> - * from rbd_finish_bh() which runs in a qemu context.
> >> - */
> > I would adapt this comment instead of removing it. I mean, it is
> > still true and the reason for going through aio_bh_schedule_oneshot()
> > instead of calling aio_co_wake() directly, right?
>
>
> Sure, its still right, but I think rbd is the only driver with this comment.
>
> I can leave it in, no problem.
Yeah, just massage it a bit to reflect the reality (at least the
function name).
>
>
> >
> >> -static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
> >> +static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
> >> {
> >> - RBDAIOCB *acb = rcb->acb;
> >> -
> >> - rcb->ret = rbd_aio_get_return_value(c);
> >> + task->ret = rbd_aio_get_return_value(c);
> >> rbd_aio_release(c);
> >> -
> >> - replay_bh_schedule_oneshot_event(bdrv_get_aio_context(acb->common.bs),
> >> - rbd_finish_bh, rcb);
> >> + aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
> >> + qemu_rbd_finish_bh, task);
> >> }
> >>
> >> -static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
> >> - int64_t off,
> >> - QEMUIOVector *qiov,
> >> - int64_t size,
> >> - BlockCompletionFunc *cb,
> >> - void *opaque,
> >> - RBDAIOCmd cmd)
> >> +static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
> >> + uint64_t offset,
> >> + uint64_t bytes,
> >> + QEMUIOVector *qiov,
> >> + int flags,
> >> + RBDAIOCmd cmd)
> >> {
> >> - RBDAIOCB *acb;
> >> - RADOSCB *rcb = NULL;
> >> + BDRVRBDState *s = bs->opaque;
> >> + RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
> >> rbd_completion_t c;
> >> int r;
> >>
> >> - BDRVRBDState *s = bs->opaque;
> >> + assert(!qiov || qiov->size == bytes);
> >>
> >> - acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
> >> - acb->cmd = cmd;
> >> - acb->qiov = qiov;
> >> - assert(!qiov || qiov->size == size);
> >> -
> >> - rcb = g_new(RADOSCB, 1);
> >> -
> >> - acb->ret = 0;
> >> - acb->error = 0;
> >> - acb->s = s;
> >> -
> >> - rcb->acb = acb;
> >> - rcb->s = acb->s;
> >> - rcb->size = size;
> >> - r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb,
> >> &c);
> >> + r = rbd_aio_create_completion(&task,
> >> + (rbd_callback_t)
> >> qemu_rbd_completion_cb, &c);
> >> if (r < 0) {
> >> - goto failed;
> >> + return r;
> >> }
> >>
> >> switch (cmd) {
> >> - case RBD_AIO_WRITE:
> >> - /*
> >> - * RBD APIs don't allow us to write more than actual size, so in
> >> order
> >> - * to support growing images, we resize the image before write
> >> - * operations that exceed the current size.
> >> - */
> >> - if (off + size > s->image_size) {
> >> - r = qemu_rbd_resize(bs, off + size);
> >> - if (r < 0) {
> >> - goto failed_completion;
> >> - }
> >> - }
> >> - r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
> >> - break;
> >> case RBD_AIO_READ:
> >> - r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
> >> + r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
> >> + break;
> >> + case RBD_AIO_WRITE:
> >> + r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
> >> break;
> >> case RBD_AIO_DISCARD:
> >> - r = rbd_aio_discard(s->image, off, size, c);
> >> + r = rbd_aio_discard(s->image, offset, bytes, c);
> >> break;
> >> case RBD_AIO_FLUSH:
> >> r = rbd_aio_flush(s->image, c);
> >> @@ -918,44 +823,69 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState
> >> *bs,
> >> }
> >>
> >> if (r < 0) {
> >> - goto failed_completion;
> >> + error_report("rbd request failed early: cmd %d offset %" PRIu64
> >> + " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
> >> + bytes, flags, r, strerror(-r));
> >> + rbd_aio_release(c);
> >> + return r;
> >> }
> >> - return &acb->common;
> >>
> >> -failed_completion:
> >> - rbd_aio_release(c);
> >> -failed:
> >> - g_free(rcb);
> >> + while (!task.complete) {
> >> + qemu_coroutine_yield();
> >> + }
> >>
> >> - qemu_aio_unref(acb);
> >> - return NULL;
> >> + if (task.ret < 0) {
> >> + error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes
> >> %"
> >> + PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd,
> >> offset,
> >> + bytes, flags, task.ret, strerror(-task.ret));
> >> + return task.ret;
> >> + }
> >> +
> >> + /* zero pad short reads */
> >> + if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
> >> + qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
> > I would use bytes instead of qiov->size here and on the previous
> > line. In many systems the iovec can be larger than the op and it
> > took me a couple of minutes to spot the "qiov->size == bytes" assert.
>
>
> qemu_iovec_memset is an operation on the qiov. If one missed
>
> the assertion one could argue that we memset beyond the size of the qiov
>
> if we pass bytes - task.ret as length.
Sorry, disregard -- I missed that rbd_aio_readv/writev() are specified
in terms of iovecs (i.e. iovec base + iovec count instead of iovec base +
number of bytes).
>
>
> Is it true that there a systems which pass a bigger iovec than the actual op?
Perhaps not in QEMU but it comes in many other places. For example
in the kernel when the block device driver gets a "bio" which it doesn't
own and should act only on a small piece of it, etc.
>
> In this case the assertion would trigger there.
>
>
> >
> > Also, previously we zeroed the entire op on read errors. I don't think
> > it matters but want to make sure this was dropped on purpose.
>
>
> I dropped it because its not done in other drivers. If you feel that we could
> reveal sensitive information I can put the wiping back in.
I don't think so and we don't do this in kernel rbd either.
>
>
> >
> >> + }
> >> +
> >> + return 0;
> >> +}
> >> +
> >> +static int
> >> +coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, uint64_t offset,
> >> + uint64_t bytes, QEMUIOVector *qiov,
> >> + int flags)
> >> +{
> >> + return qemu_rbd_start_co(bs, offset, bytes, qiov, flags,
> >> RBD_AIO_READ);
> >> }
> >>
> >> -static BlockAIOCB *qemu_rbd_aio_preadv(BlockDriverState *bs,
> >> - uint64_t offset, uint64_t bytes,
> >> - QEMUIOVector *qiov, int flags,
> >> - BlockCompletionFunc *cb,
> >> - void *opaque)
> >> +static int
> >> +coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, uint64_t offset,
> >> + uint64_t bytes, QEMUIOVector *qiov,
> >> + int flags)
> >> {
> >> - return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
> >> - RBD_AIO_READ);
> >> + BDRVRBDState *s = bs->opaque;
> >> + /*
> >> + * RBD APIs don't allow us to write more than actual size, so in order
> >> + * to support growing images, we resize the image before write
> >> + * operations that exceed the current size.
> >> + */
> >> + if (offset + bytes > s->image_size) {
> >> + int r = qemu_rbd_resize(bs, offset + bytes);
> >> + if (r < 0) {
> >> + return r;
> >> + }
> >> + }
> > How can this be triggered today? qemu-io used to be able to, but that
> > support was removed a long time ago:
> >
> > https://mail.gnu.org/archive/html/qemu-devel/2014-12/msg01592.html
> >
> > Just checking that this piece of code is not vestigial at this point.
>
>
> We had this discussion before. The way to hit this if you want to create e.g.
> a qcow2 image on rbd. It does not make that much sense, but it was supported
> in the past.
Ah, so something like
qemu-img create -f qcow2 rbd:foo/bar 10G
>
> I would vote for removing it, but it might break something somewhere.
Yeah, it's weird but definitely not up for removal in a patch that
migrates from one internal API to another.
Thanks,
Ilya