27.09.2018 21:35, Max Reitz wrote:
On 07.08.18 19:43, Vladimir Sementsov-Ogievskiy wrote:
Start several async requests instead of read chunk by chunk.
Signed-off-by: Vladimir Sementsov-Ogievskiy <address@hidden>
---
block/qcow2.c | 208
++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 204 insertions(+), 4 deletions(-)
diff --git a/block/qcow2.c b/block/qcow2.c
index 5e7f2ee318..a0df8d4e50 100644
--- a/block/qcow2.c
+++ b/block/qcow2.c
@@ -1869,6 +1869,197 @@ out:
return ret;
}
+typedef struct Qcow2WorkerTask {
+ uint64_t file_cluster_offset;
+ uint64_t offset;
+ uint64_t bytes;
+ uint64_t bytes_done;
+} Qcow2WorkerTask;
Why don't you make this a union of request-specific structs?
ok, will try
+
+typedef int (*Qcow2DoWorkFunc)(BlockDriverState *bs, QEMUIOVector
*qiov,
+ Qcow2WorkerTask *task);
+
+typedef struct Qcow2RWState {
+ BlockDriverState *bs;
+ QEMUIOVector *qiov;
+ uint64_t bytes;
Maybe make it total_bytes so it doesn't conflict with the value in
Qcow2WorkerTask?
ok
+ int ret;
+ bool waiting_one;
+ bool waiting_all;
+ bool finalize;
+ Coroutine *co;
+ QSIMPLEQ_HEAD(, Qcow2Worker) free_workers;
+ QSIMPLEQ_HEAD(, Qcow2Worker) busy_workers;
+ int online_workers;
+ Qcow2DoWorkFunc do_work_func;
+} Qcow2RWState;
+
+typedef struct Qcow2Worker {
+ Qcow2RWState *rws;
+ Coroutine *co;
+ Qcow2WorkerTask task;
+ bool busy;
+ QSIMPLEQ_ENTRY(Qcow2Worker) entry;
+} Qcow2Worker;
+#define QCOW2_MAX_WORKERS 64
That's really a bit hidden here. I think it should go into the header.
Also I'm not quite sure about the number. In other places we've always
used 16.
(With the encryption code always allocating a new bounce buffer, this
can mean quite a bit of memory usage.)
No doubts.
+
+static coroutine_fn void qcow2_rw_worker(void *opaque);
+static Qcow2Worker *qcow2_new_worker(Qcow2RWState *rws)
+{
+ Qcow2Worker *w = g_new0(Qcow2Worker, 1);
+ w->rws = rws;
+ w->co = qemu_coroutine_create(qcow2_rw_worker, w);
+
+ return w;
+}
+
+static void qcow2_free_worker(Qcow2Worker *w)
+{
+ g_free(w);
+}
+
+static coroutine_fn void qcow2_rw_worker(void *opaque)
+{
+ Qcow2Worker *w = opaque;
+ Qcow2RWState *rws = w->rws;
+
+ rws->online_workers++;
+
+ while (!rws->finalize) {
+ int ret = rws->do_work_func(rws->bs, rws->qiov, &w->task);
+ if (ret < 0 && rws->ret == 0) {
+ rws->ret = ret;
+ }
+
+ if (rws->waiting_all || rws->ret < 0) {
+ break;
+ }
+
+ w->busy = false;
+ QSIMPLEQ_REMOVE(&rws->busy_workers, w, Qcow2Worker, entry);
+ QSIMPLEQ_INSERT_TAIL(&rws->free_workers, w, entry);
+ if (rws->waiting_one) {
+ rws->waiting_one = false;
+ /* we must unset it here, to prevent queuing rws->co in
several
+ * workers (it may happen if other worker already waits
us on mutex,
+ * so it will be entered after our yield and before
rws->co enter)
+ *
+ * TODO: rethink this comment, as here (and in other
places in the
+ * file) we moved from qemu_coroutine_add_next to
aio_co_wake.
+ */
+ aio_co_wake(rws->co);
+ }
+
+ qemu_coroutine_yield();
+ }
+
+ if (w->busy) {
+ w->busy = false;
+ QSIMPLEQ_REMOVE(&rws->busy_workers, w, Qcow2Worker, entry);
+ }
+ qcow2_free_worker(w);
+ rws->online_workers--;
+
+ if (rws->waiting_all && rws->online_workers == 0) {
+ aio_co_wake(rws->co);
+ }
+}
+
+static coroutine_fn void qcow2_rws_add_task(Qcow2RWState *rws,
+ uint64_t
file_cluster_offset,
+ uint64_t offset,
+ uint64_t bytes,
+ uint64_t bytes_done)
I'd propose just taking a const Qcow2WorkerTask * here. (Makes even
more sense if you make it a union.)
ok, I'll try this way
+{
+ Qcow2Worker *w;
+
+ assert(rws->co == qemu_coroutine_self());
+
+ if (bytes_done == 0 && bytes == rws->bytes) {
+ Qcow2WorkerTask task = {
+ .file_cluster_offset = file_cluster_offset,
+ .offset = offset,
+ .bytes = bytes,
+ .bytes_done = bytes_done
+ };
+ rws->ret = rws->do_work_func(rws->bs, rws->qiov, &task);
(If so, you'd just pass the pointer along here)
+ return;
+ }
I like this fast path, but I think it deserves a small comment. (That
is a fast path and bypasses the whole worker infrastructure.)
+
+ if (!QSIMPLEQ_EMPTY(&rws->free_workers)) {
+ w = QSIMPLEQ_FIRST(&rws->free_workers);
+ QSIMPLEQ_REMOVE_HEAD(&rws->free_workers, entry);
+ } else if (rws->online_workers < QCOW2_MAX_WORKERS) {
+ w = qcow2_new_worker(rws);
+ } else {
+ rws->waiting_one = true;
+ qemu_coroutine_yield();
+ assert(!rws->waiting_one); /* already unset by worker */
Sometimes I hate coroutines. OK. So, how does the yield ensure that
any worker is scheduled? Doesn't yield just give control to the parent?
hm. I don't follow. All workers are busy - we sure, because there no
free workers,
and we can't create one more, second condition isn't satisfied too.
So, we give control to the parent. And only worker can wake us up.