qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [PATCH 4/5] coroutine-lock: reimplement CoRwLock to fix downgrade bu


From: David Edmondson
Subject: Re: [PATCH 4/5] coroutine-lock: reimplement CoRwLock to fix downgrade bug
Date: Wed, 17 Mar 2021 10:40:42 +0000

On Tuesday, 2021-03-16 at 17:00:06 +01, Paolo Bonzini wrote:

> A feature of the current rwlock is that if multiple coroutines hold a
> reader lock, all must be runnable. The unlock implementation relies on
> this, choosing to wake a single coroutine when the final read lock
> holder exits the critical section, assuming that it will wake a
> coroutine attempting to acquire a write lock.
>
> The downgrade implementation violates this assumption by creating a
> read lock owning coroutine that is exclusively runnable - any other
> coroutines that are waiting to acquire a read lock are *not* made
> runnable when the write lock holder converts its ownership to read
> only.
>
> To fix this, keep the queue of waiters explicitly in the CoRwLock
> instead of using CoQueue, and store for each whether it is a
> potential reader or a writer.  This way, downgrade can look at the
> first queued coroutines and wake it if it is a reader, causing
> all other readers to be released in turn.
>
> Reported-by: David Edmondson <david.edmondson@oracle.com>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  include/qemu/coroutine.h   |  10 ++-
>  util/qemu-coroutine-lock.c | 150 ++++++++++++++++++++++++-------------
>  2 files changed, 106 insertions(+), 54 deletions(-)
>
> diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
> index 84eab6e3bf..ae62d4bc8d 100644
> --- a/include/qemu/coroutine.h
> +++ b/include/qemu/coroutine.h
> @@ -237,11 +237,15 @@ bool qemu_co_enter_next_impl(CoQueue *queue, 
> QemuLockable *lock);
>  bool qemu_co_queue_empty(CoQueue *queue);
>  
>  
> +typedef struct CoRwTicket CoRwTicket;
>  typedef struct CoRwlock {
> -    int pending_writer;
> -    int reader;
>      CoMutex mutex;
> -    CoQueue queue;
> +
> +    /* Number of readers, of -1 if owned for writing.  */

s/, of/, or/

> +    int owner;
> +
> +    /* Waiting coroutines.  */
> +    QSIMPLEQ_HEAD(, CoRwTicket) tickets;
>  } CoRwlock;

Isn't this...

 * ... Also, @qemu_co_rwlock_upgrade
 * only overrides CoRwlock fairness if there are no concurrent readers, so
 * another writer might run while @qemu_co_rwlock_upgrade blocks.

...now incorrect?

>  /**
> diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
> index eb73cf11dc..655634d185 100644
> --- a/util/qemu-coroutine-lock.c
> +++ b/util/qemu-coroutine-lock.c
> @@ -327,11 +327,70 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
>      trace_qemu_co_mutex_unlock_return(mutex, self);
>  }
>  
> +struct CoRwTicket {
> +    bool read;
> +    Coroutine *co;
> +    QSIMPLEQ_ENTRY(CoRwTicket) next;
> +};
> +
>  void qemu_co_rwlock_init(CoRwlock *lock)
>  {
> -    memset(lock, 0, sizeof(*lock));
> -    qemu_co_queue_init(&lock->queue);
>      qemu_co_mutex_init(&lock->mutex);
> +    lock->owner = 0;
> +    QSIMPLEQ_INIT(&lock->tickets);
> +}
> +
> +/* Releases the internal CoMutex.  */
> +static void qemu_co_rwlock_maybe_wake_one(CoRwlock *lock)
> +{
> +    CoRwTicket *tkt = QSIMPLEQ_FIRST(&lock->tickets);
> +    Coroutine *co = NULL;
> +
> +    /*
> +     * Setting lock->owner here prevents rdlock and wrlock from
> +     * sneaking in between unlock and wake.
> +     */
> +
> +    if (tkt) {
> +        if (tkt->read) {
> +            if (lock->owner >= 0) {
> +                lock->owner++;
> +                co = tkt->co;
> +            }
> +        } else {
> +            if (lock->owner == 0) {
> +                lock->owner = -1;
> +                co = tkt->co;
> +            }
> +        }
> +    }
> +
> +    if (co) {
> +        QSIMPLEQ_REMOVE_HEAD(&lock->tickets, next);
> +        qemu_co_mutex_unlock(&lock->mutex);
> +        aio_co_wake(co);
> +    } else {
> +        qemu_co_mutex_unlock(&lock->mutex);
> +    }
> +}
> +
> +/* Releases the internal CoMutex.  */
> +static void qemu_co_rwlock_sleep(bool read, CoRwlock *lock)
> +{
> +    CoRwTicket my_ticket = { read, qemu_coroutine_self() };
> +
> +    QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
> +    qemu_co_mutex_unlock(&lock->mutex);
> +    qemu_coroutine_yield();
> +
> +    if (read) {
> +        /* Possibly wake another reader, which will wake the next in line.  
> */
> +        assert(lock->owner >= 1);
> +        qemu_co_mutex_lock(&lock->mutex);
> +        qemu_co_rwlock_maybe_wake_one(lock);
> +    } else {
> +        assert(lock->owner == -1);
> +    }
>  }
>  
>  void qemu_co_rwlock_rdlock(CoRwlock *lock)
> @@ -339,13 +398,13 @@ void qemu_co_rwlock_rdlock(CoRwlock *lock)
>  
>      qemu_co_mutex_lock(&lock->mutex);
>      /* For fairness, wait if a writer is in line.  */
> -    while (lock->pending_writer) {
> -        qemu_co_queue_wait(&lock->queue, &lock->mutex);
> +    if (lock->owner == 0 || (lock->owner > 0 && 
> QSIMPLEQ_EMPTY(&lock->tickets))) {
> +        lock->owner++;
> +        qemu_co_mutex_unlock(&lock->mutex);
> +    } else {
> +        qemu_co_rwlock_sleep(true, lock);
>      }
> -    lock->reader++;
> -    qemu_co_mutex_unlock(&lock->mutex);
>  
> -    /* The rest of the read-side critical section is run without the mutex.  
> */
>      self->locks_held++;
>  }
>  
> @@ -355,69 +413,58 @@ void qemu_co_rwlock_unlock(CoRwlock *lock)
>      Coroutine *self = qemu_coroutine_self();
>  
>      assert(qemu_in_coroutine());
> -    if (!lock->reader) {
> -        /* The critical section started in qemu_co_rwlock_wrlock.  */
> -        qemu_co_queue_restart_all(&lock->queue);
> +    self->locks_held--;
> +
> +    qemu_co_mutex_lock(&lock->mutex);
> +    if (lock->owner == -1) {
> +        lock->owner = 0;
>      } else {
> -        self->locks_held--;
> +        lock->owner--;
> +    }
>  
> -        qemu_co_mutex_lock(&lock->mutex);
> -        lock->reader--;
> -        assert(lock->reader >= 0);
> -        /* Wakeup only one waiting writer */
> -        if (!lock->reader) {
> -            qemu_co_queue_next(&lock->queue);
> -        }
> +    if (lock->owner == 0) {
> +        qemu_co_rwlock_maybe_wake_one(lock);
> +    } else {
> +        qemu_co_mutex_unlock(&lock->mutex);
>      }
> -    qemu_co_mutex_unlock(&lock->mutex);
>  }
>  
>  void qemu_co_rwlock_downgrade(CoRwlock *lock)
>  {
> -    Coroutine *self = qemu_coroutine_self();
> -
> -    /* lock->mutex critical section started in qemu_co_rwlock_wrlock or
> -     * qemu_co_rwlock_upgrade.
> -     */
> -    assert(lock->reader == 0);
> -    lock->reader++;
> -    qemu_co_mutex_unlock(&lock->mutex);
> +    qemu_co_mutex_lock(&lock->mutex);
> +    assert(lock->owner == -1);
> +    lock->owner = 1;
>  
> -    /* The rest of the read-side critical section is run without the mutex.  
> */
> -    self->locks_held++;
> +    /* Possibly wake another reader, which will wake the next in line.  */
> +    qemu_co_rwlock_maybe_wake_one(lock);
>  }
>  
>  void qemu_co_rwlock_wrlock(CoRwlock *lock)
>  {
> +    Coroutine *self = qemu_coroutine_self();
> +
>      qemu_co_mutex_lock(&lock->mutex);
> -    lock->pending_writer++;
> -    while (lock->reader) {
> -        qemu_co_queue_wait(&lock->queue, &lock->mutex);
> +    if (lock->owner == 0) {
> +        lock->owner = -1;
> +        qemu_co_mutex_unlock(&lock->mutex);
> +    } else {
> +        qemu_co_rwlock_sleep(false, lock);
>      }
> -    lock->pending_writer--;
>  
> -    /* The rest of the write-side critical section is run with
> -     * the mutex taken, so that lock->reader remains zero.
> -     * There is no need to update self->locks_held.
> -     */
> +    self->locks_held++;
>  }
>  
>  void qemu_co_rwlock_upgrade(CoRwlock *lock)
>  {
> -    Coroutine *self = qemu_coroutine_self();
> -
>      qemu_co_mutex_lock(&lock->mutex);
> -    assert(lock->reader > 0);
> -    lock->reader--;
> -    lock->pending_writer++;
> -    while (lock->reader) {
> -        qemu_co_queue_wait(&lock->queue, &lock->mutex);
> +    assert(lock->owner > 0);
> +    /* For fairness, wait if a writer is in line.  */
> +    if (lock->owner == 1 && QSIMPLEQ_EMPTY(&lock->tickets)) {
> +        lock->owner = -1;
> +        qemu_co_mutex_unlock(&lock->mutex);
> +    } else {
> +        lock->owner--;
> +        qemu_co_rwlock_sleep(false, lock);

Doesn't this need something for the case where lock->owner hits 0?

If not, how is two readers both attempting to upgrade ever resolved?

It feels like it should jump into the second half of
qemu_co_rwlock_wrlock().

>      }
> -    lock->pending_writer--;
>  
> -    /* The rest of the write-side critical section is run with
> -     * the mutex taken, similar to qemu_co_rwlock_wrlock.  Do
> -     * not account for the lock twice in self->locks_held.
> -     */
> -    self->locks_held--;
>  }
> -- 
> 2.29.2

dme.
-- 
And you're standing here beside me, I love the passing of time.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]