qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH 1/2] migration: Implement a multiple compress thread


From: Liliang
Subject: [Qemu-devel] [PATCH 1/2] migration: Implement a multiple compress threads feature to accelerate live migration
Date: Fri, 31 Oct 2014 13:40:11 +0800

From: Li Liang <address@hidden>

Instead of sending the guest memory directly, this solution compress the
ram page before sending, after receiving, the data will be decompressed.
This feature can help to reduce the data transferred about 60%, this
is very useful when the network bandwidth is limited, and the migration
time can also be reduced about 80%. The feature is off by default,
fllowing the document docs/multiple-compression-threads.txt for
information to use it.

Signed-off-by: Li Liang <address@hidden>
---
 arch_init.c                   | 435 ++++++++++++++++++++++++++++++++++++++++--
 hmp-commands.hx               |  56 ++++++
 hmp.c                         |  57 ++++++
 hmp.h                         |   6 +
 include/migration/migration.h |  12 +-
 include/migration/qemu-file.h |   1 +
 migration.c                   |  99 ++++++++++
 monitor.c                     |  21 ++
 qapi-schema.json              |  84 +++++++-
 qmp-commands.hx               | 131 +++++++++++++
 10 files changed, 886 insertions(+), 16 deletions(-)

diff --git a/arch_init.c b/arch_init.c
index 88a5ba0..04730a7 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -24,6 +24,7 @@
 #include <stdint.h>
 #include <stdarg.h>
 #include <stdlib.h>
+#include <zlib.h>
 #ifndef _WIN32
 #include <sys/types.h>
 #include <sys/mman.h>
@@ -126,6 +127,7 @@ static uint64_t bitmap_sync_count;
 #define RAM_SAVE_FLAG_CONTINUE 0x20
 #define RAM_SAVE_FLAG_XBZRLE   0x40
 /* 0x80 is reserved in migration.h start with 0x100 next */
+#define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
 
 static struct defconfig_file {
     const char *filename;
@@ -332,6 +334,177 @@ static uint64_t migration_dirty_pages;
 static uint32_t last_version;
 static bool ram_bulk_stage;
 
+#define COMPRESS_BUF_SIZE (TARGET_PAGE_SIZE + 16)
+#define MIG_BUF_SIZE (COMPRESS_BUF_SIZE + 256 + 16)
+struct MigBuf {
+    int buf_index;
+    uint8_t buf[MIG_BUF_SIZE];
+};
+
+typedef struct MigBuf MigBuf;
+
+static void migrate_put_byte(MigBuf *f, int v)
+{
+    f->buf[f->buf_index] = v;
+    f->buf_index++;
+}
+
+static void migrate_put_be16(MigBuf *f, unsigned int v)
+{
+    migrate_put_byte(f, v >> 8);
+    migrate_put_byte(f, v);
+}
+
+static void migrate_put_be32(MigBuf *f, unsigned int v)
+{
+    migrate_put_byte(f, v >> 24);
+    migrate_put_byte(f, v >> 16);
+    migrate_put_byte(f, v >> 8);
+    migrate_put_byte(f, v);
+}
+
+static void migrate_put_be64(MigBuf *f, uint64_t v)
+{
+    migrate_put_be32(f, v >> 32);
+    migrate_put_be32(f, v);
+}
+
+static void migrate_put_buffer(MigBuf *f, const uint8_t *buf, int size)
+{
+    int l;
+
+    while (size > 0) {
+        l = MIG_BUF_SIZE - f->buf_index;
+        if (l > size) {
+            l = size;
+        }
+        memcpy(f->buf + f->buf_index, buf, l);
+        f->buf_index += l;
+        buf += l;
+        size -= l;
+    }
+}
+
+static size_t migrate_save_block_hdr(MigBuf *f, RAMBlock *block,
+        ram_addr_t offset, int cont, int flag)
+{
+    size_t size;
+
+    migrate_put_be64(f, offset | cont | flag);
+    size = 8;
+
+    if (!cont) {
+        migrate_put_byte(f, strlen(block->idstr));
+        migrate_put_buffer(f, (uint8_t *)block->idstr,
+                        strlen(block->idstr));
+        size += 1 + strlen(block->idstr);
+    }
+    return size;
+}
+
+static int migrate_qemu_add_compress(MigBuf *f,  const uint8_t *p,
+        int size, int level)
+{
+    uLong  blen = COMPRESS_BUF_SIZE;
+    if (compress2(f->buf + f->buf_index + sizeof(int), &blen, (Bytef *)p,
+            size, level) != Z_OK) {
+        printf("compress failed!\n");
+        return 0;
+    }
+    migrate_put_be32(f, blen);
+    f->buf_index += blen;
+    return blen + sizeof(int);
+}
+
+enum {
+    COM_DONE = 0,
+    COM_START,
+};
+
+static int  compress_thread_count = 1;
+static int  decompress_thread_count = 1;
+
+struct compress_param {
+    int state;
+    MigBuf migbuf;
+    RAMBlock *block;
+    ram_addr_t offset;
+    bool last_stage;
+    int ret;
+    int bytes_sent;
+    uint8_t *p;
+    int cont;
+    bool bulk_stage;
+};
+
+typedef struct compress_param compress_param;
+compress_param *comp_param;
+
+struct decompress_param {
+    int state;
+    void *des;
+    uint8 compbuf[COMPRESS_BUF_SIZE];
+    int len;
+};
+typedef struct decompress_param decompress_param;
+
+static decompress_param *decomp_param;
+bool incomming_migration_done;
+static bool quit_thread;
+
+static int save_compress_ram_page(compress_param *param);
+
+
+static void *do_data_compress(void *opaque)
+{
+    compress_param *param = opaque;
+    while (!quit_thread) {
+        if (param->state == COM_START) {
+            save_compress_ram_page(param);
+            param->state = COM_DONE;
+         } else {
+             g_usleep(1);
+         }
+    }
+
+    return NULL;
+}
+
+
+void migrate_compress_threads_join(MigrationState *s)
+{
+    int i;
+    if (!migrate_use_compress()) {
+        return;
+    }
+    quit_thread = true;
+    for (i = 0; i < compress_thread_count; i++) {
+        qemu_thread_join(s->compress_thread + i);
+    }
+    g_free(s->compress_thread);
+    g_free(comp_param);
+    s->compress_thread = NULL;
+    comp_param = NULL;
+}
+
+void migrate_compress_threads_create(MigrationState *s)
+{
+    int i;
+    if (!migrate_use_compress()) {
+        return;
+    }
+    quit_thread = false;
+    compress_thread_count = s->compress_thread_count;
+    s->compress_thread = g_malloc0(sizeof(QemuThread)
+        * s->compress_thread_count);
+    comp_param = g_malloc0(sizeof(compress_param) * s->compress_thread_count);
+    for (i = 0; i < s->compress_thread_count; i++) {
+        qemu_thread_create(s->compress_thread + i, "compress",
+            do_data_compress, comp_param + i, QEMU_THREAD_JOINABLE);
+
+    }
+}
+
 /* Update the xbzrle cache to reflect a page that's been sent as all 0.
  * The important thing is that a stale (not-yet-0'd) page be replaced
  * by the new data.
@@ -351,9 +524,10 @@ static void xbzrle_cache_zero_page(ram_addr_t current_addr)
 
 #define ENCODING_FLAG_XBZRLE 0x1
 
-static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
+static int save_xbzrle_page(void *f, uint8_t **current_data,
                             ram_addr_t current_addr, RAMBlock *block,
-                            ram_addr_t offset, int cont, bool last_stage)
+                            ram_addr_t offset, int cont, bool last_stage,
+                            bool save_to_buf)
 {
     int encoded_len = 0, bytes_sent = -1;
     uint8_t *prev_cached_page;
@@ -401,10 +575,19 @@ static int save_xbzrle_page(QEMUFile *f, uint8_t 
**current_data,
     }
 
     /* Send XBZRLE based compressed page */
-    bytes_sent = save_block_hdr(f, block, offset, cont, RAM_SAVE_FLAG_XBZRLE);
-    qemu_put_byte(f, ENCODING_FLAG_XBZRLE);
-    qemu_put_be16(f, encoded_len);
-    qemu_put_buffer(f, XBZRLE.encoded_buf, encoded_len);
+    if (save_to_buf) {
+        bytes_sent = migrate_save_block_hdr((MigBuf *)f, block, offset,
+            cont, RAM_SAVE_FLAG_XBZRLE);
+        migrate_put_byte((MigBuf *)f, ENCODING_FLAG_XBZRLE);
+        migrate_put_be16((MigBuf *)f, encoded_len);
+        migrate_put_buffer((MigBuf *)f, XBZRLE.encoded_buf, encoded_len);
+    } else {
+        bytes_sent = save_block_hdr((QEMUFile *)f, block, offset,
+            cont, RAM_SAVE_FLAG_XBZRLE);
+        qemu_put_byte((QEMUFile *)f, ENCODING_FLAG_XBZRLE);
+        qemu_put_be16((QEMUFile *)f, encoded_len);
+        qemu_put_buffer((QEMUFile *)f, XBZRLE.encoded_buf, encoded_len);
+    }
     bytes_sent += encoded_len + 1 + 2;
     acct_info.xbzrle_pages++;
     acct_info.xbzrle_bytes += bytes_sent;
@@ -609,7 +792,7 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, 
ram_addr_t offset,
         xbzrle_cache_zero_page(current_addr);
     } else if (!ram_bulk_stage && migrate_use_xbzrle()) {
         bytes_sent = save_xbzrle_page(f, &p, current_addr, block,
-                                      offset, cont, last_stage);
+                                      offset, cont, last_stage, false);
         if (!last_stage) {
             /* Can't send this cached data async, since the cache page
              * might get updated before it gets to the wire
@@ -635,6 +818,90 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, 
ram_addr_t offset,
     return bytes_sent;
 }
 
+static int save_compress_ram_page(compress_param *param)
+{
+    int bytes_sent = param->bytes_sent;
+    int blen = COMPRESS_BUF_SIZE;
+    int cont = param->cont;
+    uint8_t *p = param->p;
+    int ret = param->ret;
+    RAMBlock *block = param->block;
+    ram_addr_t offset = param->offset;
+    bool last_stage = param->last_stage;
+    /* In doubt sent page as normal */
+    XBZRLE_cache_lock();
+    ram_addr_t current_addr = block->offset + offset;
+    if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
+        if (ret != RAM_SAVE_CONTROL_DELAYED) {
+            if (bytes_sent > 0) {
+                atomic_inc(&acct_info.norm_pages);
+             } else if (bytes_sent == 0) {
+                atomic_inc(&acct_info.dup_pages);
+             }
+        }
+    } else if (is_zero_range(p, TARGET_PAGE_SIZE)) {
+        atomic_inc(&acct_info.dup_pages);
+        bytes_sent = migrate_save_block_hdr(&param->migbuf, block, offset, 
cont,
+                             RAM_SAVE_FLAG_COMPRESS);
+        migrate_put_byte(&param->migbuf, 0);
+        bytes_sent++;
+        /* Must let xbzrle know, otherwise a previous (now 0'd) cached
+         * page would be stale
+         */
+        xbzrle_cache_zero_page(current_addr);
+    } else if (!param->bulk_stage && migrate_use_xbzrle()) {
+        bytes_sent = save_xbzrle_page(&param->migbuf, &p, current_addr, block,
+                              offset, cont, last_stage, true);
+    }
+    XBZRLE_cache_unlock();
+    /* XBZRLE overflow or normal page */
+    if (bytes_sent == -1) {
+        bytes_sent = migrate_save_block_hdr(&param->migbuf, block,
+            offset, cont, RAM_SAVE_FLAG_COMPRESS_PAGE);
+        blen = migrate_qemu_add_compress(&param->migbuf, p,
+            TARGET_PAGE_SIZE, migrate_compress_level());
+        bytes_sent += blen;
+        atomic_inc(&acct_info.norm_pages);
+    }
+    return bytes_sent;
+}
+
+static uint64_t bytes_transferred;
+
+static void flush_compressed_data(QEMUFile *f)
+{
+    int idx;
+    if (!migrate_use_compress()) {
+        return;
+    }
+
+    for (idx = 0; idx < compress_thread_count; idx++) {
+        while (comp_param[idx].state != COM_DONE) {
+            g_usleep(0);
+        }
+        if (comp_param[idx].migbuf.buf_index > 0) {
+            qemu_put_buffer(f, comp_param[idx].migbuf.buf,
+                comp_param[idx].migbuf.buf_index);
+            bytes_transferred += comp_param[idx].migbuf.buf_index;
+            comp_param[idx].migbuf.buf_index = 0;
+        }
+    }
+}
+
+static inline void set_common_compress_params(compress_param *param,
+    int ret, int bytes_sent, RAMBlock *block, ram_addr_t offset,
+    bool last_stage, int cont, uint8_t *p, bool bulk_stage)
+{
+    param->ret = ret;
+    param->bytes_sent = bytes_sent;
+    param->block = block;
+    param->offset = offset;
+    param->last_stage = last_stage;
+    param->cont = cont;
+    param->p = p;
+    param->bulk_stage = bulk_stage;
+}
+
 /*
  * ram_find_and_save_block: Finds a page to send and sends it to f
  *
@@ -649,6 +916,8 @@ static int ram_find_and_save_block(QEMUFile *f, bool 
last_stage)
     bool complete_round = false;
     int bytes_sent = 0;
     MemoryRegion *mr;
+    int cont, idx, ret, len = -1;
+    uint8_t *p;
 
     if (!block)
         block = QTAILQ_FIRST(&ram_list.blocks);
@@ -667,14 +936,73 @@ static int ram_find_and_save_block(QEMUFile *f, bool 
last_stage)
                 block = QTAILQ_FIRST(&ram_list.blocks);
                 complete_round = true;
                 ram_bulk_stage = false;
+                if (migrate_use_xbzrle()) {
+                    /* terminate the used thread at this point*/
+                    flush_compressed_data(f);
+                    quit_thread = true;
+                }
             }
         } else {
-            bytes_sent = ram_save_page(f, block, offset, last_stage);
-
-            /* if page is unmodified, continue to the next */
-            if (bytes_sent > 0) {
-                last_sent_block = block;
-                break;
+            if (!migrate_use_compress()) {
+                bytes_sent = ram_save_page(f, block, offset, last_stage);
+                /* if page is unmodified, continue to the next */
+                if (bytes_sent > 0) {
+                    last_sent_block = block;
+                    break;
+                }
+            } else {
+                cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
+                p = memory_region_get_ram_ptr(block->mr) + offset;
+                ret = ram_control_save_page(f, block->offset,
+                           offset, TARGET_PAGE_SIZE, &len);
+                if ((!ram_bulk_stage && migrate_use_xbzrle()) || cont == 0) {
+                    if (cont == 0) {
+                        flush_compressed_data(f);
+                    }
+                    set_common_compress_params(&comp_param[0],
+                        ret, len, block, offset, last_stage, cont,
+                        p, ram_bulk_stage);
+                    bytes_sent = save_compress_ram_page(&comp_param[0]);
+                    if (bytes_sent > 0) {
+                        qemu_put_buffer(f, comp_param[0].migbuf.buf,
+                            comp_param[0].migbuf.buf_index);
+                        comp_param[0].migbuf.buf_index = 0;
+                        last_sent_block = block;
+                        break;
+                    }
+                } else {
+retry:
+                    for (idx = 0; idx < compress_thread_count; idx++) {
+                        if (comp_param[idx].state == COM_DONE) {
+                            bytes_sent = comp_param[idx].migbuf.buf_index;
+                            if (bytes_sent == 0) {
+                                set_common_compress_params(&comp_param[idx],
+                                    ret, len, block, offset, last_stage,
+                                    cont, p, ram_bulk_stage);
+                                comp_param[idx].state = COM_START;
+                                bytes_sent = 1;
+                                bytes_transferred -= 1;
+                                break;
+                            } else if (bytes_sent > 0) {
+                                qemu_put_buffer(f, comp_param[idx].migbuf.buf,
+                                    comp_param[idx].migbuf.buf_index);
+                                comp_param[idx].migbuf.buf_index = 0;
+                                set_common_compress_params(&comp_param[idx],
+                                   ret, len, block, offset, last_stage,
+                                   cont, p, ram_bulk_stage);
+                                comp_param[idx].state = COM_START;
+                                break;
+                            }
+                        }
+                    }
+                    if (idx < compress_thread_count) {
+                        last_sent_block = block;
+                        break;
+                    } else {
+                        g_usleep(0);
+                        goto retry;
+                    }
+                }
             }
         }
     }
@@ -684,7 +1012,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool 
last_stage)
     return bytes_sent;
 }
 
-static uint64_t bytes_transferred;
 
 void acct_update_position(QEMUFile *f, size_t size, bool zero)
 {
@@ -892,6 +1219,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
         i++;
     }
 
+    flush_compressed_data(f);
     qemu_mutex_unlock_ramlist();
 
     /*
@@ -938,6 +1266,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
         bytes_transferred += bytes_sent;
     }
 
+    flush_compressed_data(f);
     ram_control_after_iterate(f, RAM_CONTROL_FINISH);
     migration_end();
 
@@ -1038,10 +1367,61 @@ void ram_handle_compressed(void *host, uint8_t ch, 
uint64_t size)
     }
 }
 
+QemuThread *decompress_threads;
+
+static void *do_data_decompress(void *opaque)
+{
+    decompress_param *param = opaque;
+    while (incomming_migration_done == false) {
+        if (param->state == COM_START) {
+            uLong pagesize = TARGET_PAGE_SIZE;
+            if (uncompress((Bytef *)param->des, &pagesize,
+                    (const Bytef *)param->compbuf, param->len) != Z_OK) {
+                printf("uncompress failed!\n");
+                break;
+            }
+            param->state = COM_DONE;
+        } else {
+            if (quit_thread) {
+                break;
+            }
+            g_usleep(1);
+        }
+    }
+    return NULL;
+}
+
+void migrate_decompress_threads_create(int count)
+{
+    int i;
+    decompress_thread_count = count;
+    decompress_threads = g_malloc0(sizeof(QemuThread) * count);
+    decomp_param = g_malloc0(sizeof(decompress_param) * count);
+    quit_thread = false;
+    for (i = 0; i < count; i++) {
+        qemu_thread_create(decompress_threads + i, "decompress",
+            do_data_decompress, decomp_param + i, QEMU_THREAD_JOINABLE);
+    }
+}
+
+void migrate_decompress_threads_join(void)
+{
+    int i;
+    for (i = 0; i < decompress_thread_count; i++) {
+        qemu_thread_join(decompress_threads + i);
+    }
+    g_free(decompress_threads);
+    g_free(decomp_param);
+    decompress_threads = NULL;
+    decomp_param = NULL;
+}
+
 static int ram_load(QEMUFile *f, void *opaque, int version_id)
 {
     int flags = 0, ret = 0;
     static uint64_t seq_iter;
+    int len = 0;
+    uint8_t compbuf[COMPRESS_BUF_SIZE];
 
     seq_iter++;
 
@@ -1106,6 +1486,7 @@ static int ram_load(QEMUFile *f, void *opaque, int 
version_id)
             ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
             break;
         case RAM_SAVE_FLAG_PAGE:
+            quit_thread = true;
             host = host_from_stream_offset(f, addr, flags);
             if (!host) {
                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
@@ -1115,6 +1496,32 @@ static int ram_load(QEMUFile *f, void *opaque, int 
version_id)
 
             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;
+        case RAM_SAVE_FLAG_COMPRESS_PAGE:
+            host = host_from_stream_offset(f, addr, flags);
+            if (!host) {
+                error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
+                ret = -EINVAL;
+                break;
+            }
+
+            len = qemu_get_be32(f);
+            qemu_get_buffer(f, compbuf, len);
+            int idx;
+retry:
+            for (idx = 0; idx < decompress_thread_count; idx++) {
+                if (decomp_param[idx].state == COM_DONE)  {
+                    memcpy(decomp_param[idx].compbuf, compbuf, len);
+                    decomp_param[idx].des = host;
+                    decomp_param[idx].len = len;
+                    decomp_param[idx].state = COM_START;
+                    break;
+                }
+            }
+            if (idx == decompress_thread_count) {
+                g_usleep(0);
+                goto retry;
+            }
+            break;
         case RAM_SAVE_FLAG_XBZRLE:
             host = host_from_stream_offset(f, addr, flags);
             if (!host) {
diff --git a/hmp-commands.hx b/hmp-commands.hx
index e37bc8b..8b93bed 100644
--- a/hmp-commands.hx
+++ b/hmp-commands.hx
@@ -941,6 +941,56 @@ Set cache size to @var{value} (in bytes) for xbzrle 
migrations.
 ETEXI
 
     {
+        .name       = "migrate_set_compress_level",
+        .args_type  = "value:i",
+        .params     = "value",
+        .help       = "set compress level for compress migrations,"
+                      "the level is a number between 0 and 9, 0 stands for "
+                      "no compression.\n"
+                      "1 stands for the fast compress speed while 9 stands for"
+                      "the highest compress ratio.",
+        .mhandler.cmd = hmp_migrate_set_compress_level,
+    },
+
+STEXI
address@hidden migrate_set_compress_level @var{value}
address@hidden migrate_set_compress_level
+Set compress level to @var{value}  for compress migrations.
+ETEXI
+
+    {
+        .name       = "migrate_set_compress_threads",
+        .args_type  = "value:i",
+        .params     = "value",
+        .help       = "set compress thread count for migrations. "
+                      "a proper thread count will accelerate the migration 
speed,"
+                      "the threads should be between 1 and the CPUS of your 
system",
+        .mhandler.cmd = hmp_migrate_set_compress_threads,
+    },
+
+STEXI
address@hidden migrate_set_compress_threads @var{value}
address@hidden migrate_set_compress_threads
+Set compress threads to @var{value}  for compress migrations.
+ETEXI
+
+    {
+        .name       = "migrate_set_decompress_threads",
+        .args_type  = "value:i",
+        .params     = "value",
+        .help       = "set decompress thread count for migrations. "
+                      "a proper thread count will accelerate the migration 
speed,"
+                      "the threads should be between 1 and the CPUS of your 
system",
+        .mhandler.cmd = hmp_migrate_set_decompress_threads,
+    },
+
+STEXI
address@hidden migrate_set_decompress_threads @var{value}
address@hidden migrate_set_decompress_threads
+Set decompress threads to @var{value}  for compress migrations.
+ETEXI
+
+    {
         .name       = "migrate_set_speed",
         .args_type  = "value:o",
         .params     = "value",
@@ -1766,6 +1816,12 @@ show migration status
 show current migration capabilities
 @item info migrate_cache_size
 show current migration XBZRLE cache size
address@hidden info migrate_compress_level
+show current migration compress level
address@hidden info migrate_compress_threads
+show current migration compress threads
address@hidden info migrate_decompress_threads
+show current migration decompress threads
 @item info balloon
 show balloon information
 @item info qtree
diff --git a/hmp.c b/hmp.c
index 63d7686..b1936a3 100644
--- a/hmp.c
+++ b/hmp.c
@@ -252,6 +252,24 @@ void hmp_info_migrate_cache_size(Monitor *mon, const QDict 
*qdict)
                    qmp_query_migrate_cache_size(NULL) >> 10);
 }
 
+void hmp_info_migrate_compress_level(Monitor *mon, const QDict *qdict)
+{
+    monitor_printf(mon, "compress level: %" PRId64 "\n",
+                   qmp_query_migrate_compress_level(NULL));
+}
+
+void hmp_info_migrate_compress_threads(Monitor *mon, const QDict *qdict)
+{
+    monitor_printf(mon, "compress threads: %" PRId64 "\n",
+                   qmp_query_migrate_compress_threads(NULL));
+}
+
+void hmp_info_migrate_decompress_threads(Monitor *mon, const QDict *qdict)
+{
+    monitor_printf(mon, "decompress threads: %" PRId64 "\n",
+                   qmp_query_migrate_decompress_threads(NULL));
+}
+
 void hmp_info_cpus(Monitor *mon, const QDict *qdict)
 {
     CpuInfoList *cpu_list, *cpu;
@@ -1041,6 +1059,45 @@ void hmp_migrate_set_cache_size(Monitor *mon, const 
QDict *qdict)
     }
 }
 
+void hmp_migrate_set_compress_level(Monitor *mon, const QDict *qdict)
+{
+    int64_t value = qdict_get_int(qdict, "value");
+    Error *err = NULL;
+
+    qmp_migrate_set_compress_level(value, &err);
+    if (err) {
+        monitor_printf(mon, "%s\n", error_get_pretty(err));
+        error_free(err);
+        return;
+    }
+}
+
+void hmp_migrate_set_compress_threads(Monitor *mon, const QDict *qdict)
+{
+    int64_t value = qdict_get_int(qdict, "value");
+    Error *err = NULL;
+
+    qmp_migrate_set_compress_threads(value, &err);
+    if (err) {
+        monitor_printf(mon, "%s\n", error_get_pretty(err));
+        error_free(err);
+        return;
+    }
+}
+
+void hmp_migrate_set_decompress_threads(Monitor *mon, const QDict *qdict)
+{
+    int64_t value = qdict_get_int(qdict, "value");
+    Error *err = NULL;
+
+    qmp_migrate_set_decompress_threads(value, &err);
+    if (err) {
+        monitor_printf(mon, "%s\n", error_get_pretty(err));
+        error_free(err);
+        return;
+    }
+}
+
 void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict)
 {
     int64_t value = qdict_get_int(qdict, "value");
diff --git a/hmp.h b/hmp.h
index 4bb5dca..b348806 100644
--- a/hmp.h
+++ b/hmp.h
@@ -29,6 +29,9 @@ void hmp_info_mice(Monitor *mon, const QDict *qdict);
 void hmp_info_migrate(Monitor *mon, const QDict *qdict);
 void hmp_info_migrate_capabilities(Monitor *mon, const QDict *qdict);
 void hmp_info_migrate_cache_size(Monitor *mon, const QDict *qdict);
+void hmp_info_migrate_compress_level(Monitor *mon, const QDict *qdict);
+void hmp_info_migrate_compress_threads(Monitor *mon, const QDict *qdict);
+void hmp_info_migrate_decompress_threads(Monitor *mon, const QDict *qdict);
 void hmp_info_cpus(Monitor *mon, const QDict *qdict);
 void hmp_info_block(Monitor *mon, const QDict *qdict);
 void hmp_info_blockstats(Monitor *mon, const QDict *qdict);
@@ -64,6 +67,9 @@ void hmp_migrate_set_downtime(Monitor *mon, const QDict 
*qdict);
 void hmp_migrate_set_speed(Monitor *mon, const QDict *qdict);
 void hmp_migrate_set_capability(Monitor *mon, const QDict *qdict);
 void hmp_migrate_set_cache_size(Monitor *mon, const QDict *qdict);
+void hmp_migrate_set_compress_level(Monitor *mon, const QDict *qdict);
+void hmp_migrate_set_compress_threads(Monitor *mon, const QDict *qdict);
+void hmp_migrate_set_decompress_threads(Monitor *mon, const QDict *qdict);
 void hmp_set_password(Monitor *mon, const QDict *qdict);
 void hmp_expire_password(Monitor *mon, const QDict *qdict);
 void hmp_eject(Monitor *mon, const QDict *qdict);
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 3cb5ba8..03c8e0d 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -49,6 +49,9 @@ struct MigrationState
     QemuThread thread;
     QEMUBH *cleanup_bh;
     QEMUFile *file;
+    QemuThread *compress_thread;
+    int compress_thread_count;
+    int compress_level;
 
     int state;
     MigrationParams params;
@@ -64,6 +67,7 @@ struct MigrationState
     int64_t dirty_sync_count;
 };
 
+extern bool incomming_migration_done;
 void process_incoming_migration(QEMUFile *f);
 
 void qemu_start_incoming_migration(const char *uri, Error **errp);
@@ -107,6 +111,10 @@ bool migration_has_finished(MigrationState *);
 bool migration_has_failed(MigrationState *);
 MigrationState *migrate_get_current(void);
 
+void migrate_compress_threads_create(MigrationState *s);
+void migrate_compress_threads_join(MigrationState *s);
+void migrate_decompress_threads_create(int count);
+void migrate_decompress_threads_join(void);
 uint64_t ram_bytes_remaining(void);
 uint64_t ram_bytes_transferred(void);
 uint64_t ram_bytes_total(void);
@@ -144,7 +152,7 @@ void migrate_del_blocker(Error *reason);
 
 bool migrate_rdma_pin_all(void);
 bool migrate_zero_blocks(void);
-
+bool migrate_use_compress(void);
 bool migrate_auto_converge(void);
 
 int xbzrle_encode_buffer(uint8_t *old_buf, uint8_t *new_buf, int slen,
@@ -153,6 +161,8 @@ int xbzrle_decode_buffer(uint8_t *src, int slen, uint8_t 
*dst, int dlen);
 
 int migrate_use_xbzrle(void);
 int64_t migrate_xbzrle_cache_size(void);
+int migrate_compress_level(void);
+int migrate_compress_threads(void);
 
 int64_t xbzrle_cache_resize(int64_t new_size);
 
diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h
index 401676b..431e6cc 100644
--- a/include/migration/qemu-file.h
+++ b/include/migration/qemu-file.h
@@ -112,6 +112,7 @@ QEMUFile *qemu_bufopen(const char *mode, QEMUSizedBuffer 
*input);
 int qemu_get_fd(QEMUFile *f);
 int qemu_fclose(QEMUFile *f);
 int64_t qemu_ftell(QEMUFile *f);
+uint64_t qemu_add_compress(QEMUFile *f,  const uint8_t *p, int size);
 void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size);
 void qemu_put_byte(QEMUFile *f, int v);
 /*
diff --git a/migration.c b/migration.c
index c49a05a..23d892e 100644
--- a/migration.c
+++ b/migration.c
@@ -46,6 +46,12 @@ enum {
 /* Migration XBZRLE default cache size */
 #define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024)
 
+/* Migration compress default thread count */
+#define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8
+#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2
+/*0: means nocompress, 1: best speed, ... 9: best compress ratio */
+#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1
+
 static NotifierList migration_state_notifiers =
     NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
 
@@ -60,6 +66,8 @@ MigrationState *migrate_get_current(void)
         .bandwidth_limit = MAX_THROTTLE,
         .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
         .mbps = -1,
+        .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
+        .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
     };
 
     return &current_migration;
@@ -101,6 +109,7 @@ static void process_incoming_migration_co(void *opaque)
         error_report("load of migration failed: %s", strerror(-ret));
         exit(EXIT_FAILURE);
     }
+    incomming_migration_done = true;
     qemu_announce_self();
 
     /* Make sure all file formats flush their mutable metadata */
@@ -116,10 +125,14 @@ static void process_incoming_migration_co(void *opaque)
     } else {
         runstate_set(RUN_STATE_PAUSED);
     }
+    migrate_decompress_threads_join();
 }
 
+static int uncompress_thread_count = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT;
 void process_incoming_migration(QEMUFile *f)
 {
+    incomming_migration_done = false;
+    migrate_decompress_threads_create(uncompress_thread_count);
     Coroutine *co = qemu_coroutine_create(process_incoming_migration_co);
     int fd = qemu_get_fd(f);
 
@@ -302,6 +315,7 @@ static void migrate_fd_cleanup(void *opaque)
         qemu_thread_join(&s->thread);
         qemu_mutex_lock_iothread();
 
+        migrate_compress_threads_join(s);
         qemu_fclose(s->file);
         s->file = NULL;
     }
@@ -373,6 +387,8 @@ static MigrationState *migrate_init(const MigrationParams 
*params)
     int64_t bandwidth_limit = s->bandwidth_limit;
     bool enabled_capabilities[MIGRATION_CAPABILITY_MAX];
     int64_t xbzrle_cache_size = s->xbzrle_cache_size;
+    int compress_level = s->compress_level;
+    int compress_thread_count = s->compress_thread_count;
 
     memcpy(enabled_capabilities, s->enabled_capabilities,
            sizeof(enabled_capabilities));
@@ -383,6 +399,8 @@ static MigrationState *migrate_init(const MigrationParams 
*params)
            sizeof(enabled_capabilities));
     s->xbzrle_cache_size = xbzrle_cache_size;
 
+    s->compress_level = compress_level;
+    s->compress_thread_count = compress_thread_count;
     s->bandwidth_limit = bandwidth_limit;
     s->state = MIG_STATE_SETUP;
     trace_migrate_set_state(MIG_STATE_SETUP);
@@ -503,6 +521,59 @@ int64_t qmp_query_migrate_cache_size(Error **errp)
     return migrate_xbzrle_cache_size();
 }
 
+void qmp_migrate_set_compress_level(int64_t value, Error **errp)
+{
+    MigrationState *s = migrate_get_current();
+
+    if (value > 9 || value < 0) {
+        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress level",
+                  "is invalid, please input a integer between 0 and 9. ");
+        return;
+    }
+
+    s->compress_level = value;
+}
+
+int64_t qmp_query_migrate_compress_level(Error **errp)
+{
+    return migrate_compress_level();
+}
+
+void qmp_migrate_set_compress_threads(int64_t value, Error **errp)
+{
+    MigrationState *s = migrate_get_current();
+
+    if (value > 255 || value < 1) {
+        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress thread count",
+                  "is invalid, please input a integer between 1 and 255. ");
+        return;
+    }
+
+    s->compress_thread_count = value;
+}
+
+void qmp_migrate_set_decompress_threads(int64_t value, Error **errp)
+{
+
+    if (value > 64 || value < 1) {
+        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "compress thread count",
+                  "is invalid, please input a integer between 1 and 64. ");
+        return;
+    }
+
+    uncompress_thread_count = value;
+}
+
+int64_t qmp_query_migrate_compress_threads(Error **errp)
+{
+    return migrate_compress_threads();
+}
+
+int64_t qmp_query_migrate_decompress_threads(Error **errp)
+{
+    return uncompress_thread_count;
+}
+
 void qmp_migrate_set_speed(int64_t value, Error **errp)
 {
     MigrationState *s;
@@ -555,6 +626,33 @@ bool migrate_zero_blocks(void)
     return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS];
 }
 
+bool migrate_use_compress(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->enabled_capabilities[MIGRATION_CAPABILITY_COMPRESS];
+}
+
+int migrate_compress_level(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->compress_level;
+}
+
+int migrate_compress_threads(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->compress_thread_count;
+}
+
 int migrate_use_xbzrle(void)
 {
     MigrationState *s;
@@ -697,4 +795,5 @@ void migrate_fd_connect(MigrationState *s)
 
     qemu_thread_create(&s->thread, "migration", migration_thread, s,
                        QEMU_THREAD_JOINABLE);
+    migrate_compress_threads_create(s);
 }
diff --git a/monitor.c b/monitor.c
index 1fc201a..4dfde70 100644
--- a/monitor.c
+++ b/monitor.c
@@ -2865,6 +2865,27 @@ static mon_cmd_t info_cmds[] = {
         .mhandler.cmd = hmp_info_migrate_cache_size,
     },
     {
+        .name       = "migrate_compress_level",
+        .args_type  = "",
+        .params     = "",
+        .help       = "show current migration compress level",
+        .mhandler.cmd = hmp_info_migrate_compress_level,
+    },
+    {
+        .name       = "migrate_compress_threads",
+        .args_type  = "",
+        .params     = "",
+        .help       = "show current migration compress thread count",
+        .mhandler.cmd = hmp_info_migrate_compress_threads,
+    },
+    {
+        .name       = "migrate_decompress_threads",
+        .args_type  = "",
+        .params     = "",
+        .help       = "show current migration decompress thread count",
+        .mhandler.cmd = hmp_info_migrate_decompress_threads,
+    },
+    {
         .name       = "balloon",
         .args_type  = "",
         .params     = "",
diff --git a/qapi-schema.json b/qapi-schema.json
index 24379ab..8dbe85c 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -497,7 +497,7 @@
 # Since: 1.2
 ##
 { 'enum': 'MigrationCapability',
-  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks'] }
+  'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks', 
'compress'] }
 
 ##
 # @MigrationCapabilityStatus
@@ -1382,6 +1382,88 @@
 { 'command': 'query-migrate-cache-size', 'returns': 'int' }
 
 ##
+# @migrate-set-compress-level
+#
+# Set compress level
+#
+# @value: compress level int
+#
+# The compress level will be an integer between 0 and 9.
+# The compress level can be modified before and during ongoing migration
+#
+# Returns: nothing on success
+#
+# Since: 1.2
+##
+{ 'command': 'migrate-set-compress-level', 'data': {'value': 'int'} }
+
+##
+# @query-migrate-compress-level
+#
+# query compress level
+#
+# Returns: compress level int
+#
+# Since: 1.2
+##
+{ 'command': 'query-migrate-compress-level', 'returns': 'int' }
+
+##
+# @migrate-set-compress-threads
+#
+# Set compress threads
+#
+# @value: compress threads int
+#
+# The compress thread count is an integer between 1 and 255.
+# The compress level can be modified only before migration
+#
+# Returns: nothing on success
+#
+# Since: 1.2
+##
+{ 'command': 'migrate-set-compress-threads', 'data': {'value': 'int'} }
+
+##
+# @query-migrate-compress-threads
+#
+# query compress threads
+#
+# Returns: compress threads int
+#
+# Since: 1.2
+##
+{ 'command': 'query-migrate-compress-threads', 'returns': 'int' }
+
+##
+##
+# @migrate-set-decompress-threads
+#
+# Set decompress threads
+#
+# @value: decompress threads int
+#
+# The decompress thread count is an integer between 1 and 64.
+# The decompress level can be modified only before migration
+#
+# Returns: nothing on success
+#
+# Since: 1.2
+##
+{ 'command': 'migrate-set-decompress-threads', 'data': {'value': 'int'} }
+
+##
+# @query-migrate-decompress-threads
+#
+# query decompress threads
+#
+# Returns: decompress threads int
+#
+# Since: 1.2
+##
+{ 'command': 'query-migrate-decompress-threads', 'returns': 'int' }
+
+##
 # @ObjectPropertyInfo:
 #
 # @name: the name of the property
diff --git a/qmp-commands.hx b/qmp-commands.hx
index 1abd619..c5c89d6 100644
--- a/qmp-commands.hx
+++ b/qmp-commands.hx
@@ -705,7 +705,138 @@ Example:
 <- { "return": 67108864 }
 
 EQMP
+{
+        .name       = "migrate-set-compress-level",
+        .args_type  = "value:i",
+        .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_level,
+    },
+
+SQMP
+migrate-set-compress-level
+----------------------
+
+Set compress level to be used by compress migration, the compress level is an 
integer
+between 0 and 9
+
+Arguments:
+
+- "value": compress level (json-int)
+
+Example:
+
+-> { "execute": "migrate-set-compress-level", "arguments": { "value": 
536870912 } }
+<- { "return": {} }
+
+EQMP
+    {
+        .name       = "query-migrate-compress-level",
+        .args_type  = "",
+        .mhandler.cmd_new = qmp_marshal_input_query_migrate_compress_level,
+    },
+
+SQMP
+query-migrate-compress-level
+------------------------
+
+Show compress level to be used by compress migration
+
+returns a json-object with the following information:
+- "size" : json-int
+
+Example:
+
+-> { "execute": "query-migrate-compress-level" }
+<- { "return": 67108864 }
+
+EQMP
+{
+        .name       = "migrate-set-compress-threads",
+        .args_type  = "value:i",
+        .mhandler.cmd_new = qmp_marshal_input_migrate_set_compress_threads,
+    },
+
+SQMP
+migrate-set-compress-threads
+----------------------
+
+Set compress thread count to be used by compress migration, the compress 
thread count is an integer
+between 1 and 255
+
+Arguments:
+
+- "value": compress threads (json-int)
+
+Example:
+
+-> { "execute": "migrate-set-compress-threads", "arguments": { "value": 
536870912 } }
+<- { "return": {} }
+
+EQMP
+    {
+        .name       = "query-migrate-compress-threads",
+        .args_type  = "",
+        .mhandler.cmd_new = qmp_marshal_input_query_migrate_compress_threads,
+    },
+
+SQMP
+query-migrate-compress-threads
+------------------------
+
+Show compress thread count to be used by compress migration
+
+returns a json-object with the following information:
+- "size" : json-int
+
+Example:
+
+-> { "execute": "query-migrate-compress-threads" }
+<- { "return": 67108864 }
+
+EQMP
+{
+        .name       = "migrate-set-decompress-threads",
+        .args_type  = "value:i",
+        .mhandler.cmd_new = qmp_marshal_input_migrate_set_decompress_threads,
+    },
+
+SQMP
+migrate-set-decompress-threads
+----------------------
+
+Set decompress thread count to be used by compress migration, the decompress 
thread count is an integer
+between 1 and 64
+
+Arguments:
+
+- "value": decompress threads (json-int)
+
+Example:
+
+-> { "execute": "migrate-set-decompress-threads", "arguments": { "value": 
536870912 } }
+<- { "return": {} }
 
+EQMP
+    {
+        .name       = "query-migrate-decompress-threads",
+        .args_type  = "",
+        .mhandler.cmd_new = qmp_marshal_input_query_migrate_decompress_threads,
+    },
+
+SQMP
+query-migrate-decompress-threads
+------------------------
+
+Show decompress thread count to be used by compress migration
+
+returns a json-object with the following information:
+- "size" : json-int
+
+Example:
+
+-> { "execute": "query-migrate-compress-threads" }
+<- { "return": 67108864 }
+
+EQMP
     {
         .name       = "migrate_set_speed",
         .args_type  = "value:o",
-- 
1.9.1




reply via email to

[Prev in Thread] Current Thread [Next in Thread]