[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH v6 1/5] block: Support Archipelago as a QEMU blo
From: |
Jeff Cody |
Subject: |
Re: [Qemu-devel] [PATCH v6 1/5] block: Support Archipelago as a QEMU block backend |
Date: |
Wed, 9 Jul 2014 20:23:22 -0400 |
User-agent: |
Mutt/1.5.21 (2010-09-15) |
On Fri, Jun 27, 2014 at 11:24:08AM +0300, Chrysostomos Nanakos wrote:
> VM Image on Archipelago volume is specified like this:
>
> file.driver=archipelago,file.volume=<volumename>[,file.mport=<mapperd_port>[,
> file.vport=<vlmcd_port>][,file.segment=<segment_name>]]
>
> 'archipelago' is the protocol.
>
> 'mport' is the port number on which mapperd is listening. This is optional
> and if not specified, QEMU will make Archipelago to use the default port.
>
> 'vport' is the port number on which vlmcd is listening. This is optional
> and if not specified, QEMU will make Archipelago to use the default port.
>
> 'segment' is the name of the shared memory segment Archipelago stack is using.
> This is optional and if not specified, QEMU will make Archipelago to use the
> default value, 'archipelago'.
>
> Examples:
>
> file.driver=archipelago,file.volume=my_vm_volume
> file.driver=archipelago,file.volume=my_vm_volume,file.mport=123
> file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
> file.vport=1234
> file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
> file.vport=1234,file.segment=my_segment
>
> Signed-off-by: Chrysostomos Nanakos <address@hidden>
This is just a superficial review, because I don't have a good idea of
what archipelago or libxseg really does (I didn't even compile it or
these patches). But I scanned through this patch, and found a few
things, and had a few questions.
> ---
> MAINTAINERS | 6 +
> block/Makefile.objs | 2 +
> block/archipelago.c | 819
> +++++++++++++++++++++++++++++++++++++++++++++++++++
> configure | 40 +++
> 4 files changed, 867 insertions(+)
> create mode 100644 block/archipelago.c
>
> diff --git a/MAINTAINERS b/MAINTAINERS
> index 9b93edd..58ef1e3 100644
> --- a/MAINTAINERS
> +++ b/MAINTAINERS
> @@ -999,3 +999,9 @@ SSH
> M: Richard W.M. Jones <address@hidden>
> S: Supported
> F: block/ssh.c
> +
> +ARCHIPELAGO
> +M: Chrysostomos Nanakos <address@hidden>
> +M: Chrysostomos Nanakos <address@hidden>
> +S: Maintained
> +F: block/archipelago.c
> diff --git a/block/Makefile.objs b/block/Makefile.objs
> index fd88c03..858d2b3 100644
> --- a/block/Makefile.objs
> +++ b/block/Makefile.objs
> @@ -17,6 +17,7 @@ block-obj-$(CONFIG_LIBNFS) += nfs.o
> block-obj-$(CONFIG_CURL) += curl.o
> block-obj-$(CONFIG_RBD) += rbd.o
> block-obj-$(CONFIG_GLUSTERFS) += gluster.o
> +block-obj-$(CONFIG_ARCHIPELAGO) += archipelago.o
> block-obj-$(CONFIG_LIBSSH2) += ssh.o
> endif
>
> @@ -35,5 +36,6 @@ gluster.o-cflags := $(GLUSTERFS_CFLAGS)
> gluster.o-libs := $(GLUSTERFS_LIBS)
> ssh.o-cflags := $(LIBSSH2_CFLAGS)
> ssh.o-libs := $(LIBSSH2_LIBS)
> +archipelago.o-libs := $(ARCHIPELAGO_LIBS)
> qcow.o-libs := -lz
> linux-aio.o-libs := -laio
> diff --git a/block/archipelago.c b/block/archipelago.c
> new file mode 100644
> index 0000000..c56826a
> --- /dev/null
> +++ b/block/archipelago.c
> @@ -0,0 +1,819 @@
> +/*
> + * QEMU Block driver for Archipelago
> + *
> + * Copyright 2014 GRNET S.A. All rights reserved.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + * copyright notice, this list of conditions and the following
> + * disclaimer.
> + * 2. Redistributions in binary form must reproduce the above
> + * copyright notice, this list of conditions and the following
> + * disclaimer in the documentation and/or other materials
> + * provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
> + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
> + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
> + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
> + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
> + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
> + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
> + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
> + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
> + * POSSIBILITY OF SUCH DAMAGE.
> + *
> + * The views and conclusions contained in the software and
> + * documentation are those of the authors and should not be
> + * interpreted as representing official policies, either expressed
> + * or implied, of GRNET S.A.
> + */
> +
> +/*
> +* VM Image on Archipelago volume is specified like this:
> +*
> +*
> file.driver=archipelago,file.volume=<volumename>[,file.mport=<mapperd_port>[,
> +* file.vport=<vlmcd_port>][,file.segment=<segment_name>]]
> +*
> +* 'archipelago' is the protocol.
> +*
> +* 'mport' is the port number on which mapperd is listening. This is optional
> +* and if not specified, QEMU will make Archipelago to use the default port.
> +*
> +* 'vport' is the port number on which vlmcd is listening. This is optional
> +* and if not specified, QEMU will make Archipelago to use the default port.
> +*
> +* 'segment' is the name of the shared memory segment Archipelago stack is
> using.
> +* This is optional and if not specified, QEMU will make Archipelago to use
> the
> +* default value, 'archipelago'.
> +*
> +* Examples:
> +*
> +* file.driver=archipelago,file.volume=my_vm_volume
> +* file.driver=archipelago,file.volume=my_vm_volume,file.mport=123
> +* file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
> +* file.vport=1234
> +* file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
> +* file.vport=1234,file.segment=my_segment
> +*/
> +
> +#include "block/block_int.h"
> +#include "qemu/error-report.h"
> +#include "qemu/thread.h"
> +#include "qapi/qmp/qint.h"
> +#include "qapi/qmp/qstring.h"
> +#include "qapi/qmp/qjson.h"
> +
> +#include <inttypes.h>
> +#include <xseg/xseg.h>
> +#include <xseg/protocol.h>
> +
> +#define ARCHIP_FD_READ 0
> +#define ARCHIP_FD_WRITE 1
> +#define MAX_REQUEST_SIZE 524288
> +
> +#define ARCHIPELAGO_OPT_VOLUME "volume"
> +#define ARCHIPELAGO_OPT_SEGMENT "segment"
> +#define ARCHIPELAGO_OPT_MPORT "mport"
> +#define ARCHIPELAGO_OPT_VPORT "vport"
> +
> +#define archipelagolog(fmt, ...) \
> + do { \
> + fprintf(stderr, "archipelago\t%-24s: " fmt, __func__,
> ##__VA_ARGS__); \
> + } while (0)
> +
> +typedef enum {
> + ARCHIP_OP_READ,
> + ARCHIP_OP_WRITE,
> + ARCHIP_OP_FLUSH,
> + ARCHIP_OP_VOLINFO,
> +} ARCHIPCmd;
> +
> +typedef struct ArchipelagoAIOCB {
> + BlockDriverAIOCB common;
> + QEMUBH *bh;
> + struct BDRVArchipelagoState *s;
> + QEMUIOVector *qiov;
> + void *buffer;
> + ARCHIPCmd cmd;
> + bool cancelled;
> + int status;
> + int64_t size;
> + int64_t ret;
> +} ArchipelagoAIOCB;
> +
> +typedef struct BDRVArchipelagoState {
> + ArchipelagoAIOCB *event_acb;
> + char *volname;
> + char *segment_name;
> + uint64_t size;
> + /* Archipelago specific */
> + struct xseg *xseg;
I assume s->xseg is allocated in xseg_join() - is it ever freed? In
_close(), there is a final call to xseg_leave(s->xseg), but from what
I found in libxseg, it does not appear to be freed:
https://github.com/cnanakos/libxseg/blob/develop/src/xseg.c#L975
Is it up to libxseg to free xseg, or the caller?
> + struct xseg_port *port;
> + xport srcport;
> + xport sport;
> + xport mportno;
> + xport vportno;
> + QemuMutex archip_mutex;
> + QemuCond archip_cond;
> + bool is_signaled;
> + /* Request handler specific */
> + QemuThread request_th;
> + QemuCond request_cond;
> + QemuMutex request_mutex;
> + bool th_is_signaled;
> + bool stopping;
> +} BDRVArchipelagoState;
> +
> +typedef struct ArchipelagoSegmentedRequest {
> + size_t count;
> + size_t total;
> + int ref;
> + int failed;
> +} ArchipelagoSegmentedRequest;
> +
> +typedef struct AIORequestData {
> + const char *volname;
> + off_t offset;
> + size_t size;
> + uint64_t bufidx;
> + int ret;
> + int op;
> + ArchipelagoAIOCB *aio_cb;
> + ArchipelagoSegmentedRequest *segreq;
> +} AIORequestData;
> +
> +static void qemu_archipelago_complete_aio(void *opaque);
> +
> +static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
> +{
> + if (xseg && (sport != srcport)) {
> + xseg_init_local_signal(xseg, srcport);
> + sport = srcport;
> + }
> +}
> +
> +static void archipelago_finish_aiocb(AIORequestData *reqdata)
> +{
> + if (reqdata->aio_cb->ret != reqdata->segreq->total) {
> + reqdata->aio_cb->ret = -EIO;
> + } else if (reqdata->aio_cb->ret == reqdata->segreq->total) {
> + reqdata->aio_cb->ret = 0;
> + }
> + reqdata->aio_cb->bh = aio_bh_new(
> + bdrv_get_aio_context(reqdata->aio_cb->common.bs),
> + qemu_archipelago_complete_aio, reqdata
> + );
> + qemu_bh_schedule(reqdata->aio_cb->bh);
> +}
> +
> +static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port
> *port,
> + struct xseg_request *expected_req)
> +{
> + struct xseg_request *req;
> + xseg_prepare_wait(xseg, srcport);
> + void *psd = xseg_get_signal_desc(xseg, port);
> + while (1) {
> + req = xseg_receive(xseg, srcport, 0);
> + if (req) {
> + if (req != expected_req) {
> + archipelagolog("Unknown received request\n");
> + xseg_put_request(xseg, req, srcport);
> + } else if (!(req->state & XS_SERVED)) {
> + return -1;
> + } else {
> + break;
> + }
> + }
> + xseg_wait_signal(xseg, psd, 100000UL);
> + }
> + xseg_cancel_wait(xseg, srcport);
> + return 0;
> +}
> +
> +static void xseg_request_handler(void *state)
> +{
> + BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
> + void *psd = xseg_get_signal_desc(s->xseg, s->port);
> + qemu_mutex_lock(&s->request_mutex);
> +
> + while (!s->stopping) {
> + struct xseg_request *req;
> + void *data;
> + xseg_prepare_wait(s->xseg, s->srcport);
> + req = xseg_receive(s->xseg, s->srcport, 0);
Is this a blocking call? If so, is there a timeout, and if not, what
scenarios (if any) could cause us to wait here indefinitely?
> + if (req) {
> + AIORequestData *reqdata;
> + ArchipelagoSegmentedRequest *segreq;
> + xseg_get_req_data(s->xseg, req, (void **)&reqdata);
> +
> + switch (reqdata->op) {
> + case ARCHIP_OP_READ:
> + data = xseg_get_data(s->xseg, req);
> + segreq = reqdata->segreq;
> + segreq->count += req->serviced;
> +
> + qemu_iovec_from_buf(reqdata->aio_cb->qiov,
> reqdata->bufidx,
> + data,
> + req->serviced);
> +
> + xseg_put_request(s->xseg, req, s->srcport);
> +
> + if ((__sync_add_and_fetch(&segreq->ref, -1)) == 0) {
> + if (!segreq->failed) {
> + reqdata->aio_cb->ret = segreq->count;
> + archipelago_finish_aiocb(reqdata);
> + g_free(segreq);
> + } else {
> + g_free(segreq);
> + g_free(reqdata);
> + }
> + } else {
> + g_free(reqdata);
> + }
> + break;
> + case ARCHIP_OP_WRITE:
> + segreq = reqdata->segreq;
> + segreq->count += req->serviced;
> + xseg_put_request(s->xseg, req, s->srcport);
> +
> + if ((__sync_add_and_fetch(&segreq->ref, -1)) == 0) {
> + if (!segreq->failed) {
> + reqdata->aio_cb->ret = segreq->count;
> + archipelago_finish_aiocb(reqdata);
> + g_free(segreq);
> + } else {
> + g_free(segreq);
> + g_free(reqdata);
> + }
> + } else {
> + g_free(reqdata);
> +
This (OP_WRITE / OP_READ) is where I am worried that we leak in error
cases, and a _close() won't clean it up (see later comments).
> + break;
> + case ARCHIP_OP_VOLINFO:
> + s->is_signaled = true;
> + qemu_cond_signal(&s->archip_cond);
> + break;
> + }
> + } else {
> + xseg_wait_signal(s->xseg, psd, 100000UL);
> + }
> + xseg_cancel_wait(s->xseg, s->srcport);
> + }
> +
> + s->th_is_signaled = true;
> + qemu_cond_signal(&s->request_cond);
> + qemu_mutex_unlock(&s->request_mutex);
> + qemu_thread_exit(NULL);
> +}
> +
> +static int qemu_archipelago_xseg_init(BDRVArchipelagoState *s)
> +{
> + if (xseg_initialize()) {
> + archipelagolog("Cannot initialize XSEG\n");
> + goto err_exit;
> + }
> +
> + s->xseg = xseg_join((char *)"posix", s->segment_name,
> + (char *)"posixfd", NULL);
> + if (!s->xseg) {
> + archipelagolog("Cannot join XSEG shared memory segment\n");
> + goto err_exit;
> + }
> + s->port = xseg_bind_dynport(s->xseg);
> + s->srcport = s->port->portno;
> + init_local_signal(s->xseg, s->sport, s->srcport);
> + return 0;
> +
> +err_exit:
> + return -1;
> +}
> +
> +static int qemu_archipelago_init(BDRVArchipelagoState *s)
> +{
> + int ret;
> +
> + ret = qemu_archipelago_xseg_init(s);
> + if (ret < 0) {
> + error_report("Cannot initialize XSEG. Aborting...\n");
> + goto err_exit;
> + }
> +
> + qemu_cond_init(&s->archip_cond);
> + qemu_mutex_init(&s->archip_mutex);
> + qemu_cond_init(&s->request_cond);
> + qemu_mutex_init(&s->request_mutex);
> + s->th_is_signaled = false;
> + qemu_thread_create(&s->request_th, "xseg_io_th",
> + (void *) xseg_request_handler,
> + (void *) s, QEMU_THREAD_JOINABLE);
> +
> +err_exit:
> + return ret;
> +}
> +
> +static void qemu_archipelago_complete_aio(void *opaque)
> +{
> + AIORequestData *reqdata = (AIORequestData *) opaque;
> + ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) reqdata->aio_cb;
> +
> + qemu_bh_delete(aio_cb->bh);
> + qemu_vfree(aio_cb->buffer);
> + aio_cb->common.cb(aio_cb->common.opaque, aio_cb->ret);
> + aio_cb->status = 0;
> +
> + if (!aio_cb->cancelled) {
> + qemu_aio_release(aio_cb);
> + }
> + g_free(reqdata);
> +}
> +
> +static QemuOptsList archipelago_runtime_opts = {
> + .name = "archipelago",
> + .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
> + .desc = {
> + {
> + .name = ARCHIPELAGO_OPT_VOLUME,
> + .type = QEMU_OPT_STRING,
> + .help = "Name of the volume image",
> + },
> + {
> + .name = ARCHIPELAGO_OPT_SEGMENT,
> + .type = QEMU_OPT_STRING,
> + .help = "Name of the Archipelago shared memory segment",
> + },
> + {
> + .name = ARCHIPELAGO_OPT_MPORT,
> + .type = QEMU_OPT_NUMBER,
> + .help = "Archipelago mapperd port number"
> + },
> + {
> + .name = ARCHIPELAGO_OPT_VPORT,
> + .type = QEMU_OPT_NUMBER,
> + .help = "Archipelago vlmcd port number"
> +
> + },
> + { /* end of list */ }
> + },
> +};
> +
> +static int qemu_archipelago_open(BlockDriverState *bs,
> + QDict *options,
> + int bdrv_flags,
> + Error **errp)
> +{
> + int ret = 0;
> + const char *volume, *segment_name;
> + QemuOpts *opts;
> + Error *local_err = NULL;
> + BDRVArchipelagoState *s = bs->opaque;
> +
> + opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0,
> &error_abort);
> + qemu_opts_absorb_qdict(opts, options, &local_err);
> + if (local_err) {
> + error_propagate(errp, local_err);
> + qemu_opts_del(opts);
> + return -EINVAL;
> + }
> +
> + s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT, 1001);
> + s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT, 501);
> +
> + segment_name = qemu_opt_get(opts, ARCHIPELAGO_OPT_SEGMENT);
> + if (segment_name == NULL) {
> + s->segment_name = g_strdup("archipelago");
> + } else {
> + s->segment_name = g_strdup(segment_name);
> + }
> +
> + volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
> + if (volume == NULL) {
> + error_setg(errp, "archipelago block driver requires the 'volume'"
> + " option");
> + qemu_opts_del(opts);
> + return -EINVAL;
s->segment_name is leaked here.
You already have an exit label (err_exit) that cleans everything up,
and g_free() is NULL safe (and bs->opaque is zero-initialized).
You should be able to just set ret, and 'goto err_exit' in each error
instance in qemu_archipelago_open() - this also gets rid of the extra
qemu_opts_del() calls.
> + }
> + s->volname = g_strdup(volume);
> +
> + /* Initialize XSEG, join shared memory segment */
> + ret = qemu_archipelago_init(s);
> + if (ret < 0) {
> + error_setg(errp, "cannot initialize XSEG and join shared "
> + "memory segment");
> + goto err_exit;
> + }
> +
> + qemu_opts_del(opts);
> + return 0;
> +
> +err_exit:
> + g_free(s->volname);
> + g_free(s->segment_name);
> + qemu_opts_del(opts);
> + return ret;
> +}
> +
> +static void qemu_archipelago_close(BlockDriverState *bs)
> +{
> + int r, targetlen;
> + char *target;
> + struct xseg_request *req;
> + BDRVArchipelagoState *s = bs->opaque;
> +
> + s->stopping = true;
> +
> + qemu_mutex_lock(&s->request_mutex);
> + while (!s->th_is_signaled) {
> + qemu_cond_wait(&s->request_cond,
> + &s->request_mutex);
> + }
> + qemu_mutex_unlock(&s->request_mutex);
> + qemu_thread_join(&s->request_th);
> + qemu_cond_destroy(&s->request_cond);
> + qemu_mutex_destroy(&s->request_mutex);
> +
> + qemu_cond_destroy(&s->archip_cond);
> + qemu_mutex_destroy(&s->archip_mutex);
> +
> + targetlen = strlen(s->volname);
Should this be strlen(s->volname) + 1, to account for the '\0'? Or
does xseg_prep_request() just need the length of the non-null
terminated string?
> + req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
> + if (!req) {
> + archipelagolog("Cannot get XSEG request\n");
> + goto err_exit;
> + }
> + r = xseg_prep_request(s->xseg, req, targetlen, 0);
> + if (r < 0) {
> + xseg_put_request(s->xseg, req, s->srcport);
What does this do here, if xseg_prep_request() failed? Is it
essentially a cleanup function?
> + archipelagolog("Cannot prepare XSEG close request\n");
> + goto err_exit;
> + }
> +
> + target = xseg_get_target(s->xseg, req);
> + memcpy(target, s->volname, targetlen);
> + req->size = req->datalen;
> + req->offset = 0;
> + req->op = X_CLOSE;
> +
> + xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
> + if (p == NoPort) {
> + xseg_put_request(s->xseg, req, s->srcport);
> + archipelagolog("Cannot submit XSEG close request\n");
> + goto err_exit;
> + }
> +
> + xseg_signal(s->xseg, p);
> + wait_reply(s->xseg, s->srcport, s->port, req);
This is another spot I am wondering if we could get stuck on a
blocking call that could potentially wait forever... is there a
timeout here?
> +
> + xseg_put_request(s->xseg, req, s->srcport);
> +
> +err_exit:
> + g_free(s->volname);
> + g_free(s->segment_name);
> + xseg_quit_local_signal(s->xseg, s->srcport);
> + xseg_leave_dynport(s->xseg, s->port);
> + xseg_leave(s->xseg);
> +}
> +
> +static void qemu_archipelago_aio_cancel(BlockDriverAIOCB *blockacb)
> +{
> + ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) blockacb;
> + aio_cb->cancelled = true;
> + while (aio_cb->status == -EINPROGRESS) {
> + qemu_aio_wait();
> + }
> + qemu_aio_release(aio_cb);
> +}
> +
> +static const AIOCBInfo archipelago_aiocb_info = {
> + .aiocb_size = sizeof(ArchipelagoAIOCB),
> + .cancel = qemu_archipelago_aio_cancel,
> +};
> +
> +static int __archipelago_submit_request(BDRVArchipelagoState *s,
> + uint64_t bufidx,
> + size_t count,
> + off_t offset,
> + ArchipelagoAIOCB *aio_cb,
> + ArchipelagoSegmentedRequest *segreq,
> + int op)
> +{
> + int ret, targetlen;
> + char *target;
> + void *data = NULL;
> + struct xseg_request *req;
> + AIORequestData *reqdata = g_malloc(sizeof(AIORequestData));
> +
> + targetlen = strlen(s->volname);
> + req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
> + if (!req) {
> + archipelagolog("Cannot get XSEG request\n");
> + goto err_exit2;
> + }
> + ret = xseg_prep_request(s->xseg, req, targetlen, count);
> + if (ret < 0) {
> + archipelagolog("Cannot prepare XSEG request\n");
> + goto err_exit;
> + }
> + target = xseg_get_target(s->xseg, req);
> + if (!target) {
> + archipelagolog("Cannot get XSEG target\n");
> + goto err_exit;
> + }
> + memcpy(target, s->volname, targetlen);
> + req->size = count;
> + req->offset = offset;
> +
> + switch (op) {
> + case ARCHIP_OP_READ:
> + req->op = X_READ;
> + break;
> + case ARCHIP_OP_WRITE:
> + req->op = X_WRITE;
> + break;
> + }
> + reqdata->volname = s->volname;
> + reqdata->offset = offset;
> + reqdata->size = count;
> + reqdata->bufidx = bufidx;
> + reqdata->aio_cb = aio_cb;
> + reqdata->segreq = segreq;
> + reqdata->op = op;
> +
> + xseg_set_req_data(s->xseg, req, reqdata);
> + if (op == ARCHIP_OP_WRITE) {
> + data = xseg_get_data(s->xseg, req);
> + if (!data) {
> + archipelagolog("Cannot get XSEG data\n");
> + goto err_exit;
> + }
> + memcpy(data, aio_cb->buffer + bufidx, count);
> + }
> +
> + xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
> + if (p == NoPort) {
> + archipelagolog("Could not submit XSEG request\n");
> + goto err_exit;
> + }
> + xseg_signal(s->xseg, p);
> + return 0;
> +
> +err_exit:
> + g_free(reqdata);
> + xseg_put_request(s->xseg, req, s->srcport);
> + return -EIO;
> +err_exit2:
> + g_free(reqdata);
> + return -EIO;
> +}
> +
> +static int archipelago_aio_segmented_rw(BDRVArchipelagoState *s,
> + size_t count,
> + off_t offset,
> + ArchipelagoAIOCB *aio_cb,
> + int op)
> +{
> + int i, ret, segments_nr, last_segment_size;
> + ArchipelagoSegmentedRequest *segreq;
> +
> + segreq = g_malloc(sizeof(ArchipelagoSegmentedRequest));
> +
> + if (op == ARCHIP_OP_FLUSH) {
> + segments_nr = 1;
> + segreq->ref = segments_nr;
> + segreq->total = count;
> + segreq->count = 0;
> + segreq->failed = 0;
> + ret = __archipelago_submit_request(s, 0, count, offset, aio_cb,
> + segreq, ARCHIP_OP_WRITE);
> + if (ret < 0) {
> + goto err_exit;
> + }
> + return 0;
> + }
> +
> + segments_nr = (int)(count / MAX_REQUEST_SIZE) + \
> + ((count % MAX_REQUEST_SIZE) ? 1 : 0);
> + last_segment_size = (int)(count % MAX_REQUEST_SIZE);
> +
> + segreq->ref = segments_nr;
> + segreq->total = count;
> + segreq->count = 0;
> + segreq->failed = 0;
> +
> + for (i = 0; i < segments_nr - 1; i++) {
> + ret = __archipelago_submit_request(s, i * MAX_REQUEST_SIZE,
> + MAX_REQUEST_SIZE,
> + offset + i * MAX_REQUEST_SIZE,
> + aio_cb, segreq, op);
> +
> + if (ret < 0) {
> + goto err_exit;
> + }
> + }
> +
> + if ((segments_nr > 1) && last_segment_size) {
> + ret = __archipelago_submit_request(s, i * MAX_REQUEST_SIZE,
> + last_segment_size,
> + offset + i * MAX_REQUEST_SIZE,
> + aio_cb, segreq, op);
> + } else if ((segments_nr > 1) && !last_segment_size) {
> + ret = __archipelago_submit_request(s, i * MAX_REQUEST_SIZE,
> + MAX_REQUEST_SIZE,
> + offset + i * MAX_REQUEST_SIZE,
> + aio_cb, segreq, op);
> + } else if (segments_nr == 1) {
> + ret = __archipelago_submit_request(s, 0, count, offset, aio_cb,
> + segreq, op);
> + }
> +
> + if (ret < 0) {
> + goto err_exit;
> + }
> +
> + return 0;
> +
> +err_exit:
> + __sync_add_and_fetch(&segreq->failed, 1);
> + if (segments_nr == 1) {
> + if (__sync_add_and_fetch(&segreq->ref, -1) == 0) {
> + g_free(segreq);
> + }
> + } else {
> + if ((__sync_add_and_fetch(&segreq->ref, -segments_nr + i)) == 0) {
> + g_free(segreq);
> + }
> + }
Don't we run the risk of leaking segreq here? The other place this is
freed is in xseg_request_handler(), but could we run into a race
condition where 's->stopping' is true, or even xseg_receive() just does not
return a request?
> +
> + return ret;
> +}
> +
> +static BlockDriverAIOCB *qemu_archipelago_aio_rw(BlockDriverState *bs,
> + int64_t sector_num,
> + QEMUIOVector *qiov,
> + int nb_sectors,
> + BlockDriverCompletionFunc
> *cb,
> + void *opaque,
> + int op)
> +{
> + ArchipelagoAIOCB *aio_cb;
> + BDRVArchipelagoState *s = bs->opaque;
> + int64_t size, off;
> + int ret;
> +
> + aio_cb = qemu_aio_get(&archipelago_aiocb_info, bs, cb, opaque);
> + aio_cb->cmd = op;
> + aio_cb->qiov = qiov;
> +
> + if (op != ARCHIP_OP_FLUSH) {
> + aio_cb->buffer = qemu_blockalign(bs, qiov->size);
> + } else {
> + aio_cb->buffer = NULL;
> + }
> +
> + aio_cb->ret = 0;
> + aio_cb->s = s;
> + aio_cb->cancelled = false;
> + aio_cb->status = -EINPROGRESS;
> +
> + if (op == ARCHIP_OP_WRITE) {
> + qemu_iovec_to_buf(aio_cb->qiov, 0, aio_cb->buffer, qiov->size);
> + }
> +
> + off = sector_num * BDRV_SECTOR_SIZE;
> + size = nb_sectors * BDRV_SECTOR_SIZE;
> + aio_cb->size = size;
> +
> + ret = archipelago_aio_segmented_rw(s, size, off,
> + aio_cb, op);
> + if (ret < 0) {
> + goto err_exit;
> + }
> + return &aio_cb->common;
> +
> +err_exit:
> + error_report("qemu_archipelago_aio_rw(): I/O Error\n");
> + qemu_vfree(aio_cb->buffer);
> + qemu_aio_release(aio_cb);
> + return NULL;
> +}
> +
> +static BlockDriverAIOCB *qemu_archipelago_aio_readv(BlockDriverState *bs,
> + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> + BlockDriverCompletionFunc *cb, void *opaque)
> +{
> + return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
> + opaque, ARCHIP_OP_READ);
> +}
> +
> +static BlockDriverAIOCB *qemu_archipelago_aio_writev(BlockDriverState *bs,
> + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> + BlockDriverCompletionFunc *cb, void *opaque)
> +{
> + return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
> + opaque, ARCHIP_OP_WRITE);
> +}
> +
> +static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
> +{
> + uint64_t size;
> + int ret, targetlen;
> + struct xseg_request *req;
> + struct xseg_reply_info *xinfo;
> + AIORequestData *reqdata = g_malloc(sizeof(AIORequestData));
> +
> + const char *volname = s->volname;
> + targetlen = strlen(volname);
> + req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
> + if (!req) {
> + archipelagolog("Cannot get XSEG request\n");
> + goto err_exit2;
> + }
> + ret = xseg_prep_request(s->xseg, req, targetlen,
> + sizeof(struct xseg_reply_info));
> + if (ret < 0) {
> + archipelagolog("Cannot prepare XSEG request\n");
> + goto err_exit;
> + }
> + char *target = xseg_get_target(s->xseg, req);
> + if (!target) {
> + archipelagolog("Cannot get XSEG target\n");
> + goto err_exit;
> + }
> + memcpy(target, volname, targetlen);
> + req->size = req->datalen;
> + req->offset = 0;
> + req->op = X_INFO;
> +
> + reqdata->op = ARCHIP_OP_VOLINFO;
> + reqdata->volname = volname;
> + xseg_set_req_data(s->xseg, req, reqdata);
> +
> + xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
> + if (p == NoPort) {
> + archipelagolog("Cannot submit XSEG request\n");
> + goto err_exit;
> + }
> + xseg_signal(s->xseg, p);
> + qemu_mutex_lock(&s->archip_mutex);
> + while (!s->is_signaled) {
> + qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
> + }
> + s->is_signaled = false;
> + qemu_mutex_unlock(&s->archip_mutex);
> +
> + xinfo = (struct xseg_reply_info *) xseg_get_data(s->xseg, req);
> + size = xinfo->size;
> + xseg_put_request(s->xseg, req, s->srcport);
> + g_free(reqdata);
> + s->size = size;
> + return size;
> +
> +err_exit:
> + g_free(reqdata);
> + xseg_put_request(s->xseg, req, s->srcport);
> + return -1;
> +err_exit2:
> + g_free(reqdata);
> + return -1;
> +}
This could be simplified to just:
err_exit:
xseg_put_request(s->xseg, req, s->srcport);
err_exit2:
g_free(reqdata);
return -1;
}
Maybe it'd also be best to return -EIO (or other meaningful error
value) instead of just -1, as this value gets passed along to
.bdrv_getlength().
> +
> +static int64_t qemu_archipelago_getlength(BlockDriverState *bs)
> +{
> + int64_t ret;
> + BDRVArchipelagoState *s = bs->opaque;
> +
> + ret = archipelago_volume_info(s);
(This is where I am talking about an error value such as -EIO may be
better)
> + return ret;
> +}
> +
> +static BlockDriverAIOCB *qemu_archipelago_aio_flush(BlockDriverState *bs,
> + BlockDriverCompletionFunc *cb, void *opaque)
> +{
> + return qemu_archipelago_aio_rw(bs, 0, NULL, 0, cb, opaque,
> + ARCHIP_OP_FLUSH);
> +}
> +
> +static BlockDriver bdrv_archipelago = {
> + .format_name = "archipelago",
> + .protocol_name = "archipelago",
> + .instance_size = sizeof(BDRVArchipelagoState),
> + .bdrv_file_open = qemu_archipelago_open,
> + .bdrv_close = qemu_archipelago_close,
> + .bdrv_getlength = qemu_archipelago_getlength,
> + .bdrv_aio_readv = qemu_archipelago_aio_readv,
> + .bdrv_aio_writev = qemu_archipelago_aio_writev,
> + .bdrv_aio_flush = qemu_archipelago_aio_flush,
> + .bdrv_has_zero_init = bdrv_has_zero_init_1,
> +};
> +
> +static void bdrv_archipelago_init(void)
> +{
> + bdrv_register(&bdrv_archipelago);
> +}
> +
> +block_init(bdrv_archipelago_init);
> diff --git a/configure b/configure
> index 7102964..e4acd9c 100755
> --- a/configure
> +++ b/configure
> @@ -326,6 +326,7 @@ seccomp=""
> glusterfs=""
> glusterfs_discard="no"
> glusterfs_zerofill="no"
> +archipelago=""
> virtio_blk_data_plane=""
> gtk=""
> gtkabi=""
> @@ -1087,6 +1088,10 @@ for opt do
> ;;
> --enable-glusterfs) glusterfs="yes"
> ;;
> + --disable-archipelago) archipelago="no"
> + ;;
> + --enable-archipelago) archipelago="yes"
> + ;;
> --disable-virtio-blk-data-plane) virtio_blk_data_plane="no"
> ;;
> --enable-virtio-blk-data-plane) virtio_blk_data_plane="yes"
> @@ -1382,6 +1387,8 @@ Advanced options (experts only):
> --enable-coroutine-pool enable coroutine freelist (better performance)
> --enable-glusterfs enable GlusterFS backend
> --disable-glusterfs disable GlusterFS backend
> + --enable-archipelago enable Archipelago backend
> + --disable-archipelago disable Archipelago backend
> --enable-gcov enable test coverage analysis with gcov
> --gcov=GCOV use specified gcov [$gcov_tool]
> --disable-tpm disable TPM support
> @@ -3051,6 +3058,33 @@ EOF
> fi
> fi
>
> +
> +##########################################
> +# archipelago probe
> +if test "$archipelago" != "no" ; then
> + cat > $TMPC <<EOF
> +#include <stdio.h>
> +#include <xseg/xseg.h>
> +#include <xseg/protocol.h>
> +int main(void) {
> + xseg_initialize();
> + return 0;
> +}
> +EOF
> + archipelago_libs=-lxseg
> + if compile_prog "" "$archipelago_libs"; then
> + archipelago="yes"
> + libs_tools="$archipelago_libs $libs_tools"
> + libs_softmmu="$archipelago_libs $libs_softmmu"
> + else
> + if test "$archipelago" = "yes" ; then
> + feature_not_found "Archipelago backend support" "Install libxseg
> devel"
> + fi
> + archipelago="no"
> + fi
> +fi
> +
> +
> ##########################################
> # glusterfs probe
> if test "$glusterfs" != "no" ; then
> @@ -4230,6 +4264,7 @@ echo "seccomp support $seccomp"
> echo "coroutine backend $coroutine"
> echo "coroutine pool $coroutine_pool"
> echo "GlusterFS support $glusterfs"
> +echo "Archipelago support $archipelago"
> echo "virtio-blk-data-plane $virtio_blk_data_plane"
> echo "gcov $gcov_tool"
> echo "gcov enabled $gcov"
> @@ -4665,6 +4700,11 @@ if test "$glusterfs_zerofill" = "yes" ; then
> echo "CONFIG_GLUSTERFS_ZEROFILL=y" >> $config_host_mak
> fi
>
> +if test "$archipelago" = "yes" ; then
> + echo "CONFIG_ARCHIPELAGO=m" >> $config_host_mak
> + echo "ARCHIPELAGO_LIBS=$archipelago_libs" >> $config_host_mak
> +fi
> +
> if test "$libssh2" = "yes" ; then
> echo "CONFIG_LIBSSH2=m" >> $config_host_mak
> echo "LIBSSH2_CFLAGS=$libssh2_cflags" >> $config_host_mak
> --
> 1.7.10.4
>
>
- Re: [Qemu-devel] [PATCH v6 1/5] block: Support Archipelago as a QEMU block backend,
Jeff Cody <=