[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [PATCH 44/47] mirror: switch mirror_iteration to AIO
From: |
Paolo Bonzini |
Subject: |
[Qemu-devel] [PATCH 44/47] mirror: switch mirror_iteration to AIO |
Date: |
Tue, 24 Jul 2012 13:04:22 +0200 |
There is really no change in the behavior of the job here, since there
is still a maximum of one in-flight I/O operation between the source and
the target. However, this patch already introduces moves the copy logic
from mirror_iteration to AIO callbacks; it also adds the logic to count
in-flight operations, and only complete the job after they have finished.
Some care is required in the error and cancellation cases, in order
to avoid access to dangling pointers (and consequent corruption).
Signed-off-by: Paolo Bonzini <address@hidden>
---
block/mirror.c | 161 ++++++++++++++++++++++++++++++++++++++++++--------------
trace-events | 2 +
2 files changed, 123 insertions(+), 40 deletions(-)
diff --git a/block/mirror.c b/block/mirror.c
index 81a600b..971c923 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -17,7 +17,7 @@
#include "qemu/ratelimit.h"
#include "bitmap.h"
-#define SLICE_TIME 100000000ULL /* ns */
+#define SLICE_TIME 100000000ULL /* ns */
typedef struct MirrorBlockJob {
BlockJob common;
@@ -33,17 +33,78 @@ typedef struct MirrorBlockJob {
unsigned long *cow_bitmap;
HBitmapIter hbi;
uint8_t *buf;
+
+ int in_flight;
+ int ret;
} MirrorBlockJob;
-static int coroutine_fn mirror_iteration(MirrorBlockJob *s,
- BlockErrorAction *p_action)
+typedef struct MirrorOp {
+ MirrorBlockJob *s;
+ QEMUIOVector qiov;
+ struct iovec iov;
+ int64_t sector_num;
+ int nb_sectors;
+} MirrorOp;
+
+static void mirror_iteration_done(MirrorOp *op)
{
+ MirrorBlockJob *s = op->s;
+
+ s->in_flight--;
+ trace_mirror_iteration_done(s, op->sector_num, op->nb_sectors);
+ g_slice_free(MirrorOp, op);
+ qemu_coroutine_enter(s->common.co, NULL);
+}
+
+static void mirror_write_complete(void *opaque, int ret)
+{
+ MirrorOp *op = opaque;
+ MirrorBlockJob *s = op->s;
BlockDriverState *source = s->common.bs;
BlockDriverState *target = s->target;
- QEMUIOVector qiov;
- int ret, nb_sectors, nb_sectors_chunk;
+ BlockErrorAction action;
+
+ if (ret < 0) {
+ bdrv_set_dirty(source, op->sector_num, op->nb_sectors);
+ action = block_job_error_action(&s->common, target, s->on_target_error,
+ false, -ret);
+ s->synced = false;
+ if (action == BDRV_ACTION_REPORT && s->ret >= 0) {
+ s->ret = ret;
+ }
+ }
+ mirror_iteration_done(op);
+}
+
+static void mirror_read_complete(void *opaque, int ret)
+{
+ MirrorOp *op = opaque;
+ MirrorBlockJob *s = op->s;
+ BlockDriverState *source = s->common.bs;
+ BlockDriverState *target = s->target;
+ BlockErrorAction action;
+ if (ret < 0) {
+ bdrv_set_dirty(source, op->sector_num, op->nb_sectors);
+ action = block_job_error_action(&s->common, source, s->on_source_error,
+ true, -ret);
+ s->synced = false;
+ if (action == BDRV_ACTION_REPORT && s->ret >= 0) {
+ s->ret = ret;
+ }
+
+ mirror_iteration_done(op);
+ return;
+ }
+ bdrv_aio_writev(target, op->sector_num, &op->qiov, op->nb_sectors,
+ mirror_write_complete, op);
+}
+
+static void coroutine_fn mirror_iteration(MirrorBlockJob *s)
+{
+ BlockDriverState *source = s->common.bs;
+ int nb_sectors, nb_sectors_chunk;
int64_t end, sector_num, cluster_num;
- struct iovec iov;
+ MirrorOp *op;
s->sector_num = hbitmap_iter_next(&s->hbi);
if (s->sector_num < 0) {
@@ -74,34 +135,30 @@ static int coroutine_fn mirror_iteration(MirrorBlockJob *s,
end = s->common.len >> BDRV_SECTOR_BITS;
nb_sectors = MIN(nb_sectors, end - sector_num);
+
+ /* Allocate a MirrorOp that is used as an AIO callback. */
+ op = g_slice_new(MirrorOp);
+ op->s = s;
+ op->iov.iov_base = s->buf;
+ op->iov.iov_len = nb_sectors * 512;
+ op->sector_num = sector_num;
+ op->nb_sectors = nb_sectors;
+ qemu_iovec_init_external(&op->qiov, &op->iov, 1);
+
bdrv_reset_dirty(source, sector_num, nb_sectors);
/* Copy the dirty cluster. */
- iov.iov_base = s->buf;
- iov.iov_len = nb_sectors * 512;
- qemu_iovec_init_external(&qiov, &iov, 1);
-
+ s->in_flight++;
trace_mirror_one_iteration(s, sector_num, nb_sectors);
- ret = bdrv_co_readv(source, sector_num, nb_sectors, &qiov);
- if (ret < 0) {
- *p_action = block_job_error_action(&s->common, source,
- s->on_source_error, true, -ret);
- s->synced = false;
- goto fail;
- }
- ret = bdrv_co_writev(target, sector_num, nb_sectors, &qiov);
- if (ret < 0) {
- *p_action = block_job_error_action(&s->common, target,
- s->on_target_error, false, -ret);
- s->synced = false;
- goto fail;
- }
- return 0;
+ bdrv_aio_readv(source, sector_num, &op->qiov, nb_sectors,
+ mirror_read_complete, op);
+}
-fail:
- /* Try again later. */
- bdrv_set_dirty(source, sector_num, nb_sectors);
- return ret;
+static void mirror_drain(MirrorBlockJob *s)
+{
+ while (s->in_flight > 0) {
+ qemu_coroutine_yield();
+ }
}
static void coroutine_fn mirror_run(void *opaque)
@@ -109,6 +166,7 @@ static void coroutine_fn mirror_run(void *opaque)
MirrorBlockJob *s = opaque;
BlockDriverState *bs = s->common.bs;
int64_t sector_num, end, nb_sectors_chunk, length;
+ uint64_t last_pause_ns;
BlockDriverInfo bdi;
char backing_filename[1024];
int ret = 0;
@@ -168,22 +226,37 @@ static void coroutine_fn mirror_run(void *opaque)
}
bdrv_dirty_iter_init(bs, &s->hbi);
+ last_pause_ns = qemu_get_clock_ns(rt_clock);
for (;;) {
uint64_t delay_ns;
int64_t cnt;
bool should_complete;
+ if (s->ret < 0) {
+ ret = s->ret;
+ break;
+ }
+
cnt = bdrv_get_dirty_count(bs);
- if (cnt != 0) {
- BlockErrorAction action = BDRV_ACTION_REPORT;
- ret = mirror_iteration(s, &action);
- if (ret < 0 && action == BDRV_ACTION_REPORT) {
- break;
+
+ /* Note that even when no rate limit is applied we need to yield
+ * periodically with no pending I/O so that qemu_aio_flush() returns.
+ * We do so every SLICE_TIME milliseconds, or when there is an error,
+ * or when the source is clean, whichever comes first.
+ */
+ if (qemu_get_clock_ns(rt_clock) - last_pause_ns < SLICE_TIME &&
+ s->common.iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
+ if (s->in_flight > 0) {
+ trace_mirror_yield(s, s->in_flight, cnt);
+ qemu_coroutine_yield();
+ continue;
+ } else if (cnt != 0) {
+ mirror_iteration(s);
+ continue;
}
- cnt = bdrv_get_dirty_count(bs);
}
- if (cnt != 0) {
+ if (s->in_flight > 0 || cnt != 0) {
should_complete = false;
} else {
trace_mirror_before_flush(s);
@@ -228,15 +301,12 @@ static void coroutine_fn mirror_run(void *opaque)
delay_ns = 0;
}
- /* Note that even when no rate limit is applied we need to yield
- * with no pending I/O here so that qemu_aio_flush() returns.
- */
block_job_sleep_ns(&s->common, rt_clock, delay_ns);
if (block_job_is_cancelled(&s->common)) {
break;
}
} else if (!should_complete) {
- delay_ns = (cnt == 0 ? SLICE_TIME : 0);
+ delay_ns = (s->in_flight == 0 && cnt == 0 ? SLICE_TIME : 0);
block_job_sleep_ns(&s->common, rt_clock, delay_ns);
} else if (cnt == 0) {
/* The two disks are in sync. Exit and report successful
@@ -246,9 +316,20 @@ static void coroutine_fn mirror_run(void *opaque)
s->common.cancelled = false;
break;
}
+ last_pause_ns = qemu_get_clock_ns(rt_clock);
+ }
+
+ if (s->in_flight > 0) {
+ /* We get here only if something went wrong. Either the job failed,
+ * or it was cancelled prematurely so that we do not guarantee that
+ * the target is a copy of the source.
+ */
+ assert(ret < 0 || (!s->synced && block_job_is_cancelled(&s->common)));
+ mirror_drain(s);
}
immediate_exit:
+ assert(s->in_flight == 0);
g_free(s->buf);
g_free(s->cow_bitmap);
bdrv_set_dirty_tracking(bs, 0);
diff --git a/trace-events b/trace-events
index 6b504d8..fe20bd7 100644
--- a/trace-events
+++ b/trace-events
@@ -83,6 +83,8 @@ mirror_before_drain(void *s, int64_t cnt) "s %p dirty count
%"PRId64
mirror_before_sleep(void *s, int64_t cnt, int synced) "s %p dirty count
%"PRId64" synced %d"
mirror_one_iteration(void *s, int64_t sector_num, int nb_sectors) "s %p
sector_num %"PRId64" nb_sectors %d"
mirror_cow(void *s, int64_t sector_num) "s %p sector_num %"PRId64
+mirror_iteration_done(void *s, int64_t sector_num, int nb_sectors) "s %p
sector_num %"PRId64" nb_sectors %d"
+mirror_yield(void *s, int64_t cnt, int in_flight) "s %p dirty count %"PRId64"
in_flight %d"
# blockdev.c
qmp_block_job_cancel(void *job) "job %p"
--
1.7.10.4
- [Qemu-devel] [PATCH 21/47] block: add bdrv_ensure_backing_file, (continued)
- [Qemu-devel] [PATCH 21/47] block: add bdrv_ensure_backing_file, Paolo Bonzini, 2012/07/24
- [Qemu-devel] [PATCH 35/47] qemu-iotests: add testcases for mirroring on-source-error/on-target-error, Paolo Bonzini, 2012/07/24
- [Qemu-devel] [PATCH 24/47] block: introduce new dirty bitmap functionality, Paolo Bonzini, 2012/07/24
- [Qemu-devel] [PATCH 45/47] mirror: add buf-size argument to drive-mirror, Paolo Bonzini, 2012/07/24
- [Qemu-devel] [PATCH 25/47] block: add block-job-complete, Paolo Bonzini, 2012/07/24
- [Qemu-devel] [PATCH 43/47] mirror: allow customizing the granularity, Paolo Bonzini, 2012/07/24
[Qemu-devel] [PATCH 44/47] mirror: switch mirror_iteration to AIO,
Paolo Bonzini <=
[Qemu-devel] [PATCH 38/47] block: implement dirty bitmap using HBitmap, Paolo Bonzini, 2012/07/24
Re: [Qemu-devel] [PATCH 00/47] Block job improvements for 1.2, Eric Blake, 2012/07/28