qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [v3 10/13] migration: Add the core code of multi-thread dec


From: Liang Li
Subject: [Qemu-devel] [v3 10/13] migration: Add the core code of multi-thread decompression
Date: Fri, 12 Dec 2014 09:29:03 +0800

Signed-off-by: Liang Li <address@hidden>
Signed-off-by: Yang Zhang <address@hidden>
---
 arch_init.c | 48 ++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 46 insertions(+), 2 deletions(-)

diff --git a/arch_init.c b/arch_init.c
index 14bc486..7103f4f 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -24,6 +24,7 @@
 #include <stdint.h>
 #include <stdarg.h>
 #include <stdlib.h>
+#include <zlib.h>
 #ifndef _WIN32
 #include <sys/types.h>
 #include <sys/mman.h>
@@ -820,6 +821,14 @@ static inline void start_compression(compress_param *param)
     qemu_mutex_unlock(&param->mutex);
 }
 
+static inline void start_decompression(decompress_param *param)
+{
+    qemu_mutex_lock(&param->mutex);
+    param->state = START;
+    qemu_cond_signal(&param->cond);
+    qemu_mutex_unlock(&param->mutex);
+}
+
 
 static uint64_t bytes_transferred;
 
@@ -1351,8 +1360,24 @@ void ram_handle_compressed(void *host, uint8_t ch, 
uint64_t size)
 
 static void *do_data_decompress(void *opaque)
 {
+    decompress_param *param = opaque;
     while (!quit_thread) {
-        /* To be done */
+        qemu_mutex_lock(&param->mutex);
+        while (param->state != START) {
+            qemu_cond_wait(&param->cond, &param->mutex);
+            if (quit_thread) {
+                break;
+            }
+            size_t pagesize = TARGET_PAGE_SIZE;
+            /* uncompress() will return failed in some case,
+             * especially when the page is dirted when doing
+             * the compression, ignore the return value because
+             * the dirty page will be retransferred. */
+            uncompress((Bytef *)param->des, &pagesize,
+                    (const Bytef *)param->compbuf, param->len);
+            param->state = DONE;
+        }
+        qemu_mutex_unlock(&param->mutex);
     }
     return NULL;
 }
@@ -1379,6 +1404,9 @@ void migrate_decompress_threads_join(void)
     quit_thread = true;
     thread_count = migrate_decompress_threads();
     for (i = 0; i < thread_count; i++) {
+        qemu_cond_signal(&decomp_param[i].cond);
+    }
+    for (i = 0; i < thread_count; i++) {
         qemu_thread_join(decompress_threads + i);
         qemu_mutex_destroy(&decomp_param[i].mutex);
         qemu_cond_destroy(&decomp_param[i].cond);
@@ -1392,7 +1420,23 @@ void migrate_decompress_threads_join(void)
 static void decompress_data_with_multi_threads(uint8_t *compbuf,
         void *host, int len)
 {
-    /* To be done */
+    int idx, thread_count;
+
+    thread_count = migrate_decompress_threads();
+    while (true) {
+        for (idx = 0; idx < thread_count; idx++) {
+            if (decomp_param[idx].state == DONE) {
+                memcpy(decomp_param[idx].compbuf, compbuf, len);
+                decomp_param[idx].des = host;
+                decomp_param[idx].len = len;
+                start_decompression(&decomp_param[idx]);
+                break;
+            }
+        }
+        if (idx < thread_count) {
+            break;
+        }
+    }
 }
 
 static int ram_load(QEMUFile *f, void *opaque, int version_id)
-- 
1.8.3.1




reply via email to

[Prev in Thread] Current Thread [Next in Thread]