[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [PATCH v5 12/45] Return path: Source handling of return pat
From: |
Dr. David Alan Gilbert (git) |
Subject: |
[Qemu-devel] [PATCH v5 12/45] Return path: Source handling of return path |
Date: |
Wed, 25 Feb 2015 16:51:35 +0000 |
From: "Dr. David Alan Gilbert" <address@hidden>
Open a return path, and handle messages that are received upon it.
Signed-off-by: Dr. David Alan Gilbert <address@hidden>
---
include/migration/migration.h | 8 ++
migration/migration.c | 178 +++++++++++++++++++++++++++++++++++++++++-
trace-events | 13 +++
3 files changed, 198 insertions(+), 1 deletion(-)
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 6775747..5242ead 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -73,6 +73,14 @@ struct MigrationState
int state;
MigrationParams params;
+
+ /* State related to return path */
+ struct {
+ QEMUFile *file;
+ QemuThread rp_thread;
+ bool error;
+ } rp_state;
+
double mbps;
int64_t total_time;
int64_t downtime;
diff --git a/migration/migration.c b/migration/migration.c
index 80d234c..34cd4fe 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -237,6 +237,23 @@ MigrationCapabilityStatusList
*qmp_query_migrate_capabilities(Error **errp)
return head;
}
+/*
+ * Return true if we're already in the middle of a migration
+ * (i.e. any of the active or setup states)
+ */
+static bool migration_already_active(MigrationState *ms)
+{
+ switch (ms->state) {
+ case MIG_STATE_ACTIVE:
+ case MIG_STATE_SETUP:
+ return true;
+
+ default:
+ return false;
+
+ }
+}
+
static void get_xbzrle_cache_stats(MigrationInfo *info)
{
if (migrate_use_xbzrle()) {
@@ -362,6 +379,21 @@ static void migrate_set_state(MigrationState *s, int
old_state, int new_state)
}
}
+static void migrate_fd_cleanup_src_rp(MigrationState *ms)
+{
+ QEMUFile *rp = ms->rp_state.file;
+
+ /*
+ * When stuff goes wrong (e.g. failing destination) on the rp, it can get
+ * cleaned up from a few threads; make sure not to do it twice in parallel
+ */
+ rp = atomic_cmpxchg(&ms->rp_state.file, rp, NULL);
+ if (rp) {
+ trace_migrate_fd_cleanup_src_rp();
+ qemu_fclose(rp);
+ }
+}
+
static void migrate_fd_cleanup(void *opaque)
{
MigrationState *s = opaque;
@@ -369,6 +401,8 @@ static void migrate_fd_cleanup(void *opaque)
qemu_bh_delete(s->cleanup_bh);
s->cleanup_bh = NULL;
+ migrate_fd_cleanup_src_rp(s);
+
if (s->file) {
trace_migrate_fd_cleanup();
qemu_mutex_unlock_iothread();
@@ -406,6 +440,11 @@ static void migrate_fd_cancel(MigrationState *s)
QEMUFile *f = migrate_get_current()->file;
trace_migrate_fd_cancel();
+ if (s->rp_state.file) {
+ /* shutdown the rp socket, so causing the rp thread to shutdown */
+ qemu_file_shutdown(s->rp_state.file);
+ }
+
do {
old_state = s->state;
if (old_state != MIG_STATE_SETUP && old_state != MIG_STATE_ACTIVE) {
@@ -658,8 +697,145 @@ int64_t migrate_xbzrle_cache_size(void)
return s->xbzrle_cache_size;
}
-/* migration thread support */
+/*
+ * Something bad happened to the RP stream, mark an error
+ * The caller shall print something to indicate why
+ */
+static void source_return_path_bad(MigrationState *s)
+{
+ s->rp_state.error = true;
+ migrate_fd_cleanup_src_rp(s);
+}
+
+/*
+ * Handles messages sent on the return path towards the source VM
+ *
+ */
+static void *source_return_path_thread(void *opaque)
+{
+ MigrationState *ms = opaque;
+ QEMUFile *rp = ms->rp_state.file;
+ uint16_t expected_len, header_len, header_com;
+ const int max_len = 512;
+ uint8_t buf[max_len];
+ uint32_t tmp32;
+ int res;
+
+ trace_source_return_path_thread_entry();
+ while (rp && !qemu_file_get_error(rp) &&
+ migration_already_active(ms)) {
+ trace_source_return_path_thread_loop_top();
+ header_com = qemu_get_be16(rp);
+ header_len = qemu_get_be16(rp);
+
+ switch (header_com) {
+ case MIG_RP_CMD_SHUT:
+ case MIG_RP_CMD_PONG:
+ expected_len = 4;
+ break;
+
+ default:
+ error_report("RP: Received invalid cmd 0x%04x length 0x%04x",
+ header_com, header_len);
+ source_return_path_bad(ms);
+ goto out;
+ }
+ if (header_len > expected_len) {
+ error_report("RP: Received command 0x%04x with"
+ "incorrect length %d expecting %d",
+ header_com, header_len,
+ expected_len);
+ source_return_path_bad(ms);
+ goto out;
+ }
+
+ /* We know we've got a valid header by this point */
+ res = qemu_get_buffer(rp, buf, header_len);
+ if (res != header_len) {
+ trace_source_return_path_thread_failed_read_cmd_data();
+ source_return_path_bad(ms);
+ goto out;
+ }
+
+ /* OK, we have the command and the data */
+ switch (header_com) {
+ case MIG_RP_CMD_SHUT:
+ tmp32 = be32_to_cpup((uint32_t *)buf);
+ trace_source_return_path_thread_shut(tmp32);
+ if (tmp32) {
+ error_report("RP: Sibling indicated error %d", tmp32);
+ source_return_path_bad(ms);
+ }
+ /*
+ * We'll let the main thread deal with closing the RP
+ * we could do a shutdown(2) on it, but we're the only user
+ * anyway, so there's nothing gained.
+ */
+ goto out;
+
+ case MIG_RP_CMD_PONG:
+ tmp32 = be32_to_cpup((uint32_t *)buf);
+ trace_source_return_path_thread_pong(tmp32);
+ break;
+
+ default:
+ /* This shouldn't happen because we should catch this above */
+ trace_source_return_path_bad_header_com();
+ }
+ /* Latest command processed, now leave a gap for the next one */
+ header_com = MIG_RP_CMD_INVALID;
+ }
+ if (rp && qemu_file_get_error(rp)) {
+ trace_source_return_path_thread_bad_end();
+ source_return_path_bad(ms);
+ }
+
+ trace_source_return_path_thread_end();
+out:
+ return NULL;
+}
+
+__attribute__ (( unused )) /* Until later in patch series */
+static int open_outgoing_return_path(MigrationState *ms)
+{
+
+ ms->rp_state.file = qemu_file_get_return_path(ms->file);
+ if (!ms->rp_state.file) {
+ return -1;
+ }
+
+ trace_open_outgoing_return_path();
+ qemu_thread_create(&ms->rp_state.rp_thread, "return path",
+ source_return_path_thread, ms, QEMU_THREAD_JOINABLE);
+
+ trace_open_outgoing_return_path_continue();
+
+ return 0;
+}
+
+__attribute__ (( unused )) /* Until later in patch series */
+static void await_outgoing_return_path_close(MigrationState *ms)
+{
+ /*
+ * If this is a normal exit then the destination will send a SHUT and the
+ * rp_thread will exit, however if there's an error we need to cause
+ * it to exit, which we can do by a shutdown.
+ * (canceling must also shutdown to stop us getting stuck here if
+ * the destination died at just the wrong place)
+ */
+ if (qemu_file_get_error(ms->file) && ms->rp_state.file) {
+ qemu_file_shutdown(ms->rp_state.file);
+ }
+ trace_await_outgoing_return_path_joining();
+ qemu_thread_join(&ms->rp_state.rp_thread);
+ trace_await_outgoing_return_path_close();
+}
+
+/*
+ * Master migration thread on the source VM.
+ * It drives the migration and pumps the data down the outgoing channel.
+ */
static void *migration_thread(void *opaque)
{
MigrationState *s = opaque;
diff --git a/trace-events b/trace-events
index 4f3eff8..1951b25 100644
--- a/trace-events
+++ b/trace-events
@@ -1374,12 +1374,25 @@ flic_no_device_api(int err) "flic: no Device Contral
API support %d"
flic_reset_failed(int err) "flic: reset failed %d"
# migration.c
+await_outgoing_return_path_close(void) ""
+await_outgoing_return_path_joining(void) ""
migrate_set_state(int new_state) "new state %d"
migrate_fd_cleanup(void) ""
+migrate_fd_cleanup_src_rp(void) ""
migrate_fd_error(void) ""
migrate_fd_cancel(void) ""
migrate_pending(uint64_t size, uint64_t max) "pending size %" PRIu64 " max %"
PRIu64
migrate_send_rp_message(int cmd, uint16_t len) "cmd=%d, len=%d"
+open_outgoing_return_path(void) ""
+open_outgoing_return_path_continue(void) ""
+source_return_path_thread_bad_end(void) ""
+source_return_path_bad_header_com(void) ""
+source_return_path_thread_end(void) ""
+source_return_path_thread_entry(void) ""
+source_return_path_thread_failed_read_cmd_data(void) ""
+source_return_path_thread_loop_top(void) ""
+source_return_path_thread_pong(uint32_t val) "%x"
+source_return_path_thread_shut(uint32_t val) "%x"
migrate_transferred(uint64_t tranferred, uint64_t time_spent, double
bandwidth, uint64_t size) "transferred %" PRIu64 " time_spent %" PRIu64 "
bandwidth %g max_size %" PRId64
# migration/rdma.c
--
2.1.0
- [Qemu-devel] [PATCH v5 05/45] Create MigrationIncomingState, (continued)
- [Qemu-devel] [PATCH v5 05/45] Create MigrationIncomingState, Dr. David Alan Gilbert (git), 2015/02/25
- [Qemu-devel] [PATCH v5 02/45] Split header writing out of qemu_save_state_begin, Dr. David Alan Gilbert (git), 2015/02/25
- [Qemu-devel] [PATCH v5 04/45] Add qemu_get_counted_string to read a string prefixed by a count byte, Dr. David Alan Gilbert (git), 2015/02/25
- [Qemu-devel] [PATCH v5 06/45] Provide runtime Target page information, Dr. David Alan Gilbert (git), 2015/02/25
- [Qemu-devel] [PATCH v5 09/45] Migration commands, Dr. David Alan Gilbert (git), 2015/02/25
- [Qemu-devel] [PATCH v5 07/45] Return path: Open a return path on QEMUFile for sockets, Dr. David Alan Gilbert (git), 2015/02/25
- [Qemu-devel] [PATCH v5 10/45] Return path: Control commands, Dr. David Alan Gilbert (git), 2015/02/25
- [Qemu-devel] [PATCH v5 08/45] Return path: socket_writev_buffer: Block even on non-blocking fd's, Dr. David Alan Gilbert (git), 2015/02/25
- [Qemu-devel] [PATCH v5 13/45] ram_debug_dump_bitmap: Dump a migration bitmap as text, Dr. David Alan Gilbert (git), 2015/02/25
- [Qemu-devel] [PATCH v5 11/45] Return path: Send responses from destination to source, Dr. David Alan Gilbert (git), 2015/02/25
- [Qemu-devel] [PATCH v5 12/45] Return path: Source handling of return path,
Dr. David Alan Gilbert (git) <=
- [Qemu-devel] [PATCH v5 14/45] Move loadvm_handlers into MigrationIncomingState, Dr. David Alan Gilbert (git), 2015/02/25
- [Qemu-devel] [PATCH v5 15/45] Rework loadvm path for subloops, Dr. David Alan Gilbert (git), 2015/02/25
- [Qemu-devel] [PATCH v5 19/45] migrate_init: Call from savevm, Dr. David Alan Gilbert (git), 2015/02/25
- [Qemu-devel] [PATCH v5 18/45] MIG_CMD_PACKAGED: Send a packaged chunk of migration stream, Dr. David Alan Gilbert (git), 2015/02/25
- [Qemu-devel] [PATCH v5 16/45] Add migration-capability boolean for postcopy-ram., Dr. David Alan Gilbert (git), 2015/02/25
- [Qemu-devel] [PATCH v5 17/45] Add wrappers and handlers for sending/receiving the postcopy-ram migration messages., Dr. David Alan Gilbert (git), 2015/02/25
- [Qemu-devel] [PATCH v5 21/45] Add Linux userfaultfd header, Dr. David Alan Gilbert (git), 2015/02/25
- [Qemu-devel] [PATCH v5 20/45] Modify savevm handlers for postcopy, Dr. David Alan Gilbert (git), 2015/02/25
- [Qemu-devel] [PATCH v5 24/45] MIG_STATE_POSTCOPY_ACTIVE: Add new migration state, Dr. David Alan Gilbert (git), 2015/02/25
- [Qemu-devel] [PATCH v5 22/45] postcopy: OS support test, Dr. David Alan Gilbert (git), 2015/02/25