[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[PATCH 4/4] qmp: Move dispatcher to a coroutine
From: |
Kevin Wolf |
Subject: |
[PATCH 4/4] qmp: Move dispatcher to a coroutine |
Date: |
Thu, 9 Jan 2020 19:35:45 +0100 |
This moves the QMP dispatcher to a coroutine and runs all QMP command
handlers that declare 'coroutine': true in coroutine context so they
can avoid blocking the main loop while doing I/O or waiting for other
events.
For commands that are not declared safe to run in a coroutine, the
dispatcher drops out of coroutine context by calling the QMP command
handler from a bottom half.
Signed-off-by: Kevin Wolf <address@hidden>
---
include/qapi/qmp/dispatch.h | 2 +
monitor/monitor-internal.h | 5 ++-
monitor/monitor.c | 24 +++++++----
monitor/qmp.c | 83 +++++++++++++++++++++++--------------
qapi/qmp-dispatch.c | 38 ++++++++++++++++-
5 files changed, 111 insertions(+), 41 deletions(-)
diff --git a/include/qapi/qmp/dispatch.h b/include/qapi/qmp/dispatch.h
index d6ce9efc8e..d6d5443391 100644
--- a/include/qapi/qmp/dispatch.h
+++ b/include/qapi/qmp/dispatch.h
@@ -30,6 +30,8 @@ typedef enum QmpCommandOptions
typedef struct QmpCommand
{
const char *name;
+ /* Runs in coroutine context if QCO_COROUTINE is set, except for OOB
+ * commands */
QmpCommandFunc *fn;
QmpCommandOptions options;
QTAILQ_ENTRY(QmpCommand) node;
diff --git a/monitor/monitor-internal.h b/monitor/monitor-internal.h
index d78f5ca190..7389b6a56c 100644
--- a/monitor/monitor-internal.h
+++ b/monitor/monitor-internal.h
@@ -154,7 +154,8 @@ static inline bool monitor_is_qmp(const Monitor *mon)
typedef QTAILQ_HEAD(MonitorList, Monitor) MonitorList;
extern IOThread *mon_iothread;
-extern QEMUBH *qmp_dispatcher_bh;
+extern Coroutine *qmp_dispatcher_co;
+extern bool qmp_dispatcher_co_busy;
extern QmpCommandList qmp_commands, qmp_cap_negotiation_commands;
extern QemuMutex monitor_lock;
extern MonitorList mon_list;
@@ -172,7 +173,7 @@ void monitor_fdsets_cleanup(void);
void qmp_send_response(MonitorQMP *mon, const QDict *rsp);
void monitor_data_destroy_qmp(MonitorQMP *mon);
-void monitor_qmp_bh_dispatcher(void *data);
+void coroutine_fn monitor_qmp_dispatcher_co(void *data);
int get_monitor_def(int64_t *pval, const char *name);
void help_cmd(Monitor *mon, const char *name);
diff --git a/monitor/monitor.c b/monitor/monitor.c
index 12898b6448..c72763fa4e 100644
--- a/monitor/monitor.c
+++ b/monitor/monitor.c
@@ -53,8 +53,9 @@ typedef struct {
/* Shared monitor I/O thread */
IOThread *mon_iothread;
-/* Bottom half to dispatch the requests received from I/O thread */
-QEMUBH *qmp_dispatcher_bh;
+/* Coroutine to dispatch the requests received from I/O thread */
+Coroutine *qmp_dispatcher_co;
+bool qmp_dispatcher_co_busy;
/* Protects mon_list, monitor_qapi_event_state, monitor_destroyed. */
QemuMutex monitor_lock;
@@ -579,9 +580,16 @@ void monitor_cleanup(void)
}
qemu_mutex_unlock(&monitor_lock);
- /* QEMUBHs needs to be deleted before destroying the I/O thread */
- qemu_bh_delete(qmp_dispatcher_bh);
- qmp_dispatcher_bh = NULL;
+ /* The dispatcher needs to stop before destroying the I/O thread */
+ if (!atomic_mb_read(&qmp_dispatcher_co_busy)) {
+ aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);
+ qmp_dispatcher_co = NULL;
+ }
+
+ AIO_WAIT_WHILE(qemu_get_aio_context(),
+ (aio_bh_poll(iohandler_get_aio_context()),
+ atomic_mb_read(&qmp_dispatcher_co_busy)));
+
if (mon_iothread) {
iothread_destroy(mon_iothread);
mon_iothread = NULL;
@@ -604,9 +612,9 @@ void monitor_init_globals_core(void)
* have commands assuming that context. It would be nice to get
* rid of those assumptions.
*/
- qmp_dispatcher_bh = aio_bh_new(iohandler_get_aio_context(),
- monitor_qmp_bh_dispatcher,
- NULL);
+ qmp_dispatcher_co = qemu_coroutine_create(monitor_qmp_dispatcher_co, NULL);
+ atomic_mb_set(&qmp_dispatcher_co_busy, true);
+ aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);
}
QemuOptsList qemu_mon_opts = {
diff --git a/monitor/qmp.c b/monitor/qmp.c
index b67a8e7d1f..9fd66c7b97 100644
--- a/monitor/qmp.c
+++ b/monitor/qmp.c
@@ -133,6 +133,8 @@ static void monitor_qmp_respond(MonitorQMP *mon, QDict *rsp)
}
}
+/* Runs outside of coroutine context for OOB commands, but in coroutine context
+ * for everything else. */
static void monitor_qmp_dispatch(MonitorQMP *mon, QObject *req)
{
Monitor *old_mon;
@@ -211,43 +213,62 @@ static QMPRequest
*monitor_qmp_requests_pop_any_with_lock(void)
return req_obj;
}
-void monitor_qmp_bh_dispatcher(void *data)
+void coroutine_fn monitor_qmp_dispatcher_co(void *data)
{
- QMPRequest *req_obj = monitor_qmp_requests_pop_any_with_lock();
+ QMPRequest *req_obj = NULL;
QDict *rsp;
bool need_resume;
MonitorQMP *mon;
- if (!req_obj) {
- return;
- }
+ while (true) {
+ assert(atomic_mb_read(&qmp_dispatcher_co_busy) == true);
+
+ while (!(req_obj = monitor_qmp_requests_pop_any_with_lock())) {
+ /* Wait to be reentered from handle_qmp_command, or terminate if
+ * qmp_dispatcher_co has been reset to NULL */
+ atomic_mb_set(&qmp_dispatcher_co_busy, false);
+ if (qmp_dispatcher_co) {
+ qemu_coroutine_yield();
+ }
+ /* qmp_dispatcher_co may have changed if we yielded and were
+ * reentered from monitor_cleanup() */
+ if (!qmp_dispatcher_co) {
+ return;
+ }
+ atomic_mb_set(&qmp_dispatcher_co_busy, true);
+ }
- mon = req_obj->mon;
- /* qmp_oob_enabled() might change after "qmp_capabilities" */
- need_resume = !qmp_oob_enabled(mon) ||
- mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1;
- qemu_mutex_unlock(&mon->qmp_queue_lock);
- if (req_obj->req) {
- QDict *qdict = qobject_to(QDict, req_obj->req);
- QObject *id = qdict ? qdict_get(qdict, "id") : NULL;
- trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: "");
- monitor_qmp_dispatch(mon, req_obj->req);
- } else {
- assert(req_obj->err);
- rsp = qmp_error_response(req_obj->err);
- req_obj->err = NULL;
- monitor_qmp_respond(mon, rsp);
- qobject_unref(rsp);
- }
+ aio_co_schedule(qemu_get_aio_context(), qmp_dispatcher_co);
+ qemu_coroutine_yield();
+
+ mon = req_obj->mon;
+ /* qmp_oob_enabled() might change after "qmp_capabilities" */
+ need_resume = !qmp_oob_enabled(mon) ||
+ mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1;
+ qemu_mutex_unlock(&mon->qmp_queue_lock);
+ if (req_obj->req) {
+ QDict *qdict = qobject_to(QDict, req_obj->req);
+ QObject *id = qdict ? qdict_get(qdict, "id") : NULL;
+ trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: "");
+ monitor_qmp_dispatch(mon, req_obj->req);
+ } else {
+ assert(req_obj->err);
+ rsp = qmp_error_response(req_obj->err);
+ req_obj->err = NULL;
+ monitor_qmp_respond(mon, rsp);
+ qobject_unref(rsp);
+ }
- if (need_resume) {
- /* Pairs with the monitor_suspend() in handle_qmp_command() */
- monitor_resume(&mon->common);
- }
- qmp_request_free(req_obj);
+ if (need_resume) {
+ /* Pairs with the monitor_suspend() in handle_qmp_command() */
+ monitor_resume(&mon->common);
+ }
+ qmp_request_free(req_obj);
- /* Reschedule instead of looping so the main loop stays responsive */
- qemu_bh_schedule(qmp_dispatcher_bh);
+ /* Reschedule instead of looping so the main loop stays responsive */
+ aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);
+ qemu_coroutine_yield();
+ }
}
static void handle_qmp_command(void *opaque, QObject *req, Error *err)
@@ -308,7 +329,9 @@ static void handle_qmp_command(void *opaque, QObject *req,
Error *err)
qemu_mutex_unlock(&mon->qmp_queue_lock);
/* Kick the dispatcher routine */
- qemu_bh_schedule(qmp_dispatcher_bh);
+ if (!atomic_mb_read(&qmp_dispatcher_co_busy)) {
+ aio_co_wake(qmp_dispatcher_co);
+ }
}
static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size)
diff --git a/qapi/qmp-dispatch.c b/qapi/qmp-dispatch.c
index bc264b3c9b..6ccf19f2a2 100644
--- a/qapi/qmp-dispatch.c
+++ b/qapi/qmp-dispatch.c
@@ -12,6 +12,8 @@
*/
#include "qemu/osdep.h"
+
+#include "monitor/monitor-internal.h"
#include "qapi/error.h"
#include "qapi/qmp/dispatch.h"
#include "qapi/qmp/qdict.h"
@@ -75,6 +77,23 @@ static QDict *qmp_dispatch_check_obj(const QObject *request,
bool allow_oob,
return dict;
}
+typedef struct QmpDispatchBH {
+ QmpCommand *cmd;
+ QDict *args;
+ QObject **ret;
+ Error **errp;
+ Coroutine *co;
+} QmpDispatchBH;
+
+static void do_qmp_dispatch_bh(void *opaque)
+{
+ QmpDispatchBH *data = opaque;
+ data->cmd->fn(data->args, data->ret, data->errp);
+ aio_co_wake(data->co);
+}
+
+/* Runs outside of coroutine context for OOB commands, but in coroutine context
+ * for everything else. */
static QObject *do_qmp_dispatch(QmpCommandList *cmds, QObject *request,
bool allow_oob, Error **errp)
{
@@ -129,7 +148,22 @@ static QObject *do_qmp_dispatch(QmpCommandList *cmds,
QObject *request,
qobject_ref(args);
}
- cmd->fn(args, &ret, &local_err);
+ assert(!(oob && qemu_in_coroutine()));
+ if ((cmd->options & QCO_COROUTINE) || !qemu_in_coroutine()) {
+ cmd->fn(args, &ret, &local_err);
+ } else {
+ /* Must drop out of coroutine context for this one */
+ QmpDispatchBH data = {
+ .cmd = cmd,
+ .args = args,
+ .ret = &ret,
+ .errp = &local_err,
+ .co = qemu_coroutine_self(),
+ };
+ aio_bh_schedule_oneshot(qemu_get_aio_context(), do_qmp_dispatch_bh,
+ &data);
+ qemu_coroutine_yield();
+ }
if (local_err) {
error_propagate(errp, local_err);
} else if (cmd->options & QCO_NO_SUCCESS_RESP) {
@@ -164,6 +198,8 @@ bool qmp_is_oob(const QDict *dict)
&& !qdict_haskey(dict, "execute");
}
+/* Runs outside of coroutine context for OOB commands, but in coroutine context
+ * for everything else. */
QDict *qmp_dispatch(QmpCommandList *cmds, QObject *request,
bool allow_oob)
{
--
2.20.1