qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH v6 1/5] block: Support Archipelago as a QEMU blo


From: Jeff Cody
Subject: Re: [Qemu-devel] [PATCH v6 1/5] block: Support Archipelago as a QEMU block backend
Date: Wed, 9 Jul 2014 20:23:22 -0400
User-agent: Mutt/1.5.21 (2010-09-15)

On Fri, Jun 27, 2014 at 11:24:08AM +0300, Chrysostomos Nanakos wrote:
> VM Image on Archipelago volume is specified like this:
> 
> file.driver=archipelago,file.volume=<volumename>[,file.mport=<mapperd_port>[,
> file.vport=<vlmcd_port>][,file.segment=<segment_name>]]
> 
> 'archipelago' is the protocol.
> 
> 'mport' is the port number on which mapperd is listening. This is optional
> and if not specified, QEMU will make Archipelago to use the default port.
> 
> 'vport' is the port number on which vlmcd is listening. This is optional
> and if not specified, QEMU will make Archipelago to use the default port.
> 
> 'segment' is the name of the shared memory segment Archipelago stack is using.
> This is optional and if not specified, QEMU will make Archipelago to use the
> default value, 'archipelago'.
> 
> Examples:
> 
> file.driver=archipelago,file.volume=my_vm_volume
> file.driver=archipelago,file.volume=my_vm_volume,file.mport=123
> file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
> file.vport=1234
> file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
> file.vport=1234,file.segment=my_segment
> 
> Signed-off-by: Chrysostomos Nanakos <address@hidden>

This is just a superficial review, because I don't have a good idea of
what archipelago or libxseg really does (I didn't even compile it or
these patches).  But I scanned through this patch, and found a few
things, and had a few questions.

> ---
>  MAINTAINERS         |    6 +
>  block/Makefile.objs |    2 +
>  block/archipelago.c |  819 
> +++++++++++++++++++++++++++++++++++++++++++++++++++
>  configure           |   40 +++
>  4 files changed, 867 insertions(+)
>  create mode 100644 block/archipelago.c
> 
> diff --git a/MAINTAINERS b/MAINTAINERS
> index 9b93edd..58ef1e3 100644
> --- a/MAINTAINERS
> +++ b/MAINTAINERS
> @@ -999,3 +999,9 @@ SSH
>  M: Richard W.M. Jones <address@hidden>
>  S: Supported
>  F: block/ssh.c
> +
> +ARCHIPELAGO
> +M: Chrysostomos Nanakos <address@hidden>
> +M: Chrysostomos Nanakos <address@hidden>
> +S: Maintained
> +F: block/archipelago.c
> diff --git a/block/Makefile.objs b/block/Makefile.objs
> index fd88c03..858d2b3 100644
> --- a/block/Makefile.objs
> +++ b/block/Makefile.objs
> @@ -17,6 +17,7 @@ block-obj-$(CONFIG_LIBNFS) += nfs.o
>  block-obj-$(CONFIG_CURL) += curl.o
>  block-obj-$(CONFIG_RBD) += rbd.o
>  block-obj-$(CONFIG_GLUSTERFS) += gluster.o
> +block-obj-$(CONFIG_ARCHIPELAGO) += archipelago.o
>  block-obj-$(CONFIG_LIBSSH2) += ssh.o
>  endif
>  
> @@ -35,5 +36,6 @@ gluster.o-cflags   := $(GLUSTERFS_CFLAGS)
>  gluster.o-libs     := $(GLUSTERFS_LIBS)
>  ssh.o-cflags       := $(LIBSSH2_CFLAGS)
>  ssh.o-libs         := $(LIBSSH2_LIBS)
> +archipelago.o-libs := $(ARCHIPELAGO_LIBS)
>  qcow.o-libs        := -lz
>  linux-aio.o-libs   := -laio
> diff --git a/block/archipelago.c b/block/archipelago.c
> new file mode 100644
> index 0000000..c56826a
> --- /dev/null
> +++ b/block/archipelago.c
> @@ -0,0 +1,819 @@
> +/*
> + * QEMU Block driver for Archipelago
> + *
> + * Copyright 2014 GRNET S.A. All rights reserved.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + *   1. Redistributions of source code must retain the above
> + *      copyright notice, this list of conditions and the following
> + *      disclaimer.
> + *   2. Redistributions in binary form must reproduce the above
> + *      copyright notice, this list of conditions and the following
> + *      disclaimer in the documentation and/or other materials
> + *      provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
> + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
> + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
> + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
> + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
> + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
> + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
> + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
> + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
> + * POSSIBILITY OF SUCH DAMAGE.
> + *
> + * The views and conclusions contained in the software and
> + * documentation are those of the authors and should not be
> + * interpreted as representing official policies, either expressed
> + * or implied, of GRNET S.A.
> + */
> +
> +/*
> +* VM Image on Archipelago volume is specified like this:
> +*
> +* 
> file.driver=archipelago,file.volume=<volumename>[,file.mport=<mapperd_port>[,
> +* file.vport=<vlmcd_port>][,file.segment=<segment_name>]]
> +*
> +* 'archipelago' is the protocol.
> +*
> +* 'mport' is the port number on which mapperd is listening. This is optional
> +* and if not specified, QEMU will make Archipelago to use the default port.
> +*
> +* 'vport' is the port number on which vlmcd is listening. This is optional
> +* and if not specified, QEMU will make Archipelago to use the default port.
> +*
> +* 'segment' is the name of the shared memory segment Archipelago stack is 
> using.
> +* This is optional and if not specified, QEMU will make Archipelago to use 
> the
> +* default value, 'archipelago'.
> +*
> +* Examples:
> +*
> +* file.driver=archipelago,file.volume=my_vm_volume
> +* file.driver=archipelago,file.volume=my_vm_volume,file.mport=123
> +* file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
> +* file.vport=1234
> +* file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
> +* file.vport=1234,file.segment=my_segment
> +*/
> +
> +#include "block/block_int.h"
> +#include "qemu/error-report.h"
> +#include "qemu/thread.h"
> +#include "qapi/qmp/qint.h"
> +#include "qapi/qmp/qstring.h"
> +#include "qapi/qmp/qjson.h"
> +
> +#include <inttypes.h>
> +#include <xseg/xseg.h>
> +#include <xseg/protocol.h>
> +
> +#define ARCHIP_FD_READ      0
> +#define ARCHIP_FD_WRITE     1
> +#define MAX_REQUEST_SIZE    524288
> +
> +#define ARCHIPELAGO_OPT_VOLUME      "volume"
> +#define ARCHIPELAGO_OPT_SEGMENT     "segment"
> +#define ARCHIPELAGO_OPT_MPORT       "mport"
> +#define ARCHIPELAGO_OPT_VPORT       "vport"
> +
> +#define archipelagolog(fmt, ...) \
> +    do {                         \
> +        fprintf(stderr, "archipelago\t%-24s: " fmt, __func__, 
> ##__VA_ARGS__); \
> +    } while (0)
> +
> +typedef enum {
> +    ARCHIP_OP_READ,
> +    ARCHIP_OP_WRITE,
> +    ARCHIP_OP_FLUSH,
> +    ARCHIP_OP_VOLINFO,
> +} ARCHIPCmd;
> +
> +typedef struct ArchipelagoAIOCB {
> +    BlockDriverAIOCB common;
> +    QEMUBH *bh;
> +    struct BDRVArchipelagoState *s;
> +    QEMUIOVector *qiov;
> +    void *buffer;
> +    ARCHIPCmd cmd;
> +    bool cancelled;
> +    int status;
> +    int64_t size;
> +    int64_t ret;
> +} ArchipelagoAIOCB;
> +
> +typedef struct BDRVArchipelagoState {
> +    ArchipelagoAIOCB *event_acb;
> +    char *volname;
> +    char *segment_name;
> +    uint64_t size;
> +    /* Archipelago specific */
> +    struct xseg *xseg;

I assume s->xseg is allocated in xseg_join() - is it ever freed?  In
_close(), there is a final call to xseg_leave(s->xseg), but from what
I found in libxseg, it does not appear to be freed:
https://github.com/cnanakos/libxseg/blob/develop/src/xseg.c#L975

Is it up to libxseg to free xseg, or the caller?


> +    struct xseg_port *port;
> +    xport srcport;
> +    xport sport;
> +    xport mportno;
> +    xport vportno;
> +    QemuMutex archip_mutex;
> +    QemuCond archip_cond;
> +    bool is_signaled;
> +    /* Request handler specific */
> +    QemuThread request_th;
> +    QemuCond request_cond;
> +    QemuMutex request_mutex;
> +    bool th_is_signaled;
> +    bool stopping;
> +} BDRVArchipelagoState;
> +
> +typedef struct ArchipelagoSegmentedRequest {
> +    size_t count;
> +    size_t total;
> +    int ref;
> +    int failed;
> +} ArchipelagoSegmentedRequest;
> +
> +typedef struct AIORequestData {
> +    const char *volname;
> +    off_t offset;
> +    size_t size;
> +    uint64_t bufidx;
> +    int ret;
> +    int op;
> +    ArchipelagoAIOCB *aio_cb;
> +    ArchipelagoSegmentedRequest *segreq;
> +} AIORequestData;
> +
> +static void qemu_archipelago_complete_aio(void *opaque);
> +
> +static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
> +{
> +    if (xseg && (sport != srcport)) {
> +        xseg_init_local_signal(xseg, srcport);
> +        sport = srcport;
> +    }
> +}
> +
> +static void archipelago_finish_aiocb(AIORequestData *reqdata)
> +{
> +    if (reqdata->aio_cb->ret != reqdata->segreq->total) {
> +        reqdata->aio_cb->ret = -EIO;
> +    } else if (reqdata->aio_cb->ret == reqdata->segreq->total) {
> +        reqdata->aio_cb->ret = 0;
> +    }
> +    reqdata->aio_cb->bh = aio_bh_new(
> +                        bdrv_get_aio_context(reqdata->aio_cb->common.bs),
> +                        qemu_archipelago_complete_aio, reqdata
> +                        );
> +    qemu_bh_schedule(reqdata->aio_cb->bh);
> +}
> +
> +static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port 
> *port,
> +                      struct xseg_request *expected_req)
> +{
> +    struct xseg_request *req;
> +    xseg_prepare_wait(xseg, srcport);
> +    void *psd = xseg_get_signal_desc(xseg, port);
> +    while (1) {
> +        req = xseg_receive(xseg, srcport, 0);
> +        if (req) {
> +            if (req != expected_req) {
> +                archipelagolog("Unknown received request\n");
> +                xseg_put_request(xseg, req, srcport);
> +            } else if (!(req->state & XS_SERVED)) {
> +                return -1;
> +            } else {
> +                break;
> +            }
> +        }
> +        xseg_wait_signal(xseg, psd, 100000UL);
> +    }
> +    xseg_cancel_wait(xseg, srcport);
> +    return 0;
> +}
> +
> +static void xseg_request_handler(void *state)
> +{
> +    BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
> +    void *psd = xseg_get_signal_desc(s->xseg, s->port);
> +    qemu_mutex_lock(&s->request_mutex);
> +
> +    while (!s->stopping) {
> +        struct xseg_request *req;
> +        void *data;
> +        xseg_prepare_wait(s->xseg, s->srcport);
> +        req = xseg_receive(s->xseg, s->srcport, 0);

Is this a blocking call?  If so, is there a timeout, and if not, what
scenarios (if any) could cause us to wait here indefinitely?

> +        if (req) {
> +            AIORequestData *reqdata;
> +            ArchipelagoSegmentedRequest *segreq;
> +            xseg_get_req_data(s->xseg, req, (void **)&reqdata);
> +
> +            switch (reqdata->op) {
> +            case ARCHIP_OP_READ:
> +                    data = xseg_get_data(s->xseg, req);
> +                    segreq = reqdata->segreq;
> +                    segreq->count += req->serviced;
> +
> +                    qemu_iovec_from_buf(reqdata->aio_cb->qiov, 
> reqdata->bufidx,
> +                                        data,
> +                                        req->serviced);
> +
> +                    xseg_put_request(s->xseg, req, s->srcport);
> +
> +                    if ((__sync_add_and_fetch(&segreq->ref, -1)) == 0) {
> +                        if (!segreq->failed) {
> +                            reqdata->aio_cb->ret = segreq->count;
> +                            archipelago_finish_aiocb(reqdata);
> +                            g_free(segreq);
> +                        } else {
> +                            g_free(segreq);
> +                            g_free(reqdata);
> +                        }
> +                    } else {
> +                        g_free(reqdata);
> +                    }
> +                    break;
> +            case ARCHIP_OP_WRITE:
> +                    segreq = reqdata->segreq;
> +                    segreq->count += req->serviced;
> +                    xseg_put_request(s->xseg, req, s->srcport);
> +
> +                    if ((__sync_add_and_fetch(&segreq->ref, -1)) == 0) {
> +                        if (!segreq->failed) {
> +                            reqdata->aio_cb->ret = segreq->count;
> +                            archipelago_finish_aiocb(reqdata);
> +                            g_free(segreq);
> +                        } else {
> +                            g_free(segreq);
> +                            g_free(reqdata);
> +                        }
> +                    } else {
> +                        g_free(reqdata);
> +                    

This (OP_WRITE / OP_READ) is where I am worried that we leak in error
cases, and a _close() won't clean it up (see later comments).


> +                    break;
> +            case ARCHIP_OP_VOLINFO:
> +                    s->is_signaled = true;
> +                    qemu_cond_signal(&s->archip_cond);
> +                    break;
> +            }
> +        } else {
> +            xseg_wait_signal(s->xseg, psd, 100000UL);
> +        }
> +        xseg_cancel_wait(s->xseg, s->srcport);



> +    }
> +
> +    s->th_is_signaled = true;
> +    qemu_cond_signal(&s->request_cond);
> +    qemu_mutex_unlock(&s->request_mutex);
> +    qemu_thread_exit(NULL);
> +}
> +
> +static int qemu_archipelago_xseg_init(BDRVArchipelagoState *s)
> +{
> +    if (xseg_initialize()) {
> +        archipelagolog("Cannot initialize XSEG\n");
> +        goto err_exit;
> +    }
> +
> +    s->xseg = xseg_join((char *)"posix", s->segment_name,
> +                        (char *)"posixfd", NULL);
> +    if (!s->xseg) {
> +        archipelagolog("Cannot join XSEG shared memory segment\n");
> +        goto err_exit;
> +    }
> +    s->port = xseg_bind_dynport(s->xseg);
> +    s->srcport = s->port->portno;
> +    init_local_signal(s->xseg, s->sport, s->srcport);
> +    return 0;
> +
> +err_exit:
> +    return -1;
> +}
> +
> +static int qemu_archipelago_init(BDRVArchipelagoState *s)
> +{
> +    int ret;
> +
> +    ret = qemu_archipelago_xseg_init(s);
> +    if (ret < 0) {
> +        error_report("Cannot initialize XSEG. Aborting...\n");
> +        goto err_exit;
> +    }
> +
> +    qemu_cond_init(&s->archip_cond);
> +    qemu_mutex_init(&s->archip_mutex);
> +    qemu_cond_init(&s->request_cond);
> +    qemu_mutex_init(&s->request_mutex);
> +    s->th_is_signaled = false;
> +    qemu_thread_create(&s->request_th, "xseg_io_th",
> +                       (void *) xseg_request_handler,
> +                       (void *) s, QEMU_THREAD_JOINABLE);
> +
> +err_exit:
> +    return ret;
> +}
> +
> +static void qemu_archipelago_complete_aio(void *opaque)
> +{
> +    AIORequestData *reqdata = (AIORequestData *) opaque;
> +    ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) reqdata->aio_cb;
> +
> +    qemu_bh_delete(aio_cb->bh);
> +    qemu_vfree(aio_cb->buffer);
> +    aio_cb->common.cb(aio_cb->common.opaque, aio_cb->ret);
> +    aio_cb->status = 0;
> +
> +    if (!aio_cb->cancelled) {
> +        qemu_aio_release(aio_cb);
> +    }
> +    g_free(reqdata);
> +}
> +
> +static QemuOptsList archipelago_runtime_opts = {
> +    .name = "archipelago",
> +    .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
> +    .desc = {
> +        {
> +            .name = ARCHIPELAGO_OPT_VOLUME,
> +            .type = QEMU_OPT_STRING,
> +            .help = "Name of the volume image",
> +        },
> +        {
> +            .name = ARCHIPELAGO_OPT_SEGMENT,
> +            .type = QEMU_OPT_STRING,
> +            .help = "Name of the Archipelago shared memory segment",
> +        },
> +        {
> +            .name = ARCHIPELAGO_OPT_MPORT,
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "Archipelago mapperd port number"
> +        },
> +        {
> +            .name = ARCHIPELAGO_OPT_VPORT,
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "Archipelago vlmcd port number"
> +
> +        },
> +        { /* end of list */ }
> +    },
> +};
> +
> +static int qemu_archipelago_open(BlockDriverState *bs,
> +                                 QDict *options,
> +                                 int bdrv_flags,
> +                                 Error **errp)
> +{
> +    int ret = 0;
> +    const char *volume, *segment_name;
> +    QemuOpts *opts;
> +    Error *local_err = NULL;
> +    BDRVArchipelagoState *s = bs->opaque;
> +
> +    opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, 
> &error_abort);
> +    qemu_opts_absorb_qdict(opts, options, &local_err);
> +    if (local_err) {
> +        error_propagate(errp, local_err);
> +        qemu_opts_del(opts);
> +        return -EINVAL;
> +    }
> +
> +    s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT, 1001);
> +    s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT, 501);
> +
> +    segment_name = qemu_opt_get(opts, ARCHIPELAGO_OPT_SEGMENT);
> +    if (segment_name == NULL) {
> +        s->segment_name = g_strdup("archipelago");
> +    } else {
> +        s->segment_name = g_strdup(segment_name);
> +    }
> +
> +    volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
> +    if (volume == NULL) {
> +        error_setg(errp, "archipelago block driver requires the 'volume'"
> +                   " option");
> +        qemu_opts_del(opts);
> +        return -EINVAL;

s->segment_name is leaked here.

You already have an exit label (err_exit) that cleans everything up,
and g_free() is NULL safe (and bs->opaque is zero-initialized).

You should be able to just set ret, and 'goto err_exit' in each error
instance in qemu_archipelago_open() - this also gets rid of the extra
qemu_opts_del() calls.

> +    }
> +    s->volname = g_strdup(volume);
> +
> +    /* Initialize XSEG, join shared memory segment */
> +    ret = qemu_archipelago_init(s);
> +    if (ret < 0) {
> +        error_setg(errp, "cannot initialize XSEG and join shared "
> +                   "memory segment");
> +        goto err_exit;
> +    }
> +
> +    qemu_opts_del(opts);
> +    return 0;
> +
> +err_exit:
> +    g_free(s->volname);
> +    g_free(s->segment_name);
> +    qemu_opts_del(opts);
> +    return ret;
> +}
> +
> +static void qemu_archipelago_close(BlockDriverState *bs)
> +{
> +    int r, targetlen;
> +    char *target;
> +    struct xseg_request *req;
> +    BDRVArchipelagoState *s = bs->opaque;
> +
> +    s->stopping = true;
> +
> +    qemu_mutex_lock(&s->request_mutex);
> +    while (!s->th_is_signaled) {
> +        qemu_cond_wait(&s->request_cond,
> +                       &s->request_mutex);
> +    }
> +    qemu_mutex_unlock(&s->request_mutex);
> +    qemu_thread_join(&s->request_th);
> +    qemu_cond_destroy(&s->request_cond);
> +    qemu_mutex_destroy(&s->request_mutex);
> +
> +    qemu_cond_destroy(&s->archip_cond);
> +    qemu_mutex_destroy(&s->archip_mutex);
> +
> +    targetlen = strlen(s->volname);

Should this be strlen(s->volname) + 1, to account for the '\0'?  Or
does xseg_prep_request() just need the length of the non-null
terminated string?

> +    req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
> +    if (!req) {
> +        archipelagolog("Cannot get XSEG request\n");
> +        goto err_exit;
> +    }
> +    r = xseg_prep_request(s->xseg, req, targetlen, 0);
> +    if (r < 0) {
> +        xseg_put_request(s->xseg, req, s->srcport);

What does this do here, if xseg_prep_request() failed?  Is it
essentially a cleanup function?

> +        archipelagolog("Cannot prepare XSEG close request\n");
> +        goto err_exit;
> +    }
> +
> +    target = xseg_get_target(s->xseg, req);
> +    memcpy(target, s->volname, targetlen);
> +    req->size = req->datalen;
> +    req->offset = 0;
> +    req->op = X_CLOSE;
> +
> +    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
> +    if (p == NoPort) {
> +        xseg_put_request(s->xseg, req, s->srcport);
> +        archipelagolog("Cannot submit XSEG close request\n");
> +        goto err_exit;
> +    }
> +
> +    xseg_signal(s->xseg, p);
> +    wait_reply(s->xseg, s->srcport, s->port, req);

This is another spot I am wondering if we could get stuck on a
blocking call that could potentially wait forever... is there a
timeout here?

> +
> +    xseg_put_request(s->xseg, req, s->srcport);
> +
> +err_exit:
> +    g_free(s->volname);
> +    g_free(s->segment_name);
> +    xseg_quit_local_signal(s->xseg, s->srcport);
> +    xseg_leave_dynport(s->xseg, s->port);
> +    xseg_leave(s->xseg);
> +}
> +
> +static void qemu_archipelago_aio_cancel(BlockDriverAIOCB *blockacb)
> +{
> +    ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) blockacb;
> +    aio_cb->cancelled = true;
> +    while (aio_cb->status == -EINPROGRESS) {
> +        qemu_aio_wait();
> +    }
> +    qemu_aio_release(aio_cb);
> +}
> +
> +static const AIOCBInfo archipelago_aiocb_info = {
> +    .aiocb_size = sizeof(ArchipelagoAIOCB),
> +    .cancel = qemu_archipelago_aio_cancel,
> +};
> +
> +static int __archipelago_submit_request(BDRVArchipelagoState *s,
> +                                        uint64_t bufidx,
> +                                        size_t count,
> +                                        off_t offset,
> +                                        ArchipelagoAIOCB *aio_cb,
> +                                        ArchipelagoSegmentedRequest *segreq,
> +                                        int op)
> +{
> +    int ret, targetlen;
> +    char *target;
> +    void *data = NULL;
> +    struct xseg_request *req;
> +    AIORequestData *reqdata = g_malloc(sizeof(AIORequestData));
> +
> +    targetlen = strlen(s->volname);
> +    req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
> +    if (!req) {
> +        archipelagolog("Cannot get XSEG request\n");
> +        goto err_exit2;
> +    }
> +    ret = xseg_prep_request(s->xseg, req, targetlen, count);
> +    if (ret < 0) {
> +        archipelagolog("Cannot prepare XSEG request\n");
> +        goto err_exit;
> +    }
> +    target = xseg_get_target(s->xseg, req);
> +    if (!target) {
> +        archipelagolog("Cannot get XSEG target\n");
> +        goto err_exit;
> +    }
> +    memcpy(target, s->volname, targetlen);
> +    req->size = count;
> +    req->offset = offset;
> +
> +    switch (op) {
> +    case ARCHIP_OP_READ:
> +        req->op = X_READ;
> +        break;
> +    case ARCHIP_OP_WRITE:
> +        req->op = X_WRITE;
> +        break;
> +    }
> +    reqdata->volname = s->volname;
> +    reqdata->offset = offset;
> +    reqdata->size = count;
> +    reqdata->bufidx = bufidx;
> +    reqdata->aio_cb = aio_cb;
> +    reqdata->segreq = segreq;
> +    reqdata->op = op;
> +
> +    xseg_set_req_data(s->xseg, req, reqdata);
> +    if (op == ARCHIP_OP_WRITE) {
> +        data = xseg_get_data(s->xseg, req);
> +        if (!data) {
> +            archipelagolog("Cannot get XSEG data\n");
> +            goto err_exit;
> +        }
> +        memcpy(data, aio_cb->buffer + bufidx, count);
> +    }
> +
> +    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
> +    if (p == NoPort) {
> +        archipelagolog("Could not submit XSEG request\n");
> +        goto err_exit;
> +    }
> +    xseg_signal(s->xseg, p);
> +    return 0;
> +
> +err_exit:
> +    g_free(reqdata);
> +    xseg_put_request(s->xseg, req, s->srcport);
> +    return -EIO;
> +err_exit2:
> +    g_free(reqdata);
> +    return -EIO;
> +}
> +
> +static int archipelago_aio_segmented_rw(BDRVArchipelagoState *s,
> +                                        size_t count,
> +                                        off_t offset,
> +                                        ArchipelagoAIOCB *aio_cb,
> +                                        int op)
> +{
> +    int i, ret, segments_nr, last_segment_size;
> +    ArchipelagoSegmentedRequest *segreq;
> +
> +    segreq = g_malloc(sizeof(ArchipelagoSegmentedRequest));
> +
> +    if (op == ARCHIP_OP_FLUSH) {
> +        segments_nr = 1;
> +        segreq->ref = segments_nr;
> +        segreq->total = count;
> +        segreq->count = 0;
> +        segreq->failed = 0;
> +        ret = __archipelago_submit_request(s, 0, count, offset, aio_cb,
> +                                           segreq, ARCHIP_OP_WRITE);
> +        if (ret < 0) {
> +            goto err_exit;
> +        }
> +        return 0;
> +    }
> +
> +    segments_nr = (int)(count / MAX_REQUEST_SIZE) + \
> +                  ((count % MAX_REQUEST_SIZE) ? 1 : 0);
> +    last_segment_size = (int)(count % MAX_REQUEST_SIZE);
> +
> +    segreq->ref = segments_nr;
> +    segreq->total = count;
> +    segreq->count = 0;
> +    segreq->failed = 0;
> +
> +    for (i = 0; i < segments_nr - 1; i++) {
> +        ret = __archipelago_submit_request(s, i * MAX_REQUEST_SIZE,
> +                                           MAX_REQUEST_SIZE,
> +                                           offset + i * MAX_REQUEST_SIZE,
> +                                           aio_cb, segreq, op);
> +
> +        if (ret < 0) {
> +            goto err_exit;
> +        }
> +    }
> +
> +    if ((segments_nr > 1) && last_segment_size) {
> +        ret = __archipelago_submit_request(s, i * MAX_REQUEST_SIZE,
> +                                           last_segment_size,
> +                                           offset + i * MAX_REQUEST_SIZE,
> +                                           aio_cb, segreq, op);
> +    } else if ((segments_nr > 1) && !last_segment_size) {
> +        ret = __archipelago_submit_request(s, i * MAX_REQUEST_SIZE,
> +                                           MAX_REQUEST_SIZE,
> +                                           offset + i * MAX_REQUEST_SIZE,
> +                                           aio_cb, segreq, op);
> +    } else if (segments_nr == 1) {
> +            ret = __archipelago_submit_request(s, 0, count, offset, aio_cb,
> +                                               segreq, op);
> +    }
> +
> +    if (ret < 0) {
> +        goto err_exit;
> +    }
> +
> +    return 0;
> +
> +err_exit:
> +    __sync_add_and_fetch(&segreq->failed, 1);
> +    if (segments_nr == 1) {
> +        if (__sync_add_and_fetch(&segreq->ref, -1) == 0) {
> +            g_free(segreq);
> +        }
> +    } else {
> +        if ((__sync_add_and_fetch(&segreq->ref, -segments_nr + i)) == 0) {
> +            g_free(segreq);
> +        }
> +    }

Don't we run the risk of leaking segreq here?  The other place this is
freed is in xseg_request_handler(), but could we run into a race
condition where 's->stopping' is true, or even xseg_receive() just does not
return a request? 

> +
> +    return ret;
> +}
> +
> +static BlockDriverAIOCB *qemu_archipelago_aio_rw(BlockDriverState *bs,
> +                                                 int64_t sector_num,
> +                                                 QEMUIOVector *qiov,
> +                                                 int nb_sectors,
> +                                                 BlockDriverCompletionFunc 
> *cb,
> +                                                 void *opaque,
> +                                                 int op)
> +{
> +    ArchipelagoAIOCB *aio_cb;
> +    BDRVArchipelagoState *s = bs->opaque;
> +    int64_t size, off;
> +    int ret;
> +
> +    aio_cb = qemu_aio_get(&archipelago_aiocb_info, bs, cb, opaque);
> +    aio_cb->cmd = op;
> +    aio_cb->qiov = qiov;
> +
> +    if (op != ARCHIP_OP_FLUSH) {
> +        aio_cb->buffer = qemu_blockalign(bs, qiov->size);
> +    } else {
> +        aio_cb->buffer = NULL;
> +    }
> +
> +    aio_cb->ret = 0;
> +    aio_cb->s = s;
> +    aio_cb->cancelled = false;
> +    aio_cb->status = -EINPROGRESS;
> +
> +    if (op == ARCHIP_OP_WRITE) {
> +        qemu_iovec_to_buf(aio_cb->qiov, 0, aio_cb->buffer, qiov->size);
> +    }
> +
> +    off = sector_num * BDRV_SECTOR_SIZE;
> +    size = nb_sectors * BDRV_SECTOR_SIZE;
> +    aio_cb->size = size;
> +
> +    ret = archipelago_aio_segmented_rw(s, size, off,
> +                                       aio_cb, op);
> +    if (ret < 0) {
> +        goto err_exit;
> +    }
> +    return &aio_cb->common;
> +
> +err_exit:
> +    error_report("qemu_archipelago_aio_rw(): I/O Error\n");
> +    qemu_vfree(aio_cb->buffer);
> +    qemu_aio_release(aio_cb);
> +    return NULL;
> +}
> +
> +static BlockDriverAIOCB *qemu_archipelago_aio_readv(BlockDriverState *bs,
> +        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> +        BlockDriverCompletionFunc *cb, void *opaque)
> +{
> +    return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
> +                                   opaque, ARCHIP_OP_READ);
> +}
> +
> +static BlockDriverAIOCB *qemu_archipelago_aio_writev(BlockDriverState *bs,
> +        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> +        BlockDriverCompletionFunc *cb, void *opaque)
> +{
> +    return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
> +                                   opaque, ARCHIP_OP_WRITE);
> +}
> +
> +static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
> +{
> +    uint64_t size;
> +    int ret, targetlen;
> +    struct xseg_request *req;
> +    struct xseg_reply_info *xinfo;
> +    AIORequestData *reqdata = g_malloc(sizeof(AIORequestData));
> +
> +    const char *volname = s->volname;
> +    targetlen = strlen(volname);
> +    req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
> +    if (!req) {
> +        archipelagolog("Cannot get XSEG request\n");
> +        goto err_exit2;
> +    }
> +    ret = xseg_prep_request(s->xseg, req, targetlen,
> +                            sizeof(struct xseg_reply_info));
> +    if (ret < 0) {
> +        archipelagolog("Cannot prepare XSEG request\n");
> +        goto err_exit;
> +    }
> +    char *target = xseg_get_target(s->xseg, req);
> +    if (!target) {
> +        archipelagolog("Cannot get XSEG target\n");
> +        goto err_exit;
> +    }
> +    memcpy(target, volname, targetlen);
> +    req->size = req->datalen;
> +    req->offset = 0;
> +    req->op = X_INFO;
> +
> +    reqdata->op = ARCHIP_OP_VOLINFO;
> +    reqdata->volname = volname;
> +    xseg_set_req_data(s->xseg, req, reqdata);
> +
> +    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
> +    if (p == NoPort) {
> +        archipelagolog("Cannot submit XSEG request\n");
> +        goto err_exit;
> +    }
> +    xseg_signal(s->xseg, p);
> +    qemu_mutex_lock(&s->archip_mutex);
> +    while (!s->is_signaled) {
> +        qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
> +    }
> +    s->is_signaled = false;
> +    qemu_mutex_unlock(&s->archip_mutex);
> +
> +    xinfo = (struct xseg_reply_info *) xseg_get_data(s->xseg, req);
> +    size = xinfo->size;
> +    xseg_put_request(s->xseg, req, s->srcport);
> +    g_free(reqdata);
> +    s->size = size;
> +    return size;
> +



> +err_exit:
> +    g_free(reqdata);
> +    xseg_put_request(s->xseg, req, s->srcport);
> +    return -1;
> +err_exit2:
> +    g_free(reqdata);
> +    return -1;
> +}

This could be simplified to just:

 err_exit:
     xseg_put_request(s->xseg, req, s->srcport);
 err_exit2:
     g_free(reqdata);
     return -1;
 }

Maybe it'd also be best to return -EIO (or other meaningful error
value) instead of just -1, as this value gets passed along to
.bdrv_getlength().

> +
> +static int64_t qemu_archipelago_getlength(BlockDriverState *bs)
> +{
> +    int64_t ret;
> +    BDRVArchipelagoState *s = bs->opaque;
> +
> +    ret = archipelago_volume_info(s);

(This is where I am talking about an error value such as -EIO may be
better)

> +    return ret;
> +}
> +
> +static BlockDriverAIOCB *qemu_archipelago_aio_flush(BlockDriverState *bs,
> +        BlockDriverCompletionFunc *cb, void *opaque)
> +{
> +    return qemu_archipelago_aio_rw(bs, 0, NULL, 0, cb, opaque,
> +                                   ARCHIP_OP_FLUSH);
> +}
> +
> +static BlockDriver bdrv_archipelago = {
> +    .format_name         = "archipelago",
> +    .protocol_name       = "archipelago",
> +    .instance_size       = sizeof(BDRVArchipelagoState),
> +    .bdrv_file_open      = qemu_archipelago_open,
> +    .bdrv_close          = qemu_archipelago_close,
> +    .bdrv_getlength      = qemu_archipelago_getlength,
> +    .bdrv_aio_readv      = qemu_archipelago_aio_readv,
> +    .bdrv_aio_writev     = qemu_archipelago_aio_writev,
> +    .bdrv_aio_flush      = qemu_archipelago_aio_flush,
> +    .bdrv_has_zero_init  = bdrv_has_zero_init_1,
> +};
> +
> +static void bdrv_archipelago_init(void)
> +{
> +    bdrv_register(&bdrv_archipelago);
> +}
> +
> +block_init(bdrv_archipelago_init);
> diff --git a/configure b/configure
> index 7102964..e4acd9c 100755
> --- a/configure
> +++ b/configure
> @@ -326,6 +326,7 @@ seccomp=""
>  glusterfs=""
>  glusterfs_discard="no"
>  glusterfs_zerofill="no"
> +archipelago=""
>  virtio_blk_data_plane=""
>  gtk=""
>  gtkabi=""
> @@ -1087,6 +1088,10 @@ for opt do
>    ;;
>    --enable-glusterfs) glusterfs="yes"
>    ;;
> +  --disable-archipelago) archipelago="no"
> +  ;;
> +  --enable-archipelago) archipelago="yes"
> +  ;;
>    --disable-virtio-blk-data-plane) virtio_blk_data_plane="no"
>    ;;
>    --enable-virtio-blk-data-plane) virtio_blk_data_plane="yes"
> @@ -1382,6 +1387,8 @@ Advanced options (experts only):
>    --enable-coroutine-pool  enable coroutine freelist (better performance)
>    --enable-glusterfs       enable GlusterFS backend
>    --disable-glusterfs      disable GlusterFS backend
> +  --enable-archipelago     enable Archipelago backend
> +  --disable-archipelago    disable Archipelago backend
>    --enable-gcov            enable test coverage analysis with gcov
>    --gcov=GCOV              use specified gcov [$gcov_tool]
>    --disable-tpm            disable TPM support
> @@ -3051,6 +3058,33 @@ EOF
>    fi
>  fi
>  
> +
> +##########################################
> +# archipelago probe
> +if test "$archipelago" != "no" ; then
> +    cat > $TMPC <<EOF
> +#include <stdio.h>
> +#include <xseg/xseg.h>
> +#include <xseg/protocol.h>
> +int main(void) {
> +    xseg_initialize();
> +    return 0;
> +}
> +EOF
> +    archipelago_libs=-lxseg
> +    if compile_prog "" "$archipelago_libs"; then
> +        archipelago="yes"
> +        libs_tools="$archipelago_libs $libs_tools"
> +        libs_softmmu="$archipelago_libs $libs_softmmu"
> +    else
> +      if test "$archipelago" = "yes" ; then
> +        feature_not_found "Archipelago backend support" "Install libxseg 
> devel"
> +      fi
> +      archipelago="no"
> +    fi
> +fi
> +
> +
>  ##########################################
>  # glusterfs probe
>  if test "$glusterfs" != "no" ; then
> @@ -4230,6 +4264,7 @@ echo "seccomp support   $seccomp"
>  echo "coroutine backend $coroutine"
>  echo "coroutine pool    $coroutine_pool"
>  echo "GlusterFS support $glusterfs"
> +echo "Archipelago support $archipelago"
>  echo "virtio-blk-data-plane $virtio_blk_data_plane"
>  echo "gcov              $gcov_tool"
>  echo "gcov enabled      $gcov"
> @@ -4665,6 +4700,11 @@ if test "$glusterfs_zerofill" = "yes" ; then
>    echo "CONFIG_GLUSTERFS_ZEROFILL=y" >> $config_host_mak
>  fi
>  
> +if test "$archipelago" = "yes" ; then
> +  echo "CONFIG_ARCHIPELAGO=m" >> $config_host_mak
> +  echo "ARCHIPELAGO_LIBS=$archipelago_libs" >> $config_host_mak
> +fi
> +
>  if test "$libssh2" = "yes" ; then
>    echo "CONFIG_LIBSSH2=m" >> $config_host_mak
>    echo "LIBSSH2_CFLAGS=$libssh2_cflags" >> $config_host_mak
> -- 
> 1.7.10.4
> 
> 



reply via email to

[Prev in Thread] Current Thread [Next in Thread]