qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCHv3] [RFC] aio/async: Add timed bottom-halves


From: Alex Bligh
Subject: [Qemu-devel] [PATCHv3] [RFC] aio/async: Add timed bottom-halves
Date: Tue, 16 Jul 2013 22:22:50 +0100

Add timed bottom halves. A timed bottom half is a bottom half that
will not execute until a given time has passed (qemu_bh_schedule_at)
or a given interval has passed (qemu_bh_schedule_in). Any qemu
clock can be used, and times are specified in nanoseconds.

Timed bottom halves can be used where timers cannot. For instance,
in block drivers where there is no mainloop that calls timers
(qemu-nbd, qemu-img), or where (per address@hidden) the
aio code loops internally and thus timers never get called.

Changes to aio-win32.c have not even been compile tested.

Changes since v1:
* aio_ctx_prepare should cope with wait<0
* aio_ctx_prepare should round up wait time

Changes since v2:
* aio_poll timeout depends on presence of timed bottom halves
* timed bh's do not do aio_notify immediately

Signed-off-by: Alex Bligh <address@hidden>
---
 aio-posix.c         |   20 +++++++++++++-
 aio-win32.c         |   21 ++++++++++++++-
 async.c             |   74 ++++++++++++++++++++++++++++++++++++++++++++++-----
 include/block/aio.h |   43 ++++++++++++++++++++++++++++++
 tests/test-aio.c    |   47 ++++++++++++++++++++++++++++++++
 5 files changed, 196 insertions(+), 9 deletions(-)

diff --git a/aio-posix.c b/aio-posix.c
index b68eccd..f949e22 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -173,6 +173,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
 {
     AioHandler *node;
     int ret;
+    int timeout = -1;
     bool busy, progress;
 
     progress = false;
@@ -183,6 +184,10 @@ bool aio_poll(AioContext *ctx, bool blocking)
      * does not need a complete flush (as is the case for qemu_aio_wait loops).
      */
     if (aio_bh_poll(ctx)) {
+        /* At least one non-idle but scheduled bh existed when aio_bh_poll
+         * was called. We're thus making progress, so don't block.
+         * Note it may no longer be scheduled.
+         */
         blocking = false;
         progress = true;
     }
@@ -191,6 +196,11 @@ bool aio_poll(AioContext *ctx, bool blocking)
         progress = true;
     }
 
+    /* The above may schedule bh's in which case if they need the poll below to
+     * exit immediately, they will have called aio_notify. If they are timed,
+     * we'll process the timeout below.
+     */
+
     if (progress && !blocking) {
         return true;
     }
@@ -231,10 +241,18 @@ bool aio_poll(AioContext *ctx, bool blocking)
         return progress;
     }
 
+    /* calculate the timeout. This will be infinite if no bh's
+     * are scheduled, else the lowest accross all scheduled bh's for
+     *    * 0ms for a non-timed, non-idle bh
+     *    * 0ms for a timed, non-idle bh which is ready
+     *    * 10ms for a idle-bh
+     */
+    aio_bh_get_timeout(ctx, &timeout);
+
     /* wait until next event */
     ret = g_poll((GPollFD *)ctx->pollfds->data,
                  ctx->pollfds->len,
-                 blocking ? -1 : 0);
+                 blocking ? timeout : 0);
 
     /* if we have any readable fds, dispatch event */
     if (ret > 0) {
diff --git a/aio-win32.c b/aio-win32.c
index 38723bf..5d45448 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -107,6 +107,10 @@ bool aio_poll(AioContext *ctx, bool blocking)
      * does not need a complete flush (as is the case for qemu_aio_wait loops).
      */
     if (aio_bh_poll(ctx)) {
+        /* At least one non-idle but scheduled bh existed when aio_bh_poll
+         * was called. We're thus making progress, so don't block.
+         * Note it may no longer be scheduled.
+         */
         blocking = false;
         progress = true;
     }
@@ -140,6 +144,11 @@ bool aio_poll(AioContext *ctx, bool blocking)
         }
     }
 
+    /* The above may schedule bh's in which case if they need the poll below to
+     * exit immediately, they will have called aio_notify. If they are timed,
+     * we'll process the timeout below.
+     */
+
     if (progress && !blocking) {
         return true;
     }
@@ -175,7 +184,17 @@ bool aio_poll(AioContext *ctx, bool blocking)
     /* wait until next event */
     while (count > 0) {
         int timeout = blocking ? INFINITE : 0;
-        int ret = WaitForMultipleObjects(count, events, FALSE, timeout);
+        int ret;
+
+        /* calculate the timeout. This will be infinite if no bh's
+         * are scheduled, else the lowest accross all scheduled bh's for
+         *    * 0ms for a non-timed, non-idle bh
+         *    * 0ms for a timed, non-idle bh which is ready
+         *    * 10ms for a idle-bh
+         */
+        aio_bh_get_timeout(ctx, &timeout);
+
+        ret = WaitForMultipleObjects(count, events, FALSE, timeout);
 
         /* if we have any signaled events, dispatch event */
         if ((DWORD) (ret - WAIT_OBJECT_0) >= count) {
diff --git a/async.c b/async.c
index 90fe906..c844b9d 100644
--- a/async.c
+++ b/async.c
@@ -35,6 +35,8 @@ struct QEMUBH {
     QEMUBHFunc *cb;
     void *opaque;
     QEMUBH *next;
+    QEMUClock *clock;
+    int64_t time;
     bool scheduled;
     bool idle;
     bool deleted;
@@ -52,6 +54,11 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void 
*opaque)
     return bh;
 }
 
+static inline int64_t qemu_bh_ready_in(QEMUBH *bh)
+{
+    return (bh->clock) ? (bh->time - qemu_get_clock_ns(bh->clock)) : 0;
+}
+
 int aio_bh_poll(AioContext *ctx)
 {
     QEMUBH *bh, **bhp, *next;
@@ -62,8 +69,10 @@ int aio_bh_poll(AioContext *ctx)
     ret = 0;
     for (bh = ctx->first_bh; bh; bh = next) {
         next = bh->next;
-        if (!bh->deleted && bh->scheduled) {
+        if (!bh->deleted && bh->scheduled && qemu_bh_ready_in(bh) <= 0) {
             bh->scheduled = 0;
+            bh->clock = NULL;
+            bh->time = 0;
             if (!bh->idle)
                 ret = 1;
             bh->idle = 0;
@@ -96,6 +105,8 @@ void qemu_bh_schedule_idle(QEMUBH *bh)
         return;
     bh->scheduled = 1;
     bh->idle = 1;
+    bh->clock = NULL;
+    bh->time = 0;
 }
 
 void qemu_bh_schedule(QEMUBH *bh)
@@ -104,35 +115,84 @@ void qemu_bh_schedule(QEMUBH *bh)
         return;
     bh->scheduled = 1;
     bh->idle = 0;
+    bh->clock = NULL;
+    bh->time = 0;
     aio_notify(bh->ctx);
 }
 
+void qemu_bh_schedule_at(QEMUBH *bh, QEMUClock *clock, int64_t time)
+{
+    /* Allow rescheduling if already scheduled */
+    bh->scheduled = 1;
+    bh->idle = 0;
+    bh->clock = clock;
+    bh->time = time;
+}
+
+void qemu_bh_schedule_in(QEMUBH *bh, QEMUClock *clock, int64_t time)
+{
+    qemu_bh_schedule_at(bh, clock, qemu_get_clock_ns(clock) + time);
+}
+
 void qemu_bh_cancel(QEMUBH *bh)
 {
     bh->scheduled = 0;
+    bh->clock = NULL;
+    bh->time = 0;
 }
 
 void qemu_bh_delete(QEMUBH *bh)
 {
     bh->scheduled = 0;
     bh->deleted = 1;
+    bh->clock = NULL;
+    bh->time = 0;
 }
 
 static gboolean
-aio_ctx_prepare(GSource *source, gint    *timeout)
+aio_ctx_prepare(GSource *source, gint *timeout)
 {
     AioContext *ctx = (AioContext *) source;
+    return aio_bh_get_timeout((AioContext *)ctx, (int *)timeout);
+}
+
+bool aio_bh_get_timeout(AioContext *ctx, int *timeout)
+{
     QEMUBH *bh;
 
     for (bh = ctx->first_bh; bh; bh = bh->next) {
         if (!bh->deleted && bh->scheduled) {
-            if (bh->idle) {
+            int64_t wait = qemu_bh_ready_in(bh);
+            if (wait <= 0) {
+                wait = 0;
+            } else {
+                /* Sadly timeout is in milliseconds not nanoseconds;
+                 * it is better to trigger a timeout too late than too
+                 * early for risk of busywaiting */
+                wait = (wait + SCALE_MS - 1) / SCALE_MS;
+            }
+            if (!wait && bh->idle) {
                 /* idle bottom halves will be polled at least
                  * every 10ms */
-                *timeout = 10;
+                wait = 10;
+            }
+            if (wait) {
+                /* Use the minimum wait across all bottom
+                 * halves */
+                if (*timeout == -1 || *timeout > wait) {
+                    *timeout = wait;
+                }
             } else {
-                /* non-idle bottom halves will be executed
-                 * immediately */
+                /* non-idle bottom halves or timed bottom
+                 * halves which are ready to run will be
+                 * executed immediately */
+                if (bh->clock) {
+                    /* if there is an expired timer, make sure
+                     * aio_notify is called as it won't have
+                     * been called when scheduled
+                     */
+                    aio_notify(bh->ctx);
+                }
                 *timeout = 0;
                 return true;
             }
@@ -149,7 +209,7 @@ aio_ctx_check(GSource *source)
     QEMUBH *bh;
 
     for (bh = ctx->first_bh; bh; bh = bh->next) {
-        if (!bh->deleted && bh->scheduled) {
+        if (!bh->deleted && bh->scheduled && qemu_bh_ready_in(bh) <= 0) {
             return true;
        }
     }
diff --git a/include/block/aio.h b/include/block/aio.h
index 1836793..afa685d 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -20,6 +20,7 @@
 
 typedef struct BlockDriverAIOCB BlockDriverAIOCB;
 typedef void BlockDriverCompletionFunc(void *opaque, int ret);
+typedef struct QEMUClock QEMUClock;
 
 typedef struct AIOCBInfo {
     void (*cancel)(BlockDriverAIOCB *acb);
@@ -145,6 +146,38 @@ int aio_bh_poll(AioContext *ctx);
 void qemu_bh_schedule(QEMUBH *bh);
 
 /**
+ * qemu_bh_schedule_at: Schedule a bottom half at a future time
+ *
+ * Scheduling a bottom half interrupts the main loop and causes the
+ * execution of the callback that was passed to qemu_bh_new.
+ *
+ * Bottom halves that are scheduled from a bottom half handler are instantly
+ * invoked.  This can create an infinite loop if a bottom half handler
+ * schedules itself.
+ *
+ * @bh: The bottom half to be scheduled.
+ * @clock: The clock to be used
+ * @time: The time in nanoseconds at which the bh is scheduled
+ */
+void qemu_bh_schedule_at(QEMUBH *bh, QEMUClock *clock, int64_t time);
+
+/**
+ * qemu_bh_schedule_in: Schedule a bottom half after an interval
+ *
+ * Scheduling a bottom half interrupts the main loop and causes the
+ * execution of the callback that was passed to qemu_bh_new.
+ *
+ * Bottom halves that are scheduled from a bottom half handler are instantly
+ * invoked.  This can create an infinite loop if a bottom half handler
+ * schedules itself.
+ *
+ * @bh: The bottom half to be scheduled.
+ * @clock: The clock to be used
+ * @time: The interval in nanoseconds after which the bh is scheduled
+ */
+void qemu_bh_schedule_in(QEMUBH *bh, QEMUClock *clock, int64_t time);
+
+/**
  * qemu_bh_cancel: Cancel execution of a bottom half.
  *
  * Canceling execution of a bottom half undoes the effect of calls to
@@ -190,6 +223,16 @@ bool aio_pending(AioContext *ctx);
  */
 bool aio_poll(AioContext *ctx, bool blocking);
 
+/* This is used internally to calculate the appropriate timeout for aio_poll.
+ * It returns true if a bh can run right now (either a scheduled bh or
+ * a timed bh that has already passed its time), or false otherwise.
+ * timeout should be set on entry to the proposed timeout value to use
+ * (or -1 for infinite), and if a lower timeout is needed due to there
+ * being a timed scheduled in the future, timeout will be adjusted. Timeout
+ * is in milliseconds (yuck).
+ */
+bool aio_bh_get_timeout(AioContext *ctx, int *timeout);
+
 #ifdef CONFIG_POSIX
 /* Returns 1 if there are still outstanding AIO requests; 0 otherwise */
 typedef int (AioFlushHandler)(void *opaque);
diff --git a/tests/test-aio.c b/tests/test-aio.c
index c173870..29daf81 100644
--- a/tests/test-aio.c
+++ b/tests/test-aio.c
@@ -12,6 +12,7 @@
 
 #include <glib.h>
 #include "block/aio.h"
+#include "qemu/timer.h"
 
 AioContext *ctx;
 
@@ -124,6 +125,27 @@ static void test_bh_schedule10(void)
     qemu_bh_delete(data.bh);
 }
 
+static void test_bh_schedule_in(void)
+{
+    BHTestData data = { .n = 0 };
+    data.bh = aio_bh_new(ctx, bh_test_cb, &data);
+
+    qemu_bh_schedule_in(data.bh, rt_clock, 1000000000LL);
+    g_assert_cmpint(data.n, ==, 0);
+
+    g_assert(!aio_poll(ctx, false));
+    g_assert_cmpint(data.n, ==, 0);
+
+    sleep(2);
+
+    g_assert(aio_poll(ctx, true));
+    g_assert_cmpint(data.n, ==, 1);
+
+    g_assert(!aio_poll(ctx, false));
+    g_assert_cmpint(data.n, ==, 1);
+    qemu_bh_delete(data.bh);
+}
+
 static void test_bh_cancel(void)
 {
     BHTestData data = { .n = 0 };
@@ -407,6 +429,27 @@ static void test_source_bh_schedule10(void)
     qemu_bh_delete(data.bh);
 }
 
+static void test_source_bh_schedule_in(void)
+{
+    BHTestData data = { .n = 0 };
+    data.bh = aio_bh_new(ctx, bh_test_cb, &data);
+
+    qemu_bh_schedule_in(data.bh, rt_clock, 1000000000LL);
+    g_assert_cmpint(data.n, ==, 0);
+
+    g_assert(!g_main_context_iteration(NULL, false));
+    g_assert_cmpint(data.n, ==, 0);
+
+    sleep(2);
+
+    g_assert(g_main_context_iteration(NULL, true));
+    g_assert_cmpint(data.n, ==, 1);
+
+    g_assert(!g_main_context_iteration(NULL, false));
+    g_assert_cmpint(data.n, ==, 1);
+    qemu_bh_delete(data.bh);
+}
+
 static void test_source_bh_cancel(void)
 {
     BHTestData data = { .n = 0 };
@@ -628,6 +671,8 @@ int main(int argc, char **argv)
 {
     GSource *src;
 
+    init_clocks();
+
     ctx = aio_context_new();
     src = aio_get_g_source(ctx);
     g_source_attach(src, NULL);
@@ -639,6 +684,7 @@ int main(int argc, char **argv)
     g_test_add_func("/aio/notify",                  test_notify);
     g_test_add_func("/aio/bh/schedule",             test_bh_schedule);
     g_test_add_func("/aio/bh/schedule10",           test_bh_schedule10);
+    g_test_add_func("/aio/bh/schedule-in",          test_bh_schedule_in);
     g_test_add_func("/aio/bh/cancel",               test_bh_cancel);
     g_test_add_func("/aio/bh/delete",               test_bh_delete);
     g_test_add_func("/aio/bh/callback-delete/one",  test_bh_delete_from_cb);
@@ -653,6 +699,7 @@ int main(int argc, char **argv)
     g_test_add_func("/aio-gsource/flush",                   test_source_flush);
     g_test_add_func("/aio-gsource/bh/schedule",             
test_source_bh_schedule);
     g_test_add_func("/aio-gsource/bh/schedule10",           
test_source_bh_schedule10);
+    g_test_add_func("/aio-gsource/bh/schedule-in",          
test_source_bh_schedule_in);
     g_test_add_func("/aio-gsource/bh/cancel",               
test_source_bh_cancel);
     g_test_add_func("/aio-gsource/bh/delete",               
test_source_bh_delete);
     g_test_add_func("/aio-gsource/bh/callback-delete/one",  
test_source_bh_delete_from_cb);
-- 
1.7.9.5




reply via email to

[Prev in Thread] Current Thread [Next in Thread]