qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH] qemu-thread: add a fast path to the Win32 QemuE


From: Paolo Bonzini
Subject: Re: [Qemu-devel] [PATCH] qemu-thread: add a fast path to the Win32 QemuEvent
Date: Thu, 10 Sep 2015 18:12:38 +0200
User-agent: Mozilla/5.0 (X11; Linux x86_64; rv:38.0) Gecko/20100101 Thunderbird/38.1.0

On 12/08/2015 15:38, Paolo Bonzini wrote:
> QemuEvents are used heavily by call_rcu.  We do not want them to be slow,
> but the current implementation does a kernel call on every invocation
> of qemu_event_* and won't cut it.
> 
> So, wrap a Win32 manual-reset event with a fast userspace path.  The
> states and transitions are the same as for the futex and mutex/condvar
> implementations, but the slow path is different of course.  The idea
> is to reset the Win32 event lazily, as part of a test-reset-test-wait
> sequence.  Such a sequence is, indeed, how QemuEvents are used by
> RCU and other subsystems!
> 
> The patch includes a formal model of the algorithm.
> 
> Signed-off-by: Paolo Bonzini <address@hidden>
> ---
>  docs/win32-qemu-event.promela | 98 
> +++++++++++++++++++++++++++++++++++++++++++
>  include/qemu/thread-win32.h   |  1 +
>  util/qemu-thread-win32.c      | 66 +++++++++++++++++++++++++++--
>  3 files changed, 161 insertions(+), 4 deletions(-)
>  create mode 100644 docs/win32-qemu-event.promela
> 
> diff --git a/docs/win32-qemu-event.promela b/docs/win32-qemu-event.promela
> new file mode 100644
> index 0000000..c446a71
> --- /dev/null
> +++ b/docs/win32-qemu-event.promela
> @@ -0,0 +1,98 @@
> +/*
> + * This model describes the implementation of QemuEvent in
> + * util/qemu-thread-win32.c.
> + *
> + * Author: Paolo Bonzini <address@hidden>
> + *
> + * This file is in the public domain.  If you really want a license,
> + * the WTFPL will do.
> + *
> + * To verify it:
> + *     spin -a docs/event.promela
> + *     gcc -O2 pan.c -DSAFETY
> + *     ./a.out
> + */
> +
> +bool event;
> +int value;
> +
> +/* Primitives for a Win32 event */
> +#define RAW_RESET event = false
> +#define RAW_SET   event = true
> +#define RAW_WAIT  do :: event -> break; od
> +
> +#if 0
> +/* Basic sanity checking: test the Win32 event primitives */
> +#define RESET     RAW_RESET
> +#define SET       RAW_SET
> +#define WAIT      RAW_WAIT
> +#else
> +/* Full model: layer a userspace-only fast path on top of the RAW_*
> + * primitives.  SET/RESET/WAIT have exactly the same semantics as
> + * RAW_SET/RAW_RESET/RAW_WAIT, but try to avoid invoking them.
> + */
> +#define EV_SET 0
> +#define EV_FREE 1
> +#define EV_BUSY -1
> +
> +int state = EV_FREE;
> +
> +int xchg_result;
> +#define SET   if :: state != EV_SET ->                                      \
> +                    atomic { /* xchg_result=xchg(state, EV_SET) */          \
> +                        xchg_result = state;                                \
> +                        state = EV_SET;                                     \
> +                    }                                                       \
> +                    if :: xchg_result == EV_BUSY -> RAW_SET;                \
> +                       :: else -> skip;                                     \
> +                    fi;                                                     \
> +                 :: else -> skip;                                           \
> +              fi
> +
> +#define RESET if :: state == EV_SET -> atomic { state = state | EV_FREE; }  \
> +                 :: else            -> skip;                                \
> +              fi
> +
> +int tmp1, tmp2;
> +#define WAIT  tmp1 = state;                                                 \
> +              if :: tmp1 != EV_SET ->                                       \
> +                    if :: tmp1 == EV_FREE ->                                \
> +                          RAW_RESET;                                        \
> +                          atomic { /* tmp2=cas(state, EV_FREE, EV_BUSY) */  \
> +                              tmp2 = state;                                 \
> +                              if :: tmp2 == EV_FREE -> state = EV_BUSY;     \
> +                                 :: else            -> skip;                \
> +                              fi;                                           \
> +                          }                                                 \
> +                          if :: tmp2 == EV_SET -> tmp1 = EV_SET;            \
> +                             :: else           -> tmp1 = EV_BUSY;           \
> +                          fi;                                               \
> +                       :: else -> skip;                                     \
> +                    fi;                                                     \
> +                    assert(tmp1 != EV_FREE);                                \
> +                    if :: tmp1 == EV_BUSY -> RAW_WAIT;                      \
> +                       :: else -> skip;                                     \
> +                    fi;                                                     \
> +                 :: else -> skip;                                           \
> +              fi
> +#endif
> +
> +active proctype waiter()
> +{
> +     if
> +         :: !value ->
> +             RESET;
> +             if
> +                 :: !value -> WAIT;
> +                 :: else   -> skip;
> +             fi;
> +        :: else -> skip;
> +    fi;
> +    assert(value);
> +}
> +
> +active proctype notifier()
> +{
> +    value = true;
> +    SET;
> +}
> diff --git a/include/qemu/thread-win32.h b/include/qemu/thread-win32.h
> index 3d58081..385ff5f 100644
> --- a/include/qemu/thread-win32.h
> +++ b/include/qemu/thread-win32.h
> @@ -18,6 +18,7 @@ struct QemuSemaphore {
>  };
>  
>  struct QemuEvent {
> +    int value;
>      HANDLE event;
>  };
>  
> diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c
> index 406b52f..6cdd553 100644
> --- a/util/qemu-thread-win32.c
> +++ b/util/qemu-thread-win32.c
> @@ -238,10 +238,34 @@ void qemu_sem_wait(QemuSemaphore *sem)
>      }
>  }
>  
> +/* Wrap a Win32 manual-reset event with a fast userspace path.  The idea
> + * is to reset the Win32 event lazily, as part of a test-reset-test-wait
> + * sequence.  Such a sequence is, indeed, how QemuEvents are used by
> + * RCU and other subsystems!
> + *
> + * Valid transitions:
> + * - free->set, when setting the event
> + * - busy->set, when setting the event, followed by futex_wake
> + * - set->free, when resetting the event
> + * - free->busy, when waiting
> + *
> + * set->busy does not happen (it can be observed from the outside but
> + * it really is set->free->busy).
> + *
> + * busy->free provably cannot happen; to enforce it, the set->free transition
> + * is done with an OR, which becomes a no-op if the event has concurrently
> + * transitioned to free or busy (and is faster than cmpxchg).
> + */
> +
> +#define EV_SET         0
> +#define EV_FREE        1
> +#define EV_BUSY       -1
> +
>  void qemu_event_init(QemuEvent *ev, bool init)
>  {
>      /* Manual reset.  */
> -    ev->event = CreateEvent(NULL, TRUE, init, NULL);
> +    ev->event = CreateEvent(NULL, TRUE, TRUE, NULL);
> +    ev->value = (init ? EV_SET : EV_FREE);
>  }
>  
>  void qemu_event_destroy(QemuEvent *ev)
> @@ -251,17 +275,51 @@ void qemu_event_destroy(QemuEvent *ev)
>  
>  void qemu_event_set(QemuEvent *ev)
>  {
> -    SetEvent(ev->event);
> +    if (atomic_mb_read(&ev->value) != EV_SET) {
> +        if (atomic_xchg(&ev->value, EV_SET) == EV_BUSY) {
> +            /* There were waiters, wake them up.  */
> +            SetEvent(ev->event);
> +        }
> +    }
>  }
>  
>  void qemu_event_reset(QemuEvent *ev)
>  {
> -    ResetEvent(ev->event);
> +    if (atomic_mb_read(&ev->value) == EV_SET) {
> +        /* If there was a concurrent reset (or even reset+wait),
> +         * do nothing.  Otherwise change EV_SET->EV_FREE.
> +         */
> +        atomic_or(&ev->value, EV_FREE);
> +    }
>  }
>  
>  void qemu_event_wait(QemuEvent *ev)
>  {
> -    WaitForSingleObject(ev->event, INFINITE);
> +    unsigned value;
> +
> +    value = atomic_mb_read(&ev->value);
> +    if (value != EV_SET) {
> +        if (value == EV_FREE) {
> +            /* qemu_event_set is not yet going to call SetEvent, but we are
> +             * going to do another check for EV_SET below when setting 
> EV_BUSY.
> +             * At that point it is safe to call WaitForSingleObject.
> +             */
> +            ResetEvent(ev->event);
> +
> +            /* Tell qemu_event_set that there are waiters.  No need to retry
> +             * because there cannot be a concurent busy->free transition.
> +             * After the CAS, the event will be either set or busy.
> +             */
> +            if (atomic_cmpxchg(&ev->value, EV_FREE, EV_BUSY) == EV_SET) {
> +                value = EV_SET;
> +            } else {
> +                value = EV_BUSY;
> +            }
> +        }
> +        if (value == EV_BUSY) {
> +            WaitForSingleObject(ev->event, INFINITE);
> +        }
> +    }
>  }
>  
>  struct QemuThreadData {
> 

Ping?



reply via email to

[Prev in Thread] Current Thread [Next in Thread]