qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[RFC PATCH v2 6/6] migration/multifd: Bring back the 'ready' semaphore


From: Fabiano Rosas
Subject: [RFC PATCH v2 6/6] migration/multifd: Bring back the 'ready' semaphore
Date: Thu, 12 Oct 2023 11:06:51 -0300

Bring back the 'ready' semaphore, but this time make it per-channel,
so that we can do true lockstep switching of MultiFDPages without
taking the channel lock.

Drop the channel lock as it now becomes useless. The rules for
accessing the MultiFDSendParams are:

- between sem and sem_done/ready if we're the channel

    qemu_sem_post(&p->ready);
    qemu_sem_wait(&p->sem);
    <owns p>
    qemu_sem_post(&p->sem_done);

- between ready and sem if we're not the channel

    qemu_sem_wait(&p->ready);
    <owns p>
    qemu_sem_post(&p->sem);

Signed-off-by: Fabiano Rosas <farosas@suse.de>
---
One issue I can see with this is that we might now have to wait at
multifd_send_pages() if a channel takes too long to send it's packet
and come back to p->ready. We would need to find a way of ignoring a
slow channel and skipping ahead to the next one in line.
---
 migration/multifd.c | 45 +++++++++++++--------------------------------
 migration/multifd.h |  5 ++---
 2 files changed, 15 insertions(+), 35 deletions(-)

diff --git a/migration/multifd.c b/migration/multifd.c
index b7ba3fe0e6..7fa7bc33fd 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -410,10 +410,10 @@ static int multifd_send_pages(QEMUFile *f)
     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
         p = &multifd_send_state->params[i];
 
-        qemu_mutex_lock(&p->mutex);
+        qemu_sem_wait(&p->ready);
+
         if (p->quit) {
             error_report("%s: channel %d has already quit!", __func__, i);
-            qemu_mutex_unlock(&p->mutex);
             return -1;
         }
         if (!p->pending_job) {
@@ -421,7 +421,6 @@ static int multifd_send_pages(QEMUFile *f)
             next_channel = (i + 1) % migrate_multifd_channels();
             break;
         }
-        qemu_mutex_unlock(&p->mutex);
     }
     assert(!p->pages->num);
     assert(!p->pages->block);
@@ -429,7 +428,6 @@ static int multifd_send_pages(QEMUFile *f)
     p->packet_num = multifd_send_state->packet_num++;
     multifd_send_state->pages = p->pages;
     p->pages = pages;
-    qemu_mutex_unlock(&p->mutex);
     qemu_sem_post(&p->sem);
 
     return 1;
@@ -529,9 +527,9 @@ void multifd_save_cleanup(void)
         }
         socket_send_channel_destroy(p->c);
         p->c = NULL;
-        qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
         qemu_sem_destroy(&p->sem_done);
+        qemu_sem_destroy(&p->ready);
         g_free(p->name);
         p->name = NULL;
         multifd_pages_clear(p->pages);
@@ -586,14 +584,12 @@ static void multifd_send_wait(void)
          * Even idle channels will wait for p->sem at the top of the
          * loop.
          */
+        qemu_sem_wait(&p->ready);
         qemu_sem_post(&p->sem);
 
         trace_multifd_send_wait(migrate_multifd_channels() - i);
         qemu_sem_wait(&p->sem_done);
-
-        qemu_mutex_lock(&p->mutex);
         assert(!p->pending_job || p->quit);
-        qemu_mutex_unlock(&p->mutex);
     }
 
     /*
@@ -621,20 +617,17 @@ int multifd_send_sync_main(QEMUFile *f)
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
+        qemu_sem_wait(&p->ready);
         trace_multifd_send_sync_main_signal(p->id);
 
-        qemu_mutex_lock(&p->mutex);
-
         if (p->quit) {
             error_report("%s: channel %d has already quit", __func__, i);
-            qemu_mutex_unlock(&p->mutex);
             return -1;
         }
 
         p->packet_num = multifd_send_state->packet_num++;
         p->flags |= MULTIFD_FLAG_SYNC;
         p->pending_job++;
-        qemu_mutex_unlock(&p->mutex);
         qemu_sem_post(&p->sem);
     }
 
@@ -685,15 +678,14 @@ static void *multifd_send_thread(void *opaque)
     p->num_packets = 1;
 
     while (true) {
+        qemu_sem_post(&p->ready);
         qemu_sem_wait(&p->sem);
 
         if (qatomic_read(&multifd_send_state->exiting)) {
-            qemu_mutex_lock(&p->mutex);
             p->quit = true;
-            qemu_mutex_unlock(&p->mutex);
+            qemu_sem_post(&p->sem_done);
             break;
         }
-        qemu_mutex_lock(&p->mutex);
 
         if (p->pending_job) {
             uint64_t packet_num = p->packet_num;
@@ -714,7 +706,6 @@ static void *multifd_send_thread(void *opaque)
             if (p->normal_num) {
                 ret = multifd_send_state->ops->send_prepare(p, &local_err);
                 if (ret != 0) {
-                    qemu_mutex_unlock(&p->mutex);
                     break;
                 }
             }
@@ -725,7 +716,6 @@ static void *multifd_send_thread(void *opaque)
             p->total_normal_pages += p->normal_num;
             p->pages->num = 0;
             p->pages->block = NULL;
-            qemu_mutex_unlock(&p->mutex);
 
             trace_multifd_send(p->id, packet_num, p->normal_num, flags,
                                p->next_packet_size);
@@ -753,12 +743,9 @@ static void *multifd_send_thread(void *opaque)
 
             stat64_add(&mig_stats.multifd_bytes, p->next_packet_size);
             stat64_add(&mig_stats.transferred, p->next_packet_size);
-            qemu_mutex_lock(&p->mutex);
             p->pending_job--;
-            qemu_mutex_unlock(&p->mutex);
 
         } else {
-            qemu_mutex_unlock(&p->mutex);
             qemu_sem_post(&p->sem_done);
         }
     }
@@ -766,11 +753,8 @@ static void *multifd_send_thread(void *opaque)
 out:
     if (local_err) {
         trace_multifd_send_error(p->id);
-
-        qemu_mutex_lock(&p->mutex);
         p->quit = true;
-        qemu_mutex_unlock(&p->mutex);
-
+        qemu_sem_post(&p->ready);
         multifd_send_terminate_threads(local_err);
         error_free(local_err);
     }
@@ -780,12 +764,10 @@ out:
      * who pay attention to me.
      */
     if (ret != 0) {
-        qemu_sem_post(&p->sem_done);
+        p->quit = true;
+        qemu_sem_post(&p->ready);
     }
-
-    qemu_mutex_lock(&p->mutex);
     p->running = false;
-    qemu_mutex_unlock(&p->mutex);
 
     rcu_unregister_thread();
     migration_threads_remove(thread);
@@ -817,7 +799,7 @@ static void multifd_tls_outgoing_handshake(QIOTask *task,
          * is not created, and then tell who pay attention to me.
          */
         p->quit = true;
-        qemu_sem_post(&p->sem_done);
+        qemu_sem_post(&p->ready);
     }
 }
 
@@ -893,14 +875,13 @@ static void 
multifd_new_send_channel_cleanup(MultiFDSendParams *p,
                                              QIOChannel *ioc, Error *err)
 {
      migrate_set_error(migrate_get_current(), err);
-     /* Error happen, we need to tell who pay attention to me */
-     qemu_sem_post(&p->sem_done);
      /*
       * Although multifd_send_thread is not created, but main migration
       * thread need to judge whether it is running, so we need to mark
       * its status.
       */
      p->quit = true;
+     qemu_sem_post(&p->ready);
      object_unref(OBJECT(ioc));
      error_free(err);
 }
@@ -944,9 +925,9 @@ int multifd_save_setup(Error **errp)
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
-        qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
         qemu_sem_init(&p->sem_done, 0);
+        qemu_sem_init(&p->ready, 0);
         p->quit = false;
         p->pending_job = 0;
         p->id = i;
diff --git a/migration/multifd.h b/migration/multifd.h
index 71bd66974d..6bb10b07aa 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -93,8 +93,8 @@ typedef struct {
     /* channel is done transmitting until more pages are queued */
     QemuSemaphore sem_done;
 
-    /* this mutex protects the following parameters */
-    QemuMutex mutex;
+    QemuSemaphore ready;
+
     /* is this channel thread running */
     bool running;
     /* should this thread finish */
@@ -209,4 +209,3 @@ typedef struct {
 void multifd_register_ops(int method, MultiFDMethods *ops);
 
 #endif
-
-- 
2.35.3




reply via email to

[Prev in Thread] Current Thread [Next in Thread]