qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH v3 2/5] util: introduce threaded workqueue


From: guangrong . xiao
Subject: [Qemu-devel] [PATCH v3 2/5] util: introduce threaded workqueue
Date: Thu, 22 Nov 2018 15:20:25 +0800

From: Xiao Guangrong <address@hidden>

This modules implements the lockless and efficient threaded workqueue.

Three abstracted objects are used in this module:
- Request.
     It not only contains the data that the workqueue fetches out
    to finish the request but also offers the space to save the result
    after the workqueue handles the request.

    It's flowed between user and workqueue. The user fills the request
    data into it when it is owned by user. After it is submitted to the
    workqueue, the workqueue fetched data out and save the result into
    it after the request is handled.

    All the requests are pre-allocated and carefully partitioned between
    threads so there is no contention on the request, that make threads
    be parallel as much as possible.

- User, i.e, the submitter
    It's the one fills the request and submits it to the workqueue,
    the result will be collected after it is handled by the work queue.

    The user can consecutively submit requests without waiting the previous
    requests been handled.
    It only supports one submitter, you should do serial submission by
    yourself if you want more, e.g, use lock on you side.

- Workqueue, i.e, thread
    Each workqueue is represented by a running thread that fetches
    the request submitted by the user, do the specified work and save
    the result to the request.

Signed-off-by: Xiao Guangrong <address@hidden>
---
 include/qemu/threaded-workqueue.h | 106 +++++++++
 util/Makefile.objs                |   1 +
 util/threaded-workqueue.c         | 463 ++++++++++++++++++++++++++++++++++++++
 3 files changed, 570 insertions(+)
 create mode 100644 include/qemu/threaded-workqueue.h
 create mode 100644 util/threaded-workqueue.c

diff --git a/include/qemu/threaded-workqueue.h 
b/include/qemu/threaded-workqueue.h
new file mode 100644
index 0000000000..e0ede496d0
--- /dev/null
+++ b/include/qemu/threaded-workqueue.h
@@ -0,0 +1,106 @@
+/*
+ * Lockless and Efficient Threaded Workqueue Abstraction
+ *
+ * Author:
+ *   Xiao Guangrong <address@hidden>
+ *
+ * Copyright(C) 2018 Tencent Corporation.
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#ifndef QEMU_THREADED_WORKQUEUE_H
+#define QEMU_THREADED_WORKQUEUE_H
+
+#include "qemu/queue.h"
+#include "qemu/thread.h"
+
+/*
+ * This modules implements the lockless and efficient threaded workqueue.
+ *
+ * Three abstracted objects are used in this module:
+ * - Request.
+ *   It not only contains the data that the workqueue fetches out
+ *   to finish the request but also offers the space to save the result
+ *   after the workqueue handles the request.
+ *
+ *   It's flowed between user and workqueue. The user fills the request
+ *   data into it when it is owned by user. After it is submitted to the
+ *   workqueue, the workqueue fetched data out and save the result into
+ *   it after the request is handled.
+ *
+ *   All the requests are pre-allocated and carefully partitioned between
+ *   threads so there is no contention on the request, that make threads
+ *   be parallel as much as possible.
+ *
+ * - User, i.e, the submitter
+ *   It's the one fills the request and submits it to the workqueue,
+ *   the result will be collected after it is handled by the work queue.
+ *
+ *   The user can consecutively submit requests without waiting the previous
+ *   requests been handled.
+ *   It only supports one submitter, you should do serial submission by
+ *   yourself if you want more, e.g, use lock on you side.
+ *
+ * - Workqueue, i.e, thread
+ *   Each workqueue is represented by a running thread that fetches
+ *   the request submitted by the user, do the specified work and save
+ *   the result to the request.
+ */
+
+typedef struct Threads Threads;
+
+struct ThreadedWorkqueueOps {
+    /* constructor of the request */
+    int (*thread_request_init)(void *request);
+    /*  destructor of the request */
+    void (*thread_request_uninit)(void *request);
+
+    /* the handler of the request that is called by the thread */
+    void (*thread_request_handler)(void *request);
+    /* called by the user after the request has been handled */
+    void (*thread_request_done)(void *request);
+
+    size_t request_size;
+};
+typedef struct ThreadedWorkqueueOps ThreadedWorkqueueOps;
+
+/* the default number of requests that thread need handle */
+#define DEFAULT_THREAD_REQUEST_NR 4
+/* the max number of requests that thread need handle */
+#define MAX_THREAD_REQUEST_NR     (sizeof(uint64_t) * BITS_PER_BYTE)
+
+/*
+ * create a threaded queue. Other APIs will work on the Threads it returned
+ *
+ * @name: the identity of the workqueue which is used to construct the name
+ *    of threads only
+ * @threads_nr: the number of threads that the workqueue will create
+ * @thread_requests_nr: the number of requests that each single thread will
+ *    handle
+ * @ops: the handlers of the request
+ *
+ * Return NULL if it failed
+ */
+Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr,
+                                   unsigned int thread_requests_nr,
+                                   const ThreadedWorkqueueOps *ops);
+void threaded_workqueue_destroy(Threads *threads);
+
+/*
+ * find a free request where the user can store the data that is needed to
+ * finish the request
+ *
+ * If all requests are used up, return NULL
+ */
+void *threaded_workqueue_get_request(Threads *threads);
+/* submit the request and notify the thread */
+void threaded_workqueue_submit_request(Threads *threads, void *request);
+
+/*
+ * wait all threads to complete the request to make sure there is no
+ * previous request exists
+ */
+void threaded_workqueue_wait_for_requests(Threads *threads);
+#endif
diff --git a/util/Makefile.objs b/util/Makefile.objs
index 0820923c18..f26dfe5182 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -50,5 +50,6 @@ util-obj-y += range.o
 util-obj-y += stats64.o
 util-obj-y += systemd.o
 util-obj-y += iova-tree.o
+util-obj-y += threaded-workqueue.o
 util-obj-$(CONFIG_LINUX) += vfio-helpers.o
 util-obj-$(CONFIG_OPENGL) += drm.o
diff --git a/util/threaded-workqueue.c b/util/threaded-workqueue.c
new file mode 100644
index 0000000000..2ab37cee8d
--- /dev/null
+++ b/util/threaded-workqueue.c
@@ -0,0 +1,463 @@
+/*
+ * Lockless and Efficient Threaded Workqueue Abstraction
+ *
+ * Author:
+ *   Xiao Guangrong <address@hidden>
+ *
+ * Copyright(C) 2018 Tencent Corporation.
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/bitmap.h"
+#include "qemu/threaded-workqueue.h"
+
+#define SMP_CACHE_BYTES 64
+
+/*
+ * the request representation which contains the internally used mete data,
+ * it is the header of user-defined data.
+ *
+ * It should be aligned to the nature size of CPU.
+ */
+struct ThreadRequest {
+    /*
+     * the request has been handled by the thread and need the user
+     * to fetch result out.
+     */
+    uint8_t done;
+
+    /*
+     * the index to Thread::requests.
+     * Save it to the padding space although it can be calculated at runtime.
+     */
+    uint8_t request_index;
+
+    /* the index to Threads::per_thread_data */
+    unsigned int thread_index;
+} QEMU_ALIGNED(sizeof(unsigned long));
+typedef struct ThreadRequest ThreadRequest;
+
+struct ThreadLocal {
+    struct Threads *threads;
+
+    /* the index of the thread */
+    int self;
+
+    /* thread is useless and needs to exit */
+    bool quit;
+
+    QemuThread thread;
+
+    void *requests;
+
+   /*
+     * the bit in these two bitmaps indicates the index of the ï¼ requests
+     * respectively. If it's the same, the corresponding request is free
+     * and owned by the user, i.e, where the user fills a request. Otherwise,
+     * it is valid and owned by the thread, i.e, where the thread fetches
+     * the request and write the result.
+     */
+
+    /* after the user fills the request, the bit is flipped. */
+    uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
+    /* after handles the request, the thread flips the bit. */
+    uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
+
+    /*
+     * the event used to wake up the thread whenever a valid request has
+     * been submitted
+     */
+    QemuEvent request_valid_ev QEMU_ALIGNED(SMP_CACHE_BYTES);
+
+    /*
+     * the event is notified whenever a request has been completed
+     * (i.e, become free), which is used to wake up the user
+     */
+    QemuEvent request_free_ev QEMU_ALIGNED(SMP_CACHE_BYTES);
+};
+typedef struct ThreadLocal ThreadLocal;
+
+/*
+ * the main data struct represents multithreads which is shared by
+ * all threads
+ */
+struct Threads {
+    /* the request header, ThreadRequest, is contained */
+    unsigned int request_size;
+    unsigned int thread_requests_nr;
+    unsigned int threads_nr;
+
+    /* the request is pushed to the thread with round-robin manner */
+    unsigned int current_thread_index;
+
+    const ThreadedWorkqueueOps *ops;
+
+    ThreadLocal per_thread_data[0];
+};
+typedef struct Threads Threads;
+
+static ThreadRequest *index_to_request(ThreadLocal *thread, int request_index)
+{
+    ThreadRequest *request;
+
+    request = thread->requests + request_index * thread->threads->request_size;
+    assert(request->request_index == request_index);
+    assert(request->thread_index == thread->self);
+    return request;
+}
+
+static int request_to_index(ThreadRequest *request)
+{
+    return request->request_index;
+}
+
+static int request_to_thread_index(ThreadRequest *request)
+{
+    return request->thread_index;
+}
+
+/*
+ * free request: the request is not used by any thread, however, it might
+ *   contain the result need the user to call thread_request_done()
+ *
+ * valid request: the request contains the request data and it's committed
+ *   to the thread, i,e. it's owned by thread.
+ */
+static uint64_t get_free_request_bitmap(Threads *threads, ThreadLocal *thread)
+{
+    uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap;
+
+    request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap);
+    request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap);
+    bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap,
+               threads->thread_requests_nr);
+
+    /*
+     * paired with smp_wmb() in mark_request_free() to make sure that we
+     * read request_done_bitmap before fetching the result out.
+     */
+    smp_rmb();
+
+    return result_bitmap;
+}
+
+static ThreadRequest
+*find_thread_free_request(Threads *threads, ThreadLocal *thread)
+{
+    uint64_t result_bitmap = get_free_request_bitmap(threads, thread);
+    int index;
+
+    index  = find_first_zero_bit(&result_bitmap, threads->thread_requests_nr);
+    if (index >= threads->thread_requests_nr) {
+        return NULL;
+    }
+
+    return index_to_request(thread, index);
+}
+
+static ThreadRequest *threads_find_free_request(Threads *threads)
+{
+    ThreadLocal *thread;
+    ThreadRequest *request;
+    int cur_thread, thread_index;
+
+    cur_thread = threads->current_thread_index % threads->threads_nr;
+    thread_index = cur_thread;
+    do {
+        thread = threads->per_thread_data + thread_index++;
+        request = find_thread_free_request(threads, thread);
+        if (request) {
+            break;
+        }
+        thread_index %= threads->threads_nr;
+    } while (thread_index != cur_thread);
+
+    return request;
+}
+
+/*
+ * the change bit operation combined with READ_ONCE and WRITE_ONCE which
+ * only works on single uint64_t width
+ */
+static void change_bit_once(long nr, uint64_t *addr)
+{
+    uint64_t value = atomic_rcu_read(addr) ^ BIT_MASK(nr);
+
+    atomic_rcu_set(addr, value);
+}
+
+static void mark_request_valid(Threads *threads, ThreadRequest *request)
+{
+    int thread_index = request_to_thread_index(request);
+    int request_index = request_to_index(request);
+    ThreadLocal *thread = threads->per_thread_data + thread_index;
+
+    /*
+     * paired with smp_rmb() in find_first_valid_request_index() to make
+     * sure the request has been filled before the bit is flipped that
+     * will make the request be visible to the thread
+     */
+    smp_wmb();
+
+    change_bit_once(request_index, &thread->request_fill_bitmap);
+    qemu_event_set(&thread->request_valid_ev);
+}
+
+static int thread_find_first_valid_request_index(ThreadLocal *thread)
+{
+    Threads *threads = thread->threads;
+    uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap;
+    int index;
+
+    request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap);
+    request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap);
+    bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap,
+               threads->thread_requests_nr);
+    /*
+     * paired with smp_wmb() in mark_request_valid() to make sure that
+     * we read request_fill_bitmap before fetch the request out.
+     */
+    smp_rmb();
+
+    index = find_first_bit(&result_bitmap, threads->thread_requests_nr);
+    return index >= threads->thread_requests_nr ? -1 : index;
+}
+
+static void mark_request_free(ThreadLocal *thread, ThreadRequest *request)
+{
+    int index = request_to_index(request);
+
+    /*
+     * smp_wmb() is implied in change_bit_atomic() that is paired with
+     * smp_rmb() in get_free_request_bitmap() to make sure the result
+     * has been saved before the bit is flipped.
+     */
+    change_bit_atomic(index, &thread->request_done_bitmap);
+    qemu_event_set(&thread->request_free_ev);
+}
+
+/* retry to see if there is available request before actually go to wait. */
+#define BUSY_WAIT_COUNT 1000
+
+static ThreadRequest *
+thread_busy_wait_for_request(ThreadLocal *thread)
+{
+    int index, count = 0;
+
+    for (count = 0; count < BUSY_WAIT_COUNT; count++) {
+        index = thread_find_first_valid_request_index(thread);
+        if (index >= 0) {
+            return index_to_request(thread, index);
+        }
+
+        cpu_relax();
+    }
+
+    return NULL;
+}
+
+static void *thread_run(void *opaque)
+{
+    ThreadLocal *self_data = (ThreadLocal *)opaque;
+    Threads *threads = self_data->threads;
+    void (*handler)(void *request) = threads->ops->thread_request_handler;
+    ThreadRequest *request;
+
+    for ( ; !atomic_read(&self_data->quit); ) {
+        qemu_event_reset(&self_data->request_valid_ev);
+
+        request = thread_busy_wait_for_request(self_data);
+        if (!request) {
+            qemu_event_wait(&self_data->request_valid_ev);
+            continue;
+        }
+
+        assert(!request->done);
+
+        handler(request + 1);
+        request->done = true;
+        mark_request_free(self_data, request);
+    }
+
+    return NULL;
+}
+
+static void uninit_thread_requests(ThreadLocal *thread, int free_nr)
+{
+    Threads *threads = thread->threads;
+    ThreadRequest *request = thread->requests;
+    int i;
+
+    for (i = 0; i < free_nr; i++) {
+        threads->ops->thread_request_uninit(request + 1);
+        request = (void *)request + threads->request_size;
+    }
+    g_free(thread->requests);
+}
+
+static int init_thread_requests(ThreadLocal *thread)
+{
+    Threads *threads = thread->threads;
+    ThreadRequest *request;
+    int ret, i, thread_reqs_size;
+
+    thread_reqs_size = threads->thread_requests_nr * threads->request_size;
+    thread_reqs_size = QEMU_ALIGN_UP(thread_reqs_size, SMP_CACHE_BYTES);
+    thread->requests = g_malloc0(thread_reqs_size);
+
+    request = thread->requests;
+    for (i = 0; i < threads->thread_requests_nr; i++) {
+        ret = threads->ops->thread_request_init(request + 1);
+        if (ret < 0) {
+            goto exit;
+        }
+
+        request->request_index = i;
+        request->thread_index = thread->self;
+        request = (void *)request + threads->request_size;
+    }
+    return 0;
+
+exit:
+    uninit_thread_requests(thread, i);
+    return -1;
+}
+
+static void uninit_thread_data(Threads *threads, int free_nr)
+{
+    ThreadLocal *thread_local = threads->per_thread_data;
+    int i;
+
+    for (i = 0; i < free_nr; i++) {
+        thread_local[i].quit = true;
+        qemu_event_set(&thread_local[i].request_valid_ev);
+        qemu_thread_join(&thread_local[i].thread);
+        qemu_event_destroy(&thread_local[i].request_valid_ev);
+        qemu_event_destroy(&thread_local[i].request_free_ev);
+        uninit_thread_requests(&thread_local[i], threads->thread_requests_nr);
+    }
+}
+
+static int
+init_thread_data(Threads *threads, const char *thread_name, int thread_nr)
+{
+    ThreadLocal *thread_local = threads->per_thread_data;
+    char *name;
+    int i;
+
+    for (i = 0; i < thread_nr; i++) {
+        thread_local[i].threads = threads;
+        thread_local[i].self = i;
+
+        if (init_thread_requests(&thread_local[i]) < 0) {
+            goto exit;
+        }
+
+        qemu_event_init(&thread_local[i].request_free_ev, false);
+        qemu_event_init(&thread_local[i].request_valid_ev, false);
+
+        name = g_strdup_printf("%s/%d", thread_name, thread_local[i].self);
+        qemu_thread_create(&thread_local[i].thread, name,
+                           thread_run, &thread_local[i], QEMU_THREAD_JOINABLE);
+        g_free(name);
+    }
+    return 0;
+
+exit:
+    uninit_thread_data(threads, i);
+    return -1;
+}
+
+Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr,
+                                   unsigned int thread_requests_nr,
+                                   const ThreadedWorkqueueOps *ops)
+{
+    Threads *threads;
+
+    if (threads_nr > MAX_THREAD_REQUEST_NR) {
+        return NULL;
+    }
+
+    threads = g_malloc0(sizeof(*threads) + threads_nr * sizeof(ThreadLocal));
+    threads->ops = ops;
+    threads->threads_nr = threads_nr;
+    threads->thread_requests_nr = thread_requests_nr;
+
+    QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(sizeof(ThreadRequest), sizeof(long)));
+    threads->request_size = threads->ops->request_size;
+    threads->request_size = QEMU_ALIGN_UP(threads->request_size, sizeof(long));
+    threads->request_size += sizeof(ThreadRequest);
+
+    if (init_thread_data(threads, name, threads_nr) < 0) {
+        g_free(threads);
+        return NULL;
+    }
+
+    return threads;
+}
+
+void threaded_workqueue_destroy(Threads *threads)
+{
+    uninit_thread_data(threads, threads->threads_nr);
+    g_free(threads);
+}
+
+static void request_done(Threads *threads, ThreadRequest *request)
+{
+    if (!request->done) {
+        return;
+    }
+
+    threads->ops->thread_request_done(request + 1);
+    request->done = false;
+}
+
+void *threaded_workqueue_get_request(Threads *threads)
+{
+    ThreadRequest *request;
+
+    request = threads_find_free_request(threads);
+    if (!request) {
+        return NULL;
+    }
+
+    request_done(threads, request);
+    return request + 1;
+}
+
+void threaded_workqueue_submit_request(Threads *threads, void *request)
+{
+    ThreadRequest *req = request - sizeof(ThreadRequest);
+    int thread_index = request_to_thread_index(request);
+
+    assert(!req->done);
+    mark_request_valid(threads, req);
+    threads->current_thread_index = thread_index  + 1;
+}
+
+void threaded_workqueue_wait_for_requests(Threads *threads)
+{
+    ThreadLocal *thread;
+    uint64_t result_bitmap;
+    int thread_index, index = 0;
+
+    for (thread_index = 0; thread_index < threads->threads_nr; thread_index++) 
{
+        thread = threads->per_thread_data + thread_index;
+        index = 0;
+retry:
+        qemu_event_reset(&thread->request_free_ev);
+        result_bitmap = get_free_request_bitmap(threads, thread);
+
+        for (; index < threads->thread_requests_nr; index++) {
+            if (test_bit(index, &result_bitmap)) {
+                qemu_event_wait(&thread->request_free_ev);
+                goto retry;
+            }
+
+            request_done(threads, index_to_request(thread, index));
+        }
+    }
+}
-- 
2.14.5




reply via email to

[Prev in Thread] Current Thread [Next in Thread]