qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PULL 7/9] aio-posix: add io_uring fd monitoring implementation


From: Stefan Hajnoczi
Subject: [PULL 7/9] aio-posix: add io_uring fd monitoring implementation
Date: Wed, 11 Mar 2020 12:40:43 +0000

The recent Linux io_uring API has several advantages over ppoll(2) and
epoll(2).  Details are given in the source code.

Add an io_uring implementation and make it the default on Linux.
Performance is the same as with epoll(7) but later patches add
optimizations that take advantage of io_uring.

It is necessary to change how aio_set_fd_handler() deals with deleting
AioHandlers since removing monitored file descriptors is asynchronous in
io_uring.  fdmon_io_uring_remove() marks the AioHandler deleted and
aio_set_fd_handler() will let it handle deletion in that case.

Signed-off-by: Stefan Hajnoczi <address@hidden>
Link: https://lore.kernel.org/r/address@hidden
Message-Id: <address@hidden>
---
 configure             |   5 +
 include/block/aio.h   |   9 ++
 util/Makefile.objs    |   1 +
 util/aio-posix.c      |  20 ++-
 util/aio-posix.h      |  20 ++-
 util/fdmon-io_uring.c | 326 ++++++++++++++++++++++++++++++++++++++++++
 6 files changed, 376 insertions(+), 5 deletions(-)
 create mode 100644 util/fdmon-io_uring.c

diff --git a/configure b/configure
index cbf864bff1..3c7470096f 100755
--- a/configure
+++ b/configure
@@ -4093,6 +4093,11 @@ if test "$linux_io_uring" != "no" ; then
     linux_io_uring_cflags=$($pkg_config --cflags liburing)
     linux_io_uring_libs=$($pkg_config --libs liburing)
     linux_io_uring=yes
+
+    # io_uring is used in libqemuutil.a where per-file -libs variables are not
+    # seen by programs linking the archive.  It's not ideal, but just add the
+    # library dependency globally.
+    LIBS="$linux_io_uring_libs $LIBS"
   else
     if test "$linux_io_uring" = "yes" ; then
       feature_not_found "linux io_uring" "Install liburing devel"
diff --git a/include/block/aio.h b/include/block/aio.h
index bd76b08f1a..83fc9b844d 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -14,6 +14,9 @@
 #ifndef QEMU_AIO_H
 #define QEMU_AIO_H
 
+#ifdef CONFIG_LINUX_IO_URING
+#include <liburing.h>
+#endif
 #include "qemu/queue.h"
 #include "qemu/event_notifier.h"
 #include "qemu/thread.h"
@@ -96,6 +99,8 @@ struct BHListSlice {
     QSIMPLEQ_ENTRY(BHListSlice) next;
 };
 
+typedef QSLIST_HEAD(, AioHandler) AioHandlerSList;
+
 struct AioContext {
     GSource source;
 
@@ -181,6 +186,10 @@ struct AioContext {
      * locking.
      */
     struct LuringState *linux_io_uring;
+
+    /* State for file descriptor monitoring using Linux io_uring */
+    struct io_uring fdmon_io_uring;
+    AioHandlerSList submit_list;
 #endif
 
     /* TimerLists for calling timers - one per clock type.  Has its own
diff --git a/util/Makefile.objs b/util/Makefile.objs
index 6439077a68..6718a38b61 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -7,6 +7,7 @@ util-obj-$(call lnot,$(CONFIG_ATOMIC64)) += atomic64.o
 util-obj-$(CONFIG_POSIX) += aio-posix.o
 util-obj-$(CONFIG_POSIX) += fdmon-poll.o
 util-obj-$(CONFIG_EPOLL_CREATE1) += fdmon-epoll.o
+util-obj-$(CONFIG_LINUX_IO_URING) += fdmon-io_uring.o
 util-obj-$(CONFIG_POSIX) += compatfd.o
 util-obj-$(CONFIG_POSIX) += event_notifier-posix.o
 util-obj-$(CONFIG_POSIX) += mmap-alloc.o
diff --git a/util/aio-posix.c b/util/aio-posix.c
index 028b2abded..ffd9cc381b 100644
--- a/util/aio-posix.c
+++ b/util/aio-posix.c
@@ -57,10 +57,16 @@ static bool aio_remove_fd_handler(AioContext *ctx, 
AioHandler *node)
         g_source_remove_poll(&ctx->source, &node->pfd);
     }
 
+    node->pfd.revents = 0;
+
+    /* If the fd monitor has already marked it deleted, leave it alone */
+    if (QLIST_IS_INSERTED(node, node_deleted)) {
+        return false;
+    }
+
     /* If a read is in progress, just mark the node as deleted */
     if (qemu_lockcnt_count(&ctx->list_lock)) {
         QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
-        node->pfd.revents = 0;
         return false;
     }
     /* Otherwise, delete it for real.  We can't just mark it as
@@ -126,9 +132,6 @@ void aio_set_fd_handler(AioContext *ctx,
 
         QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node);
     }
-    if (node) {
-        deleted = aio_remove_fd_handler(ctx, node);
-    }
 
     /* No need to order poll_disable_cnt writes against other updates;
      * the counter is only used to avoid wasting time and latency on
@@ -140,6 +143,9 @@ void aio_set_fd_handler(AioContext *ctx,
                atomic_read(&ctx->poll_disable_cnt) + poll_disable_change);
 
     ctx->fdmon_ops->update(ctx, node, new_node);
+    if (node) {
+        deleted = aio_remove_fd_handler(ctx, node);
+    }
     qemu_lockcnt_unlock(&ctx->list_lock);
     aio_notify(ctx);
 
@@ -565,11 +571,17 @@ void aio_context_setup(AioContext *ctx)
     ctx->fdmon_ops = &fdmon_poll_ops;
     ctx->epollfd = -1;
 
+    /* Use the fastest fd monitoring implementation if available */
+    if (fdmon_io_uring_setup(ctx)) {
+        return;
+    }
+
     fdmon_epoll_setup(ctx);
 }
 
 void aio_context_destroy(AioContext *ctx)
 {
+    fdmon_io_uring_destroy(ctx);
     fdmon_epoll_disable(ctx);
 }
 
diff --git a/util/aio-posix.h b/util/aio-posix.h
index 97899d0fbc..55fc771327 100644
--- a/util/aio-posix.h
+++ b/util/aio-posix.h
@@ -27,10 +27,14 @@ struct AioHandler {
     IOHandler *io_poll_begin;
     IOHandler *io_poll_end;
     void *opaque;
-    bool is_external;
     QLIST_ENTRY(AioHandler) node;
     QLIST_ENTRY(AioHandler) node_ready; /* only used during aio_poll() */
     QLIST_ENTRY(AioHandler) node_deleted;
+#ifdef CONFIG_LINUX_IO_URING
+    QSLIST_ENTRY(AioHandler) node_submitted;
+    unsigned flags; /* see fdmon-io_uring.c */
+#endif
+    bool is_external;
 };
 
 /* Add a handler to a ready list */
@@ -58,4 +62,18 @@ static inline void fdmon_epoll_disable(AioContext *ctx)
 }
 #endif /* !CONFIG_EPOLL_CREATE1 */
 
+#ifdef CONFIG_LINUX_IO_URING
+bool fdmon_io_uring_setup(AioContext *ctx);
+void fdmon_io_uring_destroy(AioContext *ctx);
+#else
+static inline bool fdmon_io_uring_setup(AioContext *ctx)
+{
+    return false;
+}
+
+static inline void fdmon_io_uring_destroy(AioContext *ctx)
+{
+}
+#endif /* !CONFIG_LINUX_IO_URING */
+
 #endif /* AIO_POSIX_H */
diff --git a/util/fdmon-io_uring.c b/util/fdmon-io_uring.c
new file mode 100644
index 0000000000..fb99b4b61e
--- /dev/null
+++ b/util/fdmon-io_uring.c
@@ -0,0 +1,326 @@
+/* SPDX-License-Identifier: GPL-2.0-or-later */
+/*
+ * Linux io_uring file descriptor monitoring
+ *
+ * The Linux io_uring API supports file descriptor monitoring with a few
+ * advantages over existing APIs like poll(2) and epoll(7):
+ *
+ * 1. Userspace polling of events is possible because the completion queue (cq
+ *    ring) is shared between the kernel and userspace.  This allows
+ *    applications that rely on userspace polling to also monitor file
+ *    descriptors in the same userspace polling loop.
+ *
+ * 2. Submission and completion is batched and done together in a single system
+ *    call.  This minimizes the number of system calls.
+ *
+ * 3. File descriptor monitoring is O(1) like epoll(7) so it scales better than
+ *    poll(2).
+ *
+ * 4. Nanosecond timeouts are supported so it requires fewer syscalls than
+ *    epoll(7).
+ *
+ * This code only monitors file descriptors and does not do asynchronous disk
+ * I/O.  Implementing disk I/O efficiently has other requirements and should
+ * use a separate io_uring so it does not make sense to unify the code.
+ *
+ * File descriptor monitoring is implemented using the following operations:
+ *
+ * 1. IORING_OP_POLL_ADD - adds a file descriptor to be monitored.
+ * 2. IORING_OP_POLL_REMOVE - removes a file descriptor being monitored.  When
+ *    the poll mask changes for a file descriptor it is first removed and then
+ *    re-added with the new poll mask, so this operation is also used as part
+ *    of modifying an existing monitored file descriptor.
+ * 3. IORING_OP_TIMEOUT - added every time a blocking syscall is made to wait
+ *    for events.  This operation self-cancels if another event completes
+ *    before the timeout.
+ *
+ * io_uring calls the submission queue the "sq ring" and the completion queue
+ * the "cq ring".  Ring entries are called "sqe" and "cqe", respectively.
+ *
+ * The code is structured so that sq/cq rings are only modified within
+ * fdmon_io_uring_wait().  Changes to AioHandlers are made by enqueuing them on
+ * ctx->submit_list so that fdmon_io_uring_wait() can submit IORING_OP_POLL_ADD
+ * and/or IORING_OP_POLL_REMOVE sqes for them.
+ */
+
+#include "qemu/osdep.h"
+#include <poll.h>
+#include "qemu/rcu_queue.h"
+#include "aio-posix.h"
+
+enum {
+    FDMON_IO_URING_ENTRIES  = 128, /* sq/cq ring size */
+
+    /* AioHandler::flags */
+    FDMON_IO_URING_PENDING  = (1 << 0),
+    FDMON_IO_URING_ADD      = (1 << 1),
+    FDMON_IO_URING_REMOVE   = (1 << 2),
+};
+
+static inline int poll_events_from_pfd(int pfd_events)
+{
+    return (pfd_events & G_IO_IN ? POLLIN : 0) |
+           (pfd_events & G_IO_OUT ? POLLOUT : 0) |
+           (pfd_events & G_IO_HUP ? POLLHUP : 0) |
+           (pfd_events & G_IO_ERR ? POLLERR : 0);
+}
+
+static inline int pfd_events_from_poll(int poll_events)
+{
+    return (poll_events & POLLIN ? G_IO_IN : 0) |
+           (poll_events & POLLOUT ? G_IO_OUT : 0) |
+           (poll_events & POLLHUP ? G_IO_HUP : 0) |
+           (poll_events & POLLERR ? G_IO_ERR : 0);
+}
+
+/*
+ * Returns an sqe for submitting a request.  Only be called within
+ * fdmon_io_uring_wait().
+ */
+static struct io_uring_sqe *get_sqe(AioContext *ctx)
+{
+    struct io_uring *ring = &ctx->fdmon_io_uring;
+    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
+    int ret;
+
+    if (likely(sqe)) {
+        return sqe;
+    }
+
+    /* No free sqes left, submit pending sqes first */
+    ret = io_uring_submit(ring);
+    assert(ret > 1);
+    sqe = io_uring_get_sqe(ring);
+    assert(sqe);
+    return sqe;
+}
+
+/* Atomically enqueue an AioHandler for sq ring submission */
+static void enqueue(AioHandlerSList *head, AioHandler *node, unsigned flags)
+{
+    unsigned old_flags;
+
+    old_flags = atomic_fetch_or(&node->flags, FDMON_IO_URING_PENDING | flags);
+    if (!(old_flags & FDMON_IO_URING_PENDING)) {
+        QSLIST_INSERT_HEAD_ATOMIC(head, node, node_submitted);
+    }
+}
+
+/* Dequeue an AioHandler for sq ring submission.  Called by fill_sq_ring(). */
+static AioHandler *dequeue(AioHandlerSList *head, unsigned *flags)
+{
+    AioHandler *node = QSLIST_FIRST(head);
+
+    if (!node) {
+        return NULL;
+    }
+
+    /* Doesn't need to be atomic since fill_sq_ring() moves the list */
+    QSLIST_REMOVE_HEAD(head, node_submitted);
+
+    /*
+     * Don't clear FDMON_IO_URING_REMOVE.  It's sticky so it can serve two
+     * purposes: telling fill_sq_ring() to submit IORING_OP_POLL_REMOVE and
+     * telling process_cqe() to delete the AioHandler when its
+     * IORING_OP_POLL_ADD completes.
+     */
+    *flags = atomic_fetch_and(&node->flags, ~(FDMON_IO_URING_PENDING |
+                                              FDMON_IO_URING_ADD));
+    return node;
+}
+
+static void fdmon_io_uring_update(AioContext *ctx,
+                                  AioHandler *old_node,
+                                  AioHandler *new_node)
+{
+    if (new_node) {
+        enqueue(&ctx->submit_list, new_node, FDMON_IO_URING_ADD);
+    }
+
+    if (old_node) {
+        /*
+         * Deletion is tricky because IORING_OP_POLL_ADD and
+         * IORING_OP_POLL_REMOVE are async.  We need to wait for the original
+         * IORING_OP_POLL_ADD to complete before this handler can be freed
+         * safely.
+         *
+         * It's possible that the file descriptor becomes ready and the
+         * IORING_OP_POLL_ADD cqe is enqueued before IORING_OP_POLL_REMOVE is
+         * submitted, too.
+         *
+         * Mark this handler deleted right now but don't place it on
+         * ctx->deleted_aio_handlers yet.  Instead, manually fudge the list
+         * entry to make QLIST_IS_INSERTED() think this handler has been
+         * inserted and other code recognizes this AioHandler as deleted.
+         *
+         * Once the original IORING_OP_POLL_ADD completes we enqueue the
+         * handler on the real ctx->deleted_aio_handlers list to be freed.
+         */
+        assert(!QLIST_IS_INSERTED(old_node, node_deleted));
+        old_node->node_deleted.le_prev = &old_node->node_deleted.le_next;
+
+        enqueue(&ctx->submit_list, old_node, FDMON_IO_URING_REMOVE);
+    }
+}
+
+static void add_poll_add_sqe(AioContext *ctx, AioHandler *node)
+{
+    struct io_uring_sqe *sqe = get_sqe(ctx);
+    int events = poll_events_from_pfd(node->pfd.events);
+
+    io_uring_prep_poll_add(sqe, node->pfd.fd, events);
+    io_uring_sqe_set_data(sqe, node);
+}
+
+static void add_poll_remove_sqe(AioContext *ctx, AioHandler *node)
+{
+    struct io_uring_sqe *sqe = get_sqe(ctx);
+
+    io_uring_prep_poll_remove(sqe, node);
+}
+
+/* Add a timeout that self-cancels when another cqe becomes ready */
+static void add_timeout_sqe(AioContext *ctx, int64_t ns)
+{
+    struct io_uring_sqe *sqe;
+    struct __kernel_timespec ts = {
+        .tv_sec = ns / NANOSECONDS_PER_SECOND,
+        .tv_nsec = ns % NANOSECONDS_PER_SECOND,
+    };
+
+    sqe = get_sqe(ctx);
+    io_uring_prep_timeout(sqe, &ts, 1, 0);
+}
+
+/* Add sqes from ctx->submit_list for submission */
+static void fill_sq_ring(AioContext *ctx)
+{
+    AioHandlerSList submit_list;
+    AioHandler *node;
+    unsigned flags;
+
+    QSLIST_MOVE_ATOMIC(&submit_list, &ctx->submit_list);
+
+    while ((node = dequeue(&submit_list, &flags))) {
+        /* Order matters, just in case both flags were set */
+        if (flags & FDMON_IO_URING_ADD) {
+            add_poll_add_sqe(ctx, node);
+        }
+        if (flags & FDMON_IO_URING_REMOVE) {
+            add_poll_remove_sqe(ctx, node);
+        }
+    }
+}
+
+/* Returns true if a handler became ready */
+static bool process_cqe(AioContext *ctx,
+                        AioHandlerList *ready_list,
+                        struct io_uring_cqe *cqe)
+{
+    AioHandler *node = io_uring_cqe_get_data(cqe);
+    unsigned flags;
+
+    /* poll_timeout and poll_remove have a zero user_data field */
+    if (!node) {
+        return false;
+    }
+
+    /*
+     * Deletion can only happen when IORING_OP_POLL_ADD completes.  If we race
+     * with enqueue() here then we can safely clear the FDMON_IO_URING_REMOVE
+     * bit before IORING_OP_POLL_REMOVE is submitted.
+     */
+    flags = atomic_fetch_and(&node->flags, ~FDMON_IO_URING_REMOVE);
+    if (flags & FDMON_IO_URING_REMOVE) {
+        QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
+        return false;
+    }
+
+    aio_add_ready_handler(ready_list, node, pfd_events_from_poll(cqe->res));
+
+    /* IORING_OP_POLL_ADD is one-shot so we must re-arm it */
+    add_poll_add_sqe(ctx, node);
+    return true;
+}
+
+static int process_cq_ring(AioContext *ctx, AioHandlerList *ready_list)
+{
+    struct io_uring *ring = &ctx->fdmon_io_uring;
+    struct io_uring_cqe *cqe;
+    unsigned num_cqes = 0;
+    unsigned num_ready = 0;
+    unsigned head;
+
+    io_uring_for_each_cqe(ring, head, cqe) {
+        if (process_cqe(ctx, ready_list, cqe)) {
+            num_ready++;
+        }
+
+        num_cqes++;
+    }
+
+    io_uring_cq_advance(ring, num_cqes);
+    return num_ready;
+}
+
+static int fdmon_io_uring_wait(AioContext *ctx, AioHandlerList *ready_list,
+                               int64_t timeout)
+{
+    unsigned wait_nr = 1; /* block until at least one cqe is ready */
+    int ret;
+
+    /* Fall back while external clients are disabled */
+    if (atomic_read(&ctx->external_disable_cnt)) {
+        return fdmon_poll_ops.wait(ctx, ready_list, timeout);
+    }
+
+    if (timeout == 0) {
+        wait_nr = 0; /* non-blocking */
+    } else if (timeout > 0) {
+        add_timeout_sqe(ctx, timeout);
+    }
+
+    fill_sq_ring(ctx);
+
+    ret = io_uring_submit_and_wait(&ctx->fdmon_io_uring, wait_nr);
+    assert(ret >= 0);
+
+    return process_cq_ring(ctx, ready_list);
+}
+
+static const FDMonOps fdmon_io_uring_ops = {
+    .update = fdmon_io_uring_update,
+    .wait = fdmon_io_uring_wait,
+};
+
+bool fdmon_io_uring_setup(AioContext *ctx)
+{
+    int ret;
+
+    ret = io_uring_queue_init(FDMON_IO_URING_ENTRIES, &ctx->fdmon_io_uring, 0);
+    if (ret != 0) {
+        return false;
+    }
+
+    QSLIST_INIT(&ctx->submit_list);
+    ctx->fdmon_ops = &fdmon_io_uring_ops;
+    return true;
+}
+
+void fdmon_io_uring_destroy(AioContext *ctx)
+{
+    if (ctx->fdmon_ops == &fdmon_io_uring_ops) {
+        AioHandler *node;
+
+        io_uring_queue_exit(&ctx->fdmon_io_uring);
+
+        /* No need to submit these anymore, just free them. */
+        while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) {
+            QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted);
+            QLIST_REMOVE(node, node);
+            g_free(node);
+        }
+
+        ctx->fdmon_ops = &fdmon_poll_ops;
+    }
+}
-- 
2.24.1


reply via email to

[Prev in Thread] Current Thread [Next in Thread]