[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [v5 08/12] migration: Add the core code of multi-thread
From: |
Li, Liang Z |
Subject: |
Re: [Qemu-devel] [v5 08/12] migration: Add the core code of multi-thread compression |
Date: |
Thu, 12 Feb 2015 07:43:38 +0000 |
> -----Original Message-----
> From: Juan Quintela [mailto:address@hidden
> Sent: Wednesday, February 11, 2015 7:45 PM
> To: Li, Liang Z
> Cc: address@hidden; address@hidden; address@hidden;
> address@hidden; address@hidden; address@hidden; Zhang,
> Yang Z
> Subject: Re: [v5 08/12] migration: Add the core code of multi-thread
> compression
>
> Liang Li <address@hidden> wrote:
> > Implement the core logic of the multiple thread compression. At this
> > point, multiple thread compression can't co-work with xbzrle yet.
> >
> > Signed-off-by: Liang Li <address@hidden>
> > Signed-off-by: Yang Zhang <address@hidden>
>
>
> > --- a/arch_init.c
> > +++ b/arch_init.c
> > @@ -363,18 +363,44 @@ static QemuMutex *comp_done_lock; static
> > QemuCond *comp_done_cond;
> > /* The empty QEMUFileOps will be used by file in CompressParam */
> > static const QEMUFileOps empty_ops = { };
> > +
> > +/* one_byte_count is used to count the bytes that is added to
> > + * bytes_transferred but not actually transferred, at the proper
> > + * time, we should sub one_byte_count from bytes_transferred to
> > + * make bytes_transferred accurate.
> > + */
> > +static int one_byte_count;
>
> With the changes proposed previously to ram_save_compressed_page() this
> shouldn't be needed. It can return 0 now.
>
> > +static int do_compress_ram_page(CompressParam *param);
> > +
> > static void *do_data_compress(void *opaque) {
> > - while (!quit_comp_thread) {
> > -
> > - /* To be done */
> > + CompressParam *param = opaque;
> >
> > + while (!quit_comp_thread) {
> > + qemu_mutex_lock(¶m->mutex);
> > + /* Re-check the quit_comp_thread in case of
> > + * terminate_compression_threads is called just before
> > + * qemu_mutex_lock(¶m->mutex) and after
> > + * while(!quit_comp_thread), re-check it here can make
> > + * sure the compression thread terminate as expected.
> > + */
> > + while (!param->busy && !quit_comp_thread) {
> > + qemu_cond_wait(¶m->cond, ¶m->mutex);
> > + }
> > + qemu_mutex_unlock(¶m->mutex);
> > + if (!quit_comp_thread) {
> > + do_compress_ram_page(param);
> > + }
> > + qemu_mutex_lock(comp_done_lock);
> > + param->busy = false;
> > + qemu_cond_signal(comp_done_cond);
> > + qemu_mutex_unlock(comp_done_lock);
> > }
> >
> > return NULL;
> > @@ -382,9 +408,15 @@ static void *do_data_compress(void *opaque)
> >
> > static inline void terminate_compression_threads(void)
> > {
> > - quit_comp_thread = true;
> > + int idx, thread_count;
> >
> > - /* To be done */
> > + thread_count = migrate_compress_threads();
> > + quit_comp_thread = true;
> > + for (idx = 0; idx < thread_count; idx++) {
> > + qemu_mutex_lock(&comp_param[idx].mutex);
> > + qemu_cond_signal(&comp_param[idx].cond);
> > + qemu_mutex_unlock(&comp_param[idx].mutex);
> > + }
> > }
> >
> > void migrate_compress_threads_join(MigrationState *s) @@ -770,12
> > +802,157 @@ static int ram_save_page(QEMUFile *f, RAMBlock *block,
> ram_addr_t offset,
> > return bytes_sent;
> > }
> >
> > +static int do_compress_ram_page(CompressParam *param) {
> > + int bytes_sent, cont;
> > + int blen;
> > + uint8_t *p;
> > + RAMBlock *block = param->block;
> > + ram_addr_t offset = param->offset;
> > +
> > + cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> > + p = memory_region_get_ram_ptr(block->mr) + offset;
> > +
> > + bytes_sent = save_block_hdr(param->file, block, offset, cont,
> > + RAM_SAVE_FLAG_COMPRESS_PAGE);
> > + blen = qemu_put_compression_data(param->file, p,
> TARGET_PAGE_SIZE,
> > + migrate_compress_level());
> > + bytes_sent += blen;
> > + atomic_inc(&acct_info.norm_pages);
> > +
> > + return bytes_sent;
> > +}
> > +
> > +static inline void start_compression(CompressParam *param) {
> > + qemu_mutex_lock(¶m->mutex);
> > + param->busy = true;
> > + qemu_cond_signal(¶m->cond);
> > + qemu_mutex_unlock(¶m->mutex); }
> > +
> > +
> > +static uint64_t bytes_transferred;
> > +
> > +static void flush_compressed_data(QEMUFile *f) {
> > + int idx, len, thread_count;
> > +
> > + if (!migrate_use_compression()) {
> > + return;
> > + }
> > + thread_count = migrate_compress_threads();
> > + for (idx = 0; idx < thread_count; idx++) {
> > + if (comp_param[idx].busy) {
> > + qemu_mutex_lock(comp_done_lock);
> > + while (comp_param[idx].busy && !quit_comp_thread) {
> > + qemu_cond_wait(comp_done_cond, comp_done_lock);
> > + }
> > + qemu_mutex_unlock(comp_done_lock);
> > + }
>
> If we arrive here because quit_comp_thread == true, shouldn't we skip the
> qemu_put_qemu_file()?
>
> > + len = qemu_put_qemu_file(f, comp_param[idx].file);
> > + bytes_transferred += len;
> > + }
>
> [remove one_byte stuff here]
>
> > +}
> > +
> > +static inline void set_compress_params(CompressParam *param,
> RAMBlock *block,
> > + ram_addr_t offset) {
> > + param->block = block;
> > + param->offset = offset;
> > +}
> > +
> > +static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock
> *block,
> > + ram_addr_t offset) {
> > + int idx, thread_count, bytes_sent = 0;
> > +
> > + thread_count = migrate_compress_threads();
> > + qemu_mutex_lock(comp_done_lock);
> > + while (true) {
> > + for (idx = 0; idx < thread_count; idx++) {
> > + if (!comp_param[idx].busy) {
> > + bytes_sent = qemu_put_qemu_file(f, comp_param[idx].file);
> > + set_compress_params(&comp_param[idx], block, offset);
> > + start_compression(&comp_param[idx]);
>
> [remove stuff here]
>
> > + break;
> > + }
> > + }
> > + if (bytes_sent > 0) {
>
> Change this to:
> if (bytes_sent >= 0) {
>
> > + break;
> > + } else {
> > + qemu_cond_wait(comp_done_cond, comp_done_lock);
> > + }
> > + }
> > + qemu_mutex_unlock(comp_done_lock);
> > +
> > + return bytes_sent;
> > +}
> > +
> > static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
> > ram_addr_t offset, bool
> > last_stage) {
> > int bytes_sent = -1;
> > + MemoryRegion *mr = block->mr;
> > + uint8_t *p;
> > + int ret;
> > + int cont;
> >
> > - /* To be done*/
> > + p = memory_region_get_ram_ptr(mr) + offset;
> > + cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
> > + ret = ram_control_save_page(f, block->offset,
> > + offset, TARGET_PAGE_SIZE, &bytes_sent);
> > + if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
> > + if (ret != RAM_SAVE_CONTROL_DELAYED) {
> > + if (bytes_sent > 0) {
> > + acct_info.norm_pages++;
> > + } else if (bytes_sent == 0) {
> > + acct_info.dup_pages++;
> > + }
> > + }
> > + } else {
> > + /* When starting the process of a new block, the first page of
> > + * the block should be sent out before other pages in the same
> > + * block, and all the pages in last block should have been sent
> > + * out, keeping this order is important, because the 'cont' flag
> > + * is used to avoid resending the block name.
> > + */
> > + if (block != last_sent_block) {
> > + flush_compressed_data(f);
> > + bytes_sent = save_zero_page(f, block, offset, p, cont);
> > + if (bytes_sent == -1) {
> > + set_compress_params(&comp_param[0], block, offset);
> > + /* Use the qemu thread to compress the data to make sure
> > the
> > + * first page is sent out before other pages
> > + */
> > + bytes_sent = do_compress_ram_page(&comp_param[0]);
> > + if (bytes_sent > 0) {
>
> This test is not needed
>
> assert(bytes_sent>0)
>
> or how can it be zero or negative here? So, we have to always call
> qemu_put_qemu_file() no?
>
> > + qemu_put_qemu_file(f, comp_param[0].file);
> > + }
> > + }
> > + } else {
> > + bytes_sent = save_zero_page(f, block, offset, p, cont);
> > + if (bytes_sent == -1) {
> > + bytes_sent = compress_page_with_multi_thread(f, block,
> > offset);
> > + }
> > + }
> > + }
> >
> > return bytes_sent;
> > }
> > @@ -834,8 +1011,6 @@ static int ram_find_and_save_block(QEMUFile *f,
> bool last_stage)
> > return bytes_sent;
> > }
> >
> > -static uint64_t bytes_transferred;
> > -
> > void acct_update_position(QEMUFile *f, size_t size, bool zero) {
> > uint64_t pages = size / TARGET_PAGE_SIZE; @@ -1043,6 +1218,7 @@
> > static int ram_save_iterate(QEMUFile *f, void *opaque)
> > i++;
> > }
> >
> > + flush_compressed_data(f);
> > qemu_mutex_unlock_ramlist();
> >
> > /*
> > @@ -1089,6 +1265,7 @@ static int ram_save_complete(QEMUFile *f, void
> *opaque)
> > bytes_transferred += bytes_sent;
> > }
> >
> > + flush_compressed_data(f);
> > ram_control_after_iterate(f, RAM_CONTROL_FINISH);
> > migration_end();
>
>
> I thihnk this would make the code work, but not the locking. You are using
> here:
>
> quit_comp_thread: global, and not completely clear what protects it
> comp_done_lock: global
> comp_done_cond: global
>
> param[i].busy: I would suggest renaming to pending work
> param[i].mutex:
> param[i].cond:
> thread is waiting for work
>
>
> Issues:
>
> param->busy is protected on do_data_compress() and start_compression()
> with param->busy, but in flush_compressed_data() and
> comress_page_with_multithread() it is protected by comp_done_lock.
>
> At this point, I would suggest to just drop param[i].mutex and use
> everywhere comp_done_lock. We can make locking granularly later if
> needed, but 1st get it correct?
>
> Code basically does (forget termination and locking)
>
> each compression_thread()
>
> while(1) {
> while(!work_to_do)
> wait_for_work
> do_work
> }
>
> And the main thread does:
>
>
> while(1) {
> foreacth compression_thread {
> if thread free {
> put it to work
> break;
> }
> wait_for_thread_to_finish
> }
> }
>
> Notice how we are walking all threads each time that we need to do anything
>
> Perhaps code should be more simple if we put the data that needs to be
> done on a global variable and change this to:
>
> compression_thread
>
> while(1) {
> while(!work_to_do)
> wait_for_work
> pick work from global variable
> wakeup main thread
> do_work
> }
>
> main thread:
>
> put work on global variable
> while(nobody_pick_thework) {
> signal all threads
> wait for a compression thread to take the work }
>
> Why? because then we only have a global mutex and two condition variables,
> with a clear semantics:
> - lock protects two conditions and global variable with work
> - one condition where threads wait for work
> - one condition where main thread wait for a worker to be ready
>
> As we would need to lock every single tame to put the work in the global
> variable, to wait or to pick up the work, we can stop all the:
>
> if (!foo) {
> mutex_lock
> if(!foo) /* this time with lock */
> ....
> }
>
>
> Sorry for the very long mail, if it makes you feel better, this is the second
> time that I wrote it, because the 1st version, my locking proposal didn't
> worked correctly.
>
> What do you think?
It sounds good, I will try according to your suggestion. Thanks for your
detail explanation :)
Liang
- Re: [Qemu-devel] [v5 05/12] arch_init: Alloc and free data struct for compression, (continued)
[Qemu-devel] [v5 06/12] arch_init: Add and free data struct for decompression, Liang Li, 2015/02/10
[Qemu-devel] [v5 07/12] migration: Split the function ram_save_page, Liang Li, 2015/02/10
[Qemu-devel] [v5 08/12] migration: Add the core code of multi-thread compression, Liang Li, 2015/02/10
[Qemu-devel] [v5 09/12] migration: Make compression co-work with xbzrle, Liang Li, 2015/02/10
[Qemu-devel] [v5 10/12] migration: Add the core code for decompression, Liang Li, 2015/02/10
[Qemu-devel] [v5 11/12] migration: Add interface to control compression, Liang Li, 2015/02/10
[Qemu-devel] [v5 12/12] migration: Add commands to set and query parameter, Liang Li, 2015/02/10