[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [RFC][PATCH v7 04/16] virtagent: bi-directional RPC handlin
From: |
Michael Roth |
Subject: |
[Qemu-devel] [RFC][PATCH v7 04/16] virtagent: bi-directional RPC handling logic |
Date: |
Mon, 7 Mar 2011 14:10:30 -0600 |
This implements the state machine/logic used to manage
send/receive/execute phases of RPCs we send or receive. It does so using
a set of abstract methods we implement with the application and
transport level code which will follow.
Signed-off-by: Michael Roth <address@hidden>
---
virtagent-manager.c | 326 +++++++++++++++++++++++++++++++++++++++++++++++++++
virtagent-manager.h | 130 ++++++++++++++++++++
2 files changed, 456 insertions(+), 0 deletions(-)
create mode 100644 virtagent-manager.c
create mode 100644 virtagent-manager.h
diff --git a/virtagent-manager.c b/virtagent-manager.c
new file mode 100644
index 0000000..51d26a3
--- /dev/null
+++ b/virtagent-manager.c
@@ -0,0 +1,326 @@
+/*
+ * virtagent - job queue management
+ *
+ * Copyright IBM Corp. 2011
+ *
+ * Authors:
+ * Michael Roth <address@hidden>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "virtagent-common.h"
+
+typedef struct VAServerJob {
+ char tag[64];
+ void *opaque;
+ VAServerJobOps ops;
+ QTAILQ_ENTRY(VAServerJob) next;
+ enum {
+ VA_SERVER_JOB_STATE_NEW = 0,
+ VA_SERVER_JOB_STATE_BUSY,
+ VA_SERVER_JOB_STATE_EXECUTED,
+ VA_SERVER_JOB_STATE_SENT,
+ VA_SERVER_JOB_STATE_DONE,
+ } state;
+} VAServerJob;
+
+typedef struct VAClientJob {
+ char tag[64];
+ void *opaque;
+ void *resp_opaque;
+ VAClientJobOps ops;
+ QTAILQ_ENTRY(VAClientJob) next;
+ enum {
+ VA_CLIENT_JOB_STATE_NEW = 0,
+ VA_CLIENT_JOB_STATE_BUSY,
+ VA_CLIENT_JOB_STATE_SENT,
+ VA_CLIENT_JOB_STATE_READ,
+ VA_CLIENT_JOB_STATE_DONE,
+ } state;
+} VAClientJob;
+
+#define SEND_COUNT_MAX 1
+#define EXECUTE_COUNT_MAX 4
+
+struct VAManager {
+ int send_count; /* sends in flight */
+ int execute_count; /* number of jobs currently executing */
+ QTAILQ_HEAD(, VAServerJob) server_jobs;
+ QTAILQ_HEAD(, VAClientJob) client_jobs;
+};
+
+/* server job operations/helpers */
+
+static VAServerJob *va_server_job_by_tag(VAManager *m, const char *tag)
+{
+ VAServerJob *j;
+ QTAILQ_FOREACH(j, &m->server_jobs, next) {
+ if (strcmp(j->tag, tag) == 0) {
+ return j;
+ }
+ }
+ return NULL;
+}
+
+int va_server_job_add(VAManager *m, const char *tag, void *opaque,
+ VAServerJobOps ops)
+{
+ VAServerJob *j = qemu_mallocz(sizeof(VAServerJob));
+ TRACE("called");
+ j->state = VA_SERVER_JOB_STATE_NEW;
+ j->ops = ops;
+ j->opaque = opaque;
+ memset(j->tag, 0, 64);
+ pstrcpy(j->tag, 63, tag);
+ QTAILQ_INSERT_TAIL(&m->server_jobs, j, next);
+ va_kick(m);
+ return 0;
+}
+
+static void va_server_job_execute(VAServerJob *j)
+{
+ TRACE("called");
+ j->state = VA_SERVER_JOB_STATE_BUSY;
+ j->ops.execute(j->opaque, j->tag);
+}
+
+/* TODO: need a way to pass information back */
+void va_server_job_execute_done(VAManager *m, const char *tag)
+{
+ VAServerJob *j = va_server_job_by_tag(m, tag);
+ TRACE("called");
+ if (!j) {
+ LOG("server job with tag \"%s\" not found", tag);
+ return;
+ }
+ j->state = VA_SERVER_JOB_STATE_EXECUTED;
+ va_kick(m);
+}
+
+static void va_server_job_send(VAServerJob *j)
+{
+ TRACE("called");
+ j->state = VA_SERVER_JOB_STATE_BUSY;
+ j->ops.send(j->opaque, j->tag);
+}
+
+void va_server_job_send_done(VAManager *m, const char *tag)
+{
+ VAServerJob *j = va_server_job_by_tag(m, tag);
+ TRACE("called");
+ if (!j) {
+ LOG("server job with tag \"%s\" not found", tag);
+ return;
+ }
+ j->state = VA_SERVER_JOB_STATE_SENT;
+ m->send_count--;
+ va_kick(m);
+}
+
+static void va_server_job_callback(VAServerJob *j)
+{
+ TRACE("called");
+ j->state = VA_SERVER_JOB_STATE_BUSY;
+ if (j->ops.callback) {
+ j->ops.callback(j->opaque, j->tag);
+ }
+ j->state = VA_SERVER_JOB_STATE_DONE;
+}
+
+void va_server_job_cancel(VAManager *m, const char *tag)
+{
+ VAServerJob *j = va_server_job_by_tag(m, tag);
+ TRACE("called");
+ if (!j) {
+ LOG("server job with tag \"%s\" not found", tag);
+ return;
+ }
+ /* TODO: need to decrement sends/execs in flight appropriately */
+ /* make callback and move to done state, kick() will handle cleanup */
+ va_server_job_callback(j);
+ va_kick(m);
+}
+
+/* client job operations */
+
+static VAClientJob *va_client_job_by_tag(VAManager *m, const char *tag)
+{
+ VAClientJob *j;
+ QTAILQ_FOREACH(j, &m->client_jobs, next) {
+ if (strcmp(j->tag, tag) == 0) {
+ return j;
+ }
+ }
+ return NULL;
+}
+
+int va_client_job_add(VAManager *m, const char *tag, void *opaque,
+ VAClientJobOps ops)
+{
+ VAClientJob *j = qemu_mallocz(sizeof(VAClientJob));
+ TRACE("called");
+ j->ops = ops;
+ j->opaque = opaque;
+ memset(j->tag, 0, 64);
+ pstrcpy(j->tag, 63, tag);
+ QTAILQ_INSERT_TAIL(&m->client_jobs, j, next);
+ va_kick(m);
+ return 0;
+}
+
+static void va_client_job_send(VAClientJob *j)
+{
+ TRACE("called");
+ j->state = VA_CLIENT_JOB_STATE_BUSY;
+ j->ops.send(j->opaque, j->tag);
+}
+
+void va_client_job_send_done(VAManager *m, const char *tag)
+{
+ VAClientJob *j = va_client_job_by_tag(m, tag);
+ TRACE("called");
+ if (!j) {
+ LOG("client job with tag \"%s\" not found", tag);
+ return;
+ }
+ j->state = VA_CLIENT_JOB_STATE_SENT;
+ m->send_count--;
+ va_kick(m);
+}
+
+void va_client_job_read_done(VAManager *m, const char *tag, void *resp)
+{
+ VAClientJob *j = va_client_job_by_tag(m, tag);
+ TRACE("called");
+ if (!j) {
+ LOG("client job with tag \"%s\" not found", tag);
+ return;
+ }
+ j->state = VA_CLIENT_JOB_STATE_READ;
+ j->resp_opaque = resp;
+ va_kick(m);
+}
+
+static void va_client_job_callback(VAClientJob *j)
+{
+ TRACE("called");
+ j->state = VA_CLIENT_JOB_STATE_BUSY;
+ if (j->ops.callback) {
+ j->ops.callback(j->opaque, j->resp_opaque, j->tag);
+ }
+ j->state = VA_CLIENT_JOB_STATE_DONE;
+}
+
+void va_client_job_cancel(VAManager *m, const char *tag)
+{
+ VAClientJob *j = va_client_job_by_tag(m, tag);
+ TRACE("called");
+ if (!j) {
+ LOG("client job with tag \"%s\" not found", tag);
+ return;
+ }
+ /* TODO: need to decrement sends/execs in flight appropriately */
+ /* make callback and move to done state, kick() will handle cleanup */
+ va_client_job_callback(j);
+ va_kick(m);
+}
+
+/* general management functions */
+
+VAManager *va_manager_new(void)
+{
+ VAManager *m = qemu_mallocz(sizeof(VAManager));
+ QTAILQ_INIT(&m->client_jobs);
+ QTAILQ_INIT(&m->server_jobs);
+ return m;
+}
+
+static void va_process_server_job(VAManager *m, VAServerJob *sj)
+{
+ switch (sj->state) {
+ case VA_SERVER_JOB_STATE_NEW:
+ TRACE("marker");
+ va_server_job_execute(sj);
+ break;
+ case VA_SERVER_JOB_STATE_EXECUTED:
+ TRACE("marker");
+ if (m->send_count < SEND_COUNT_MAX) {
+ TRACE("marker");
+ va_server_job_send(sj);
+ m->send_count++;
+ }
+ break;
+ case VA_SERVER_JOB_STATE_SENT:
+ TRACE("marker");
+ va_server_job_callback(sj);
+ break;
+ case VA_SERVER_JOB_STATE_BUSY:
+ TRACE("marker, server job currently busy");
+ break;
+ case VA_SERVER_JOB_STATE_DONE:
+ TRACE("marker");
+ QTAILQ_REMOVE(&m->server_jobs, sj, next);
+ break;
+ default:
+ LOG("error, unknown server job state");
+ break;
+ }
+}
+
+static void va_process_client_job(VAManager *m, VAClientJob *cj)
+{
+ switch (cj->state) {
+ case VA_CLIENT_JOB_STATE_NEW:
+ TRACE("marker");
+ if (m->send_count < SEND_COUNT_MAX) {
+ TRACE("marker");
+ va_client_job_send(cj);
+ m->send_count++;
+ }
+ break;
+ case VA_CLIENT_JOB_STATE_SENT:
+ TRACE("marker");
+ //nothing to do here, awaiting read_done()
+ break;
+ case VA_CLIENT_JOB_STATE_READ:
+ TRACE("marker");
+ va_client_job_callback(cj);
+ break;
+ case VA_CLIENT_JOB_STATE_DONE:
+ TRACE("marker");
+ QTAILQ_REMOVE(&m->client_jobs, cj, next);
+ break;
+ case VA_CLIENT_JOB_STATE_BUSY:
+ TRACE("marker, client job currently busy");
+ break;
+ default:
+ LOG("error, unknown client job state");
+ break;
+ }
+}
+
+void va_kick(VAManager *m)
+{
+ VAServerJob *sj, *sj_tmp;
+ VAClientJob *cj, *cj_tmp;
+
+ TRACE("called");
+ TRACE("send_count: %u, execute_count: %u", m->send_count,
m->execute_count);
+
+ /* TODO: make sure there is no starvation of jobs/operations here */
+
+ /* look for any work to be done among pending server jobs */
+ QTAILQ_FOREACH_SAFE(sj, &m->server_jobs, next, sj_tmp) {
+ TRACE("marker, server tag: %s", sj->tag);
+ va_process_server_job(m, sj);
+ }
+
+ /* look for work to be done among pending client jobs */
+ QTAILQ_FOREACH_SAFE(cj, &m->client_jobs, next, cj_tmp) {
+ TRACE("marker, client tag: %s", cj->tag);
+ va_process_client_job(m, cj);
+ }
+}
diff --git a/virtagent-manager.h b/virtagent-manager.h
new file mode 100644
index 0000000..7b463fb
--- /dev/null
+++ b/virtagent-manager.h
@@ -0,0 +1,130 @@
+#ifndef VIRTAGENT_MANAGER_H
+#define VIRTAGENT_MANAGER_H
+
+#include "qemu-common.h"
+#include "qemu-queue.h"
+
+/*
+ * Protocol Overview:
+ *
+ * The virtagent protocol depends on a state machine to manage communication
+ * over a single connection stream, currently a virtio or isa serial channel.
+ * The basic characterization of the work being done is that clients
+ * send/handle client jobs locally, which are then read/handled remotely as
+ * server jobs. A client job consists of a request which is sent, and a
+ * response which is eventually recieved. A server job consists of a request
+ * which is recieved from the other end, and a response which is sent back.
+ *
+ * Server jobs are given priority over client jobs, i.e. if we send a client
+ * job (our request) and recieve a server job (their request), rather than
+ * await a response to the client job, we immediately begin processing the
+ * server job and then send back the response. This prevents us from being
+ * deadlocked in a situation where both sides have sent a client job and are
+ * awaiting the response before handling the other side's client job.
+ *
+ * Multiple in-flight requests are supported, but high request rates can
+ * potentially starve out the other side's client jobs / requests, so we'll
+ * behaved participants should periodically backoff on high request rates, or
+ * limit themselves to 1 request at a time (anything more than 1 can still
+ * potentionally remove any window for the other end to service it's own
+ * client jobs, since we can begin sending the next request before it begins
+ * send the response for the 2nd).
+ *
+ * On a related note, in the future, bidirectional user/session-level guest
+ * agents may also be supported via a forwarding service made available
+ * through the system-level guest agent. In this case it is up to the
+ * system-level agent to handle forwarding requests in such a way that we
+ * don't starve the host-side service out sheerly by having too many
+ * sessions/users trying to send RPCs at a constant rate. This would be
+ * supported through this job Manager via an additional "forwarder" job type.
+ *
+ * To encapsulate some of this logic, we define here a "Manager" class, which
+ * provides an abstract interface to a state machine which handles most of
+ * the above logic transparently to the transport/application-level code.
+ * This also makes it possible to utilize alternative
+ * transport/application-level protocols in the future.
+ *
+ */
+
+/*
+ * Two types of jobs are generated from various components of virtagent.
+ * Each job type has a priority, and a set of prioritized functions as well.
+ *
+ * The read handler generates new server jobs as it recieves requests from
+ * the channel. Server jobs make progress through the following operations.
+ *
+ * EXECUTE->EXECUTE_DONE->SEND->SEND_DONE
+ *
+ * EXECUTE (provided by user, manager calls)
+ * When server jobs are added, eventually (as execution slots become
+ * available) an execute() will be called to begin executing the job. An
+ * error value will be returned if there is no room in the queue for another
+ * server job.
+ *
+ * EXECUTE_DONE (provided by manager, user calls)
+ * As server jobs complete, execute_completed() is called to update execution
+ * status of that job (failure/success), inject the payload, and kick off the
+ * next operation.
+ *
+ * SEND (provided by user, manager calls)
+ * Eventually the send() operation is made. This will cause the send handler
+ * to begin sending the response.
+ *
+ * SEND_DONE (provided by manager, user calls)
+ * Upon completion of that send, the send_completed() operation will be
+ * called. This will free up the job, and kick off the next operation.
+ */
+typedef int (va_job_op)(void *opaque, const char *tag);
+typedef struct VAServerJobOps {
+ va_job_op *execute;
+ va_job_op *send;
+ va_job_op *callback;
+} VAServerJobOps;
+
+/*
+ * The client component generates new client jobs as they're made by
+ * virtagent in response to monitored events or user-issued commands.
+ * Client jobs progress via the following operations.
+ *
+ * SEND->SEND_DONE->READ_DONE
+ *
+ * SEND (provided by user, called by manager)
+ * After client jobs are added, send() will eventually be called to queue
+ * the job up for xmit over the channel.
+ *
+ * SEND_DONE (provided by manager, called by user)
+ * Upon completion of the send, send_completed() should be called with
+ * failure/success indication.
+ *
+ * READ_DONE (provided by manager, called by user)
+ * When a response for the request is read back via the transport layer,
+ * read_done() will be called by the user to indicate success/failure,
+ * inject the response, and make the associated callback.
+ */
+typedef int (va_client_job_cb)(void *opaque, void *resp_opaque,
+ const char *tag);
+typedef struct VAClientJobOps {
+ va_job_op *send;
+ va_client_job_cb *callback;
+} VAClientJobOps;
+
+typedef struct VAManager VAManager;
+
+VAManager *va_manager_new(void);
+void va_kick(VAManager *m);
+
+/* interfaces for server jobs */
+int va_server_job_add(VAManager *m, const char *tag, void *opaque,
+ VAServerJobOps ops);
+void va_server_job_execute_done(VAManager *m, const char *tag);
+void va_server_job_send_done(VAManager *m, const char *tag);
+void va_server_job_cancel(VAManager *m, const char *tag);
+
+/* interfaces for client jobs */
+int va_client_job_add(VAManager *m, const char *tag, void *opaque,
+ VAClientJobOps ops);
+void va_client_job_cancel(VAManager *m, const char *tag);
+void va_client_job_send_done(VAManager *m, const char *tag);
+void va_client_job_read_done(VAManager *m, const char *tag, void *resp);
+
+#endif /* VIRTAGENT_MANAGER_H */
--
1.7.0.4
- [Qemu-devel] Re: [RFC][PATCH v7 06/16] virtagent: transport definitions, (continued)
- [Qemu-devel] [RFC][PATCH v7 13/16] virtagent: add va_shutdown HMP/QMP command, Michael Roth, 2011/03/07
- [Qemu-devel] [RFC][PATCH v7 11/16] virtagent: add va_ping HMP/QMP command, Michael Roth, 2011/03/07
- [Qemu-devel] [RFC][PATCH v7 15/16] virtagent: qemu-va, system-level virtagent guest agent, Michael Roth, 2011/03/07
- [Qemu-devel] [RFC][PATCH v7 16/16] virtagent: add bits to build virtagent host/guest components, Michael Roth, 2011/03/07
- [Qemu-devel] [RFC][PATCH v7 12/16] virtagent: add "shutdown" RPC to server, Michael Roth, 2011/03/07
- [Qemu-devel] [RFC][PATCH v7 14/16] virtagent: add virtagent chardev, Michael Roth, 2011/03/07
- [Qemu-devel] [RFC][PATCH v7 07/16] virtagent: base RPC client definitions, Michael Roth, 2011/03/07
- [Qemu-devel] [RFC][PATCH v7 08/16] virtagnet: base RPC server definitions, Michael Roth, 2011/03/07
- [Qemu-devel] [RFC][PATCH v7 04/16] virtagent: bi-directional RPC handling logic,
Michael Roth <=
- [Qemu-devel] [RFC][PATCH v7 05/16] virtagent: common helpers and init routines, Michael Roth, 2011/03/07
- [Qemu-devel] [RFC][PATCH v7 03/16] Make qemu timers available for tools, Michael Roth, 2011/03/07
- [Qemu-devel] Re: [RFC][PATCH v7 00/16] virtagent: host/guest communication agent, Anthony Liguori, 2011/03/07