qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH 4/6] migration: Make multifd threads wait until all


From: Juan Quintela
Subject: [Qemu-devel] [PATCH 4/6] migration: Make multifd threads wait until all have been created
Date: Wed, 14 Aug 2019 04:02:16 +0200

This makes it clear that no thread handles any incoming message until
all threads have been created.

Signed-off-by: Juan Quintela <address@hidden>
---
 migration/ram.c        | 24 ++++++++++++++++++++++--
 migration/trace-events |  1 +
 2 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index 4a6ae677a9..f1aec95f83 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -702,6 +702,8 @@ typedef struct {
     uint64_t num_pages;
     /* syncs main thread and channels */
     QemuSemaphore sem_sync;
+    /* thread can continue */
+    QemuSemaphore can_start;
 } MultiFDRecvParams;
 
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
@@ -1313,6 +1315,7 @@ int multifd_load_cleanup(Error **errp)
         p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem_sync);
+        qemu_sem_destroy(&p->can_start);
         g_free(p->name);
         p->name = NULL;
         multifd_pages_clear(p->pages);
@@ -1366,6 +1369,9 @@ static void *multifd_recv_thread(void *opaque)
     trace_multifd_recv_thread_start(p->id);
     rcu_register_thread();
 
+    qemu_sem_wait(&p->can_start);
+    trace_multifd_recv_thread_can_start(p->id);
+
     while (true) {
         uint32_t used;
         uint32_t flags;
@@ -1445,6 +1451,7 @@ int multifd_load_setup(void)
 
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem_sync, 0);
+        qemu_sem_init(&p->can_start, 0);
         p->quit = false;
         p->id = i;
         p->pages = multifd_pages_init(page_count);
@@ -1477,6 +1484,7 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error 
**errp)
 {
     MultiFDRecvParams *p;
     Error *local_err = NULL;
+    bool last_one;
     int id;
 
     id = multifd_recv_initial_packet(ioc, &local_err);
@@ -1506,8 +1514,20 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error 
**errp)
     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
                        QEMU_THREAD_JOINABLE);
     atomic_inc(&multifd_recv_state->count);
-    return atomic_read(&multifd_recv_state->count) ==
-           migrate_multifd_channels();
+
+    last_one =  atomic_read(&multifd_recv_state->count)
+        == migrate_multifd_channels();
+
+    if (last_one) {
+        int i;
+
+        for (i = 0; i < migrate_multifd_channels(); i++) {
+            MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+            qemu_sem_post(&p->can_start);
+        }
+    }
+    return last_one;
 }
 
 /**
diff --git a/migration/trace-events b/migration/trace-events
index dd13a5c4b1..9fbef614ab 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -86,6 +86,7 @@ multifd_recv_sync_main(long packet_num) "packet num %ld"
 multifd_recv_sync_main_signal(uint8_t id) "channel %d"
 multifd_recv_sync_main_wait(uint8_t id) "channel %d"
 multifd_recv_terminate_threads(bool error) "error %d"
+multifd_recv_thread_can_start(uint8_t id) "channel %d"
 multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel 
%d packets %" PRIu64 " pages %" PRIu64
 multifd_recv_thread_start(uint8_t id) "%d"
 multifd_send(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags, 
uint32_t next_packet_size) "channel %d packet_num %" PRIu64 " pages %d flags 
0x%x next packet size %d"
-- 
2.21.0




reply via email to

[Prev in Thread] Current Thread [Next in Thread]