qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets


From: Arun R Bharadwaj
Subject: Re: [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
Date: Thu, 21 Oct 2010 14:10:25 +0530
User-agent: Mutt/1.5.20 (2009-06-14)

* Stefan Hajnoczi <address@hidden> [2010-10-20 10:30:38]:

> On Tue, Oct 19, 2010 at 6:43 PM, Arun R Bharadwaj
> <address@hidden> wrote:
> > From: Gautham R Shenoy <address@hidden>
> >
> > This patch makes the paio subsystem use the threadlet framework thereby
> > decoupling asynchronous threading framework portion out of
> > posix-aio-compat.c
> >
> > The patch has been tested with fstress.
> >
> > Signed-off-by: Gautham R Shenoy <address@hidden>
> > Signed-off-by: Sripathi Kodi <address@hidden>
> > ---
> >  posix-aio-compat.c |  166 
> > +++++++++-------------------------------------------
> >  1 files changed, 30 insertions(+), 136 deletions(-)
> >
> > diff --git a/posix-aio-compat.c b/posix-aio-compat.c
> > index 7b862b5..6977c18 100644
> > --- a/posix-aio-compat.c
> > +++ b/posix-aio-compat.c
> > @@ -29,6 +29,7 @@
> >  #include "block_int.h"
> >
> >  #include "block/raw-posix-aio.h"
> > +#include "qemu-threadlets.h"
> >
> >
> >  struct qemu_paiocb {
> > @@ -51,6 +52,7 @@ struct qemu_paiocb {
> >     struct qemu_paiocb *next;
> >
> >     int async_context_id;
> > +    ThreadletWork work;
> 
> The QTAILQ_ENTRY(qemu_paiocb) node field is no longer used, please remove it.
> 
> >  };
> >
> >  typedef struct PosixAioState {
> > @@ -59,15 +61,6 @@ typedef struct PosixAioState {
> >  } PosixAioState;
> >
> >
> > -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
> > -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
> > -static pthread_t thread_id;
> > -static pthread_attr_t attr;
> > -static int max_threads = 64;
> > -static int cur_threads = 0;
> > -static int idle_threads = 0;
> > -static QTAILQ_HEAD(, qemu_paiocb) request_list;
> > -
> >  #ifdef CONFIG_PREADV
> >  static int preadv_present = 1;
> >  #else
> > @@ -85,39 +78,6 @@ static void die(const char *what)
> >     die2(errno, what);
> >  }
> >
> > -static void mutex_lock(pthread_mutex_t *mutex)
> > -{
> > -    int ret = pthread_mutex_lock(mutex);
> > -    if (ret) die2(ret, "pthread_mutex_lock");
> > -}
> > -
> > -static void mutex_unlock(pthread_mutex_t *mutex)
> > -{
> > -    int ret = pthread_mutex_unlock(mutex);
> > -    if (ret) die2(ret, "pthread_mutex_unlock");
> > -}
> > -
> > -static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
> > -                           struct timespec *ts)
> > -{
> > -    int ret = pthread_cond_timedwait(cond, mutex, ts);
> > -    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
> > -    return ret;
> > -}
> > -
> > -static void cond_signal(pthread_cond_t *cond)
> > -{
> > -    int ret = pthread_cond_signal(cond);
> > -    if (ret) die2(ret, "pthread_cond_signal");
> > -}
> > -
> > -static void thread_create(pthread_t *thread, pthread_attr_t *attr,
> > -                          void *(*start_routine)(void*), void *arg)
> > -{
> > -    int ret = pthread_create(thread, attr, start_routine, arg);
> > -    if (ret) die2(ret, "pthread_create");
> > -}
> > -
> >  static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
> >  {
> >     int ret;
> > @@ -301,106 +261,51 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb 
> > *aiocb)
> >     return nbytes;
> >  }
> >
> > -static void *aio_thread(void *unused)
> > +static void aio_thread(ThreadletWork *work)
> >  {
> >     pid_t pid;
> > +    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, 
> > work);
> > +    ssize_t ret = 0;
> >
> >     pid = getpid();
> > +    aiocb->active = 1;
> >
> > -    while (1) {
> > -        struct qemu_paiocb *aiocb;
> > -        ssize_t ret = 0;
> > -        qemu_timeval tv;
> > -        struct timespec ts;
> > -
> > -        qemu_gettimeofday(&tv);
> > -        ts.tv_sec = tv.tv_sec + 10;
> > -        ts.tv_nsec = 0;
> > -
> > -        mutex_lock(&lock);
> > -
> > -        while (QTAILQ_EMPTY(&request_list) &&
> > -               !(ret == ETIMEDOUT)) {
> > -            ret = cond_timedwait(&cond, &lock, &ts);
> > -        }
> > -
> > -        if (QTAILQ_EMPTY(&request_list))
> > -            break;
> > -
> > -        aiocb = QTAILQ_FIRST(&request_list);
> > -        QTAILQ_REMOVE(&request_list, aiocb, node);
> > -        aiocb->active = 1;
> > -        idle_threads--;
> > -        mutex_unlock(&lock);
> > -
> > -        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> > -        case QEMU_AIO_READ:
> > -        case QEMU_AIO_WRITE:
> > -            ret = handle_aiocb_rw(aiocb);
> > -            break;
> > -        case QEMU_AIO_FLUSH:
> > -            ret = handle_aiocb_flush(aiocb);
> > -            break;
> > -        case QEMU_AIO_IOCTL:
> > -            ret = handle_aiocb_ioctl(aiocb);
> > -            break;
> > -        default:
> > -            fprintf(stderr, "invalid aio request (0x%x)\n", 
> > aiocb->aio_type);
> > -            ret = -EINVAL;
> > -            break;
> > -        }
> > -
> > -        mutex_lock(&lock);
> > -        aiocb->ret = ret;
> > -        idle_threads++;
> > -        mutex_unlock(&lock);
> > -
> > -        if (kill(pid, aiocb->ev_signo)) die("kill failed");
> > +    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> > +    case QEMU_AIO_READ:
> > +    case QEMU_AIO_WRITE:
> > +        ret = handle_aiocb_rw(aiocb);
> > +        break;
> > +    case QEMU_AIO_FLUSH:
> > +        ret = handle_aiocb_flush(aiocb);
> > +        break;
> > +    case QEMU_AIO_IOCTL:
> > +        ret = handle_aiocb_ioctl(aiocb);
> > +        break;
> > +    default:
> > +        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> > +        ret = -EINVAL;
> > +        break;
> >     }
> >
> > -    idle_threads--;
> > -    cur_threads--;
> > -    mutex_unlock(&lock);
> > +    aiocb->ret = ret;
> >
> > -    return NULL;
> > -}
> > -
> > -static void spawn_thread(void)
> > -{
> > -    sigset_t set, oldset;
> > -
> > -    cur_threads++;
> > -    idle_threads++;
> > -
> > -    /* block all signals */
> > -    if (sigfillset(&set)) die("sigfillset");
> > -    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
> > -
> > -    thread_create(&thread_id, &attr, aio_thread, NULL);
> > -
> > -    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask 
> > restore");
> > +    if (kill(pid, aiocb->ev_signo)) die("kill failed");
> >  }
> >
> >  static void qemu_paio_submit(struct qemu_paiocb *aiocb)
> >  {
> >     aiocb->ret = -EINPROGRESS;
> >     aiocb->active = 0;
> > -    mutex_lock(&lock);
> > -    if (idle_threads == 0 && cur_threads < max_threads)
> > -        spawn_thread();
> > -    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
> > -    mutex_unlock(&lock);
> > -    cond_signal(&cond);
> > +
> > +    aiocb->work.func = aio_thread;
> > +    submit_threadletwork(&aiocb->work);
> >  }
> >
> >  static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
> >  {
> >     ssize_t ret;
> >
> > -    mutex_lock(&lock);
> >     ret = aiocb->ret;
> > -    mutex_unlock(&lock);
> > -
> >     return ret;
> >  }
> >
> > @@ -536,14 +441,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
> >     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
> >     int active = 0;
> >
> > -    mutex_lock(&lock);
> >     if (!acb->active) {
> 
> I'm not sure the active field serves any purpose.  No memory barriers
> are used so the value of active is 0 before the work is executed and 0
> *or* 1 while the work is executed.
> 
> The cancel_threadletwork() function already indicates whether
> cancellation succeeded.  Why not just try to cancel instead of using
> the active field?
> 

This series does not touch the active field anywhere. So I feel we can
implement this as a separate patch instead of clubbing it with this.

-arun
> > -        QTAILQ_REMOVE(&request_list, acb, node);
> > -        acb->ret = -ECANCELED;
> > +        if (!cancel_threadletwork(&acb->work))
> > +            acb->ret = -ECANCELED;
> > +         else
> > +            active = 1;
> 
> The 0 and 1 return value from cancel_threadletwork() is inverted.  See
> also my comment on patch 1/3 in this series.
> 
> >     } else if (acb->ret == -EINPROGRESS) {
> >         active = 1;
> >     }
> > -    mutex_unlock(&lock);
> >
> >     if (active) {
> >         /* fail safe: if the aio could not be canceled, we wait for
> 
> while (qemu_paio_error(acb) == EINPROGRESS)
>     ;
> 
> Tight loop with no memory barrier reading a memory location that is
> updated by another thread.  We shouldn't communicate between threads
> without barriers.
> 
> Stefan
> 



reply via email to

[Prev in Thread] Current Thread [Next in Thread]