qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH RFC] block/vxhs: Initial commit to add Veritas H


From: Stefan Hajnoczi
Subject: Re: [Qemu-devel] [PATCH RFC] block/vxhs: Initial commit to add Veritas HyperScale VxHS block device support
Date: Wed, 3 Aug 2016 17:00:37 +0100
User-agent: Mutt/1.6.2 (2016-07-01)

On Sun, Jul 31, 2016 at 06:06:30PM -0700, Ashish Mittal wrote:

A quick first pass review...

> +/* insure init once */
> +static pthread_mutex_t  of_global_ctx_lock;

There is no need for this since .bdrv_open() and .bdrv_create() run
under the QEMU global mutex.

> +
> +/* HyperScale Driver Version */
> +int vxhs_drv_version = 8895;

This is fairly useless.  QEMU itself has a version and the best
information is the git commit SHA1 so you know the actual code that was
compiled.  I would drop it.

If you want to keep it, what's the policy?  Will you force every patch
to block/vxhs.c to increment the number?

> +int vxhs_open_device(const char *vxhs_uri, int *cfd, int *rfd,
> +                       BDRVVXHSState *s)
> +{
> +    char *file_name;
> +    char *of_vsa_addr;
> +    char **split_filename = (char **)malloc(1024);
> +    char *scratch = (char *)malloc(1024);

Please roll your own string splitting/parsing and don't use arbitrary
string length limits.

> +    int resilency_count = 0;
> +    int ret = 0;
> +
> +    pthread_mutex_lock(&of_global_ctx_lock);
> +    if (global_qnio_ctx == NULL) {
> +        global_qnio_ctx = vxhs_setup_qnio();
> +        if (global_qnio_ctx == NULL) {
> +            vxhsDbg("Error while opening the device. QNIO IO library "
> +                      "couldn't be initialized.\n");
> +            pthread_mutex_unlock(&of_global_ctx_lock);
> +            free(split_filename);
> +            free(scratch);
> +            return -1;

Other error returns use -errno instead of a generic -1 error.  Please be
consistent since -1 means -EPERM, which is probably not what you wanted.

> +        }
> +    }
> +    pthread_mutex_unlock(&of_global_ctx_lock);
> +
> +    *cfd = -1;

If you're going to initialize fds to -1, please do it before the first
return.  That way callers can rely on it.  Doing it after a return makes
defeats the purpose.

> +
> +    of_vsa_addr = (char *) malloc(sizeof(char) * OF_MAX_SERVER_ADDR);

Please no arbitrary string limits and manual memory allocation.  Use
glib functions to allocate new strings while duping or formatting them:

https://developer.gnome.org/glib/stable/glib-String-Utility-Functions.html

> +    if (!of_vsa_addr) {
> +        vxhsDbg("Could not allocate memory for file parsing. Bailing out\n");
> +        free(split_filename);
> +        free(scratch);
> +        return -ENOMEM;
> +    }
> +    memset(of_vsa_addr, 0, OF_MAX_SERVER_ADDR);
> +    file_name = (char *) malloc(sizeof(char) * OF_MAX_FILE_LEN);
> +    if (!file_name) {
> +        vxhsDbg("Could not allocate memory for file parsing. Bailing out\n");
> +        free(of_vsa_addr);
> +        free(split_filename);
> +        free(scratch);
> +        return -ENOMEM;
> +    }
> +    memset(file_name, 0, OF_MAX_FILE_LEN);
> +
> +    /*
> +     * Below steps need to done by all the block driver in QEMU which
> +     * support AIO. Need to create pipe for communicating b/w two threads
> +     * in different context. And set handler for read event when IO 
> completion
> +     * done by non-QEMU context.

No, this isn't done by all block drivers that support AIO.  QEMU has
multiple primitives that are better than a pipe including QEMUBH,
EventNotifier, and a thread pool.

I suggest getting rid of the locking and pipe code.  Instead use QEMUBH
to run code in the BlockDriverState's AioContext.  That way there are no
race conditions and you don't need to take locks explicitly.

It's possible to go a step further and make the library asynchronous
(i.e. it exposes file descriptors and QEMU monitors them for the
library).  That way all callbacks happen in the AioContext of the
BlockDriverState and no locking is necessary at all.

By the way, this comment should be here since the code that sets up the
pipe is elsewhere.

> +     */
> +    vxhsDbg("Original command line : %s\n", vxhs_uri);
> +    resilency_count = vxhs_tokenize(split_filename, scratch, vxhs_uri, 
> "%7D");
> +    vxhs_build_io_target_list(s, resilency_count, split_filename);
> +
> +    snprintf(file_name, OF_MAX_FILE_LEN, "%s%s", vdisk_prefix, 
> s->vdisk_guid);
> +    snprintf(of_vsa_addr, OF_MAX_SERVER_ADDR, "of://%s:%d",
> +             s->vdisk_hostinfo[s->vdisk_cur_host_idx].hostip,
> +             s->vdisk_hostinfo[s->vdisk_cur_host_idx].port);
> +
> +    vxhsDbg("Driver state = %p\n", s);
> +    vxhsDbg("Connecting to : %s\n", of_vsa_addr);
> +
> +    *cfd = (*qnioops.qemu_open_iio_conn)(global_qnio_ctx, of_vsa_addr, 0);
> +    if (*cfd < 0) {
> +        vxhsDbg("Could not open an QNIO connection to: %s\n", of_vsa_addr);
> +        ret = -EIO;
> +        goto out;
> +    }
> +    *rfd = (*qnioops.qemu_iio_devopen)(global_qnio_ctx, *cfd, file_name, 0);
> +    s->aio_context = qemu_get_aio_context();

Don't stash AioContext yourself.  Use bdrv_get_aio_context() each time
you need it.

The AioContext can change so you need to implement
bdrv_attach/detach_aio_context() to add/remove resources from the
AioContext.

> +
> +out:
> +    /* uri is still in use, cleaned up in close */
> +    if (file_name != NULL) {
> +        free(file_name);
> +    }
> +    if (of_vsa_addr != NULL) {
> +        free(of_vsa_addr);
> +    }
> +    free(split_filename);
> +    free(scratch);
> +    return ret;
> +}
> +
> +int vxhs_create(const char *filename,
> +        QemuOpts *options, Error **errp)
> +{
> +        int ret = 0;
> +        int qemu_cfd = 0;
> +        int qemu_rfd = 0;
> +        BDRVVXHSState s;
> +
> +        vxhsDbg("vxhs_create: came in to open file = %s\n", filename);
> +        ret = vxhs_open_device(filename, &qemu_cfd, &qemu_rfd, &s);
> +        vxhsDbg("vxhs_create: s->qnio_cfd = %d , s->rfd = %d\n", qemu_cfd,
> +                 qemu_rfd);

Are cfd and rfd leaked?

> +
> +        return ret;
> +}
> +
> +static QemuOptsList runtime_opts = {
> +    .name = "vxhs",
> +    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
> +    .desc = {
> +        {
> +            .name = "filename",
> +            .type = QEMU_OPT_STRING,
> +            .help = "URI to the Veritas HyperScale image",
> +        },
> +        { /* end of list */ }
> +    },
> +};
> +
> +int vxhs_open(BlockDriverState *bs, QDict *options,
> +              int bdrv_flags, Error **errp)
> +{
> +    BDRVVXHSState *s = bs->opaque;
> +    int ret = 0;
> +    int qemu_qnio_cfd = 0;
> +    int qemu_rfd = 0;
> +    QemuOpts *opts;
> +    Error *local_err = NULL;
> +    const char *vxhs_uri;
> +
> +    opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
> +    qemu_opts_absorb_qdict(opts, options, &local_err);
> +    if (local_err) {
> +        error_propagate(errp, local_err);
> +        ret = -EINVAL;
> +        goto out;
> +    }
> +
> +    vxhs_uri = qemu_opt_get(opts, "filename");
> +
> +    memset(s, 0, sizeof(*s));

No need for this, bs->opaque is already zeroed in bdrv_open_common() and
you can rely on that.

> +    vxhsDbg("came in to open file = %s\n", vxhs_uri);
> +    ret = vxhs_open_device(vxhs_uri, &qemu_qnio_cfd, &qemu_rfd, s);
> +    if (ret != 0) {
> +        vxhsDbg("Could not open the device. Error = %d\n", ret);
> +        return ret;
> +    }
> +    s->qnio_ctx = global_qnio_ctx;
> +    s->vdisk_hostinfo[0].qnio_cfd = qemu_qnio_cfd;
> +    s->vdisk_hostinfo[0].vdisk_rfd = qemu_rfd;
> +    s->vdisk_size = 0;
> +    QSIMPLEQ_INIT(&s->vdisk_aio_retryq);
> +
> +    vxhsDbg("s->qnio_cfd = %d, s->rfd = %d\n",
> +              s->vdisk_hostinfo[0].qnio_cfd,
> +              s->vdisk_hostinfo[0].vdisk_rfd);
> +    ret = qemu_pipe(s->fds);
> +    if (ret < 0) {
> +        vxhsDbg("Could not create a pipe for device. bailing out\n");
> +        ret = -errno;
> +        goto out;
> +    }
> +    fcntl(s->fds[VDISK_FD_READ], F_SETFL, O_NONBLOCK);
> +
> +    aio_set_fd_handler(s->aio_context, s->fds[VDISK_FD_READ],
> +                       false, vxhs_aio_event_reader, NULL, s);
> +
> +    /*
> +     * Allocate/Initialize the spin-locks.
> +     *
> +     * NOTE:
> +     *      Since spin lock is being allocated
> +     *      dynamically hence moving acb struct
> +     *      specific lock to BDRVVXHSState
> +     *      struct. The reason is very simple,
> +     *      we don't want an overhead of spin
> +     *      lock dynamic allocation and free
> +     *      for every AIO.
> +     */
> +    s->vdisk_lock = VXHS_SPIN_LOCK_ALLOC;
> +    s->vdisk_acb_lock = VXHS_SPIN_LOCK_ALLOC;
> +
> +    return 0;
> +
> +out:
> +    if (s->vdisk_hostinfo[0].vdisk_rfd >= 0) {
> +        qnioops.qemu_iio_devclose(s->qnio_ctx,
> +                                 s->vdisk_hostinfo[0].vdisk_rfd);
> +    }
> +    /* never close qnio_cfd */
> +    vxhsDbg("vxhs_open failed (%d)\n", ret);
> +    return ret;

opts is leaked.  Use qemu_opts_del().

> +}
> +
> +static const AIOCBInfo vxhs_aiocb_info = {
> +    .aiocb_size = sizeof(VXHSAIOCB)
> +};
> +
> +/*
> + * This is called in QNIO thread context when IO done
> + * on IO Manager and QNIO client received the data or
> + * ACK. It notify another event handler thread running in QEMU context
> + * by writing on the pipe
> + */
> +void vxhs_finish_aiocb(ssize_t ret, void *arg)
> +{
> +    VXHSAIOCB *acb = (VXHSAIOCB *) arg;
> +    BlockDriverState *bs = acb->common.bs;
> +    BDRVVXHSState *s = bs->opaque;
> +    int rv;
> +
> +    vxhsDbg("finish callback in non-QEMU context... writing on pipe\n");
> +    acb->ret = ret;
> +    rv = qemu_write_full(s->fds[VDISK_FD_WRITE], &acb, sizeof(acb));
> +    if (rv != sizeof(acb)) {
> +        error_report("VXHS AIO completion failed: %s",
> +                     strerror(errno));
> +        abort();
> +    }
> +}
> +
> +/*
> + * This allocates QEMU-VXHS callback for each IO
> + * and passed to QNIO. When QNIO completed the work,
> + * it will be passed back through the callback
> + */
> +BlockAIOCB *vxhs_aio_rw(BlockDriverState *bs,
> +                                int64_t sector_num, QEMUIOVector *qiov,
> +                                int nb_sectors,
> +                                BlockCompletionFunc *cb,
> +                                void *opaque, int iodir)
> +{
> +    VXHSAIOCB         *acb = NULL;
> +    BDRVVXHSState     *s = bs->opaque;
> +    size_t              size;

Weird indentation.  Please use 4 spaces per level of indentation.

> +    uint64_t            offset;
> +    int                 iio_flags = 0;
> +    int                 ret = 0;
> +
> +    offset = sector_num * BDRV_SECTOR_SIZE;
> +    size = nb_sectors * BDRV_SECTOR_SIZE;
> +
> +    acb = qemu_aio_get(&vxhs_aiocb_info, bs, cb, opaque);
> +    /*
> +     * Setup or initialize VXHSAIOCB.
> +     * Every single field should be initialized since
> +     * acb will be picked up from the slab without
> +     * initializing with zero.
> +     */
> +    acb->io_offset = offset;
> +    acb->size = size;
> +    acb->ret = 0;
> +    acb->flags = 0;
> +    acb->aio_done = VXHS_IO_INPROGRESS;
> +    acb->segments = 0;
> +    acb->buffer = 0;
> +    acb->qiov = qiov;
> +    acb->direction = iodir;
> +
> +    VXHS_SPIN_LOCK(s->vdisk_lock);
> +    if (OF_VDISK_FAILED(s)) {
> +        vxhsDbg("vDisk %s, vDisk device is in failed state "
> +                  "iodir = %d size = %lu offset = %lu\n",
> +                  s->vdisk_guid, iodir, size, offset);
> +        VXHS_SPIN_UNLOCK(s->vdisk_lock);
> +        goto errout;
> +    }
> +    if (OF_VDISK_IOFAILOVER_IN_PROGRESS(s)) {
> +        QSIMPLEQ_INSERT_TAIL(&s->vdisk_aio_retryq, acb, retry_entry);
> +        s->vdisk_aio_retry_qd++;
> +        OF_AIOCB_FLAGS_SET_QUEUED(acb);
> +        VXHS_SPIN_UNLOCK(s->vdisk_lock);
> +        vxhsDbg("vDisk %s, added acb %p to retry queue(1)\n",
> +                  s->vdisk_guid, acb);
> +        goto out;
> +    }
> +    s->vdisk_aio_count++;
> +    VXHS_SPIN_UNLOCK(s->vdisk_lock);
> +
> +    iio_flags = (IIO_FLAG_DONE | IIO_FLAG_ASYNC);
> +
> +    switch (iodir) {
> +    case VDISK_AIO_WRITE:
> +            vxhs_inc_acb_segment_count(acb, 1);
> +            /*
> +             * vxhsDbg("WRITING: opaque = %p size = %lu offset = %lu  "
> +             *           "Segments = %d\n", opaque, size,
> +             *           sector_num * BDRV_SECTOR_SIZE, acb->segments);
> +             */
> +            ret = (*qnioops.qemu_iio_writev)(s->qnio_ctx,
> +                    s->vdisk_hostinfo[s->vdisk_cur_host_idx].vdisk_rfd,
> +                    qiov->iov, qiov->niov, offset, (void *)acb, iio_flags);
> +            /*
> +             * vxhsDbg("qemu_iio_writev acb %p ret %d\n", acb, ret);
> +             */
> +            break;
> +    case VDISK_AIO_READ:
> +            vxhs_inc_acb_segment_count(acb, 1);
> +            /*
> +             * vxhsDbg("READING : buf = %p size = %lu offset = %lu "
> +             *           "Segments = %d\n", buf, size,
> +             *           sector_num * BDRV_SECTOR_SIZE, acb->segments);
> +             */
> +            ret = (*qnioops.qemu_iio_readv)(s->qnio_ctx,
> +                    s->vdisk_hostinfo[s->vdisk_cur_host_idx].vdisk_rfd,
> +                    qiov->iov, qiov->niov, offset, (void *)acb, iio_flags);
> +            /*
> +             * vxhsDbg("qemu_iio_readv returned %d\n", ret);
> +             */
> +            break;
> +    default:
> +            vxhsDbg("Invalid I/O request iodir %d\n", iodir);
> +            goto errout;
> +    }
> +
> +    if (ret != 0) {
> +        vxhsDbg("IO ERROR (vDisk %s) FOR : Read/Write = %d size = %lu "
> +                  "offset = %lu ACB = %p Segments = %d. Error = %d, errno = 
> %d\n",
> +                  s->vdisk_guid, iodir, size, offset,
> +                  acb, acb->segments, ret, errno);
> +        /*
> +         * Don't retry I/Os against vDisk having no
> +         * redundency or statefull storage on compute

s/redundency/redundancy/

s/statefull/stateful/

> +         *
> +         * TODO: Revisit this code path to see if any
> +         *       particular error need to be handled.
> +         *       At this moment failing the I/O.
> +         */
> +        VXHS_SPIN_LOCK(s->vdisk_lock);
> +        if (s->vdisk_nhosts == 1) {
> +            vxhsDbg("vDisk %s, I/O operation failed.\n",
> +                      s->vdisk_guid);
> +            s->vdisk_aio_count--;
> +            vxhs_dec_acb_segment_count(acb, 1);
> +            VXHS_SPIN_UNLOCK(s->vdisk_lock);
> +            goto errout;
> +        }
> +        if (OF_VDISK_FAILED(s)) {
> +            vxhsDbg("vDisk %s, vDisk device failed "
> +                      "iodir = %d size = %lu offset = %lu\n",
> +                      s->vdisk_guid, iodir, size, offset);
> +            s->vdisk_aio_count--;
> +            vxhs_dec_acb_segment_count(acb, 1);
> +            VXHS_SPIN_UNLOCK(s->vdisk_lock);
> +            goto errout;
> +        }
> +        if (OF_VDISK_IOFAILOVER_IN_PROGRESS(s)) {
> +            /*
> +             * Queue all incoming io requests after failover starts.
> +             * Number of requests that can arrive is limited by io queue 
> depth
> +             * so an app blasting independent ios will not exhaust memory.
> +             */
> +            QSIMPLEQ_INSERT_TAIL(&s->vdisk_aio_retryq, acb, retry_entry);
> +            s->vdisk_aio_retry_qd++;
> +            OF_AIOCB_FLAGS_SET_QUEUED(acb);
> +            s->vdisk_aio_count--;
> +            vxhs_dec_acb_segment_count(acb, 1);
> +            VXHS_SPIN_UNLOCK(s->vdisk_lock);
> +            vxhsDbg("vDisk %s, added acb %p to retry queue(2)\n",
> +                      s->vdisk_guid, acb);
> +            goto out;
> +        }
> +        OF_VDISK_SET_IOFAILOVER_IN_PROGRESS(s);
> +        QSIMPLEQ_INSERT_TAIL(&s->vdisk_aio_retryq, acb, retry_entry);
> +        s->vdisk_aio_retry_qd++;
> +        OF_AIOCB_FLAGS_SET_QUEUED(acb);
> +        vxhs_dec_acb_segment_count(acb, 1);
> +        vxhsDbg("vDisk %s, added acb %p to retry queue(3)\n",
> +                  s->vdisk_guid, acb);
> +        /*
> +         * Start I/O failover if there is no active
> +         * AIO within vxhs block driver.
> +         */
> +        if (--s->vdisk_aio_count == 0) {
> +            VXHS_SPIN_UNLOCK(s->vdisk_lock);
> +            /*
> +             * Start IO failover
> +             */
> +            vxhs_failover_io(s);
> +            goto out;
> +        }
> +        VXHS_SPIN_UNLOCK(s->vdisk_lock);
> +    }
> +
> +out:
> +    return &acb->common;
> +
> +errout:
> +    qemu_aio_unref(acb);
> +    return NULL;
> +}
> +
> +/*
> + * This is called from qemu-img utility when user want to resize
> + * the disk. Currently it's noop within VXHS block driver.
> + */
> +int vxhs_truncate(BlockDriverState *bs, int64_t offset)
> +{
> +    BDRVVXHSState *s = bs->opaque;
> +    int ret = 0;
> +
> +    vxhsErr("Truncating disk (%s): BlockDriverState = %p, "
> +              "BDRVVXHSState = %p offset = %lu\n",
> +              s->vdisk_guid, bs, s, offset);
> +
> +    return ret;
> +}
> +
> +BlockAIOCB *vxhs_aio_readv(BlockDriverState *bs,
> +                                   int64_t sector_num, QEMUIOVector *qiov,
> +                                   int nb_sectors,
> +                                   BlockCompletionFunc *cb, void *opaque)
> +{
> +    /*
> +     * vxhsDbg("READING: opaque = %p size = %d offset = %lu\n",
> +     *           opaque, nb_sectors * BDRV_SECTOR_SIZE, sector_num);
> +     */
> +    return vxhs_aio_rw(bs, sector_num, qiov, nb_sectors,
> +                         cb, opaque, VDISK_AIO_READ);
> +}
> +
> +BlockAIOCB *vxhs_aio_writev(BlockDriverState *bs,
> +                                    int64_t sector_num, QEMUIOVector *qiov,
> +                                    int nb_sectors,
> +                                    BlockCompletionFunc *cb, void *opaque)
> +{
> +    /*
> +     * vxhsDbg("WRITING: opaque = %p size = %d offset = %lu\n",
> +     *           opaque, nb_sectors * BDRV_SECTOR_SIZE, sector_num);
> +     */
> +    return vxhs_aio_rw(bs, sector_num, qiov, nb_sectors,
> +                         cb, opaque, VDISK_AIO_WRITE);
> +}
> +
> +/*
> + * This is called by QEMU when flush inside guest triggered
> + * at block layer either for IDE or SCSI disks.
> + */
> +int vxhs_co_flush(BlockDriverState *bs)
> +{
> +    BDRVVXHSState *s = bs->opaque;
> +    uint64_t size = 0;
> +    int ret = 0;
> +    uint32_t iocount = 0;
> +
> +    ret = (*qnioops.qemu_iio_ioctl)(s->qnio_ctx,
> +            s->vdisk_hostinfo[s->vdisk_cur_host_idx].vdisk_rfd,
> +            VDISK_AIO_FLUSH, &size, NULL, IIO_FLAG_SYNC);
> +
> +    if (ret < 0) {
> +        /*
> +         * Currently not handling the flush ioctl
> +         * failure because of network connection
> +         * disconnect. Since all the writes are
> +         * commited into persistent storage hence
> +         * this flush call is noop and we can safely
> +         * return success status to the caller.
> +         *
> +         * If any write failure occurs for inflight
> +         * write AIO because of network disconnect
> +         * then anyway IO failover will be triggered.
> +         */
> +        vxhsDbg("vDisk (%s) Flush ioctl failed ret = %d errno = %d\n",
> +                  s->vdisk_guid, ret, errno);
> +        ret = 0;
> +    }
> +
> +    iocount = vxhs_get_vdisk_iocount(s);
> +    if (iocount > 0) {
> +        vxhsDbg("In the flush the IO count = %d\n", iocount);
> +    }
> +
> +    return ret;
> +}
> +
> +/*
> + * This is called by guest or QEMU to free blocks.
> + * When block freed when files deleted in the guest, fstrim utility
> + * can be used to pass the hints to the block layer if the disk supports
> + * TRIM. It send WRITE_SAME SCSI command to QEMU virtio-scsi layer, which
> + * call bdrv_aio_discard interface.
> + */
> +coroutine_fn int vxhs_co_pdiscard(BlockDriverState *bs,
> +                                   int64_t sector_num, int nb_sectors)
> +{
> +    int64_t off, size;
> +
> +    off = sector_num * BDRV_SECTOR_SIZE;
> +    size = nb_sectors * BDRV_SECTOR_SIZE;
> +
> +    vxhsErr("We are faking the discard for range off = %lu "
> +              "for %lu bytes\n", off, size);
> +    vxhsErr("returning from discard\n");
> +
> +    return 0;
> +}
> +
> +unsigned long vxhs_get_vdisk_stat(BDRVVXHSState *s)
> +{
> +    void *ctx = NULL;
> +    int flags = 0;
> +    unsigned long vdisk_size = 0;
> +    int ret = 0;
> +
> +    ret = (*qnioops.qemu_iio_ioctl)(s->qnio_ctx,
> +            s->vdisk_hostinfo[s->vdisk_cur_host_idx].vdisk_rfd,
> +            VDISK_STAT, &vdisk_size, ctx, flags);
> +
> +    if (ret < 0) {
> +        vxhsDbg("vDisk (%s) stat ioctl failed, ret = %d, errno = %d\n",
> +                  s->vdisk_guid, ret, errno);
> +    }
> +    vxhsDbg("vDisk size = %lu\n", vdisk_size);
> +
> +    return vdisk_size;
> +}
> +
> +/*
> + * Returns the size of vDisk in bytes. This is required
> + * by QEMU block upper block layer so that it is visible
> + * to guest.
> + */
> +int64_t vxhs_getlength(BlockDriverState *bs)
> +{
> +    BDRVVXHSState *s = bs->opaque;
> +    unsigned long vdisk_size = 0;
> +
> +    if (s->vdisk_size > 0) {
> +        vdisk_size = s->vdisk_size;
> +    } else {
> +        /*
> +         * Fetch the vDisk size using stat ioctl
> +         */
> +        vdisk_size = vxhs_get_vdisk_stat(s);
> +        if (vdisk_size > 0) {
> +            s->vdisk_size = vdisk_size;
> +        }
> +    }
> +
> +    if (vdisk_size > 0) {
> +        return (int64_t)vdisk_size; /* return size in bytes */
> +    } else {
> +        return -EIO;
> +    }
> +}
> +
> +/*
> + * Returns actual blocks allocated for the vDisk.
> + * This is required by qemu-img utility.
> + */
> +int64_t vxhs_get_allocated_blocks(BlockDriverState *bs)
> +{
> +    BDRVVXHSState *s = bs->opaque;
> +    unsigned long vdisk_size = 0;
> +
> +    if (s->vdisk_size > 0) {
> +        vdisk_size = s->vdisk_size;
> +    } else {
> +        /*
> +         * TODO:
> +         * Once HyperScale storage-virtualizer provides
> +         * actual physical allocation of blocks then
> +         * fetch that information and return back to the
> +         * caller but for now just get the full size.
> +         */
> +        vdisk_size = vxhs_get_vdisk_stat(s);
> +        if (vdisk_size > 0) {
> +            s->vdisk_size = vdisk_size;
> +            vxhsDbg("Allocated blocks = %lu\n", vdisk_size);
> +        }
> +    }
> +
> +    if (vdisk_size > 0) {
> +        return (int64_t)vdisk_size; /* return size in bytes */
> +    } else {
> +        return -EIO;
> +    }
> +}
> +
> +void vxhs_close(BlockDriverState *bs)
> +{
> +    BDRVVXHSState *s = bs->opaque;
> +
> +    vxhsDbg("closing vDisk\n");
> +
> +    close(s->fds[VDISK_FD_READ]);
> +    close(s->fds[VDISK_FD_WRITE]);
> +
> +    if (s->vdisk_hostinfo[s->vdisk_cur_host_idx].vdisk_rfd >= 0) {
> +        qnioops.qemu_iio_devclose(s->qnio_ctx,
> +            s->vdisk_hostinfo[s->vdisk_cur_host_idx].vdisk_rfd);
> +    }
> +    if (s->vdisk_lock) {
> +        VXHS_SPIN_LOCK_DESTROY(s->vdisk_lock);
> +        s->vdisk_lock = NULL;
> +    }
> +    if (s->vdisk_acb_lock) {
> +        VXHS_SPIN_LOCK_DESTROY(s->vdisk_acb_lock);
> +        s->vdisk_acb_lock = NULL;
> +    }
> +
> +    /*
> +     * never close channel - not ref counted, will
> +     * close for all vdisks
> +     */
> +
> +    vxhsDbg("vxhs_close: done\n");
> +
> +    /*
> +     * TODO: Verify that all the resources were relinguished.
> +     */
> +}
> +
> +int vxhs_has_zero_init(BlockDriverState *bs)
> +{
> +    vxhsDbg("Returning without doing anything (noop).\n");
> +
> +    return 0;
> +}
> +
> +/*
> + * If errors are consistent with storage agent failure:
> + *  - Try to reconnect in case error is transient or storage agent restarted.
> + *  - Currently failover is being triggered on per vDisk basis. There is
> + *    a scope of further optimization where failover can be global (per VM).
> + *  - In case of network (storage agent) failure, for all the vDisks, having
> + *    no redundency, I/Os will be failed without attempting for I/O failover
> + *    because of stateless nature of vDisk.
> + *  - If local or source storage agent is down then send an ioctl to remote
> + *    storage agent to check if remote storage agent in a state to accept
> + *    application I/Os.
> + *  - Once remote storage agent is ready to accept I/O, start I/O shipping.
> + *  - If I/Os cannot be serviced then vDisk will be marked failed so that
> + *    new incoming I/Os are returned with failure immediately.
> + *  - If vDisk I/O failover is in progress then all new/inflight I/Os will
> + *    queued and will be restarted or failed based on failover operation
> + *    is successful or not.
> + *  - I/O failover can be started either in I/O forward or I/O backward
> + *    path.
> + *  - I/O failover will be started as soon as all the pending acb(s)
> + *    are queued and there is no pending I/O count.
> + *  - If I/O failover couldn't be completed within QNIO_CONNECT_TIMOUT_SECS
> + *    then vDisk will be marked failed and all I/Os will be completed with
> + *    error.
> + */
> +
> +int vxhs_switch_storage_agent(BDRVVXHSState *s)
> +{
> +    int res = 0;
> +    int flags = (IIO_FLAG_ASYNC | IIO_FLAG_DONE);
> +
> +    vxhsDbg("Query host %s for vdisk %s\n",
> +              s->vdisk_hostinfo[s->vdisk_ask_failover_idx].hostip,
> +              s->vdisk_guid);
> +
> +    res = vxhs_reopen_vdisk(s, s->vdisk_ask_failover_idx);
> +    if (res == 0) {
> +        res = (*qnioops.qemu_iio_ioctl)(s->qnio_ctx,
> +                   s->vdisk_hostinfo[s->vdisk_ask_failover_idx].vdisk_rfd,
> +                   VDISK_CHECK_IO_FAILOVER_READY, NULL, s, flags);
> +    }
> +    if (res != 0) {
> +        vxhsDbg("Query to host %s for vdisk %s failed, res = %d, errno = 
> %d\n",
> +                  s->vdisk_hostinfo[s->vdisk_ask_failover_idx].hostip,
> +                  s->vdisk_guid, res, errno);
> +        /*
> +         * TODO: calling failover_ioctl_cb from here ties up the qnio epoll
> +         * loop if qemu_iio_ioctl fails synchronously (-1) for all hosts in 
> io
> +         * target list.
> +         */
> +
> +        /* try next host */
> +        failover_ioctl_cb(res, s);
> +    }
> +    return res;
> +}
> +
> +void failover_ioctl_cb(int res, void *ctx)
> +{
> +    BDRVVXHSState *s = ctx;
> +
> +    if (res == 0) {
> +        /* found failover target */
> +        s->vdisk_cur_host_idx = s->vdisk_ask_failover_idx;
> +        s->vdisk_ask_failover_idx = 0;
> +        vxhsDbg("Switched to storage server host-IP %s for vdisk %s\n",
> +                   s->vdisk_hostinfo[s->vdisk_cur_host_idx].hostip,
> +                   s->vdisk_guid);
> +        VXHS_SPIN_LOCK(s->vdisk_lock);
> +        OF_VDISK_RESET_IOFAILOVER_IN_PROGRESS(s);
> +        VXHS_SPIN_UNLOCK(s->vdisk_lock);
> +        vxhs_handle_queued_ios(s);
> +    } else {
> +        /* keep looking */
> +        vxhsDbg("failover_ioctl_cb: keep looking for io target for vdisk 
> %s\n",
> +                  s->vdisk_guid);
> +        s->vdisk_ask_failover_idx++;
> +        if (s->vdisk_ask_failover_idx == s->vdisk_nhosts) {
> +            /* pause and cycle through list again */
> +            sleep(QNIO_CONNECT_RETRY_SECS);
> +            s->vdisk_ask_failover_idx = 0;
> +        }
> +        res = vxhs_switch_storage_agent(s);
> +    }
> +}
> +
> +int vxhs_failover_io(BDRVVXHSState *s)
> +{
> +    int res = 0;
> +
> +    vxhsDbg("I/O Failover starting for vDisk %s", s->vdisk_guid);
> +
> +    s->vdisk_ask_failover_idx = 0;
> +    res = vxhs_switch_storage_agent(s);
> +
> +    return res;
> +}
> +
> +/*
> + * Try to reopen the vDisk on one of the available hosts
> + * If vDisk reopen is successful on any of the host then
> + * check if that node is ready to accept I/O.
> + */
> +int vxhs_reopen_vdisk(BDRVVXHSState *s, int index)
> +{
> +    char *of_vsa_addr = NULL;
> +    char *file_name = NULL;
> +    int  res = 0;
> +
> +    /*
> +     * Don't close the channel if it was opened
> +     * before successfully. It will be handled
> +     * within iio* api if the same channel open
> +     * fd is reused.
> +     *
> +     * close stale vdisk device remote fd since
> +     * it is invalid after channel disconnect.
> +     */
> +    if (s->vdisk_hostinfo[index].vdisk_rfd >= 0) {
> +        qnioops.qemu_iio_devclose(s->qnio_ctx,
> +                                 s->vdisk_hostinfo[index].vdisk_rfd);
> +        s->vdisk_hostinfo[index].vdisk_rfd = -1;
> +    }
> +    /*
> +     * build storage agent address and vdisk device name strings
> +     */
> +    of_vsa_addr = (char *) malloc(sizeof(char) * OF_MAX_SERVER_ADDR);
> +    if (!of_vsa_addr) {
> +        res = ENOMEM;
> +        goto out;
> +    }
> +    file_name = (char *) malloc(sizeof(char) * OF_MAX_FILE_LEN);
> +    if (!file_name) {
> +        res = ENOMEM;
> +        goto out;
> +    }
> +    snprintf(file_name, OF_MAX_FILE_LEN, "%s%s", vdisk_prefix, 
> s->vdisk_guid);
> +    snprintf(of_vsa_addr, OF_MAX_SERVER_ADDR, "of://%s:%d",
> +             s->vdisk_hostinfo[index].hostip, s->vdisk_hostinfo[index].port);
> +    /*
> +     * open iridum channel to storage agent if not opened before.
> +     */
> +    if (s->vdisk_hostinfo[index].qnio_cfd < 0) {
> +        vxhsDbg("Trying to connect "
> +                  "host-IP %s\n", s->vdisk_hostinfo[index].hostip);
> +        s->vdisk_hostinfo[index].qnio_cfd =
> +                (*qnioops.qemu_open_iio_conn)(global_qnio_ctx, of_vsa_addr, 
> 0);
> +        if (s->vdisk_hostinfo[index].qnio_cfd < 0) {
> +            vxhsDbg("Failed to connect "
> +                      "to storage agent on host-ip %s\n",
> +                      s->vdisk_hostinfo[index].hostip);
> +            res = ENODEV;
> +            goto out;
> +        }
> +    }
> +    /*
> +     * open vdisk device
> +     */
> +    vxhsDbg("Trying to open vdisk device: %s\n", file_name);
> +    s->vdisk_hostinfo[index].vdisk_rfd =
> +            (*qnioops.qemu_iio_devopen)(global_qnio_ctx,
> +             s->vdisk_hostinfo[index].qnio_cfd, file_name, 0);
> +    if (s->vdisk_hostinfo[index].vdisk_rfd < 0) {
> +        vxhsDbg("Failed to open vdisk device: %s\n", file_name);
> +        res = EIO;
> +        goto out;
> +    }
> +out:
> +    if (of_vsa_addr) {
> +        free(of_vsa_addr);
> +    }
> +    if (file_name) {
> +        free(file_name);
> +    }
> +    return res;
> +}
> +
> +/*
> + * vxhs_build_io_target_list: Initialize io target list with ip addresses of
> + * local storage agent and reflection target storage agents. The local 
> storage
> + * agent ip is the efficient internal address in the uri, e.g. 192.168.0.2.
> + * The local storage agent address is stored at index 0. The reflection 
> target
> + * ips, are the E-W data network addresses of the reflection node agents, 
> also
> + * extracted from the uri.
> + *
> + */
> +int vxhs_build_io_target_list(BDRVVXHSState *s, int resilency_count,

s/resilency/resiliency/

> +                              char **filenames)
> +{
> +    URI *uri = NULL;
> +    int i = 0;
> +
> +    /*
> +     * TODO : We need to move to dynamic allocation of number of hosts.
> +     * s->vdisk_hostinfo = (VXHSvDiskHostsInfo **) malloc (
> +     *                      sizeof(VXHSvDiskHostsInfo) * resilency_count);
> +     *  memset(&s->vdisk_hostinfo, 0, sizeof(VXHSvDiskHostsInfo) * 
> MAX_HOSTS);
> +     */
> +    for (i = 0; i < resilency_count; i++) {
> +        uri = uri_parse(filenames[i]);

Why if uri_parse fails?

> +        s->vdisk_hostinfo[i].hostip = (char *)malloc(strlen(uri->server));

Buffer overflow, you didn't check resilency_count so s->vdisk_hostinfo[]
could be exceeded.

> +        strncpy((s->vdisk_hostinfo[i].hostip), uri->server, IP_ADDR_LEN);

You forgot to allocate space the NUL byte.  Please use g_strdup().

If you are trying to enforce a length limit on uri->server then please
check for strlen explicitly and raise an error.  Don't silently truncate
it.

> +        s->vdisk_hostinfo[i].port = uri->port;
> +        s->vdisk_hostinfo[i].qnio_cfd = -1;
> +        s->vdisk_hostinfo[i].vdisk_rfd = -1;
> +        if (strstr(uri->path, "vxhs") == NULL) {
> +            s->vdisk_guid = (char *)malloc(strlen(uri->path));
> +            strcpy((s->vdisk_guid), uri->path);

Buffer overflow again, you forgot the NULL byte.  Please use g_strdup().

> +typedef enum {
> +    VDISK_AIO_READ,
> +    VDISK_AIO_WRITE,
> +    VDISK_STAT,
> +    VDISK_TRUNC,
> +    VDISK_AIO_FLUSH,
> +    VDISK_AIO_RECLAIM,
> +    VDISK_GET_GEOMETRY,
> +    VDISK_CHECK_IO_FAILOVER_READY,
> +    VDISK_AIO_LAST_CMD
> +} VDISKAIOCmd;

Please don't copy-paste this stuff from your library.  Instead you need
to get libqnio upstream and then QEMU can build against it just like
libnfs, libiscsi, Gluster, Ceph, etc.

> +
> +typedef enum {
> +    VXHS_IO_INPROGRESS,
> +    VXHS_IO_COMPLETED,
> +    VXHS_IO_ERROR
> +} VXHSIOState;
> +
> +
> +typedef void *qemu_aio_ctx_t;

This name is confusing.  QEMU already has its own AioContext
terminology.  The prefix should be qnio.

> +typedef void (*qnio_callback_t)(ssize_t retval, void *arg);
> +
> +#define VDISK_FD_READ 0
> +#define VDISK_FD_WRITE 1
> +
> +#define QNIO_VDISK_NONE        0x00
> +#define QNIO_VDISK_CREATE      0x01
> +
> +/* max IO size supported by QEMU NIO lib */
> +#define QNIO_MAX_IO_SIZE       4194304
> +
> +#define IP_ADDR_LEN             20

Why is this generic-sounding limit necessary?  Can you pass a string
around and let the IP address parsing function decide whether it's valid
or not?

> +char vdisk_prefix[] = "/dev/of/vdisk";

You cannot define global variables in a header file.  If more than one
file includes it there will be a link error due to the duplicate symbol.

Does this even need to be in the header?  It's probably best to define
it close to where it's used in the .c file.

Attachment: signature.asc
Description: PGP signature


reply via email to

[Prev in Thread] Current Thread [Next in Thread]