qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH v6 16/47] Return path: Source handling of return pat


From: Dr. David Alan Gilbert (git)
Subject: [Qemu-devel] [PATCH v6 16/47] Return path: Source handling of return path
Date: Tue, 14 Apr 2015 18:03:42 +0100

From: "Dr. David Alan Gilbert" <address@hidden>

Open a return path, and handle messages that are received upon it.

Signed-off-by: Dr. David Alan Gilbert <address@hidden>
---
 include/migration/migration.h |   8 ++
 migration/migration.c         | 177 +++++++++++++++++++++++++++++++++++++++++-
 trace-events                  |  12 +++
 3 files changed, 196 insertions(+), 1 deletion(-)

diff --git a/include/migration/migration.h b/include/migration/migration.h
index 6300ec1..0719d82 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -73,6 +73,14 @@ struct MigrationState
 
     int state;
     MigrationParams params;
+
+    /* State related to return path */
+    struct {
+        QEMUFile     *file;
+        QemuThread    rp_thread;
+        bool          error;
+    } rp_state;
+
     double mbps;
     int64_t total_time;
     int64_t downtime;
diff --git a/migration/migration.c b/migration/migration.c
index db9471d..88355e2 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -243,6 +243,23 @@ MigrationCapabilityStatusList 
*qmp_query_migrate_capabilities(Error **errp)
     return head;
 }
 
+/*
+ * Return true if we're already in the middle of a migration
+ * (i.e. any of the active or setup states)
+ */
+static bool migration_already_active(MigrationState *ms)
+{
+    switch (ms->state) {
+    case MIGRATION_STATUS_ACTIVE:
+    case MIGRATION_STATUS_SETUP:
+        return true;
+
+    default:
+        return false;
+
+    }
+}
+
 static void get_xbzrle_cache_stats(MigrationInfo *info)
 {
     if (migrate_use_xbzrle()) {
@@ -365,6 +382,21 @@ static void migrate_set_state(MigrationState *s, int 
old_state, int new_state)
     }
 }
 
+static void migrate_fd_cleanup_src_rp(MigrationState *ms)
+{
+    QEMUFile *rp = ms->rp_state.file;
+
+    /*
+     * When stuff goes wrong (e.g. failing destination) on the rp, it can get
+     * cleaned up from a few threads; make sure not to do it twice in parallel
+     */
+    rp = atomic_cmpxchg(&ms->rp_state.file, rp, NULL);
+    if (rp) {
+        trace_migrate_fd_cleanup_src_rp();
+        qemu_fclose(rp);
+    }
+}
+
 static void migrate_fd_cleanup(void *opaque)
 {
     MigrationState *s = opaque;
@@ -372,6 +404,8 @@ static void migrate_fd_cleanup(void *opaque)
     qemu_bh_delete(s->cleanup_bh);
     s->cleanup_bh = NULL;
 
+    migrate_fd_cleanup_src_rp(s);
+
     if (s->file) {
         trace_migrate_fd_cleanup();
         qemu_mutex_unlock_iothread();
@@ -410,6 +444,11 @@ static void migrate_fd_cancel(MigrationState *s)
     QEMUFile *f = migrate_get_current()->file;
     trace_migrate_fd_cancel();
 
+    if (s->rp_state.file) {
+        /* shutdown the rp socket, so causing the rp thread to shutdown */
+        qemu_file_shutdown(s->rp_state.file);
+    }
+
     do {
         old_state = s->state;
         if (old_state != MIGRATION_STATUS_SETUP &&
@@ -678,8 +717,144 @@ int64_t migrate_xbzrle_cache_size(void)
     return s->xbzrle_cache_size;
 }
 
-/* migration thread support */
+/*
+ * Something bad happened to the RP stream, mark an error
+ * The caller shall print something to indicate why
+ */
+static void source_return_path_bad(MigrationState *s)
+{
+    s->rp_state.error = true;
+    migrate_fd_cleanup_src_rp(s);
+}
+
+/*
+ * Handles messages sent on the return path towards the source VM
+ *
+ */
+static void *source_return_path_thread(void *opaque)
+{
+    MigrationState *ms = opaque;
+    QEMUFile *rp = ms->rp_state.file;
+    uint16_t expected_len, header_len, header_type;
+    const int max_len = 512;
+    uint8_t buf[max_len];
+    uint32_t tmp32;
+    int res;
+
+    trace_source_return_path_thread_entry();
+    while (rp && !qemu_file_get_error(rp) &&
+        migration_already_active(ms)) {
+        trace_source_return_path_thread_loop_top();
+        header_type = qemu_get_be16(rp);
+        header_len = qemu_get_be16(rp);
+
+        switch (header_type) {
+        case MIG_RP_MSG_SHUT:
+        case MIG_RP_MSG_PONG:
+            expected_len = 4;
+            break;
+
+        default:
+            error_report("RP: Received invalid message 0x%04x length 0x%04x",
+                    header_type, header_len);
+            source_return_path_bad(ms);
+            goto out;
+        }
+
+        if (header_len > expected_len) {
+            error_report("RP: Received message 0x%04x with"
+                    "incorrect length %d expecting %d",
+                    header_type, header_len,
+                    expected_len);
+            source_return_path_bad(ms);
+            goto out;
+        }
+
+        /* We know we've got a valid header by this point */
+        res = qemu_get_buffer(rp, buf, header_len);
+        if (res != header_len) {
+            trace_source_return_path_thread_failed_read_cmd_data();
+            source_return_path_bad(ms);
+            goto out;
+        }
+
+        /* OK, we have the message and the data */
+        switch (header_type) {
+        case MIG_RP_MSG_SHUT:
+            tmp32 = be32_to_cpup((uint32_t *)buf);
+            trace_source_return_path_thread_shut(tmp32);
+            if (tmp32) {
+                error_report("RP: Sibling indicated error %d", tmp32);
+                source_return_path_bad(ms);
+            }
+            /*
+             * We'll let the main thread deal with closing the RP
+             * we could do a shutdown(2) on it, but we're the only user
+             * anyway, so there's nothing gained.
+             */
+            goto out;
+
+        case MIG_RP_MSG_PONG:
+            tmp32 = be32_to_cpup((uint32_t *)buf);
+            trace_source_return_path_thread_pong(tmp32);
+            break;
+
+        default:
+            break;
+        }
+    }
+    if (rp && qemu_file_get_error(rp)) {
+        trace_source_return_path_thread_bad_end();
+        source_return_path_bad(ms);
+    }
+
+    trace_source_return_path_thread_end();
+out:
+    return NULL;
+}
+
+__attribute__ (( unused )) /* Until later in patch series */
+static int open_return_path_on_source(MigrationState *ms)
+{
+
+    ms->rp_state.file = qemu_file_get_return_path(ms->file);
+    if (!ms->rp_state.file) {
+        return -1;
+    }
+
+    trace_open_return_path_on_source();
+    qemu_thread_create(&ms->rp_state.rp_thread, "return path",
+                       source_return_path_thread, ms, QEMU_THREAD_JOINABLE);
 
+    trace_open_return_path_on_source_continue();
+
+    return 0;
+}
+
+__attribute__ (( unused )) /* Until later in patch series */
+/* Returns 0 if the RP was ok, otherwise there was an error on the RP */
+static int await_return_path_close_on_source(MigrationState *ms)
+{
+    /*
+     * If this is a normal exit then the destination will send a SHUT and the
+     * rp_thread will exit, however if there's an error we need to cause
+     * it to exit, which we can do by a shutdown.
+     * (canceling must also shutdown to stop us getting stuck here if
+     * the destination died at just the wrong place)
+     */
+    if (qemu_file_get_error(ms->file) && ms->rp_state.file) {
+        qemu_file_shutdown(ms->rp_state.file);
+    }
+    trace_await_return_path_close_on_source_joining();
+    qemu_thread_join(&ms->rp_state.rp_thread);
+    trace_await_return_path_close_on_source_close();
+    return ms->rp_state.error;
+}
+
+/*
+ * Master migration thread on the source VM.
+ * It drives the migration and pumps the data down the outgoing channel.
+ */
 static void *migration_thread(void *opaque)
 {
     MigrationState *s = opaque;
diff --git a/trace-events b/trace-events
index 9f0a071..eb40e61 100644
--- a/trace-events
+++ b/trace-events
@@ -1378,12 +1378,24 @@ flic_no_device_api(int err) "flic: no Device Contral 
API support %d"
 flic_reset_failed(int err) "flic: reset failed %d"
 
 # migration.c
+await_return_path_close_on_source_close(void) ""
+await_return_path_close_on_source_joining(void) ""
 migrate_set_state(int new_state) "new state %d"
 migrate_fd_cleanup(void) ""
+migrate_fd_cleanup_src_rp(void) ""
 migrate_fd_error(void) ""
 migrate_fd_cancel(void) ""
 migrate_pending(uint64_t size, uint64_t max) "pending size %" PRIu64 " max %" 
PRIu64
 migrate_send_rp_message(int msg_type, uint16_t len) "%d: len %d"
+open_return_path_on_source(void) ""
+open_return_path_on_source_continue(void) ""
+source_return_path_thread_bad_end(void) ""
+source_return_path_thread_end(void) ""
+source_return_path_thread_entry(void) ""
+source_return_path_thread_failed_read_cmd_data(void) ""
+source_return_path_thread_loop_top(void) ""
+source_return_path_thread_pong(uint32_t val) "%x"
+source_return_path_thread_shut(uint32_t val) "%x"
 migrate_transferred(uint64_t tranferred, uint64_t time_spent, double 
bandwidth, uint64_t size) "transferred %" PRIu64 " time_spent %" PRIu64 " 
bandwidth %g max_size %" PRId64
 
 # migration/rdma.c
-- 
2.1.0




reply via email to

[Prev in Thread] Current Thread [Next in Thread]