qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [RFC] aio: add aio_context_acquire() and aio_context_re


From: Wenchao Xia
Subject: Re: [Qemu-devel] [RFC] aio: add aio_context_acquire() and aio_context_release()
Date: Wed, 28 Aug 2013 10:41:42 +0800
User-agent: Mozilla/5.0 (Windows NT 5.1; rv:17.0) Gecko/20130801 Thunderbird/17.0.8

于 2013-8-27 23:12, Paolo Bonzini 写道:
Il 27/08/2013 16:39, Stefan Hajnoczi ha scritto:
It can be useful to run an AioContext from a thread which normally does
not "own" the AioContext.  For example, request draining can be
implemented by acquiring the AioContext and looping aio_poll() until all
requests have been completed.

The following pattern should work:

   /* Event loop thread */
   while (running) {
       aio_context_acquire(ctx);
       aio_poll(ctx, true);
       aio_context_release(ctx);
   }

   /* Another thread */
   aio_context_acquire(ctx);
   bdrv_read(bs, 0x1000, buf, 1);
   aio_context_release(ctx);

This patch implements aio_context_acquire() and aio_context_release().
Note that existing aio_poll() callers do not need to worry about
acquiring and releasing - it is only needed when multiple threads will
call aio_poll() on the same AioContext.

Signed-off-by: Stefan Hajnoczi <address@hidden>

Really, really nice!   The "kick owner thread" technique is a very
interesting way to avoid dropping the lock around aio_poll's ppoll
system call.

On top of this, I think it would be useful to make aio_context_acquire
support recursive acquisition by returning a bool if the current thread
is already the owner.  Recursive acquisition != recursive locking! :)
In fact, acquisition and releasing could be done directly by the
synchronous block I/O functions perhaps?

One comment: ctx->owner is really "ctx->owned", if you replace the
argument of qemu_thread_is_self(ctx->owner) with &ctx->owner_thread.  It
is probably a bit clearer that way.

   That will also avoid to have two QemuThread member, which may be a
little strange.

Paolo


---
I previously sent patches that implement bdrv_drain_all() by stopping dataplane
threads.  AioContext acquire()/release() is a more general solution than
temporarily stopping dataplane threads.  This solution is less hacky and also
supported by other event loops like GMainContext.

No need to commit this patch yet, I still want to build things on top of it
before submitting a final version.

  async.c             | 27 +++++++++++++++++++++++++
  include/block/aio.h | 13 ++++++++++++
  tests/test-aio.c    | 58 +++++++++++++++++++++++++++++++++++++++++++++++++++++
  3 files changed, 98 insertions(+)

diff --git a/include/block/aio.h b/include/block/aio.h
index 5743bf1..9035e87 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -45,6 +45,11 @@ typedef void IOHandler(void *opaque);
  typedef struct AioContext {
      GSource source;

+    QemuMutex acquire_lock;
+    QemuCond acquire_cond;
+    QemuThread owner_thread;
+    QemuThread *owner;
+
      /* The list of registered AIO handlers */
      QLIST_HEAD(, AioHandler) aio_handlers;

@@ -99,6 +104,14 @@ void aio_context_ref(AioContext *ctx);
   */
  void aio_context_unref(AioContext *ctx);

+/* Take ownership of the AioContext.  If the AioContext will be shared between
+ * threads, a thread must have ownership when calling aio_poll().
+ */
+void aio_context_acquire(AioContext *ctx);
+
+/* Reliquinish ownership of the AioContext. */
+void aio_context_release(AioContext *ctx);
+
  /**
   * aio_bh_new: Allocate a new bottom half structure.
   *
diff --git a/async.c b/async.c
index 9791d8e..9fec07c 100644
--- a/async.c
+++ b/async.c
@@ -203,6 +203,8 @@ aio_ctx_finalize(GSource     *source)
      thread_pool_free(ctx->thread_pool);
      aio_set_event_notifier(ctx, &ctx->notifier, NULL);
      event_notifier_cleanup(&ctx->notifier);
+    qemu_cond_destroy(&ctx->acquire_cond);
+    qemu_mutex_destroy(&ctx->acquire_lock);
      qemu_mutex_destroy(&ctx->bh_lock);
      g_array_free(ctx->pollfds, TRUE);
  }
@@ -240,6 +242,9 @@ AioContext *aio_context_new(void)
      ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
      ctx->thread_pool = NULL;
      qemu_mutex_init(&ctx->bh_lock);
+    qemu_mutex_init(&ctx->acquire_lock);
+    qemu_cond_init(&ctx->acquire_cond);
+    ctx->owner = NULL;
      event_notifier_init(&ctx->notifier, false);
      aio_set_event_notifier(ctx, &ctx->notifier,
                             (EventNotifierHandler *)
@@ -257,3 +262,25 @@ void aio_context_unref(AioContext *ctx)
  {
      g_source_unref(&ctx->source);
  }
+
+void aio_context_acquire(AioContext *ctx)
+{
+    qemu_mutex_lock(&ctx->acquire_lock);
+    while (ctx->owner) {
+        assert(!qemu_thread_is_self(ctx->owner));
+        aio_notify(ctx); /* kick current owner */
+        qemu_cond_wait(&ctx->acquire_cond, &ctx->acquire_lock);
+    }
+    qemu_thread_get_self(&ctx->owner_thread);
+    ctx->owner = &ctx->owner_thread;
+    qemu_mutex_unlock(&ctx->acquire_lock);
+}
+
+void aio_context_release(AioContext *ctx)
+{
+    qemu_mutex_lock(&ctx->acquire_lock);
+    assert(ctx->owner && qemu_thread_is_self(ctx->owner));
+    ctx->owner = NULL;
+    qemu_cond_signal(&ctx->acquire_cond);
+    qemu_mutex_unlock(&ctx->acquire_lock);
+}
diff --git a/tests/test-aio.c b/tests/test-aio.c
index 1ab5637..324c099 100644
--- a/tests/test-aio.c
+++ b/tests/test-aio.c
@@ -88,6 +88,63 @@ static void test_notify(void)
      g_assert(!aio_poll(ctx, false));
  }

+typedef struct {
+    QemuMutex start_lock;
+    bool thread_acquired;
+} AcquireTestData;
+
+static void *test_acquire_thread(void *opaque)
+{
+    AcquireTestData *data = opaque;
+
+    /* Wait for other thread to let us start */
+    qemu_mutex_lock(&data->start_lock);
+    qemu_mutex_unlock(&data->start_lock);
+
+    aio_context_acquire(ctx);
+    aio_context_release(ctx);
+
+    data->thread_acquired = true; /* success, we got here */
+
+    return NULL;
+}
+
+static void dummy_notifier_read(EventNotifier *unused)
+{
+    g_assert(false); /* should never be invoked */
+}
+
+static void test_acquire(void)
+{
+    QemuThread thread;
+    EventNotifier notifier;
+    AcquireTestData data;
+
+    /* Dummy event notifier ensures aio_poll() will block */
+    event_notifier_init(&notifier, false);
+    aio_set_event_notifier(ctx, &notifier, dummy_notifier_read);
+    g_assert(!aio_poll(ctx, false)); /* consume aio_notify() */
+
+    qemu_mutex_init(&data.start_lock);
+    qemu_mutex_lock(&data.start_lock);
+    data.thread_acquired = false;
+
+    qemu_thread_create(&thread, test_acquire_thread,
+                       &data, QEMU_THREAD_JOINABLE);
+
+    /* Block in aio_poll(), let other thread kick us and acquire context */
+    aio_context_acquire(ctx);
+    qemu_mutex_unlock(&data.start_lock); /* let the thread run */
+    g_assert(!aio_poll(ctx, true));
+    aio_context_release(ctx);
+
+    qemu_thread_join(&thread);
+    aio_set_event_notifier(ctx, &notifier, NULL);
+    event_notifier_cleanup(&notifier);
+
+    g_assert(data.thread_acquired);
+}
+
  static void test_bh_schedule(void)
  {
      BHTestData data = { .n = 0 };
@@ -639,6 +696,7 @@ int main(int argc, char **argv)

      g_test_init(&argc, &argv, NULL);
      g_test_add_func("/aio/notify",                  test_notify);
+    g_test_add_func("/aio/acquire",                 test_acquire);
      g_test_add_func("/aio/bh/schedule",             test_bh_schedule);
      g_test_add_func("/aio/bh/schedule10",           test_bh_schedule10);
      g_test_add_func("/aio/bh/cancel",               test_bh_cancel);





--
Best Regards

Wenchao Xia




reply via email to

[Prev in Thread] Current Thread [Next in Thread]