qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [RFC PATCH v4 28/28] COLO: Add block replication into colo


From: zhanghailiang
Subject: [Qemu-devel] [RFC PATCH v4 28/28] COLO: Add block replication into colo process
Date: Thu, 26 Mar 2015 13:29:34 +0800

From: Wen Congyang <address@hidden>

Make sure master start block replication after slave's block replication started

Signed-off-by: zhanghailiang <address@hidden>
Signed-off-by: Wen Congyang <address@hidden>
Signed-off-by: Yang Hongyang <address@hidden>
---
 migration/colo.c | 120 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 118 insertions(+), 2 deletions(-)

diff --git a/migration/colo.c b/migration/colo.c
index 894bf5f..d70f80b 100644
--- a/migration/colo.c
+++ b/migration/colo.c
@@ -18,6 +18,8 @@
 #include "migration/migration-failover.h"
 #include "net/colo-nic.h"
 #include "qmp-commands.h"
+#include "block/block.h"
+#include "sysemu/block-backend.h"
 
 #define DEBUG_COLO 0
 
@@ -110,6 +112,68 @@ static bool colo_runstate_is_stopped(void)
     return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
 }
 
+static void blk_start_replication(bool primary, Error **errp)
+{
+    int mode = primary ? COLO_MODE_PRIMARY : COLO_MODE_SECONDARY;
+    BlockBackend *blk, *temp;
+    Error *local_err = NULL;
+
+    for (blk = blk_next(NULL); blk; blk = blk_next(blk)) {
+        if (blk_is_read_only(blk) || !blk_is_inserted(blk)) {
+            continue;
+        }
+
+        bdrv_start_replication(blk_bs(blk), mode, &local_err);
+        if (local_err) {
+            error_propagate(errp, local_err);
+            goto fail;
+        }
+    }
+
+    return;
+
+fail:
+    for (temp = blk_next(NULL); temp != blk; temp = blk_next(temp)) {
+        bdrv_stop_replication(blk_bs(temp), NULL);
+    }
+}
+
+static void blk_do_checkpoint(Error **errp)
+{
+    BlockBackend *blk;
+    Error *local_err = NULL;
+
+    for (blk = blk_next(NULL); blk; blk = blk_next(blk)) {
+        if (blk_is_read_only(blk) || !blk_is_inserted(blk)) {
+            continue;
+        }
+
+        bdrv_do_checkpoint(blk_bs(blk), &local_err);
+        if (local_err) {
+            error_propagate(errp, local_err);
+            return;
+        }
+    }
+}
+
+static void blk_stop_replication(Error **errp)
+{
+    BlockBackend *blk;
+    Error *local_err = NULL;
+
+    for (blk = blk_next(NULL); blk; blk = blk_next(blk)) {
+        if (blk_is_read_only(blk) || !blk_is_inserted(blk)) {
+            continue;
+        }
+
+        bdrv_stop_replication(blk_bs(blk), &local_err);
+        if (local_err) {
+            error_propagate(errp, local_err);
+            return;
+        }
+    }
+}
+
 /*
  * there are two way to entry this function
  * 1. From colo checkpoint incoming thread, in this case
@@ -120,6 +184,8 @@ static bool colo_runstate_is_stopped(void)
  */
 static void slave_do_failover(void)
 {
+    Error *local_err = NULL;
+
     /* Wait for incoming thread loading vmstate */
     while (vmstate_loading) {
         ;
@@ -129,6 +195,10 @@ static void slave_do_failover(void)
         error_report("colo proxy failed to do failover");
     }
     colo_proxy_destroy(COLO_SECONDARY_MODE);
+    blk_stop_replication(&local_err);
+    if (local_err) {
+        error_report_err(local_err);
+    }
 
     colo = NULL;
 
@@ -147,6 +217,7 @@ static void slave_do_failover(void)
 static void master_do_failover(void)
 {
     MigrationState *s = migrate_get_current();
+    Error *local_err = NULL;
 
     if (!colo_runstate_is_stopped()) {
         vm_stop_force_state(RUN_STATE_COLO);
@@ -158,6 +229,11 @@ static void master_do_failover(void)
         migrate_set_state(s, MIGRATION_STATUS_COLO, 
MIGRATION_STATUS_COMPLETED);
     }
 
+    blk_stop_replication(&local_err);
+    if (local_err) {
+        error_report_err(local_err);
+    }
+
     vm_start();
 }
 
@@ -231,6 +307,7 @@ static int colo_do_checkpoint_transaction(MigrationState 
*s, QEMUFile *control)
     int ret;
     size_t size;
     QEMUFile *trans = NULL;
+    Error *local_err = NULL;
 
     ret = colo_ctl_put(s->file, COLO_CHECKPOINT_NEW);
     if (ret < 0) {
@@ -282,6 +359,16 @@ static int colo_do_checkpoint_transaction(MigrationState 
*s, QEMUFile *control)
         goto out;
     }
 
+    /* we call this api although this may do nothing on primary side */
+    qemu_mutex_lock_iothread();
+    blk_do_checkpoint(&local_err);
+    qemu_mutex_unlock_iothread();
+    if (local_err) {
+        error_report_err(local_err);
+        ret = -1;
+        goto out;
+    }
+
     ret = colo_ctl_put(s->file, COLO_CHECKPOINT_SEND);
     if (ret < 0) {
         goto out;
@@ -339,6 +426,7 @@ static void *colo_thread(void *opaque)
     QEMUFile *colo_control = NULL;
     int64_t current_time, checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
     int ret;
+    Error *local_err = NULL;
 
     if (colo_proxy_init(COLO_PRIMARY_MODE) != 0) {
         error_report("Init colo proxy error");
@@ -370,6 +458,12 @@ static void *colo_thread(void *opaque)
         goto out;
     }
 
+    /* start block replication */
+    blk_start_replication(true, &local_err);
+    if (local_err) {
+        goto out;
+    }
+
     qemu_mutex_lock_iothread();
     vm_start();
     qemu_mutex_unlock_iothread();
@@ -414,7 +508,11 @@ do_checkpoint:
     }
 
 out:
-    error_report("colo: some error happens in colo_thread");
+    if (local_err) {
+        error_report_err(local_err);
+    } else {
+        error_report("colo: some error happens in colo_thread");
+    }
     qemu_mutex_lock_iothread();
     if (!failover_request_is_set()) {
         error_report("master takeover from checkpoint channel");
@@ -516,6 +614,7 @@ void *colo_process_incoming_checkpoints(void *opaque)
     QEMUFile *ctl = NULL, *fb = NULL;
     int ret;
     uint64_t total_size;
+    Error *local_err = NULL;
 
     qdev_hotplug = 0;
 
@@ -543,6 +642,13 @@ void *colo_process_incoming_checkpoints(void *opaque)
         goto out;
     }
 
+    /* start block replication */
+    blk_start_replication(false, &local_err);
+    if (local_err) {
+        goto out;
+    }
+    DPRINTF("finish block replication\n");
+
     ret = colo_ctl_put(ctl, COLO_READY);
     if (ret < 0) {
         goto out;
@@ -627,7 +733,13 @@ void *colo_process_incoming_checkpoints(void *opaque)
         }
         DPRINTF("Finish load all vm state to cache\n");
         vmstate_loading = false;
+
+        /* discard colo disk buffer */
+        blk_do_checkpoint(&local_err);
         qemu_mutex_unlock_iothread();
+        if (local_err) {
+            goto out;
+        }
 
         ret = colo_ctl_put(ctl, COLO_CHECKPOINT_LOADED);
         if (ret < 0) {
@@ -645,7 +757,11 @@ void *colo_process_incoming_checkpoints(void *opaque)
     }
 
 out:
-    error_report("Detect some error or get a failover request");
+    if (local_err) {
+        error_report_err(local_err);
+    } else {
+        error_report("Detect some error or get a failover request");
+    }
     /* determine whether we need to failover */
     if (!failover_request_is_set()) {
         /*
-- 
1.7.12.4





reply via email to

[Prev in Thread] Current Thread [Next in Thread]