>From e9eac2c7ed7b98ff102ab7da4573f081ebca32fa Mon Sep 17 00:00:00 2001 From: Stefan Priebe Date: Mon, 19 Nov 2012 15:01:16 +0100 Subject: [PATCH 2/2] rbd: fix races between io completition and abort Signed-off-by: Stefan Priebe --- block/rbd.c | 40 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/block/rbd.c b/block/rbd.c index 583bcc3..ae1d03b 100644 --- a/block/rbd.c +++ b/block/rbd.c @@ -77,6 +77,7 @@ typedef struct RBDAIOCB { int error; struct BDRVRBDState *s; int cancelled; + int status; } RBDAIOCB; typedef struct RADOSCB { @@ -376,6 +377,10 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb) RBDAIOCB *acb = rcb->acb; int64_t r; + if (acb->bh) { + return; + } + r = rcb->ret; if (acb->cmd == RBD_AIO_WRITE || @@ -560,6 +565,20 @@ static void qemu_rbd_close(BlockDriverState *bs) rados_shutdown(s->cluster); } +static void qemu_rbd_aio_abort(void *private_data) +{ + RBDAIOCB *acb = (RBDAIOCB *) private_data; + + acb->status = -ECANCELED; + + if (acb->bh) { + return; + } + + acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); + qemu_bh_schedule(acb->bh); +} + /* * Cancel aio. Since we don't reference acb in a non qemu threads, * it is safe to access it here. @@ -567,7 +586,22 @@ static void qemu_rbd_close(BlockDriverState *bs) static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb) { RBDAIOCB *acb = (RBDAIOCB *) blockacb; + + if (acb->status != -EINPROGRESS) { + return; + } + acb->cancelled = 1; + + // TODO / FIXME: send an abort command to rbd + // Normally we should call abort librbd and + // librbd gets qemu_rbd_aio_abort as a callback function + // i wasn't able to find an abort function in librbd at all + qemu_rbd_aio_abort(acb); + + while (acb->status == -EINPROGRESS) { + qemu_aio_wait(); + } } static AIOPool rbd_aio_pool = { @@ -636,10 +670,13 @@ static void rbd_aio_bh_cb(void *opaque) qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size); } qemu_vfree(acb->bounce); - acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); qemu_bh_delete(acb->bh); acb->bh = NULL; + if (acb->cancelled == 0) { + acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); + } + qemu_aio_release(acb); } @@ -685,6 +722,7 @@ static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs, acb->s = s; acb->cancelled = 0; acb->bh = NULL; + acb->status = -EINPROGRESS; if (cmd == RBD_AIO_WRITE) { qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size); -- 1.7.10.4