[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH v12 16/21] migration: Synchronize multifd thread
From: |
Juan Quintela |
Subject: |
Re: [Qemu-devel] [PATCH v12 16/21] migration: Synchronize multifd threads with main thread |
Date: |
Wed, 09 May 2018 21:45:00 +0200 |
User-agent: |
Gnus/5.13 (Gnus v5.13) Emacs/25.3 (gnu/linux) |
"Dr. David Alan Gilbert" <address@hidden> wrote:
> * Juan Quintela (address@hidden) wrote:
>> We synchronize all threads each RAM_SAVE_FLAG_EOS. Bitmap
>> synchronizations don't happen inside a ram section, so we are safe
>> about two channels trying to overwrite the same memory.
>
> OK, that's quite neat - so you don't need any extra flags in the stream
> to do the sync; it probably needs a comment in the code somewhere so we
> don't forget!
Thanks.
>> Signed-off-by: Juan Quintela <address@hidden>
>> ---
>> migration/ram.c | 118 +++++++++++++++++++++++++++++++++++++----
>> migration/trace-events | 6 +++
>> 2 files changed, 113 insertions(+), 11 deletions(-)
>>
>> diff --git a/migration/ram.c b/migration/ram.c
>> index c4c185cc4c..398cb0af3b 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -405,6 +405,8 @@ static void compress_threads_save_setup(void)
>> #define MULTIFD_MAGIC 0x11223344U
>> #define MULTIFD_VERSION 1
>>
>> +#define MULTIFD_FLAG_SYNC (1 << 0)
>> +
>> typedef struct {
>> uint32_t magic;
>> uint32_t version;
>> @@ -471,6 +473,8 @@ typedef struct {
>> uint32_t num_packets;
>> /* pages sent through this channel */
>> uint32_t num_pages;
>> + /* syncs main thread and channels */
>> + QemuSemaphore sem_sync;
>> } MultiFDSendParams;
>>
>> typedef struct {
>> @@ -507,6 +511,8 @@ typedef struct {
>> uint32_t num_packets;
>> /* pages sent through this channel */
>> uint32_t num_pages;
>> + /* syncs main thread and channels */
>> + QemuSemaphore sem_sync;
>> } MultiFDRecvParams;
>>
>> static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
>> @@ -682,6 +688,10 @@ struct {
>> int count;
>> /* array of pages to sent */
>> MultiFDPages_t *pages;
>> + /* syncs main thread and channels */
>> + QemuSemaphore sem_sync;
>> + /* global number of generated multifd packets */
>> + uint32_t seq;
>
> It's interesting you use the same comment for 'seq' in
> MultiFDSendParams - but I guess that means only this one is the global
> version and the others aren't really global number - they're just
> local to that thread?
Only place that "increases/generates" seq is multifd_send_pages(), that
is what creates a new packet to be sent. So, if we see _any_ packet on
the wire, we know the real global ordering. They are only used for
traces, to se that packet 42 was sent through channel 3, and on
reception you check that packet 42 is what you received through channel
3. They only appears on traces, but I find they useful for debugging
synchcronization errors.
>> + for (i = 0; i < migrate_multifd_channels(); i++) {
>> + MultiFDSendParams *p = &multifd_send_state->params[i];
>> +
>> + trace_multifd_send_sync_main_signal(p->id);
>> +
>> + qemu_mutex_lock(&p->mutex);
>> + p->flags |= MULTIFD_FLAG_SYNC;
>> + p->pending_job++;
>> + qemu_mutex_unlock(&p->mutex);
>> + qemu_sem_post(&p->sem);
>> + }
[1]
>> + for (i = 0; i < migrate_multifd_channels(); i++) {
>> + MultiFDSendParams *p = &multifd_send_state->params[i];
>> +
>> + trace_multifd_send_sync_main_wait(p->id);
>> + qemu_sem_wait(&multifd_send_state->sem_sync);
>> + }
[2]
>> + trace_multifd_send_sync_main(multifd_send_state->seq);
>> +}
>> +
>
> OK, so this just makes each of the sending threads ack, so that seems
> OK.
> But what happens with an error? multifd_send_sync_main exits it's
> loop with a 'break' if the writes fail, and that could mean they never
> come and post the flag-sync sem.
Let's see.
[1]: we are just doing mutex_lock/sem_post(), if we are not able to do
that, we have got a big race that needs to be fixed. So that bit is ok.
[2]: We do an unconditional sem_wait(). Looking at the worker code.
In this patch level, we are ok, but I agree with you than in later
patches, we need to also do the post on the error case. Changing.
>> +
>> + trace_multifd_recv_sync_main_wait(p->id);
>> + qemu_sem_wait(&multifd_recv_state->sem_sync);
>> + qemu_mutex_lock(&p->mutex);
>> + if (multifd_recv_state->seq < p->seq) {
>> + multifd_recv_state->seq = p->seq;
>> + }
>
> Can you explain what this is for?
> Something like the latest received block?
When we are at a synhronization point, we don't know on the main thread
when that synchronization happened (at what packet considered as a
logical list of packages). So, we choose 'seq' from the channel with
the highest number. That is the one that we want. We only use this
for tracing, so we can "match" that we did a synchronization on the send
side at packet N and we see the trace at reception side that we did it
at packet N also.
Remember than in a previous patch you asked me what happened if this
does a wark around? At that point nothing. But now I need to change
this code to be.
multifd_recv_state->seq = 0;
for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDRecvParams *p = &multifd_recv_state->params[i];
...
if (multifd_recv_state->seq < p->seq) {
multifd_recv_state->seq = p->seq;
}
And I have fixed the workaround problem, no?
>> @@ -933,9 +1019,8 @@ static void *multifd_recv_thread(void *opaque)
>> trace_multifd_recv_thread_start(p->id);
>>
>> while (true) {
>> - qemu_sem_wait(&p->sem);
>> qemu_mutex_lock(&p->mutex);
>> - if (p->pending_job) {
>> + if (true || p->pending_job) {
>
> A TODO I guess???
Oops, that should be out.
Fixed on next version.
>> uint32_t used;
>> uint32_t flags;
>> qemu_mutex_unlock(&p->mutex);
>> @@ -956,14 +1041,18 @@ static void *multifd_recv_thread(void *opaque)
>> p->num_packets++;
>> p->num_pages += used;
>> qemu_mutex_unlock(&p->mutex);
>> +
>> + if (flags & MULTIFD_FLAG_SYNC) {
>> + qemu_sem_post(&multifd_recv_state->sem_sync);
>> + qemu_sem_wait(&p->sem_sync);
>> + }
>
> Can you explain the receive side logic - I think this is waiting for all
> receive threads to 'ack' - but how do we know that they've finished
> receiving all data that was sent?
Because they need to receive a packet with MULTIFD_FLAG_SYNC sent. And
if they receive that flag, we know that is the last one of the sequence.
synchrconization works like (2 channels to make things easy):
main thread:
we finish a RAM_SECTION;
flush pending packets to one of the channels
send packet with MULTIFD_FLAG_SYNC for all the channels
wait unil all the channels have processesed the FLAG_SYNC
At this point send the RAM_SECTION_EOS footer.
worker1 worker 2
if there is a pending packet, send it if there is a pending
packet, send it
(notice that there can't be more than one ever)
send a pacet with SYNC flag set send a pacet with SYNC
flag set
On recetpion side
main thread
receives RAM_SECTION_EOS footer
wait for works to receive a sync
worker1 worker1
process any pending packet(no sync) process any pending
packet(no sync)
process packet with SYNC process packet with SYNC
post main thread post main thread
now main thread can continue
Notice that we don't care what happens first, receiving packet with SYNC
in workeers or RAM_SECTION_EOS on main thread, all works as expected.
Noticing how long took to explain this, I think that I am going to add
this to migration documentation. Will wait for any question you had
before adding it.
Later, Juan.