[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [PATCH V3 4/6] block/rbd: migrate from aio to coroutines
From: |
Peter Lieven |
Subject: |
Re: [PATCH V3 4/6] block/rbd: migrate from aio to coroutines |
Date: |
Fri, 18 Jun 2021 11:07:18 +0200 |
User-agent: |
Mozilla/5.0 (X11; Linux x86_64; rv:68.0) Gecko/20100101 Thunderbird/68.10.0 |
Am 17.06.21 um 16:43 schrieb Ilya Dryomov:
> On Wed, May 19, 2021 at 4:27 PM Peter Lieven <pl@kamp.de> wrote:
>> Signed-off-by: Peter Lieven <pl@kamp.de>
>> ---
>> block/rbd.c | 255 ++++++++++++++++++----------------------------------
>> 1 file changed, 87 insertions(+), 168 deletions(-)
>>
>> diff --git a/block/rbd.c b/block/rbd.c
>> index 97a2ae4c84..0d8612a988 100644
>> --- a/block/rbd.c
>> +++ b/block/rbd.c
>> @@ -66,22 +66,6 @@ typedef enum {
>> RBD_AIO_FLUSH
>> } RBDAIOCmd;
>>
>> -typedef struct RBDAIOCB {
>> - BlockAIOCB common;
>> - int64_t ret;
>> - QEMUIOVector *qiov;
>> - RBDAIOCmd cmd;
>> - int error;
>> - struct BDRVRBDState *s;
>> -} RBDAIOCB;
>> -
>> -typedef struct RADOSCB {
>> - RBDAIOCB *acb;
>> - struct BDRVRBDState *s;
>> - int64_t size;
>> - int64_t ret;
>> -} RADOSCB;
>> -
>> typedef struct BDRVRBDState {
>> rados_t cluster;
>> rados_ioctx_t io_ctx;
>> @@ -93,6 +77,13 @@ typedef struct BDRVRBDState {
>> uint64_t object_size;
>> } BDRVRBDState;
>>
>> +typedef struct RBDTask {
>> + BlockDriverState *bs;
>> + Coroutine *co;
>> + bool complete;
>> + int64_t ret;
>> +} RBDTask;
>> +
>> static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
>> BlockdevOptionsRbd *opts, bool cache,
>> const char *keypairs, const char *secretid,
>> @@ -325,13 +316,6 @@ static int qemu_rbd_set_keypairs(rados_t cluster, const
>> char *keypairs_json,
>> return ret;
>> }
>>
>> -static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
>> -{
>> - RBDAIOCB *acb = rcb->acb;
>> - iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
>> - acb->qiov->size - offs);
>> -}
>> -
>> /* FIXME Deprecate and remove keypairs or make it available in QMP. */
>> static int qemu_rbd_do_create(BlockdevCreateOptions *options,
>> const char *keypairs, const char
>> *password_secret,
>> @@ -450,46 +434,6 @@ exit:
>> return ret;
>> }
>>
>> -/*
>> - * This aio completion is being called from rbd_finish_bh() and runs in qemu
>> - * BH context.
>> - */
>> -static void qemu_rbd_complete_aio(RADOSCB *rcb)
>> -{
>> - RBDAIOCB *acb = rcb->acb;
>> - int64_t r;
>> -
>> - r = rcb->ret;
>> -
>> - if (acb->cmd != RBD_AIO_READ) {
>> - if (r < 0) {
>> - acb->ret = r;
>> - acb->error = 1;
>> - } else if (!acb->error) {
>> - acb->ret = rcb->size;
>> - }
>> - } else {
>> - if (r < 0) {
>> - qemu_rbd_memset(rcb, 0);
>> - acb->ret = r;
>> - acb->error = 1;
>> - } else if (r < rcb->size) {
>> - qemu_rbd_memset(rcb, r);
>> - if (!acb->error) {
>> - acb->ret = rcb->size;
>> - }
>> - } else if (!acb->error) {
>> - acb->ret = r;
>> - }
>> - }
>> -
>> - g_free(rcb);
>> -
>> - acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
>> -
>> - qemu_aio_unref(acb);
>> -}
>> -
>> static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
>> {
>> const char **vals;
>> @@ -826,89 +770,50 @@ static int qemu_rbd_resize(BlockDriverState *bs,
>> uint64_t size)
>> return 0;
>> }
>>
>> -static const AIOCBInfo rbd_aiocb_info = {
>> - .aiocb_size = sizeof(RBDAIOCB),
>> -};
>> -
>> -static void rbd_finish_bh(void *opaque)
>> +static void qemu_rbd_finish_bh(void *opaque)
>> {
>> - RADOSCB *rcb = opaque;
>> - qemu_rbd_complete_aio(rcb);
>> + RBDTask *task = opaque;
>> + task->complete = 1;
>> + aio_co_wake(task->co);
>> }
>>
>> -/*
>> - * This is the callback function for rbd_aio_read and _write
>> - *
>> - * Note: this function is being called from a non qemu thread so
>> - * we need to be careful about what we do here. Generally we only
>> - * schedule a BH, and do the rest of the io completion handling
>> - * from rbd_finish_bh() which runs in a qemu context.
>> - */
> I would adapt this comment instead of removing it. I mean, it is
> still true and the reason for going through aio_bh_schedule_oneshot()
> instead of calling aio_co_wake() directly, right?
Sure, its still right, but I think rbd is the only driver with this comment.
I can leave it in, no problem.
>
>> -static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
>> +static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
>> {
>> - RBDAIOCB *acb = rcb->acb;
>> -
>> - rcb->ret = rbd_aio_get_return_value(c);
>> + task->ret = rbd_aio_get_return_value(c);
>> rbd_aio_release(c);
>> -
>> - replay_bh_schedule_oneshot_event(bdrv_get_aio_context(acb->common.bs),
>> - rbd_finish_bh, rcb);
>> + aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
>> + qemu_rbd_finish_bh, task);
>> }
>>
>> -static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
>> - int64_t off,
>> - QEMUIOVector *qiov,
>> - int64_t size,
>> - BlockCompletionFunc *cb,
>> - void *opaque,
>> - RBDAIOCmd cmd)
>> +static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
>> + uint64_t offset,
>> + uint64_t bytes,
>> + QEMUIOVector *qiov,
>> + int flags,
>> + RBDAIOCmd cmd)
>> {
>> - RBDAIOCB *acb;
>> - RADOSCB *rcb = NULL;
>> + BDRVRBDState *s = bs->opaque;
>> + RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
>> rbd_completion_t c;
>> int r;
>>
>> - BDRVRBDState *s = bs->opaque;
>> + assert(!qiov || qiov->size == bytes);
>>
>> - acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
>> - acb->cmd = cmd;
>> - acb->qiov = qiov;
>> - assert(!qiov || qiov->size == size);
>> -
>> - rcb = g_new(RADOSCB, 1);
>> -
>> - acb->ret = 0;
>> - acb->error = 0;
>> - acb->s = s;
>> -
>> - rcb->acb = acb;
>> - rcb->s = acb->s;
>> - rcb->size = size;
>> - r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb,
>> &c);
>> + r = rbd_aio_create_completion(&task,
>> + (rbd_callback_t) qemu_rbd_completion_cb,
>> &c);
>> if (r < 0) {
>> - goto failed;
>> + return r;
>> }
>>
>> switch (cmd) {
>> - case RBD_AIO_WRITE:
>> - /*
>> - * RBD APIs don't allow us to write more than actual size, so in
>> order
>> - * to support growing images, we resize the image before write
>> - * operations that exceed the current size.
>> - */
>> - if (off + size > s->image_size) {
>> - r = qemu_rbd_resize(bs, off + size);
>> - if (r < 0) {
>> - goto failed_completion;
>> - }
>> - }
>> - r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
>> - break;
>> case RBD_AIO_READ:
>> - r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
>> + r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
>> + break;
>> + case RBD_AIO_WRITE:
>> + r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
>> break;
>> case RBD_AIO_DISCARD:
>> - r = rbd_aio_discard(s->image, off, size, c);
>> + r = rbd_aio_discard(s->image, offset, bytes, c);
>> break;
>> case RBD_AIO_FLUSH:
>> r = rbd_aio_flush(s->image, c);
>> @@ -918,44 +823,69 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
>> }
>>
>> if (r < 0) {
>> - goto failed_completion;
>> + error_report("rbd request failed early: cmd %d offset %" PRIu64
>> + " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
>> + bytes, flags, r, strerror(-r));
>> + rbd_aio_release(c);
>> + return r;
>> }
>> - return &acb->common;
>>
>> -failed_completion:
>> - rbd_aio_release(c);
>> -failed:
>> - g_free(rcb);
>> + while (!task.complete) {
>> + qemu_coroutine_yield();
>> + }
>>
>> - qemu_aio_unref(acb);
>> - return NULL;
>> + if (task.ret < 0) {
>> + error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
>> + PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd,
>> offset,
>> + bytes, flags, task.ret, strerror(-task.ret));
>> + return task.ret;
>> + }
>> +
>> + /* zero pad short reads */
>> + if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
>> + qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
> I would use bytes instead of qiov->size here and on the previous
> line. In many systems the iovec can be larger than the op and it
> took me a couple of minutes to spot the "qiov->size == bytes" assert.
qemu_iovec_memset is an operation on the qiov. If one missed
the assertion one could argue that we memset beyond the size of the qiov
if we pass bytes - task.ret as length.
Is it true that there a systems which pass a bigger iovec than the actual op?
In this case the assertion would trigger there.
>
> Also, previously we zeroed the entire op on read errors. I don't think
> it matters but want to make sure this was dropped on purpose.
I dropped it because its not done in other drivers. If you feel that we could
reveal sensitive information I can put the wiping back in.
>
>> + }
>> +
>> + return 0;
>> +}
>> +
>> +static int
>> +coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, uint64_t offset,
>> + uint64_t bytes, QEMUIOVector *qiov,
>> + int flags)
>> +{
>> + return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
>> }
>>
>> -static BlockAIOCB *qemu_rbd_aio_preadv(BlockDriverState *bs,
>> - uint64_t offset, uint64_t bytes,
>> - QEMUIOVector *qiov, int flags,
>> - BlockCompletionFunc *cb,
>> - void *opaque)
>> +static int
>> +coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, uint64_t offset,
>> + uint64_t bytes, QEMUIOVector *qiov,
>> + int flags)
>> {
>> - return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
>> - RBD_AIO_READ);
>> + BDRVRBDState *s = bs->opaque;
>> + /*
>> + * RBD APIs don't allow us to write more than actual size, so in order
>> + * to support growing images, we resize the image before write
>> + * operations that exceed the current size.
>> + */
>> + if (offset + bytes > s->image_size) {
>> + int r = qemu_rbd_resize(bs, offset + bytes);
>> + if (r < 0) {
>> + return r;
>> + }
>> + }
> How can this be triggered today? qemu-io used to be able to, but that
> support was removed a long time ago:
>
> https://mail.gnu.org/archive/html/qemu-devel/2014-12/msg01592.html
>
> Just checking that this piece of code is not vestigial at this point.
We had this discussion before. The way to hit this if you want to create e.g. a
qcow2 image on rbd. It does not make that much sense, but it was supported in
the past.
I would vote for removing it, but it might break something somewhere.
Thanks,
Peter