[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH 3/3] Add helper functions to enable virtio-9p ma
From: |
Arun R Bharadwaj |
Subject: |
Re: [Qemu-devel] [PATCH 3/3] Add helper functions to enable virtio-9p make use of the threadlets |
Date: |
Thu, 21 Oct 2010 18:56:25 +0530 |
User-agent: |
Mutt/1.5.20 (2009-06-14) |
* Arun R Bharadwaj <address@hidden> [2010-10-21 17:40:55]:
> From: Gautham R Shenoy <address@hidden>
>
> infrastructure for offloading blocking tasks such as making posix calls on
> to the helper threads and handle the post_posix_operations() from the
> context of the iothread. This frees the vcpu thread to process any other guest
> operations while the processing of v9fs_io is in progress.
>
> Signed-off-by: Gautham R Shenoy <address@hidden>
(Please note this correction)
Signed-off-by: Gautham R Shenoy <address@hidden>
> Signed-off-by: Sripathi Kodi <address@hidden>
> Signed-off-by: Arun R Bharadwaj <address@hidden>
> ---
> hw/virtio-9p.c | 168
> ++++++++++++++++++++++++++++++++++++++++++++++++++++
> posix-aio-compat.c | 48 ++++++---------
> qemu-threadlets.c | 21 +++++++
> qemu-threadlets.h | 1
> vl.c | 3 +
> 5 files changed, 211 insertions(+), 30 deletions(-)
>
> diff --git a/hw/virtio-9p.c b/hw/virtio-9p.c
> index a871685..f9a7b7d 100644
> --- a/hw/virtio-9p.c
> +++ b/hw/virtio-9p.c
> @@ -18,6 +18,7 @@
> #include "fsdev/qemu-fsdev.h"
> #include "virtio-9p-debug.h"
> #include "virtio-9p-xattr.h"
> +#include "qemu-threadlets.h"
>
> int debug_9p_pdu;
>
> @@ -33,6 +34,149 @@ enum {
> Oappend = 0x80,
> };
>
> +struct v9fs_post_op {
> + QTAILQ_ENTRY(v9fs_post_op) node;
> + void (*func)(void *arg);
> + void *arg;
> +};
> +
> +static struct {
> + int rfd;
> + int wfd;
> + QemuMutex lock;
> + QTAILQ_HEAD(, v9fs_post_op) post_op_list;
> +} v9fs_async_struct;
> +
> +static void die2(int err, const char *what)
> +{
> + fprintf(stderr, "%s failed: %s\n", what, strerror(err));
> + abort();
> +}
> +
> +static void die(const char *what)
> +{
> + die2(errno, what);
> +}
> +
> +#define ASYNC_MAX_PROCESS 5
> +
> +/**
> + * v9fs_process_post_ops: Process any pending v9fs_post_posix_operation
> + * @arg: Not used.
> + *
> + * This function serves as a callback to the iothread to be called into
> whenever
> + * the v9fs_async_struct.wfd is written into. This thread goes through the
> list
> + * of v9fs_post_posix_operations() and executes them. In the process, it
> might
> + * queue more job on the asynchronous thread pool.
> + */
> +static void v9fs_process_post_ops(void *arg)
> +{
> + int count = 0;
> + struct v9fs_post_op *post_op;
> + int ret;
> + char byte;
> +
> + qemu_mutex_lock(&v9fs_async_struct.lock);
> + do {
> + ret = read(v9fs_async_struct.rfd, &byte, sizeof(byte));
> + } while (ret >= 0 && errno != EAGAIN);
> +
> + for (count = 0; count < ASYNC_MAX_PROCESS; count++) {
> + if (QTAILQ_EMPTY(&(v9fs_async_struct.post_op_list))) {
> + break;
> + }
> + post_op = QTAILQ_FIRST(&(v9fs_async_struct.post_op_list));
> + QTAILQ_REMOVE(&(v9fs_async_struct.post_op_list), post_op, node);
> +
> + qemu_mutex_unlock(&v9fs_async_struct.lock);
> + post_op->func(post_op->arg);
> + qemu_free(post_op);
> + qemu_mutex_lock(&v9fs_async_struct.lock);
> + }
> + qemu_mutex_unlock(&v9fs_async_struct.lock);
> +}
> +
> +/**
> + * v9fs_async_signal: Inform the io-thread of completion of async job.
> + *
> + * This function is used to inform the iothread that a particular
> + * async-operation pertaining to v9fs has been completed and that the io
> thread
> + * can handle the v9fs_post_posix_operation.
> + *
> + * This is based on the aio_signal_handler
> + */
> +static inline void v9fs_async_signal(void)
> +{
> + char byte = 0;
> + ssize_t ret;
> + int tries = 0;
> +
> + qemu_mutex_lock(&v9fs_async_struct.lock);
> + do {
> + assert(tries != 100);
> + ret = write(v9fs_async_struct.wfd, &byte, sizeof(byte));
> + tries++;
> + } while (ret < 0 && errno == EAGAIN);
> + qemu_mutex_unlock(&v9fs_async_struct.lock);
> +
> + if (ret < 0 && errno != EAGAIN) {
> + die("write() in v9fs");
> + }
> +
> + if (kill(getpid(), SIGUSR2)) {
> + die("kill failed");
> + }
> +}
> +
> +/**
> + * v9fs_async_helper_done: Marks the completion of the v9fs_async job
> + * @func: v9fs_post_posix_func() for post-processing invoked in the context
> of
> + * the io-thread
> + * @arg: Argument to func.
> + *
> + * This function is called from the context of one of the asynchronous
> threads
> + * in the thread pool. This is called when the asynchronous thread has
> finished
> + * executing a v9fs_posix_operation. It's purpose is to initiate the process
> of
> + * informing the io-thread that the v9fs_posix_operation has completed.
> + */
> +static void v9fs_async_helper_done(void (*func)(void *arg), void *arg)
> +{
> + struct v9fs_post_op *post_op;
> +
> + post_op = qemu_mallocz(sizeof(*post_op));
> + post_op->func = func;
> + post_op->arg = arg;
> +
> + qemu_mutex_lock(&v9fs_async_struct.lock);
> + QTAILQ_INSERT_TAIL(&(v9fs_async_struct.post_op_list), post_op, node);
> + qemu_mutex_unlock(&v9fs_async_struct.lock);
> +
> + v9fs_async_signal();
> +}
> +
> +/**
> + * v9fs_do_async_posix: Offload v9fs_posix_operation onto async thread.
> + * @vs: V9fsOPState variable for the OP operation.
> + * @posix_fn: The posix function which has to be offloaded onto async thread.
> + * @post_fn_ptr: Address of the location to hold the post_fn corresponding to
> + * the posix_fn
> + * @post_fn: The post processing function corresponding to the posix_fn.
> + *
> + * This function is a helper to offload posix_operation on to the
> asynchronous
> + * thread pool. It sets up the associations with the post_function that
> needs to
> + * be invoked by from the context of the iothread once the posix_fn has been
> + * executed.
> + */
> +static void v9fs_do_async_posix(ThreadletWork *work ,
> + void (*posix_fn)(ThreadletWork *work),
> + void (**post_fn_ptr)(void *arg),
> + void (*post_fn)(void *arg))
> +{
> + *post_fn_ptr = post_fn;
> + work->func = posix_fn;
> + submit_threadletwork(work);
> +}
> +
> static int omode_to_uflags(int8_t mode)
> {
> int ret = 0;
> @@ -3639,7 +3783,7 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf
> *conf)
> int i, len;
> struct stat stat;
> FsTypeEntry *fse;
> -
> + int fds[2];
>
> s = (V9fsState *)virtio_common_init("virtio-9p",
> VIRTIO_ID_9P,
> @@ -3722,5 +3866,27 @@ VirtIODevice *virtio_9p_init(DeviceState *dev,
> V9fsConf *conf)
> s->tag_len;
> s->vdev.get_config = virtio_9p_get_config;
>
> + if (qemu_pipe(fds) == -1) {
> + fprintf(stderr, "failed to create fd's for virtio-9p\n");
> + exit(1);
> + }
> +
> + v9fs_async_struct.rfd = fds[0];
> + v9fs_async_struct.wfd = fds[1];
> +
> + printf("v9fs: rfd: %d\n", v9fs_async_struct.rfd);
> + printf("v9fs: wfd: %d\n", v9fs_async_struct.wfd);
> +
> + fcntl(fds[0], F_SETFL, O_NONBLOCK);
> + fcntl(fds[1], F_SETFL, O_NONBLOCK);
> +
> + qemu_set_fd_handler(fds[0], v9fs_process_post_ops, NULL, NULL);
> + QTAILQ_INIT(&v9fs_async_struct.post_op_list);
> + qemu_mutex_init(&(v9fs_async_struct.lock));
> + /* Create async queue. */
> +
> + (void)v9fs_do_async_posix;
> + (void)v9fs_async_helper_done;
> +
> return &s->vdev;
> }
> diff --git a/posix-aio-compat.c b/posix-aio-compat.c
> index 2e47736..cb4308a 100644
> --- a/posix-aio-compat.c
> +++ b/posix-aio-compat.c
> @@ -260,6 +260,8 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
> return nbytes;
> }
>
> +static PosixAioState *posix_aio_state;
> +
> static void aio_thread(ThreadletWork *work)
> {
> pid_t pid;
> @@ -288,6 +290,16 @@ static void aio_thread(ThreadletWork *work)
>
> aiocb->ret = ret;
>
> + if (posix_aio_state) {
> + char byte = 0;
> + ssize_t ret;
> +
> + ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
> + if (ret < 0 && errno != EAGAIN) {
> + die("write()");
> + }
> + }
> +
> if (kill(pid, aiocb->ev_signo)) {
> die("kill failed");
> }
> @@ -402,22 +414,6 @@ static int posix_aio_flush(void *opaque)
> return !!s->first_aio;
> }
>
> -static PosixAioState *posix_aio_state;
> -
> -static void aio_signal_handler(int signum)
> -{
> - if (posix_aio_state) {
> - char byte = 0;
> - ssize_t ret;
> -
> - ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
> - if (ret < 0 && errno != EAGAIN)
> - die("write()");
> - }
> -
> - qemu_service_io();
> -}
> -
> static void paio_remove(struct qemu_paiocb *acb)
> {
> struct qemu_paiocb **pacb;
> @@ -442,13 +438,13 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
> struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
> int active = 0;
>
> - if (!acb->active) {
> - if (!deque_threadletwork(&acb->work)) {
> - acb->ret = -ECANCELED;
> - } else {
> - active = 1;
> - }
> - } else if (acb->ret == -EINPROGRESS) {
> + if (!deque_threadletwork(&acb->work)) {
> + acb->ret = -ECANCELED;
> + } else {
> + active = 1;
> + }
> +
> + if (acb->ret == -EINPROGRESS) {
> active = 1;
> }
>
> @@ -522,7 +518,6 @@ BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
>
> int paio_init(void)
> {
> - struct sigaction act;
> PosixAioState *s;
> int fds[2];
>
> @@ -531,11 +526,6 @@ int paio_init(void)
>
> s = qemu_malloc(sizeof(PosixAioState));
>
> - sigfillset(&act.sa_mask);
> - act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
> - act.sa_handler = aio_signal_handler;
> - sigaction(SIGUSR2, &act, NULL);
> -
> s->first_aio = NULL;
> if (qemu_pipe(fds) == -1) {
> fprintf(stderr, "failed to create pipe\n");
> diff --git a/qemu-threadlets.c b/qemu-threadlets.c
> index ac3b97b..2da6f1b 100644
> --- a/qemu-threadlets.c
> +++ b/qemu-threadlets.c
> @@ -15,12 +15,28 @@
>
> #include "qemu-threadlets.h"
> #include "osdep.h"
> +#include <signal.h>
>
> #define MAX_GLOBAL_THREADS 64
> #define MIN_GLOBAL_THREADS 64
> static ThreadletQueue globalqueue;
> static int globalqueue_init;
>
> +static void threadlet_io_completion_signal_handler(int signum)
> +{
> + qemu_service_io();
> +}
> +
> +static void threadlet_register_signal_handler(void)
> +{
> + struct sigaction act;
> +
> + sigfillset(&act.sa_mask);
> + act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
> + act.sa_handler = threadlet_io_completion_signal_handler;
> + sigaction(SIGUSR2, &act, NULL);
> +}
> +
> static void *threadlet_worker(void *data)
> {
> ThreadletQueue *queue = data;
> @@ -165,3 +181,8 @@ void threadlet_queue_init(ThreadletQueue *queue,
> qemu_mutex_init(&(queue->lock));
> qemu_cond_init(&(queue->cond));
> }
> +
> +void threadlet_init(void)
> +{
> + threadlet_register_signal_handler();
> +}
> diff --git a/qemu-threadlets.h b/qemu-threadlets.h
> index 6d9585b..5fd218a 100644
> --- a/qemu-threadlets.h
> +++ b/qemu-threadlets.h
> @@ -45,4 +45,5 @@ extern int deque_threadletwork_on_queue(ThreadletQueue
> *queue,
> extern int deque_threadletwork(ThreadletWork *work);
> extern void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
> int min_threads);
> +extern void threadlet_init(void);
> #endif
> diff --git a/vl.c b/vl.c
> index df414ef..7b9a425 100644
> --- a/vl.c
> +++ b/vl.c
> @@ -148,6 +148,7 @@ int main(int argc, char **argv)
> #include "qemu-config.h"
> #include "qemu-objects.h"
> #include "qemu-options.h"
> +#include "qemu-threadlets.h"
> #ifdef CONFIG_VIRTFS
> #include "fsdev/qemu-fsdev.h"
> #endif
> @@ -2922,6 +2923,8 @@ int main(int argc, char **argv, char **envp)
> exit(1);
> }
>
> + threadlet_init();
> +
> /* init generic devices */
> if (qemu_opts_foreach(qemu_find_opts("device"), device_init_func, NULL,
> 1) != 0)
> exit(1);
>
>