qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH v12 16/21] migration: Synchronize multifd thread


From: Dr. David Alan Gilbert
Subject: Re: [Qemu-devel] [PATCH v12 16/21] migration: Synchronize multifd threads with main thread
Date: Fri, 11 May 2018 17:32:19 +0100
User-agent: Mutt/1.9.5 (2018-04-13)

* Juan Quintela (address@hidden) wrote:
> "Dr. David Alan Gilbert" <address@hidden> wrote:
> > * Juan Quintela (address@hidden) wrote:
> >> We synchronize all threads each RAM_SAVE_FLAG_EOS.  Bitmap
> >> synchronizations don't happen inside a  ram section, so we are safe
> >> about two channels trying to overwrite the same memory.
> >
> > OK, that's quite neat - so you don't need any extra flags in the stream
> > to do the sync;  it probably needs a comment in the code somewhere so we
> > don't forget!
> 
> Thanks.
> 
> >> Signed-off-by: Juan Quintela <address@hidden>
> >> ---
> >>  migration/ram.c        | 118 +++++++++++++++++++++++++++++++++++++----
> >>  migration/trace-events |   6 +++
> >>  2 files changed, 113 insertions(+), 11 deletions(-)
> >> 
> >> diff --git a/migration/ram.c b/migration/ram.c
> >> index c4c185cc4c..398cb0af3b 100644
> >> --- a/migration/ram.c
> >> +++ b/migration/ram.c
> >> @@ -405,6 +405,8 @@ static void compress_threads_save_setup(void)
> >>  #define MULTIFD_MAGIC 0x11223344U
> >>  #define MULTIFD_VERSION 1
> >>  
> >> +#define MULTIFD_FLAG_SYNC (1 << 0)
> >> +
> >>  typedef struct {
> >>      uint32_t magic;
> >>      uint32_t version;
> >> @@ -471,6 +473,8 @@ typedef struct {
> >>      uint32_t num_packets;
> >>      /* pages sent through this channel */
> >>      uint32_t num_pages;
> >> +    /* syncs main thread and channels */
> >> +    QemuSemaphore sem_sync;
> >>  }  MultiFDSendParams;
> >>  
> >>  typedef struct {
> >> @@ -507,6 +511,8 @@ typedef struct {
> >>      uint32_t num_packets;
> >>      /* pages sent through this channel */
> >>      uint32_t num_pages;
> >> +    /* syncs main thread and channels */
> >> +    QemuSemaphore sem_sync;
> >>  } MultiFDRecvParams;
> >>  
> >>  static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
> >> @@ -682,6 +688,10 @@ struct {
> >>      int count;
> >>      /* array of pages to sent */
> >>      MultiFDPages_t *pages;
> >> +    /* syncs main thread and channels */
> >> +    QemuSemaphore sem_sync;
> >> +    /* global number of generated multifd packets */
> >> +    uint32_t seq;
> >
> > It's interesting you use the same comment for 'seq' in
> > MultiFDSendParams - but I guess that means only this one is the global
> > version and the others aren't really global number - they're just
> > local to that thread?
> 
> Only place that "increases/generates" seq is multifd_send_pages(), that
> is what creates a new packet to be sent.  So, if we see _any_ packet on
> the wire, we know the real global ordering.  They are only used for
> traces, to se that packet 42 was sent through channel 3, and on
> reception you check that packet 42 is what you received through channel
> 3.  They only appears on traces, but I find they useful for debugging
> synchcronization errors.

Ah, and multifd_send_pages is the main thread, and it always operates
on the multifd_send_state->seq and then passes it to the SendParams; OK.
I'm not sure how to explain that better; but it's a little confusing.

> >> +    for (i = 0; i < migrate_multifd_channels(); i++) {
> >> +        MultiFDSendParams *p = &multifd_send_state->params[i];
> >> +
> >> +        trace_multifd_send_sync_main_signal(p->id);
> >> +
> >> +        qemu_mutex_lock(&p->mutex);
> >> +        p->flags |= MULTIFD_FLAG_SYNC;
> >> +        p->pending_job++;
> >> +        qemu_mutex_unlock(&p->mutex);
> >> +        qemu_sem_post(&p->sem);
> >> +    }
> 
> [1]
> 
> >> +    for (i = 0; i < migrate_multifd_channels(); i++) {
> >> +        MultiFDSendParams *p = &multifd_send_state->params[i];
> >> +
> >> +        trace_multifd_send_sync_main_wait(p->id);
> >> +        qemu_sem_wait(&multifd_send_state->sem_sync);
> >> +    }
> 
> [2]
> 
> >> +    trace_multifd_send_sync_main(multifd_send_state->seq);
> >> +}
> >> +
> >
> > OK, so this just makes each of the sending threads ack, so that seems
> > OK.
> > But what happens with an error? multifd_send_sync_main exits it's
> > loop with a 'break' if the writes fail, and that could mean they never
> > come and post the flag-sync sem.
> 
> Let's see.
> 
> [1]: we are just doing mutex_lock/sem_post(), if we are not able to do
> that, we have got a big race that needs to be fixed.  So that bit is ok.
> 
> [2]: We do an unconditional sem_wait().  Looking at the worker code.
>      In this patch level, we are ok, but I agree with you than in later
>      patches, we need to also do the post on the error case.  Changing.
K.


> >> +
> >> +        trace_multifd_recv_sync_main_wait(p->id);
> >> +        qemu_sem_wait(&multifd_recv_state->sem_sync);
> >> +        qemu_mutex_lock(&p->mutex);
> >> +        if (multifd_recv_state->seq < p->seq) {
> >> +            multifd_recv_state->seq = p->seq;
> >> +        }
> >
> > Can you explain what this is for?
> > Something like the latest received block?
> 
> When we are at a synhronization point, we don't know on the main thread
> when that synchronization happened (at what packet considered as a
> logical list of packages).  So, we choose 'seq' from the channel with
> the highest number.   That is the one that we want.  We only use this
> for tracing, so we can "match" that we did a synchronization on the send
> side at packet N and we see the trace at reception side that we did it
> at packet N also.

OK, I think I see; again, this code is main thread, and it's
going around all the subthreads; so it's updating the central copy
seeing who has been received - OK.

> Remember than in a  previous patch you asked me what happened if this
> does a wark around?  At that point nothing.  But now I need to change
> this code to be.
> 
> 
>     multifd_recv_state->seq = 0;
>     for (i = 0; i < migrate_multifd_channels(); i++) {
>         MultiFDRecvParams *p = &multifd_recv_state->params[i];
>         ...
>         if (multifd_recv_state->seq < p->seq) {
>             multifd_recv_state->seq = p->seq;
>         }
> 
> And I have fixed the workaround problem, no?

Yes.  Adding a note somewhat saying it's just for debug would help as
well.

> >> @@ -933,9 +1019,8 @@ static void *multifd_recv_thread(void *opaque)
> >>      trace_multifd_recv_thread_start(p->id);
> >>  
> >>      while (true) {
> >> -        qemu_sem_wait(&p->sem);
> >>          qemu_mutex_lock(&p->mutex);
> >> -        if (p->pending_job) {
> >> +        if (true || p->pending_job) {
> >
> > A TODO I guess???
> 
> Oops, that should be out.
> 
> Fixed on next version.
> 
> >>              uint32_t used;
> >>              uint32_t flags;
> >>              qemu_mutex_unlock(&p->mutex);
> >> @@ -956,14 +1041,18 @@ static void *multifd_recv_thread(void *opaque)
> >>              p->num_packets++;
> >>              p->num_pages += used;
> >>              qemu_mutex_unlock(&p->mutex);
> >> +
> >> +            if (flags & MULTIFD_FLAG_SYNC) {
> >> +                qemu_sem_post(&multifd_recv_state->sem_sync);
> >> +                qemu_sem_wait(&p->sem_sync);
> >> +            }
> >
> > Can you explain the receive side logic - I think this is waiting for all
> > receive threads to 'ack' - but how do we know that they've finished
> > receiving all data that was sent?
> 
> Because they need to receive a packet with MULTIFD_FLAG_SYNC sent.  And
> if they receive that flag, we know that is the last one of the sequence.
> 
> synchrconization works like (2 channels to make things easy):
> 
>                 main thread:
>                 we finish a RAM_SECTION;
>                 flush pending packets to one of the channels
>                 send packet with MULTIFD_FLAG_SYNC for all the channels
>                 wait unil all the channels have processesed the FLAG_SYNC
>                 At this point send the RAM_SECTION_EOS footer.
> 
> worker1                                                worker 2
> 
> if there is a pending packet, send it                  if there is a pending 
> packet, send it
> (notice that there can't be more than one ever)
> send a pacet with SYNC flag set                        send a pacet with SYNC 
> flag set
> 
> On recetpion side
> 
> 
>               main thread
>               receives RAM_SECTION_EOS footer
>               wait for works to receive a sync
> 
> worker1                                                worker1
> process any pending packet(no sync)                    process any pending 
> packet(no sync)
> process packet with SYNC                               process packet with 
> SYNC
> post main thread                                       post main thread
> 
>               now main thread can continue
> 
> Notice that we don't care what happens first, receiving packet with SYNC
> in workeers or RAM_SECTION_EOS on main thread, all works as expected.
> 
> Noticing how long took to explain this, I think that I am going to add
> this to migration documentation.  Will wait for any question you had
> before adding it.

Thanks; that I think makes sense.

Dave

> Later, Juan.
--
Dr. David Alan Gilbert / address@hidden / Manchester, UK



reply via email to

[Prev in Thread] Current Thread [Next in Thread]