qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH for-2.1?!?] AioContext: speed up aio_notify


From: Paolo Bonzini
Subject: [Qemu-devel] [PATCH for-2.1?!?] AioContext: speed up aio_notify
Date: Thu, 3 Jul 2014 18:59:20 +0200

In many cases, the call to event_notifier_set in aio_notify is unnecessary.
In particular, if we are executing aio_dispatch, or if aio_poll is not
blocking, we know that we will soon get to the next loop iteration (if
necessary); the thread that hosts the AioContext's event loop does not
need any nudging.

The patch includes a Promela formal model that shows that this really
works and does not need any further complication such as generation
counts.  It needs a memory barrier though.

The generation counts are not needed because we only care of the state
of ctx->dispatching at the time the memory barrier happens.  If
ctx->dispatching is one at the time the memory barrier happens,
the aio_notify is not needed even if it afterwards becomes zero.

Signed-off-by: Paolo Bonzini <address@hidden>
---
        It should work, but I think this is a bit too tricky for 2.1.

 aio-posix.c             |  34 +++++++++++++++-
 async.c                 |  13 +++++-
 docs/aio_notify.promela | 104 ++++++++++++++++++++++++++++++++++++++++++++++++
 include/block/aio.h     |   9 +++++
 4 files changed, 158 insertions(+), 2 deletions(-)
 create mode 100644 docs/aio_notify.promela

diff --git a/aio-posix.c b/aio-posix.c
index f921d4f..e38334c 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -175,11 +175,38 @@ static bool aio_dispatch(AioContext *ctx)
 bool aio_poll(AioContext *ctx, bool blocking)
 {
     AioHandler *node;
+    bool was_dispatching;
     int ret;
     bool progress;
 
+    was_dispatching = ctx->dispatching;
     progress = false;
 
+    /* aio_notify can avoid the expensive event_notifier_set if
+     * everything (file descriptors, bottom halves, timers) will
+     * be re-evaluated before the next blocking poll().  This happens
+     * in two cases:
+     *
+     * 1) when aio_poll is called with blocking == false
+     *
+     * 2) when we are called after poll().  If we are called before
+     *    poll(), bottom halves will not be re-evaluated and we need
+     *    aio_notify() if blocking == true.
+     *
+     * The first aio_dispatch() only does something when AioContext is
+     * running as a GSource, and in that case aio_poll is used only
+     * with blocking == false, so this optimization is already quite
+     * effective.  However, the code is ugly and should be restructured
+     * to have a single aio_dispatch() call.  To do this, we need to
+     * reorganize aio_poll into a prepare/poll/dispatch model like
+     * glib's.
+     *
+     * If we're in a nested event loop, ctx->dispatching might be true.
+     * In that case we can restore it just before returning, but we
+     * have to clear it now.
+     */
+    aio_set_dispatching(ctx, !blocking);
+
     /*
      * If there are callbacks left that have been queued, we need to call them.
      * Do not call select in this case, because it is possible that the caller
@@ -190,12 +217,14 @@ bool aio_poll(AioContext *ctx, bool blocking)
         progress = true;
     }
 
+    /* Re-evaluate condition (1) above.  */
+    aio_set_dispatching(ctx, !blocking);
     if (aio_dispatch(ctx)) {
         progress = true;
     }
 
     if (progress && !blocking) {
-        return true;
+        goto out;
     }
 
     ctx->walking_handlers++;
@@ -234,9 +263,12 @@ bool aio_poll(AioContext *ctx, bool blocking)
     }
 
     /* Run dispatch even if there were no readable fds to run timers */
+    aio_set_dispatching(ctx, true);
     if (aio_dispatch(ctx)) {
         progress = true;
     }
 
+out:
+    aio_set_dispatching(ctx, was_dispatching);
     return progress;
 }
diff --git a/async.c b/async.c
index 5b6fe6b..ecc201e 100644
--- a/async.c
+++ b/async.c
@@ -26,6 +26,7 @@
 #include "block/aio.h"
 #include "block/thread-pool.h"
 #include "qemu/main-loop.h"
+#include "qemu/atomic.h"
 
 /***********************************************************/
 /* bottom halves (can be seen as timers which expire ASAP) */
@@ -247,9 +248,22 @@ ThreadPool *aio_get_thread_pool(AioContext *ctx)
     return ctx->thread_pool;
 }
 
+void aio_set_dispatching(AioContext *ctx, bool dispatching)
+{
+    ctx->dispatching = dispatching;
+    /* Write ctx->dispatching before reading e.g. bh->scheduled.  */
+    if (!dispatching) {
+        smp_mb();
+    }
+}
+
 void aio_notify(AioContext *ctx)
 {
-    event_notifier_set(&ctx->notifier);
+    /* Write e.g. bh->scheduled before reading ctx->dispatching.  */
+    smp_mb();
+    if (!ctx->dispatching) {
+        event_notifier_set(&ctx->notifier);
+    }
 }
 
 static void aio_timerlist_notify(void *opaque)
diff --git a/docs/aio_notify.promela b/docs/aio_notify.promela
new file mode 100644
index 0000000..ad3f6f0
--- /dev/null
+++ b/docs/aio_notify.promela
@@ -0,0 +1,104 @@
+/*
+ * This model describes the interaction between aio_set_dispatching()
+ * and aio_notify().
+ *
+ * Author: Paolo Bonzini <address@hidden>
+ *
+ * This file is in the public domain.  If you really want a license,
+ * the WTFPL will do.
+ *
+ * To simulate it:
+ *     spin -p docs/aio_notify.promela
+ *
+ * To verify it:
+ *     spin -a docs/aio_notify.promela
+ *     gcc -O2 pan.c
+ *     ./a.out -a
+ */
+
+#define MAX   4
+#define LAST  (1 << (MAX - 1))
+#define FINAL ((LAST << 1) - 1)
+
+bool dispatching;
+bool event;
+
+int req, done;
+
+active proctype waiter()
+{
+     int fetch, blocking;
+
+     do
+        :: done != FINAL -> {
+            // Computing "blocking" is separate from execution of the
+            // "bottom half"
+            blocking = (req == 0);
+
+            // This is our "bottom half"
+            atomic { fetch = req; req = 0; }
+            done = done | fetch;
+
+            // Wait for a nudge from the other side
+            do
+                :: event == 1 -> { event = 0; break; }
+                :: !blocking  -> break;
+            od;
+
+            dispatching = 1;
+
+            // If you are simulating this model, you may want to add
+            // something like this here:
+            //
+            //      int foo; foo++; foo++; foo++;
+            //
+            // This only wastes some time and makes it more likely
+            // that the notifier process hits the "fast path".
+
+            dispatching = 0;
+        }
+        :: else -> break;
+    od
+}
+
+active proctype notifier()
+{
+    int next = 1;
+    int sets = 0;
+
+    do
+        :: next <= LAST -> {
+            // generate a request
+            req = req | next;
+            next = next << 1;
+
+            // aio_notify
+            if
+                :: dispatching == 0 -> sets++; event = 1;
+                :: else             -> skip;
+            fi;
+
+            // Test both synchronous and asynchronous delivery
+            if
+                :: 1 -> do
+                            :: req == 0 -> break;
+                        od;
+                :: 1 -> skip;
+            fi;
+        }
+        :: else -> break;
+    od;
+    printf("Skipped %d event_notifier_set\n", MAX - sets);
+}
+
+#define p (done == FINAL)
+
+never  {
+    do
+        :: 1                      // after an arbitrarily long prefix
+        :: p -> break             // p becomes true
+    od;
+    do
+        :: !p -> accept: break    // it then must remains true forever after
+    od
+}
diff --git a/include/block/aio.h b/include/block/aio.h
index a92511b..dffb061 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -60,8 +60,14 @@ struct AioContext {
      */
     int walking_handlers;
 
+    /* Used to avoid unnecessary event_notifier_set calls in aio_notify.
+     * Writes protected by lock or BQL, reads are lockless.
+     */
+    bool dispatching;
+
     /* lock to protect between bh's adders and deleter */
     QemuMutex bh_lock;
+
     /* Anchor of the list of Bottom Halves belonging to the context */
     struct QEMUBH *first_bh;
 
@@ -83,6 +89,9 @@ struct AioContext {
     QEMUTimerListGroup tlg;
 };
 
+/* Used internally to synchronize aio_poll against qemu_bh_schedule.  */
+void aio_set_dispatching(AioContext *ctx, bool dispatching);
+
 /**
  * aio_context_new: Allocate a new AioContext.
  *
-- 
1.9.3




reply via email to

[Prev in Thread] Current Thread [Next in Thread]