qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH 3/3] util/event-loop: Introduce options to set the thread pool si


From: Nicolas Saenz Julienne
Subject: [PATCH 3/3] util/event-loop: Introduce options to set the thread pool size
Date: Mon, 21 Feb 2022 18:08:45 +0100

The thread pool regulates itself: when idle, it kills threads until
empty, when in demand, it creates new threads until full. This behaviour
doesn't play well with latency sensitive workloads where the price of
creating a new thread is too high. For example, when paired with qemu's
'-mlock', or using safety features like SafeStack, creating a new thread
has been measured take multiple milliseconds.

In order to mitigate this let's introduce a new 'EventLoopBackend'
property to set the thread pool size. The threads will be created during
the pool's initialization, remain available during its lifetime
regardless of demand, and destroyed upon freeing it. A properly
characterized workload will then be able to configure the pool to avoid
any latency spike.

Signed-off-by: Nicolas Saenz Julienne <nsaenzju@redhat.com>
---
 include/block/aio.h | 11 +++++++++++
 qapi/qom.json       |  4 +++-
 util/async.c        |  3 +++
 util/event-loop.c   | 15 ++++++++++++++-
 util/event-loop.h   |  4 ++++
 util/main-loop.c    | 13 +++++++++++++
 util/thread-pool.c  | 41 +++++++++++++++++++++++++++++++++++++----
 7 files changed, 85 insertions(+), 6 deletions(-)

diff --git a/include/block/aio.h b/include/block/aio.h
index 5634173b12..331483d1d1 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -192,6 +192,8 @@ struct AioContext {
     QSLIST_HEAD(, Coroutine) scheduled_coroutines;
     QEMUBH *co_schedule_bh;
 
+    int pool_min;
+    int pool_max;
     /* Thread pool for performing work and receiving completion callbacks.
      * Has its own locking.
      */
@@ -769,4 +771,13 @@ void aio_context_set_poll_params(AioContext *ctx, int64_t 
max_ns,
 void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
                                 Error **errp);
 
+/**
+ * aio_context_set_thread_pool_params:
+ * @ctx: the aio context
+ * @min: Thread pool's min size, 0 by default
+ * @max: Thread pool's max size, 64 by default
+ */
+void aio_context_set_thread_pool_params(AioContext *ctx, uint64_t min,
+                                        uint64_t max, Error **errp);
+
 #endif
diff --git a/qapi/qom.json b/qapi/qom.json
index e7730ef62f..79c141b6bf 100644
--- a/qapi/qom.json
+++ b/qapi/qom.json
@@ -526,7 +526,9 @@
   'data': { '*poll-max-ns': 'int',
             '*poll-grow': 'int',
             '*poll-shrink': 'int',
-            '*aio-max-batch': 'int' } }
+            '*aio-max-batch': 'int',
+            '*thread-pool-min': 'int',
+            '*thread-pool-max': 'int' } }
 
 ##
 # @MemoryBackendProperties:
diff --git a/util/async.c b/util/async.c
index 08d25feef5..58ef2e2ee5 100644
--- a/util/async.c
+++ b/util/async.c
@@ -562,6 +562,9 @@ AioContext *aio_context_new(Error **errp)
 
     ctx->aio_max_batch = 0;
 
+    ctx->pool_min = 0;
+    ctx->pool_max = 64;
+
     return ctx;
 fail:
     g_source_destroy(&ctx->source);
diff --git a/util/event-loop.c b/util/event-loop.c
index c0ddd61f20..f2532e7d31 100644
--- a/util/event-loop.c
+++ b/util/event-loop.c
@@ -51,6 +51,12 @@ static EventLoopBackendParamInfo poll_shrink_info = {
 static EventLoopBackendParamInfo aio_max_batch_info = {
     "aio-max-batch", offsetof(EventLoopBackend, aio_max_batch),
 };
+static EventLoopBackendParamInfo thread_pool_min_info = {
+    "thread-pool-min", offsetof(EventLoopBackend, thread_pool_min),
+};
+static EventLoopBackendParamInfo thread_pool_max_info = {
+    "thread-pool-max", offsetof(EventLoopBackend, thread_pool_max),
+};
 
 static void event_loop_backend_get_param(Object *obj, Visitor *v,
         const char *name, void *opaque, Error **errp)
@@ -84,7 +90,6 @@ static void event_loop_backend_set_param(Object *obj, Visitor 
*v,
     *field = value;
 
     return;
-
 }
 
 static void
@@ -132,6 +137,14 @@ static void event_loop_backend_class_init(ObjectClass 
*klass, void *class_data)
                               event_loop_backend_get_param,
                               event_loop_backend_set_param,
                               NULL, &aio_max_batch_info);
+    object_class_property_add(klass, "thread-pool-min", "int",
+                              event_loop_backend_get_param,
+                              event_loop_backend_set_param,
+                              NULL, &thread_pool_min_info);
+    object_class_property_add(klass, "thread-pool-max", "int",
+                              event_loop_backend_get_param,
+                              event_loop_backend_set_param,
+                              NULL, &thread_pool_max_info);
 }
 
 static const TypeInfo event_loop_backend_info = {
diff --git a/util/event-loop.h b/util/event-loop.h
index 34cf9309af..0f4255ee7b 100644
--- a/util/event-loop.h
+++ b/util/event-loop.h
@@ -37,5 +37,9 @@ struct EventLoopBackend {
 
     /* AioContext AIO engine parameters */
     int64_t aio_max_batch;
+
+    /* AioContext thread pool parameters */
+    int64_t thread_pool_min;
+    int64_t thread_pool_max;
 };
 #endif
diff --git a/util/main-loop.c b/util/main-loop.c
index 395fd9bd3e..266a9c72d8 100644
--- a/util/main-loop.c
+++ b/util/main-loop.c
@@ -190,12 +190,25 @@ MainLoop *mloop;
 static void main_loop_init(EventLoopBackend *bc, Error **errp)
 {
     MainLoop *m = MAIN_LOOP(bc);
+    Error *local_error = NULL;
+
+    if (!qemu_aio_context) {
+        error_setg(errp, "qemu aio context not ready");
+        return;
+    }
 
     if (mloop) {
         error_setg(errp, "only one main-loop instance allowed");
         return;
     }
 
+    aio_context_set_thread_pool_params(qemu_aio_context, bc->thread_pool_min,
+                                       bc->thread_pool_max, &local_error);
+    if (local_error) {
+        error_propagate(errp, local_error);
+        return;
+    }
+
     mloop = m;
     return;
 }
diff --git a/util/thread-pool.c b/util/thread-pool.c
index d763cea505..95c339cb00 100644
--- a/util/thread-pool.c
+++ b/util/thread-pool.c
@@ -21,6 +21,7 @@
 #include "trace.h"
 #include "block/thread-pool.h"
 #include "qemu/main-loop.h"
+#include "qapi/error.h"
 
 static void do_spawn_thread(ThreadPool *pool);
 
@@ -58,7 +59,6 @@ struct ThreadPool {
     QemuMutex lock;
     QemuCond worker_stopped;
     QemuSemaphore sem;
-    int max_threads;
     QEMUBH *new_thread_bh;
 
     /* The following variables are only accessed from one AioContext. */
@@ -76,6 +76,7 @@ struct ThreadPool {
 static void *worker_thread(void *opaque)
 {
     ThreadPool *pool = opaque;
+    AioContext *ctx = pool->ctx;
 
     qemu_mutex_lock(&pool->lock);
     pool->pending_threads--;
@@ -91,7 +92,8 @@ static void *worker_thread(void *opaque)
             ret = qemu_sem_timedwait(&pool->sem, 10000);
             qemu_mutex_lock(&pool->lock);
             pool->idle_threads--;
-        } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
+        } while (ret == -1 && (!QTAILQ_EMPTY(&pool->request_list) ||
+                 pool->cur_threads <= ctx->pool_min));
         if (ret == -1 || pool->stopping) {
             break;
         }
@@ -244,6 +246,7 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
         ThreadPoolFunc *func, void *arg,
         BlockCompletionFunc *cb, void *opaque)
 {
+    AioContext *ctx = pool->ctx;
     ThreadPoolElement *req;
 
     req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
@@ -257,7 +260,7 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
     trace_thread_pool_submit(pool, req, arg);
 
     qemu_mutex_lock(&pool->lock);
-    if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
+    if (pool->idle_threads == 0 && pool->cur_threads < ctx->pool_max) {
         spawn_thread(pool);
     }
     QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
@@ -306,11 +309,16 @@ static void thread_pool_init_one(ThreadPool *pool, 
AioContext *ctx)
     qemu_mutex_init(&pool->lock);
     qemu_cond_init(&pool->worker_stopped);
     qemu_sem_init(&pool->sem, 0);
-    pool->max_threads = 64;
     pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
 
     QLIST_INIT(&pool->head);
     QTAILQ_INIT(&pool->request_list);
+
+    qemu_mutex_lock(&pool->lock);
+    for (int i = pool->cur_threads; i < ctx->pool_min; i++) {
+        spawn_thread(pool);
+    }
+    qemu_mutex_unlock(&pool->lock);
 }
 
 ThreadPool *thread_pool_new(AioContext *ctx)
@@ -350,3 +358,28 @@ void thread_pool_free(ThreadPool *pool)
     qemu_mutex_destroy(&pool->lock);
     g_free(pool);
 }
+
+void aio_context_set_thread_pool_params(AioContext *ctx, uint64_t min,
+                                        uint64_t max, Error **errp)
+{
+    ThreadPool *pool = ctx->thread_pool;
+
+    if (min > max || !max) {
+        error_setg(errp, "bad thread-pool-min/thread-pool-max values");
+        return;
+    }
+
+    if (pool) {
+        qemu_mutex_lock(&pool->lock);
+    }
+
+    ctx->pool_min = min;
+    ctx->pool_max = max;
+
+    if (pool) {
+        for (int i = pool->cur_threads; i < ctx->pool_min; i++) {
+            spawn_thread(pool);
+        }
+        qemu_mutex_unlock(&pool->lock);
+    }
+}
-- 
2.35.1




reply via email to

[Prev in Thread] Current Thread [Next in Thread]