qemu-block
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-block] [PATCH 4/7] qcow2: async scheme for qcow2_co_preadv


From: Vladimir Sementsov-Ogievskiy
Subject: [Qemu-block] [PATCH 4/7] qcow2: async scheme for qcow2_co_preadv
Date: Tue, 7 Aug 2018 20:43:08 +0300

Start several async requests instead of read chunk by chunk.

Signed-off-by: Vladimir Sementsov-Ogievskiy <address@hidden>
---
 block/qcow2.c | 208 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 204 insertions(+), 4 deletions(-)

diff --git a/block/qcow2.c b/block/qcow2.c
index 5e7f2ee318..a0df8d4e50 100644
--- a/block/qcow2.c
+++ b/block/qcow2.c
@@ -1869,6 +1869,197 @@ out:
     return ret;
 }
 
+typedef struct Qcow2WorkerTask {
+    uint64_t file_cluster_offset;
+    uint64_t offset;
+    uint64_t bytes;
+    uint64_t bytes_done;
+} Qcow2WorkerTask;
+
+typedef int (*Qcow2DoWorkFunc)(BlockDriverState *bs, QEMUIOVector *qiov,
+                               Qcow2WorkerTask *task);
+
+typedef struct Qcow2RWState {
+    BlockDriverState *bs;
+    QEMUIOVector *qiov;
+    uint64_t bytes;
+    int ret;
+    bool waiting_one;
+    bool waiting_all;
+    bool finalize;
+    Coroutine *co;
+    QSIMPLEQ_HEAD(, Qcow2Worker) free_workers;
+    QSIMPLEQ_HEAD(, Qcow2Worker) busy_workers;
+    int online_workers;
+    Qcow2DoWorkFunc do_work_func;
+} Qcow2RWState;
+
+typedef struct Qcow2Worker {
+    Qcow2RWState *rws;
+    Coroutine *co;
+    Qcow2WorkerTask task;
+    bool busy;
+    QSIMPLEQ_ENTRY(Qcow2Worker) entry;
+} Qcow2Worker;
+#define QCOW2_MAX_WORKERS 64
+
+static coroutine_fn void qcow2_rw_worker(void *opaque);
+static Qcow2Worker *qcow2_new_worker(Qcow2RWState *rws)
+{
+    Qcow2Worker *w = g_new0(Qcow2Worker, 1);
+    w->rws = rws;
+    w->co = qemu_coroutine_create(qcow2_rw_worker, w);
+
+    return w;
+}
+
+static void qcow2_free_worker(Qcow2Worker *w)
+{
+    g_free(w);
+}
+
+static coroutine_fn void qcow2_rw_worker(void *opaque)
+{
+    Qcow2Worker *w = opaque;
+    Qcow2RWState *rws = w->rws;
+
+    rws->online_workers++;
+
+    while (!rws->finalize) {
+        int ret = rws->do_work_func(rws->bs, rws->qiov, &w->task);
+        if (ret < 0 && rws->ret == 0) {
+            rws->ret = ret;
+        }
+
+        if (rws->waiting_all || rws->ret < 0) {
+            break;
+        }
+
+        w->busy = false;
+        QSIMPLEQ_REMOVE(&rws->busy_workers, w, Qcow2Worker, entry);
+        QSIMPLEQ_INSERT_TAIL(&rws->free_workers, w, entry);
+        if (rws->waiting_one) {
+            rws->waiting_one = false;
+            /* we must unset it here, to prevent queuing rws->co in several
+             * workers (it may happen if other worker already waits us on 
mutex,
+             * so it will be entered after our yield and before rws->co enter)
+             *
+             * TODO: rethink this comment, as here (and in other places in the
+             * file) we moved from qemu_coroutine_add_next to aio_co_wake.
+             */
+            aio_co_wake(rws->co);
+        }
+
+        qemu_coroutine_yield();
+    }
+
+    if (w->busy) {
+        w->busy = false;
+        QSIMPLEQ_REMOVE(&rws->busy_workers, w, Qcow2Worker, entry);
+    }
+    qcow2_free_worker(w);
+    rws->online_workers--;
+
+    if (rws->waiting_all && rws->online_workers == 0) {
+        aio_co_wake(rws->co);
+    }
+}
+
+static coroutine_fn void qcow2_rws_add_task(Qcow2RWState *rws,
+                                            uint64_t file_cluster_offset,
+                                            uint64_t offset,
+                                            uint64_t bytes,
+                                            uint64_t bytes_done)
+{
+    Qcow2Worker *w;
+
+    assert(rws->co == qemu_coroutine_self());
+
+    if (bytes_done == 0 && bytes == rws->bytes) {
+        Qcow2WorkerTask task = {
+            .file_cluster_offset = file_cluster_offset,
+            .offset = offset,
+            .bytes = bytes,
+            .bytes_done = bytes_done
+        };
+        rws->ret = rws->do_work_func(rws->bs, rws->qiov, &task);
+        return;
+    }
+
+    if (!QSIMPLEQ_EMPTY(&rws->free_workers)) {
+        w = QSIMPLEQ_FIRST(&rws->free_workers);
+        QSIMPLEQ_REMOVE_HEAD(&rws->free_workers, entry);
+    } else if (rws->online_workers < QCOW2_MAX_WORKERS) {
+        w = qcow2_new_worker(rws);
+    } else {
+        rws->waiting_one = true;
+        qemu_coroutine_yield();
+        assert(!rws->waiting_one); /* already unset by worker */
+
+        w = QSIMPLEQ_FIRST(&rws->free_workers);
+        QSIMPLEQ_REMOVE_HEAD(&rws->free_workers, entry);
+    }
+    w->busy = true;
+    QSIMPLEQ_INSERT_TAIL(&rws->busy_workers, w, entry);
+
+    w->task.file_cluster_offset = file_cluster_offset;
+    w->task.offset = offset;
+    w->task.bytes = bytes;
+    w->task.bytes_done = bytes_done;
+
+    qemu_coroutine_enter(w->co);
+}
+
+static void qcow2_init_rws(Qcow2RWState *rws, BlockDriverState *bs,
+                           QEMUIOVector *qiov, uint64_t bytes,
+                           Qcow2DoWorkFunc do_work_func)
+{
+    memset(rws, 0, sizeof(*rws));
+    rws->bs = bs;
+    rws->qiov = qiov;
+    rws->bytes = bytes;
+    rws->co = qemu_coroutine_self();
+    rws->do_work_func = do_work_func;
+    QSIMPLEQ_INIT(&rws->free_workers);
+    QSIMPLEQ_INIT(&rws->busy_workers);
+}
+
+static void qcow2_finalize_rws(Qcow2RWState *rws)
+{
+    assert(rws->co == qemu_coroutine_self());
+
+    /* kill waiting workers */
+    rws->finalize = true;
+    while (!QSIMPLEQ_EMPTY(&rws->free_workers)) {
+        Qcow2Worker *w = QSIMPLEQ_FIRST(&rws->free_workers);
+        QSIMPLEQ_REMOVE_HEAD(&rws->free_workers, entry);
+        qemu_coroutine_enter(w->co);
+    }
+
+    /* wait others */
+    if (rws->online_workers > 0) {
+        rws->waiting_all = true;
+        qemu_coroutine_yield();
+        rws->waiting_all = false;
+    }
+
+    assert(rws->online_workers == 0);
+    assert(QSIMPLEQ_EMPTY(&rws->free_workers));
+    assert(QSIMPLEQ_EMPTY(&rws->busy_workers));
+}
+
+static coroutine_fn int qcow2_co_preadv_normal_task(BlockDriverState *bs,
+                                                    QEMUIOVector *qiov,
+                                                    Qcow2WorkerTask *task)
+{
+    return qcow2_co_preadv_normal(bs,
+                                  task->file_cluster_offset,
+                                  task->offset,
+                                  task->bytes,
+                                  qiov,
+                                  task->bytes_done);
+}
+
 static coroutine_fn int qcow2_co_preadv(BlockDriverState *bs, uint64_t offset,
                                         uint64_t bytes, QEMUIOVector *qiov,
                                         int flags)
@@ -1880,12 +2071,15 @@ static coroutine_fn int 
qcow2_co_preadv(BlockDriverState *bs, uint64_t offset,
     uint64_t cluster_offset = 0;
     uint64_t bytes_done = 0;
     QEMUIOVector hd_qiov;
+    Qcow2RWState rws = {0};
+
+    qcow2_init_rws(&rws, bs, qiov, bytes, qcow2_co_preadv_normal_task);
 
     qemu_iovec_init(&hd_qiov, qiov->niov);
 
     qemu_co_mutex_lock(&s->lock);
 
-    while (bytes != 0) {
+    while (bytes != 0 && rws.ret == 0) {
 
         /* prepare next request */
         cur_bytes = MIN(bytes, INT_MAX);
@@ -1942,9 +2136,10 @@ static coroutine_fn int qcow2_co_preadv(BlockDriverState 
*bs, uint64_t offset,
         case QCOW2_CLUSTER_NORMAL:
             qemu_co_mutex_unlock(&s->lock);
 
-            ret = qcow2_co_preadv_normal(bs, cluster_offset,
-                                         offset, cur_bytes, qiov, bytes_done);
-            if (ret < 0) {
+            qcow2_rws_add_task(&rws, cluster_offset, offset, cur_bytes,
+                               bytes_done);
+            if (rws.ret < 0) {
+                ret = rws.ret;
                 goto fail_nolock;
             }
 
@@ -1967,6 +2162,11 @@ fail:
     qemu_co_mutex_unlock(&s->lock);
 
 fail_nolock:
+    qcow2_finalize_rws(&rws);
+    if (ret == 0) {
+        ret = rws.ret;
+    }
+
     qemu_iovec_destroy(&hd_qiov);
 
     return ret;
-- 
2.11.1




reply via email to

[Prev in Thread] Current Thread [Next in Thread]