[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [v6 08/14] migration: Add the core code of multi-thread
From: |
Juan Quintela |
Subject: |
Re: [Qemu-devel] [v6 08/14] migration: Add the core code of multi-thread compression |
Date: |
Wed, 25 Mar 2015 12:47:57 +0100 |
User-agent: |
Gnus/5.13 (Gnus v5.13) Emacs/24.4 (gnu/linux) |
Liang Li <address@hidden> wrote:
> Implement the core logic of the multiple thread compression. At this
> point, multiple thread compression can't co-work with xbzrle yet.
>
> Signed-off-by: Liang Li <address@hidden>
> Signed-off-by: Yang Zhang <address@hidden>
> ---
> arch_init.c | 184
> +++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
> 1 file changed, 177 insertions(+), 7 deletions(-)
>
> diff --git a/arch_init.c b/arch_init.c
> index 48cae22..9f63c0f 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -355,12 +355,33 @@ static DecompressParam *decomp_param;
> static QemuThread *decompress_threads;
> static uint8_t *compressed_data_buf;
>
> +static int do_compress_ram_page(CompressParam *param);
> +
> static void *do_data_compress(void *opaque)
> {
> - while (!quit_comp_thread) {
> + CompressParam *param = opaque;
>
> - /* To be done */
What is the different with changing this loop to:
> + while (!quit_comp_thread) {
Here we don't have quit_comp_thread protected by anything.
> + qemu_mutex_lock(¶m->mutex);
> + /* Re-check the quit_comp_thread in case of
> + * terminate_compression_threads is called just before
> + * qemu_mutex_lock(¶m->mutex) and after
> + * while(!quit_comp_thread), re-check it here can make
> + * sure the compression thread terminate as expected.
> + */
> + while (!param->start && !quit_comp_thread) {
Here and next use is protected by param->mutex, but param is per
compression thread, so, it is not really protected.
> + qemu_cond_wait(¶m->cond, ¶m->mutex);
> + }
> + if (!quit_comp_thread) {
> + do_compress_ram_page(param);
> + }
> + param->start = false;
param->start is pretected by param->mutex everywhere
> + qemu_mutex_unlock(¶m->mutex);
>
> + qemu_mutex_lock(comp_done_lock);
> + param->done = true;
param->done protected by comp_done_lock
> + qemu_cond_signal(comp_done_cond);
> + qemu_mutex_unlock(comp_done_lock);
> }
>
> return NULL;
> @@ -368,9 +389,15 @@ static void *do_data_compress(void *opaque)
>
> static inline void terminate_compression_threads(void)
> {
> - quit_comp_thread = true;
> + int idx, thread_count;
>
> - /* To be done */
> + thread_count = migrate_compress_threads();
> + quit_comp_thread = true;
quite_comp_thread not protected again.
> + for (idx = 0; idx < thread_count; idx++) {
> + qemu_mutex_lock(&comp_param[idx].mutex);
> + qemu_cond_signal(&comp_param[idx].cond);
> + qemu_mutex_unlock(&comp_param[idx].mutex);
> + }
> }
>
> void migrate_compress_threads_join(void)
> @@ -420,6 +447,7 @@ void migrate_compress_threads_create(void)
> * it's ops to empty.
> */
> comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
> + comp_param[i].done = true;
> qemu_mutex_init(&comp_param[i].mutex);
> qemu_cond_init(&comp_param[i].cond);
> qemu_thread_create(compress_threads + i, "compress",
> @@ -829,6 +857,97 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block,
> ram_addr_t offset,
> return pages;
> }
>
> +static int do_compress_ram_page(CompressParam *param)
> +{
> + int bytes_sent, blen;
> + uint8_t *p;
> + RAMBlock *block = param->block;
> + ram_addr_t offset = param->offset;
> +
> + p = memory_region_get_ram_ptr(block->mr) + (offset & TARGET_PAGE_MASK);
> +
> + bytes_sent = save_page_header(param->file, block, offset |
> + RAM_SAVE_FLAG_COMPRESS_PAGE);
> + blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
> + migrate_compress_level());
> + bytes_sent += blen;
> + atomic_inc(&acct_info.norm_pages);
> +
> + return bytes_sent;
> +}
> +
> +static inline void start_compression(CompressParam *param)
> +{
> + param->done = false;
Not protected (well, its caller have protected it by comp_done_lock.
> + qemu_mutex_lock(¶m->mutex);
> + param->start = true;
> + qemu_cond_signal(¶m->cond);
> + qemu_mutex_unlock(¶m->mutex);
> +}
> +
> +
> +static uint64_t bytes_transferred;
> +
> +static void flush_compressed_data(QEMUFile *f)
> +{
> + int idx, len, thread_count;
> +
> + if (!migrate_use_compression()) {
> + return;
> + }
> + thread_count = migrate_compress_threads();
> + for (idx = 0; idx < thread_count; idx++) {
> + if (!comp_param[idx].done) {
done is not protected here.
> + qemu_mutex_lock(comp_done_lock);
> + while (!comp_param[idx].done && !quit_comp_thread) {
Now, it is under comp_done_lock. Bun none of its other uses is
protected by it.
And here done is proteced by comp_done_cond
> + qemu_cond_wait(comp_done_cond, comp_done_lock);
> + }
> + qemu_mutex_unlock(comp_done_lock);
> + }
> + if (!quit_comp_thread) {
Here, it is unprotected again.
> + len = qemu_put_qemu_file(f, comp_param[idx].file);
> + bytes_transferred += len;
> + }
> + }
> +}
> +
> +static inline void set_compress_params(CompressParam *param, RAMBlock *block,
> + ram_addr_t offset)
> +{
> + param->block = block;
> + param->offset = offset;
> +}
> +
> +static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
> + ram_addr_t offset,
> + uint64_t *bytes_transferred)
> +{
> + int idx, thread_count, bytes_xmit = -1, pages = -1;
> +
> + thread_count = migrate_compress_threads();
> + qemu_mutex_lock(comp_done_lock);
> + while (true) {
> + for (idx = 0; idx < thread_count; idx++) {
> + if (comp_param[idx].done) {
> + bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file);
> + set_compress_params(&comp_param[idx], block, offset);
> + start_compression(&comp_param[idx]);
> + pages = 1;
> + *bytes_transferred += bytes_xmit;
> + break;
> + }
> + }
> + if (pages > 0) {
> + break;
> + } else {
> + qemu_cond_wait(comp_done_cond, comp_done_lock);
> + }
> + }
> + qemu_mutex_unlock(comp_done_lock);
> +
> + return pages;
> +}
> +
> /**
> * ram_save_compressed_page: compress the given page and send it to the
> stream
> *
> @@ -845,8 +964,59 @@ static int ram_save_compressed_page(QEMUFile *f,
> RAMBlock *block,
> uint64_t *bytes_transferred)
> {
> int pages = -1;
> + uint64_t bytes_xmit;
> + MemoryRegion *mr = block->mr;
> + uint8_t *p;
> + int ret;
> +
> + p = memory_region_get_ram_ptr(mr) + offset;
>
> - /* To be done*/
> + bytes_xmit = 0;
> + ret = ram_control_save_page(f, block->offset,
> + offset, TARGET_PAGE_SIZE, &bytes_xmit);
> + if (bytes_xmit) {
> + *bytes_transferred += bytes_xmit;
> + pages = 1;
> + }
> + if (block == last_sent_block) {
> + offset |= RAM_SAVE_FLAG_CONTINUE;
> + }
> + if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
> + if (ret != RAM_SAVE_CONTROL_DELAYED) {
> + if (bytes_xmit > 0) {
> + acct_info.norm_pages++;
> + } else if (bytes_xmit == 0) {
> + acct_info.dup_pages++;
> + }
> + }
> + } else {
> + /* When starting the process of a new block, the first page of
> + * the block should be sent out before other pages in the same
> + * block, and all the pages in last block should have been sent
> + * out, keeping this order is important, because the 'cont' flag
> + * is used to avoid resending the block name.
> + */
> + if (block != last_sent_block) {
> + flush_compressed_data(f);
> + pages = save_zero_page(f, block, offset, p, bytes_transferred);
> + if (pages == -1) {
> + set_compress_params(&comp_param[0], block, offset);
> + /* Use the qemu thread to compress the data to make sure the
> + * first page is sent out before other pages
> + */
> + bytes_xmit = do_compress_ram_page(&comp_param[0]);
> + qemu_put_qemu_file(f, comp_param[0].file);
> + *bytes_transferred += bytes_xmit;
> + pages = 1;
> + }
> + } else {
> + pages = save_zero_page(f, block, offset, p, bytes_transferred);
> + if (pages == -1) {
> + pages = compress_page_with_multi_thread(f, block, offset,
> + bytes_transferred);
> + }
> + }
> + }
>
> return pages;
> }
> @@ -914,8 +1084,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool
> last_stage,
> return pages;
> }
>
> -static uint64_t bytes_transferred;
> -
> void acct_update_position(QEMUFile *f, size_t size, bool zero)
> {
> uint64_t pages = size / TARGET_PAGE_SIZE;
> @@ -1129,6 +1297,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
> }
> i++;
> }
> + flush_compressed_data(f);
> rcu_read_unlock();
>
> /*
> @@ -1170,6 +1339,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
> }
> }
>
> + flush_compressed_data(f);
> ram_control_after_iterate(f, RAM_CONTROL_FINISH);
> migration_end();
[Qemu-devel] [v6 03/14] migration: Add the framework of multi-thread decompression, Liang Li, 2015/03/23
[Qemu-devel] [v6 11/14] migration: Add interface to control compression, Liang Li, 2015/03/23
[Qemu-devel] [v6 04/14] qemu-file: Add compression functions to QEMUFile, Liang Li, 2015/03/23
[Qemu-devel] [v6 13/14] migration: Add qmp commands to set and query parameters, Liang Li, 2015/03/23
[Qemu-devel] [v6 10/14] migration: Add the core code for decompression, Liang Li, 2015/03/23