qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH 10/16] migration/multifd: Enable DSA offloading in multifd sender


From: Hao Xiang
Subject: [PATCH 10/16] migration/multifd: Enable DSA offloading in multifd sender path.
Date: Wed, 25 Oct 2023 19:38:16 +0000

Multifd sender path gets an array of pages queued by the migration
thread. It performs zero page checking on every page in the array.
The pages are classfied as either a zero page or a normal page. This
change uses Intel DSA to offload the zero page checking from CPU to
the DSA accelerator. The sender thread submits a batch of pages to DSA
hardware and waits for the DSA completion thread to signal for work
completion.

Signed-off-by: Hao Xiang <hao.xiang@bytedance.com>
---
 migration/multifd.c | 101 +++++++++++++++++++++++++++++++++++++-------
 migration/multifd.h |   3 ++
 2 files changed, 89 insertions(+), 15 deletions(-)

diff --git a/migration/multifd.c b/migration/multifd.c
index 452fb158b8..79fecbd3ae 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -13,6 +13,8 @@
 #include "qemu/osdep.h"
 #include "qemu/rcu.h"
 #include "qemu/cutils.h"
+#include "qemu/dsa.h"
+#include "qemu/memalign.h"
 #include "exec/target_page.h"
 #include "sysemu/sysemu.h"
 #include "exec/ramblock.h"
@@ -555,6 +557,8 @@ void multifd_save_cleanup(void)
             qemu_thread_join(&p->thread);
         }
     }
+    dsa_stop();
+    dsa_cleanup();
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
         Error *local_err = NULL;
@@ -571,6 +575,11 @@ void multifd_save_cleanup(void)
         p->name = NULL;
         multifd_pages_clear(p->pages);
         p->pages = NULL;
+        g_free(p->addr);
+        p->addr = NULL;
+        buffer_zero_batch_task_destroy(p->dsa_batch_task);
+        qemu_vfree(p->dsa_batch_task);
+        p->dsa_batch_task = NULL;
         p->packet_len = 0;
         g_free(p->packet);
         p->packet = NULL;
@@ -675,13 +684,71 @@ int multifd_send_sync_main(QEMUFile *f)
     return 0;
 }
 
+static void set_page(MultiFDSendParams *p, bool zero_page, uint64_t offset)
+{
+    RAMBlock *rb = p->pages->block;
+    if (zero_page) {
+        p->zero[p->zero_num] = offset;
+        p->zero_num++;
+        ram_release_page(rb->idstr, offset);
+    } else {
+        p->normal[p->normal_num] = offset;
+        p->normal_num++;
+    }
+}
+
+static void buffer_is_zero_use_cpu(MultiFDSendParams *p)
+{
+    const void **buf = (const void **)p->addr;
+    assert(!migrate_use_main_zero_page());
+    assert(!dsa_is_running());
+
+    for (int i = 0; i < p->pages->num; i++) {
+        p->dsa_batch_task->results[i] = buffer_is_zero(buf[i], p->page_size);
+    }
+}
+
+static void buffer_is_zero_use_dsa(MultiFDSendParams *p)
+{
+    assert(!migrate_use_main_zero_page());
+    assert(dsa_is_running());
+
+    buffer_is_zero_dsa_batch_async(p->dsa_batch_task,
+                                   (const void **)p->addr,
+                                   p->pages->num,
+                                   p->page_size);
+}
+
+static void multifd_zero_page_check(MultiFDSendParams *p)
+{
+    /* older qemu don't understand zero page on multifd channel */
+    bool use_multifd_zero_page = !migrate_use_main_zero_page();
+    bool use_multifd_dsa_accel = dsa_is_running();
+
+    RAMBlock *rb = p->pages->block;
+
+    for (int i = 0; i < p->pages->num; i++) {
+        p->addr[i] = (ram_addr_t)(rb->host + p->pages->offset[i]);
+    }
+
+    if (!use_multifd_zero_page || !use_multifd_dsa_accel) {
+        buffer_is_zero_use_cpu(p);
+    } else {
+        buffer_is_zero_use_dsa(p);
+    }
+
+    for (int i = 0; i < p->pages->num; i++) {
+        uint64_t offset = p->pages->offset[i];
+        bool zero_page = p->dsa_batch_task->results[i];
+        set_page(p, zero_page, offset);
+    }
+}
+
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
     MigrationThread *thread = NULL;
     Error *local_err = NULL;
-    /* older qemu don't understand zero page on multifd channel */
-    bool use_multifd_zero_page = !migrate_use_main_zero_page();
     int ret = 0;
     bool use_zero_copy_send = migrate_zero_copy_send();
 
@@ -707,7 +774,6 @@ static void *multifd_send_thread(void *opaque)
         qemu_mutex_lock(&p->mutex);
 
         if (p->pending_job) {
-            RAMBlock *rb = p->pages->block;
             uint64_t packet_num = p->packet_num;
             p->flags = 0;
             if (p->sync_needed) {
@@ -725,18 +791,7 @@ static void *multifd_send_thread(void *opaque)
                 p->iovs_num = 1;
             }
 
-            for (int i = 0; i < p->pages->num; i++) {
-                uint64_t offset = p->pages->offset[i];
-                if (use_multifd_zero_page &&
-                    buffer_is_zero(rb->host + offset, p->page_size)) {
-                    p->zero[p->zero_num] = offset;
-                    p->zero_num++;
-                    ram_release_page(rb->idstr, offset);
-                } else {
-                    p->normal[p->normal_num] = offset;
-                    p->normal_num++;
-                }
-            }
+            multifd_zero_page_check(p);
 
             if (p->normal_num) {
                 ret = multifd_send_state->ops->send_prepare(p, &local_err);
@@ -958,11 +1013,15 @@ int multifd_save_setup(Error **errp)
     int thread_count;
     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
     uint8_t i;
+    const char *dsa_parameter = migrate_multifd_dsa_accel();
 
     if (!migrate_multifd()) {
         return 0;
     }
 
+    dsa_init(dsa_parameter);
+    dsa_start();
+
     thread_count = migrate_multifd_channels();
     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
@@ -981,6 +1040,10 @@ int multifd_save_setup(Error **errp)
         p->pending_job = 0;
         p->id = i;
         p->pages = multifd_pages_init(page_count);
+        p->addr = g_new0(ram_addr_t, page_count);
+        p->dsa_batch_task = 
+            (struct buffer_zero_batch_task *)qemu_memalign(64, 
sizeof(*p->dsa_batch_task));
+        buffer_zero_batch_task_init(p->dsa_batch_task, page_count);
         p->packet_len = sizeof(MultiFDPacket_t)
                       + sizeof(uint64_t) * page_count;
         p->packet = g_malloc0(p->packet_len);
@@ -1014,6 +1077,7 @@ int multifd_save_setup(Error **errp)
             return ret;
         }
     }
+
     return 0;
 }
 
@@ -1091,6 +1155,8 @@ void multifd_load_cleanup(void)
 
         qemu_thread_join(&p->thread);
     }
+    dsa_stop();
+    dsa_cleanup();
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
@@ -1225,6 +1291,7 @@ int multifd_load_setup(Error **errp)
     int thread_count;
     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
     uint8_t i;
+    const char *dsa_parameter = migrate_multifd_dsa_accel();
 
     /*
      * Return successfully if multiFD recv state is already initialised
@@ -1234,6 +1301,9 @@ int multifd_load_setup(Error **errp)
         return 0;
     }
 
+    dsa_init(dsa_parameter);
+    dsa_start();
+
     thread_count = migrate_multifd_channels();
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
@@ -1270,6 +1340,7 @@ int multifd_load_setup(Error **errp)
             return ret;
         }
     }
+
     return 0;
 }
 
diff --git a/migration/multifd.h b/migration/multifd.h
index e8f90776bb..297b055e2b 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -114,6 +114,9 @@ typedef struct {
      * pending_job != 0 -> multifd_channel can use it.
      */
     MultiFDPages_t *pages;
+    /* Address of each pages in pages */
+    ram_addr_t *addr;
+    struct buffer_zero_batch_task *dsa_batch_task;
 
     /* thread local variables. No locking required */
 
-- 
2.30.2




reply via email to

[Prev in Thread] Current Thread [Next in Thread]