qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [v7 08/14] migration: Add the core code of multi-thread com


From: Liang Li
Subject: [Qemu-devel] [v7 08/14] migration: Add the core code of multi-thread compression
Date: Wed, 8 Apr 2015 14:20:05 +0800

Implement the core logic of the multiple thread compression. At this
point, multiple thread compression can't co-work with xbzrle yet.

Signed-off-by: Liang Li <address@hidden>
Signed-off-by: Yang Zhang <address@hidden>
---
 arch_init.c | 183 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 177 insertions(+), 6 deletions(-)

diff --git a/arch_init.c b/arch_init.c
index 44ac002..d657a6c 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -355,12 +355,35 @@ static DecompressParam *decomp_param;
 static QemuThread *decompress_threads;
 static uint8_t *compressed_data_buf;
 
+static int do_compress_ram_page(CompressParam *param);
+
+static inline void notify_compression_done(CompressParam *param)
+{
+    qemu_mutex_lock(comp_done_lock);
+    param->done = true;
+    qemu_cond_signal(comp_done_cond);
+    qemu_mutex_unlock(comp_done_lock);
+}
+
 static void *do_data_compress(void *opaque)
 {
-    while (true) {
+    CompressParam *param = opaque;
 
-    /* To be done */
+    while (true) {
+        qemu_mutex_lock(&param->mutex);
+        while (!param->start && !param->quit) {
+            qemu_cond_wait(&param->cond, &param->mutex);
+        }
+        if (param->quit) {
+            qemu_mutex_unlock(&param->mutex);
+            notify_compression_done(param);
+            break;
+        }
+        do_compress_ram_page(param);
+        param->start = false;
+        qemu_mutex_unlock(&param->mutex);
 
+        notify_compression_done(param);
     }
 
     return NULL;
@@ -368,7 +391,15 @@ static void *do_data_compress(void *opaque)
 
 static inline void terminate_compression_threads(void)
 {
-    /* To be done */
+    int idx, thread_count;
+
+    thread_count = migrate_compress_threads();
+    for (idx = 0; idx < thread_count; idx++) {
+        qemu_mutex_lock(&comp_param[idx].mutex);
+        comp_param[idx].quit = true;
+        qemu_cond_signal(&comp_param[idx].cond);
+        qemu_mutex_unlock(&comp_param[idx].mutex);
+    }
 }
 
 void migrate_compress_threads_join(void)
@@ -827,6 +858,95 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, 
ram_addr_t offset,
     return pages;
 }
 
+static int do_compress_ram_page(CompressParam *param)
+{
+    int bytes_sent, blen;
+    uint8_t *p;
+    RAMBlock *block = param->block;
+    ram_addr_t offset = param->offset;
+
+    p = memory_region_get_ram_ptr(block->mr) + (offset & TARGET_PAGE_MASK);
+
+    bytes_sent = save_page_header(param->file, block, offset |
+                                  RAM_SAVE_FLAG_COMPRESS_PAGE);
+    blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
+                                     migrate_compress_level());
+    bytes_sent += blen;
+    atomic_inc(&acct_info.norm_pages);
+
+    return bytes_sent;
+}
+
+static inline void start_compression(CompressParam *param)
+{
+    param->done = false;
+    qemu_mutex_lock(&param->mutex);
+    param->start = true;
+    qemu_cond_signal(&param->cond);
+    qemu_mutex_unlock(&param->mutex);
+}
+
+
+static uint64_t bytes_transferred;
+
+static void flush_compressed_data(QEMUFile *f)
+{
+    int idx, len, thread_count;
+
+    if (!migrate_use_compression()) {
+        return;
+    }
+    thread_count = migrate_compress_threads();
+    for (idx = 0; idx < thread_count; idx++) {
+        qemu_mutex_lock(comp_done_lock);
+        while (!comp_param[idx].done && !comp_param[idx].quit) {
+            qemu_cond_wait(comp_done_cond, comp_done_lock);
+        }
+        qemu_mutex_unlock(comp_done_lock);
+        if (!comp_param[idx].quit) {
+            len = qemu_put_qemu_file(f, comp_param[idx].file);
+            bytes_transferred += len;
+        }
+    }
+}
+
+static inline void set_compress_params(CompressParam *param, RAMBlock *block,
+                                       ram_addr_t offset)
+{
+    param->block = block;
+    param->offset = offset;
+}
+
+static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
+                                           ram_addr_t offset,
+                                           uint64_t *bytes_transferred)
+{
+    int idx, thread_count, bytes_xmit = -1, pages = -1;
+
+    thread_count = migrate_compress_threads();
+    qemu_mutex_lock(comp_done_lock);
+    while (true) {
+        for (idx = 0; idx < thread_count; idx++) {
+            if (comp_param[idx].done) {
+                bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file);
+                set_compress_params(&comp_param[idx], block, offset);
+                start_compression(&comp_param[idx]);
+                pages = 1;
+                *bytes_transferred += bytes_xmit;
+                break;
+            }
+        }
+        if (pages > 0) {
+            break;
+        } else {
+            qemu_cond_wait(comp_done_cond, comp_done_lock);
+        }
+    }
+    qemu_mutex_unlock(comp_done_lock);
+
+    return pages;
+}
+
 /**
  * ram_save_compressed_page: compress the given page and send it to the stream
  *
@@ -843,8 +963,59 @@ static int ram_save_compressed_page(QEMUFile *f, RAMBlock 
*block,
                                     uint64_t *bytes_transferred)
 {
     int pages = -1;
+    uint64_t bytes_xmit;
+    MemoryRegion *mr = block->mr;
+    uint8_t *p;
+    int ret;
 
-    /* To be done*/
+    p = memory_region_get_ram_ptr(mr) + offset;
+
+    bytes_xmit = 0;
+    ret = ram_control_save_page(f, block->offset,
+                                offset, TARGET_PAGE_SIZE, &bytes_xmit);
+    if (bytes_xmit) {
+        *bytes_transferred += bytes_xmit;
+        pages = 1;
+    }
+    if (block == last_sent_block) {
+        offset |= RAM_SAVE_FLAG_CONTINUE;
+    }
+    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
+        if (ret != RAM_SAVE_CONTROL_DELAYED) {
+            if (bytes_xmit > 0) {
+                acct_info.norm_pages++;
+            } else if (bytes_xmit == 0) {
+                acct_info.dup_pages++;
+            }
+        }
+    } else {
+        /* When starting the process of a new block, the first page of
+         * the block should be sent out before other pages in the same
+         * block, and all the pages in last block should have been sent
+         * out, keeping this order is important, because the 'cont' flag
+         * is used to avoid resending the block name.
+         */
+        if (block != last_sent_block) {
+            flush_compressed_data(f);
+            pages = save_zero_page(f, block, offset, p, bytes_transferred);
+            if (pages == -1) {
+                set_compress_params(&comp_param[0], block, offset);
+                /* Use the qemu thread to compress the data to make sure the
+                 * first page is sent out before other pages
+                 */
+                bytes_xmit = do_compress_ram_page(&comp_param[0]);
+                qemu_put_qemu_file(f, comp_param[0].file);
+                *bytes_transferred += bytes_xmit;
+                pages = 1;
+            }
+        } else {
+            pages = save_zero_page(f, block, offset, p, bytes_transferred);
+            if (pages == -1) {
+                pages = compress_page_with_multi_thread(f, block, offset,
+                                                        bytes_transferred);
+            }
+        }
+    }
 
     return pages;
 }
@@ -912,8 +1083,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool 
last_stage,
     return pages;
 }
 
-static uint64_t bytes_transferred;
-
 void acct_update_position(QEMUFile *f, size_t size, bool zero)
 {
     uint64_t pages = size / TARGET_PAGE_SIZE;
@@ -1127,6 +1296,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
         }
         i++;
     }
+    flush_compressed_data(f);
     rcu_read_unlock();
 
     /*
@@ -1168,6 +1338,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
         }
     }
 
+    flush_compressed_data(f);
     ram_control_after_iterate(f, RAM_CONTROL_FINISH);
     migration_end();
 
-- 
1.9.1




reply via email to

[Prev in Thread] Current Thread [Next in Thread]