qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH -v5] ceph/rbd block driver for qemu-kvm


From: Stefan Hajnoczi
Subject: Re: [Qemu-devel] [PATCH -v5] ceph/rbd block driver for qemu-kvm
Date: Sat, 9 Oct 2010 10:21:02 +0100

On Fri, Oct 8, 2010 at 8:00 PM, Yehuda Sadeh <address@hidden> wrote:
No flush operation is supported.  Can the guest be sure written data
is on stable storage when it receives completion?

> +/*
> + * This aio completion is being called from rbd_aio_event_reader() and
> + * runs in qemu context. It schedules a bh, but just in case the aio
> + * was not cancelled before.

Cancellation looks unsafe to me because acb is freed for cancel but
then accessed here!  Also see my comment on aio_cancel() below.

> +/*
> + * Cancel aio. Since we don't reference acb in a non qemu threads,
> + * it is safe to access it here.
> + */
> +static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
> +{
> +    RBDAIOCB *acb = (RBDAIOCB *) blockacb;
> +    qemu_bh_delete(acb->bh);
> +    acb->bh = NULL;
> +    qemu_aio_release(acb);

Any pending librados completions are still running here and will then
cause acb to be accessed after they complete.  If there is no safe way
to cancel then wait for the request to complete.

> +}
> +
> +static AIOPool rbd_aio_pool = {
> +    .aiocb_size = sizeof(RBDAIOCB),
> +    .cancel = rbd_aio_cancel,
> +};
> +
> +/*
> + * This is the callback function for rados_aio_read and _write
> + *
> + * Note: this function is being called from a non qemu thread so
> + * we need to be careful about what we do here. Generally we only
> + * write to the block notification pipe, and do the rest of the
> + * io completion handling from rbd_aio_event_reader() which
> + * runs in a qemu context.

Do librados threads have all signals blocked?  QEMU uses signals so it
is important that this signal not get sent to a librados thread and
discarded.  I have seen this issue in the past when using threaded
libraries in QEMU.

> + */
> +static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
> +{
> +    rcb->ret = rados_aio_get_return_value(c);
> +    rados_aio_release(c);
> +    if (write(rcb->s->fds[RBD_FD_WRITE], (void *)&rcb, sizeof(&rcb)) < 0) {

You are writing RADOSCB* so sizeof(rcb) should be used.

> +        error_report("failed writing to acb->s->fds\n");
> +        qemu_free(rcb);
> +    }
> +}
> +
> +/* Callback when all queued rados_aio requests are complete */
> +
> +static void rbd_aio_bh_cb(void *opaque)
> +{
> +    RBDAIOCB *acb = opaque;
> +
> +    if (!acb->write) {
> +        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
> +    }
> +    qemu_vfree(acb->bounce);
> +    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
> +    qemu_bh_delete(acb->bh);
> +    acb->bh = NULL;
> +
> +    qemu_aio_release(acb);
> +}
> +
> +static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
> +                                           int64_t sector_num,
> +                                           QEMUIOVector *qiov,
> +                                           int nb_sectors,
> +                                           BlockDriverCompletionFunc *cb,
> +                                           void *opaque, int write)
> +{
> +    RBDAIOCB *acb;
> +    RADOSCB *rcb;
> +    rados_completion_t c;
> +    char n[RBD_MAX_SEG_NAME_SIZE];
> +    int64_t segnr, segoffs, segsize, last_segnr;
> +    int64_t off, size;
> +    char *buf;
> +
> +    BDRVRBDState *s = bs->opaque;
> +
> +    acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
> +    acb->write = write;
> +    acb->qiov = qiov;
> +    acb->bounce = qemu_blockalign(bs, qiov->size);
> +    acb->aiocnt = 0;
> +    acb->ret = 0;
> +    acb->error = 0;
> +    acb->s = s;
> +
> +    if (!acb->bh) {
> +        acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
> +    }

When do you expect acb->bh to be non-NULL?

> +
> +    if (write) {
> +        qemu_iovec_to_buffer(acb->qiov, acb->bounce);
> +    }
> +
> +    buf = acb->bounce;
> +
> +    off = sector_num * BDRV_SECTOR_SIZE;
> +    size = nb_sectors * BDRV_SECTOR_SIZE;
> +    segnr = off / s->objsize;
> +    segoffs = off % s->objsize;
> +    segsize = s->objsize - segoffs;
> +
> +    last_segnr = ((off + size - 1) / s->objsize);
> +    acb->aiocnt = (last_segnr - segnr) + 1;
> +
> +    s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */
> +
> +    if (write && s->read_only) {
> +        acb->ret = -EROFS;
> +        return NULL;
> +    }

block.c:bdrv_aio_writev() will reject writes to read-only block
devices.  This check can be eliminated and it also prevents leaking
acb here.

Stefan



reply via email to

[Prev in Thread] Current Thread [Next in Thread]