qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH v4 1/3] block: Support Archipelago as a QEMU blo


From: Chrysostomos Nanakos
Subject: Re: [Qemu-devel] [PATCH v4 1/3] block: Support Archipelago as a QEMU block backend
Date: Mon, 23 Jun 2014 11:17:16 +0300
User-agent: Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20100101 Icedove/24.4.0

On 06/20/2014 05:33 PM, Stefan Hajnoczi wrote:
On Thu, Jun 19, 2014 at 05:48:46PM +0300, Chrysostomos Nanakos wrote:
+typedef struct BDRVArchipelagoState {
+    int fds[2];
+    int qemu_aio_count;
This field is never used.  It's increment and decremented but nothing
ever checks the value.  It can be dropped.

+    int event_reader_pos;
+    ArchipelagoAIOCB *event_acb;
+    const char *volname;
+    uint64_t size;
+    /* Archipelago specific */
+    struct xseg *xseg;
+    struct xseg_port *port;
+    xport srcport;
+    xport sport;
+    xport mportno;
+    xport vportno;
+    QemuMutex archip_mutex;
+    QemuCond archip_cond;
+    bool is_signaled;
+    /* Request handler specific */
+    QemuThread request_th;
+    QemuCond request_cond;
+    QemuMutex request_mutex;
+    bool th_is_signaled;
+    bool stopping;
+} BDRVArchipelagoState;
+
+typedef struct ArchipelagoSegmentedRequest {
+    size_t count;
+    size_t total;
+    int ref;
+    int failed;
+} ArchipelagoSegmentedRequest;
+
+typedef struct AIORequestData {
+    const char *volname;
+    off_t offset;
+    size_t size;
+    uint64_t bufidx;
+    int ret;
+    int op;
+    ArchipelagoAIOCB *aio_cb;
+    ArchipelagoSegmentedRequest *segreq;
+} AIORequestData;
+
+
+static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb);
+
+static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
+{
+    if (xseg && (sport != srcport)) {
+        xseg_init_local_signal(xseg, srcport);
+        sport = srcport;
+    }
+}
QEMU should clean up by calling xseg_quit_local_signal().

+
+static void archipelago_finish_aiocb(AIORequestData *reqdata)
+{
+    int ret;
+    ret = qemu_archipelago_signal_pipe(reqdata->aio_cb);
+    if (ret < 0) {
+        error_report("archipelago_finish_aiocb(): failed writing"
+                     " aio_cb->s->fds");
+    }
+    g_free(reqdata);
+}
+
+static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port *port,
+                      struct xseg_request *expected_req)
+{
+    struct xseg_request *req;
+    xseg_prepare_wait(xseg, srcport);
+    void *psd = xseg_get_signal_desc(xseg, port);
+    while (1) {
+        req = xseg_receive(xseg, srcport, 0);
+        if (req) {
+            if (req != expected_req) {
+                archipelagolog("Unknown received request\n");
+                xseg_put_request(xseg, req, srcport);
+            } else if (!(req->state & XS_SERVED)) {
+                archipelagolog("Failed req\n");
+                return -1;
+            } else {
+                break;
+            }
+        }
+        xseg_wait_signal(xseg, psd, 100000UL);
+    }
+    xseg_cancel_wait(xseg, srcport);
+    return 0;
+}
+
+static void xseg_request_handler(void *state)
+{
This thread is only necessary because you're not integrating xseg into
the QEMU event loop.  If you got the pipe fds from xseg and used
aio_set_fd_handler() you could eliminate this thread.  The advantage is
that you can skip the archipelago_finish_aiocb() and get slightly better
performance due to one less context switch between threads.

+    BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
+    void *psd = xseg_get_signal_desc(s->xseg, s->port);
+    qemu_mutex_lock(&s->request_mutex);
+
+    while (!s->stopping) {
+        struct xseg_request *req;
+        char *data;
+        xseg_prepare_wait(s->xseg, s->srcport);
+        req = xseg_receive(s->xseg, s->srcport, 0);
+        if (req) {
+            AIORequestData *reqdata;
+            ArchipelagoSegmentedRequest *segreq;
+            xseg_get_req_data(s->xseg, req, (void **)&reqdata);
+
+            if (!(req->state & XS_SERVED)) {
+                    segreq = reqdata->segreq;
+                    __sync_bool_compare_and_swap(&segreq->failed, 0, 1);
+            }
+
+            switch (reqdata->op) {
+            case ARCHIP_OP_READ:
+                    data = xseg_get_data(s->xseg, req);
+                    segreq = reqdata->segreq;
+                    segreq->count += req->serviced;
+
+                    qemu_iovec_from_buf(reqdata->aio_cb->qiov, reqdata->bufidx,
+                            data,
+                            req->serviced);
+
+                    xseg_put_request(s->xseg, req, s->srcport);
+
+                    __sync_add_and_fetch(&segreq->ref, -1);
+
+                    if (segreq->ref == 0) {
Not sure about the value of __sync_add_and_fetch() since the if
statement fetches segreq->ref again.  But I'm not reviewing the details
of the shared memory accesses.  I'm assuming this stuff is correct,
secure, etc.

Yes you are right, IMHO a better and safer approach is:

if ((__sync_add_and_fetch(&segreq->ref, -1)) == 0) {
...


+                        if (!segreq->failed) {
+                            reqdata->aio_cb->ret = segreq->count;
+                            archipelago_finish_aiocb(reqdata);
+                        }
What does segreq->failed mean?  We should always finish the I/O request,
otherwise the upper layers will run out of resources as we leak
failed requests.

Yes you are right.
If a request fails while submitting it to Archipelago archipelago_aio_segmented_rw() will return -EIO to qemu_archipelago_aio_rw() which will return NULL to .bdrv_aio_readv/_write(). Now if all requests to Archipelago have succeeded in submission and one or all of them haven't been serviced (partial read/write) from Archipelago, archipelago_finish_aiocb() will fail the request. The last one wasn't implemented in this patch, v5 series has the appropriate changes.

Is this a proper and accepted approach along with the removal of the pipe code and the introduction of the QEMU "bottom-half" scheduled in archipelago_finish_aiocb()?




+static void parse_filename_opts(const char *filename, Error **errp,
+                                char **volume, xport *mport, xport *vport)
+{
+    const char *start;
+    char *tokens[3], *ds;
+    int idx;
+    xport lmport = NoPort, lvport = NoPort;
+
+    strstart(filename, "archipelago:", &start);
+
+    ds = g_strdup(start);
+    tokens[0] = strtok(ds, "/");
+    tokens[1] = strtok(NULL, ":");
+    tokens[2] = strtok(NULL, "\0");
+
+    if (!strlen(tokens[0])) {
+        error_setg(errp, "volume name must be specified first");
+        return;
ds is leaked.

+    }
+
+    for (idx = 1; idx < 3; idx++) {
+        if (tokens[idx] != NULL) {
+            if (strstart(tokens[idx], "mport=", NULL)) {
+                xseg_find_port(tokens[idx], "mport=", &lmport);
+            }
+            if (strstart(tokens[idx], "vport=", NULL)) {
+                xseg_find_port(tokens[idx], "vport=", &lvport);
+            }
+        }
+    }
+
+    if ((lmport == (xport) -2) || (lvport == (xport) -2)) {
+        error_setg(errp, "Usage: file=archipelago:"
+                   "<volumename>[/mport=<mapperd_port>"
+                   "[:vport=<vlmcd_port>]]");
ds is leaked.

+        return;
+    }
+    *volume = g_strdup(tokens[0]);
+    *mport = lmport;
+    *vport = lvport;
+    g_free(ds);
+}
+
+static void archipelago_parse_filename(const char *filename, QDict *options,
+                                       Error **errp)
+{
+    const char *start;
+    char *volume = NULL;
+    xport mport = NoPort, vport = NoPort;
+
+    if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME)
+            || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT)
+            || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) {
+        error_setg(errp, "volume/mport/vport and a file name may not be "
+                         "specified at the same time");
+        return;
+    }
+
+    if (!strstart(filename, "archipelago:", &start)) {
+        error_setg(errp, "File name must start with 'archipelago:'");
+        return;
+    }
+
+    if (!strlen(start) || strstart(start, "/", NULL)) {
+        error_setg(errp, "volume name must be specified");
+        return;
+    }
+
+    parse_filename_opts(filename, errp, &volume, &mport, &vport);
+
+    if (volume) {
+        qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume));
+        g_free(volume);
+    }
+    if (mport != NoPort) {
+        qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport));
+    }
+    if (vport != NoPort) {
+        qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport));
+    }
+}
+
+static QemuOptsList archipelago_runtime_opts = {
+    .name = "archipelago",
+    .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
+    .desc = {
+        {
+            .name = ARCHIPELAGO_OPT_VOLUME,
+            .type = QEMU_OPT_STRING,
+            .help = "Name of the volume image",
+        },
+        {
+            .name = ARCHIPELAGO_OPT_MPORT,
+            .type = QEMU_OPT_NUMBER,
+            .help = "Archipelago mapperd port number"
+        },
+        {
+            .name = ARCHIPELAGO_OPT_VPORT,
+            .type = QEMU_OPT_NUMBER,
+            .help = "Archipelago vlmcd port number"
+
+        },
+        { /* end of list */ }
+    },
+};
+
+static int qemu_archipelago_open(BlockDriverState *bs,
+                                 QDict *options,
+                                 int bdrv_flags,
+                                 Error **errp)
+{
+    int ret = 0;
+    const char *volume;
+    QemuOpts *opts;
+    Error *local_err = NULL;
+    BDRVArchipelagoState *s = bs->opaque;
+
+    opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, &error_abort);
+    qemu_opts_absorb_qdict(opts, options, &local_err);
+    if (local_err) {
+        error_propagate(errp, local_err);
+        qemu_opts_del(opts);
+        return -EINVAL;
+    }
+
+    s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT, 1001);
+    s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT, 501);
+
+    volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
+    if (volume == NULL) {
+        error_setg(errp, "archipelago block driver requires an 'volume'"
+                   " options");
"archipelago block driver requires the 'volume' option"

+        error_propagate(errp, local_err);
This line is unnecessary since the error message was already put into
errp.

+        qemu_opts_del(opts);
+        return -EINVAL;
+    }
+    s->volname = g_strdup(volume);
+
+    /* Initialize XSEG, join shared memory segment */
+    ret = qemu_archipelago_init(s);
+    if (ret < 0) {
+        error_setg(errp, "cannot initialize XSEG and join shared "
+                   "memory segment");
+        goto err_exit;
+    }
+
+    s->event_reader_pos = 0;
+    ret = qemu_pipe(s->fds);
+    if (ret < 0) {
+        error_setg(errp, "cannot create pipe");
+        goto err_exit;
Do we need to xseg_leave() to avoid leaking xseg refcounts, leaving
memory mapped, and memory leaks?

Removed qemu_pipe() call so we do not need to call xseg_leave() anymore.


+    }
+
+    fcntl(s->fds[ARCHIP_FD_READ], F_SETFL, O_NONBLOCK);
+    fcntl(s->fds[ARCHIP_FD_WRITE], F_SETFL, O_NONBLOCK);
+    qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ],
+                            qemu_archipelago_aio_event_reader, NULL,
+                            s);
+
+    qemu_opts_del(opts);
+    return 0;
+
+err_exit:
+    qemu_opts_del(opts);
+    return ret;
s->volname is leaked

+}
+
+static void qemu_archipelago_close(BlockDriverState *bs)
+{
+    int r, targetlen;
+    char *target;
+    struct xseg_request *req;
+    BDRVArchipelagoState *s = bs->opaque;
+
+    qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ], NULL, NULL, NULL);
+    close(s->fds[0]);
+    close(s->fds[1]);
+
+    s->stopping = true;
+
+    qemu_mutex_lock(&s->request_mutex);
+    while (!s->th_is_signaled) {
+        qemu_cond_wait(&s->request_cond,
+                       &s->request_mutex);
+    }
+    qemu_mutex_unlock(&s->request_mutex);
+    qemu_cond_destroy(&s->request_cond);
+    qemu_mutex_destroy(&s->request_mutex);
It's not safe to qemu_mutex_destroy() because the other thread may still
be inside qemu_mutex_unlock(&s->request_mutex) and may still access
s->request_mutex memory.

Use qemu_thread_join() before destroying request_cond and request_mutex.
That way you can be sure there is no race condition.

(I recently did the same thing and Paolo Bonzini pointed out the bug.
After checking the glibc implementation I was convinced that it's not
safe.)

Got it! Paolo was absolutely right! Thanks for sharing!

+
+    qemu_cond_destroy(&s->archip_cond);
+    qemu_mutex_destroy(&s->archip_mutex);
+
+    targetlen = strlen(s->volname);
+    req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
+    if (!req) {
+        archipelagolog("Cannot get XSEG request\n");
+        goto err_exit;
+    }
+    r = xseg_prep_request(s->xseg, req, targetlen, 0);
+    if (r < 0) {
+        xseg_put_request(s->xseg, req, s->srcport);
+        archipelagolog("Cannot prepare XSEG close request\n");
+        goto err_exit;
+    }
+
+    target = xseg_get_target(s->xseg, req);
+    strncpy(target, s->volname, targetlen);
Using strncpy() hints that target is a string when in fact it's not.  I
think memcpy() would be clearer here since you don't want a '\0' byte at
the end of the string.

Or maybe I'm wrong and there is some guarantee that there will be a '\0'
byte after target?

No you are not wrong, memcpy() is clearer, fixed for v5 series.

+    req->size = req->datalen;
+    req->offset = 0;
+    req->op = X_CLOSE;
+
+    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
+    if (p == NoPort) {
+        xseg_put_request(s->xseg, req, s->srcport);
+        archipelagolog("Cannot submit XSEG close request\n");
+        goto err_exit;
+    }
+
+    xseg_signal(s->xseg, p);
+    r = wait_reply(s->xseg, s->srcport, s->port, req);
+    if (r < 0) {
+        archipelagolog("wait_reply() error\n");
+    }
+    if (!(req->state & XS_SERVED)) {
+        archipelagolog("Could no close map for volume '%s'\n", s->volname);
+    }
+
+    xseg_put_request(s->xseg, req, s->srcport);
+
+err_exit:
+    xseg_leave_dynport(s->xseg, s->port);
+    xseg_leave(s->xseg);
s->volname is leaked.

+}
+
+static void qemu_archipelago_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+    ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) blockacb;
+    aio_cb->cancelled = true;
+    while (aio_cb->status == -EINPROGRESS) {
+        qemu_aio_wait();
+    }
+    qemu_aio_release(aio_cb);
+}
+
+static const AIOCBInfo archipelago_aiocb_info = {
+    .aiocb_size = sizeof(ArchipelagoAIOCB),
+    .cancel = qemu_archipelago_aio_cancel,
+};
+
+static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb)
+{
+    int ret = 0;
+    while (1) {
+        fd_set wfd;
+        int fd = aio_cb->s->fds[1];
+
+        ret = write(fd, (void *)&aio_cb, sizeof(aio_cb));
+        if (ret > 0) {
+            break;
+        }
+        if (errno == EINTR) {
+            continue;
+        }
+        if (errno != EAGAIN) {
+            break;
+        }
+        FD_ZERO(&wfd);
+        FD_SET(fd, &wfd);
+        do {
+            ret = select(fd + 1, NULL, &wfd, NULL, NULL);
+        } while (ret < 0 && errno == EINTR);
+    }
+    return ret;
+}
A newer signalling approach is available and will let you drop the pipe
code.  QEMUBH is a "bottom half" or deferred function call that can be
scheduled in an event loop.  Scheduling the the QEMUBH is thread-safe so
you can perform it from any thread.


See block/gluster.c:gluster_finish_aiocb() for an example using QEMUBH.

+static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
+{
+    uint64_t size;
+    int ret, targetlen;
+    struct xseg_request *req;
+    struct xseg_reply_info *xinfo;
+    AIORequestData *reqdata = g_malloc(sizeof(AIORequestData));
+
+    if (!reqdata) {
+        archipelagolog("Cannot allocate reqdata\n");
+        return -1;
g_malloc() never returns NULL, this if statement can be dropped.




reply via email to

[Prev in Thread] Current Thread [Next in Thread]