qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [PATCH 09/12] ring: introduce lockless ring buffer


From: Peter Xu
Subject: Re: [Qemu-devel] [PATCH 09/12] ring: introduce lockless ring buffer
Date: Wed, 20 Jun 2018 12:52:03 +0800
User-agent: Mutt/1.10.0 (2018-05-17)

On Mon, Jun 04, 2018 at 05:55:17PM +0800, address@hidden wrote:
> From: Xiao Guangrong <address@hidden>
> 
> It's the simple lockless ring buffer implement which supports both
> single producer vs. single consumer and multiple producers vs.
> single consumer.
> 
> Many lessons were learned from Linux Kernel's kfifo (1) and DPDK's
> rte_ring (2) before i wrote this implement. It corrects some bugs of
> memory barriers in kfifo and it is the simpler lockless version of
> rte_ring as currently multiple access is only allowed for producer.

Could you provide some more information about the kfifo bug?  Any
pointer would be appreciated.

> 
> If has single producer vs. single consumer, it is the traditional fifo,
> If has multiple producers, it uses the algorithm as followings:
> 
> For the producer, it uses two steps to update the ring:
>    - first step, occupy the entry in the ring:
> 
> retry:
>       in = ring->in
>       if (cmpxhg(&ring->in, in, in +1) != in)
>             goto retry;
> 
>      after that the entry pointed by ring->data[in] has been owned by
>      the producer.
> 
>      assert(ring->data[in] == NULL);
> 
>      Note, no other producer can touch this entry so that this entry
>      should always be the initialized state.
> 
>    - second step, write the data to the entry:
> 
>      ring->data[in] = data;
> 
> For the consumer, it first checks if there is available entry in the
> ring and fetches the entry from the ring:
> 
>      if (!ring_is_empty(ring))
>           entry = &ring[ring->out];
> 
>      Note: the ring->out has not been updated so that the entry pointed
>      by ring->out is completely owned by the consumer.
> 
> Then it checks if the data is ready:
> 
> retry:
>      if (*entry == NULL)
>             goto retry;
> That means, the producer has updated the index but haven't written any
> data to it.
> 
> Finally, it fetches the valid data out, set the entry to the initialized
> state and update ring->out to make the entry be usable to the producer:
> 
>       data = *entry;
>       *entry = NULL;
>       ring->out++;
> 
> Memory barrier is omitted here, please refer to the comment in the code.
> 
> (1) 
> https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/linux/kfifo.h
> (2) http://dpdk.org/doc/api/rte__ring_8h.html
> 
> Signed-off-by: Xiao Guangrong <address@hidden>
> ---
>  migration/ring.h | 265 
> +++++++++++++++++++++++++++++++++++++++++++++++++++++++

If this is a very general implementation, not sure whether we can move
this to util/ directory so that it can be used even outside migration
codes.

>  1 file changed, 265 insertions(+)
>  create mode 100644 migration/ring.h
> 
> diff --git a/migration/ring.h b/migration/ring.h
> new file mode 100644
> index 0000000000..da9b8bdcbb
> --- /dev/null
> +++ b/migration/ring.h
> @@ -0,0 +1,265 @@
> +/*
> + * Ring Buffer
> + *
> + * Multiple producers and single consumer are supported with lock free.
> + *
> + * Copyright (c) 2018 Tencent Inc
> + *
> + * Authors:
> + *  Xiao Guangrong <address@hidden>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + */
> +
> +#ifndef _RING__
> +#define _RING__
> +
> +#define CACHE_LINE  64

Is this for x86_64?  Is the cache line size the same for all arch?

> +#define cache_aligned __attribute__((__aligned__(CACHE_LINE)))
> +
> +#define RING_MULTI_PRODUCER 0x1
> +
> +struct Ring {
> +    unsigned int flags;
> +    unsigned int size;
> +    unsigned int mask;
> +
> +    unsigned int in cache_aligned;
> +
> +    unsigned int out cache_aligned;
> +
> +    void *data[0] cache_aligned;
> +};
> +typedef struct Ring Ring;
> +
> +/*
> + * allocate and initialize the ring
> + *
> + * @size: the number of element, it should be power of 2
> + * @flags: set to RING_MULTI_PRODUCER if the ring has multiple producer,
> + *         otherwise set it to 0, i,e. single producer and single consumer.
> + *
> + * return the ring.
> + */
> +static inline Ring *ring_alloc(unsigned int size, unsigned int flags)
> +{
> +    Ring *ring;
> +
> +    assert(is_power_of_2(size));
> +
> +    ring = g_malloc0(sizeof(*ring) + size * sizeof(void *));
> +    ring->size = size;
> +    ring->mask = ring->size - 1;
> +    ring->flags = flags;
> +    return ring;
> +}
> +
> +static inline void ring_free(Ring *ring)
> +{
> +    g_free(ring);
> +}
> +
> +static inline bool __ring_is_empty(unsigned int in, unsigned int out)
> +{
> +    return in == out;
> +}

(some of the helpers are a bit confusing to me like this one; I would
 prefer some of the helpers be directly squashed into code, but it's a
 personal preference only)

> +
> +static inline bool ring_is_empty(Ring *ring)
> +{
> +    return ring->in == ring->out;
> +}
> +
> +static inline unsigned int ring_len(unsigned int in, unsigned int out)
> +{
> +    return in - out;
> +}

(this too)

> +
> +static inline bool
> +__ring_is_full(Ring *ring, unsigned int in, unsigned int out)
> +{
> +    return ring_len(in, out) > ring->mask;
> +}
> +
> +static inline bool ring_is_full(Ring *ring)
> +{
> +    return __ring_is_full(ring, ring->in, ring->out);
> +}
> +
> +static inline unsigned int ring_index(Ring *ring, unsigned int pos)
> +{
> +    return pos & ring->mask;
> +}
> +
> +static inline int __ring_put(Ring *ring, void *data)
> +{
> +    unsigned int index, out;
> +
> +    out = atomic_load_acquire(&ring->out);
> +    /*
> +     * smp_mb()
> +     *
> +     * should read ring->out before updating the entry, see the comments in
> +     * __ring_get().

Nit: here I think it means the comment in [1] below.  Maybe:

  "see the comments in __ring_get() when calling
   atomic_store_release()"

?

> +     */
> +
> +    if (__ring_is_full(ring, ring->in, out)) {
> +        return -ENOBUFS;
> +    }
> +
> +    index = ring_index(ring, ring->in);
> +
> +    atomic_set(&ring->data[index], data);
> +
> +    /*
> +     * should make sure the entry is updated before increasing ring->in
> +     * otherwise the consumer will get a entry but its content is useless.
> +     */
> +    smp_wmb();
> +    atomic_set(&ring->in, ring->in + 1);

Pure question: could we use store_release() instead of a mixture of
store/release and raw memory barriers in the function?  Or is there
any performance consideration behind?

It'll be nice to mention the performance considerations if there is.

> +    return 0;
> +}
> +
> +static inline void *__ring_get(Ring *ring)
> +{
> +    unsigned int index, in;
> +    void *data;
> +
> +    in = atomic_read(&ring->in);
> +
> +    /*
> +     * should read ring->in first to make sure the entry pointed by this
> +     * index is available, see the comments in __ring_put().
> +     */

Nit: similar to above, maybe mention about which comment would be a
bit nicer.

> +    smp_rmb();
> +    if (__ring_is_empty(in, ring->out)) {
> +        return NULL;
> +    }
> +
> +    index = ring_index(ring, ring->out);
> +
> +    data = atomic_read(&ring->data[index]);
> +
> +    /*
> +     * smp_mb()
> +     *
> +     * once the ring->out is updated the entry originally indicated by the
> +     * the index is visible and usable to the producer so that we should
> +     * make sure reading the entry out before updating ring->out to avoid
> +     * the entry being overwritten by the producer.
> +     */
> +    atomic_store_release(&ring->out, ring->out + 1);

[1]

> +
> +    return data;
> +}

Regards,

-- 
Peter Xu



reply via email to

[Prev in Thread] Current Thread [Next in Thread]