qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH v5 13/18] qht: support parallel writes


From: Emilio G. Cota
Subject: [Qemu-devel] [PATCH v5 13/18] qht: support parallel writes
Date: Fri, 13 May 2016 23:34:28 -0400

The appended increases write scalability by allowing concurrent writes
to separate buckets. It also removes the need for an external lock when
operating on the hash table.

Lookup code remains the same; insert and remove fast paths get an extra
smp_rmb() before reading the map pointer and a likely() check after
acquiring the lock of the bucket they operate on.

Signed-off-by: Emilio G. Cota <address@hidden>
---
 include/qemu/qht.h |  10 +-
 util/qht.c         | 276 ++++++++++++++++++++++++++++++++++++++++-------------
 2 files changed, 215 insertions(+), 71 deletions(-)

diff --git a/include/qemu/qht.h b/include/qemu/qht.h
index c2ab8b8..f0d68fb 100644
--- a/include/qemu/qht.h
+++ b/include/qemu/qht.h
@@ -10,11 +10,13 @@
 #include "qemu/osdep.h"
 #include "qemu-common.h"
 #include "qemu/seqlock.h"
+#include "qemu/thread.h"
 #include "qemu/qdist.h"
 #include "qemu/rcu.h"
 
 struct qht {
     struct qht_map *map;
+    QemuSpin lock; /* serializes setters of ht->map */
     unsigned int mode;
 };
 
@@ -33,25 +35,19 @@ typedef void (*qht_iter_func_t)(struct qht *ht, void *p, 
uint32_t h, void *up);
 
 void qht_init(struct qht *ht, size_t n_elems, unsigned int mode);
 
-/* call only when there are no readers left */
+/* call only when there are no readers/writers left */
 void qht_destroy(struct qht *ht);
 
-/* call with an external lock held */
 void qht_reset(struct qht *ht);
 
-/* call with an external lock held */
 bool qht_reset_size(struct qht *ht, size_t n_elems);
 
-/* call with an external lock held */
 bool qht_insert(struct qht *ht, void *p, uint32_t hash);
 
-/* call with an external lock held */
 bool qht_remove(struct qht *ht, const void *p, uint32_t hash);
 
-/* call with an external lock held */
 void qht_iter(struct qht *ht, qht_iter_func_t func, void *userp);
 
-/* call with an external lock held */
 bool qht_resize(struct qht *ht, size_t n_elems);
 
 /* if @func is NULL, then pointer comparison is used */
diff --git a/util/qht.c b/util/qht.c
index 112f32d..8a9b66a 100644
--- a/util/qht.c
+++ b/util/qht.c
@@ -7,13 +7,19 @@
  *   See the COPYING file in the top-level directory.
  *
  * Assumptions:
- * - Writers and iterators must take an external lock.
  * - NULL cannot be inserted as a pointer value.
  * - Duplicate pointer values cannot be inserted.
  *
  * Features:
+ * - Reads (i.e. lookups and iterators) can be concurrent with other reads.
+ *   Lookups that are concurrent with writes to the same bucket will retry
+ *   via a seqlock; iterators acquire all bucket locks and therefore can be
+ *   concurrent with lookups and are serialized wrt writers.
+ * - Writes (i.e. insertions/removals) can be concurrent with writes to
+ *   different buckets; writes to the same bucket are serialized through a 
lock.
  * - Optional auto-resizing: the hash table resizes up if the load surpasses
- *   a certain threshold. Resizing is done concurrently with readers.
+ *   a certain threshold. Resizing is done concurrently with readers; writes
+ *   are serialized with the resize operation.
  *
  * The key structure is the bucket, which is cacheline-sized. Buckets
  * contain a few hash values and pointers; the u32 hash values are stored in
@@ -33,10 +39,11 @@
  * just-removed entry. This makes lookups slightly faster, since the moment an
  * invalid entry is found, the (failed) lookup is over.
  *
- * Resizing is done by taking all spinlocks (so that no readers-turned-writers
- * can race with us) and then placing all elements into a new hash table. Last,
- * the ht->map pointer is set, and the old map is freed once no RCU readers can
- * see it anymore.
+ * Resizing is done by taking all bucket spinlocks (so that no other writers 
can
+ * race with us) and then copying all entries into a new hash map. The old
+ * hash map is marked as stale, so that no further insertions/removals will
+ * be performed on it. Last, the ht->map pointer is set, and the old map is
+ * freed once no RCU readers can see it anymore.
  *
  * Related Work:
  * - Idea of cacheline-sized buckets with full hashes taken from:
@@ -92,8 +99,14 @@ QEMU_BUILD_BUG_ON(sizeof(struct qht_bucket) > 
QHT_BUCKET_ALIGN);
  * @n_added_buckets: number of added (i.e. "non-head") buckets
  * @n_added_buckets_threshold: threshold to trigger an upward resize once the
  *                             number of added buckets surpasses it.
+ * @stale: marks that the map is stale
  *
  * Buckets are tracked in what we call a "map", i.e. this structure.
+ *
+ * The @stale flag is protected by the locks embedded in map->buckets: _all_ of
+ * these locks have to be acquired (in order from 0 to n-1) before setting
+ * the flag. Reading the flag is cheaper: it can be read after acquiring any
+ * of the bucket locks.
  */
 struct qht_map {
     struct rcu_head rcu;
@@ -101,12 +114,14 @@ struct qht_map {
     size_t n;
     size_t n_added_buckets;
     size_t n_added_buckets_threshold;
+    bool stale;
 };
 
 /* trigger a resize when n_added_buckets > n_buckets / div */
 #define QHT_NR_ADDED_BUCKETS_THRESHOLD_DIV 8
 
-static void qht_do_resize(struct qht *ht, size_t n);
+static void qht_do_resize(struct qht *ht, struct qht_map *new);
+static void qht_grow_maybe(struct qht *ht);
 
 static inline struct qht_map *qht_map__atomic_mb(const struct qht *ht)
 {
@@ -134,7 +149,7 @@ struct qht_bucket *bucket_next__atomic_mb(const struct 
qht_bucket *b)
 }
 
 #ifdef QHT_DEBUG
-static void qht_bucket_debug(struct qht_bucket *b)
+static void qht_bucket_debug__locked(struct qht_bucket *b)
 {
     bool seen_empty = false;
     bool corrupt = false;
@@ -157,19 +172,19 @@ static void qht_bucket_debug(struct qht_bucket *b)
     assert(!corrupt);
 }
 
-static void qht_map_debug(struct qht_map *map)
+static void qht_map_debug__all_locked(struct qht_map *map)
 {
     int i;
 
     for (i = 0; i < map->n; i++) {
-        qht_bucket_debug(&map->buckets[i]);
+        qht_bucket_debug__locked(&map->buckets[i]);
     }
 }
 #else
-static inline void qht_bucket_debug(struct qht_bucket *b)
+static inline void qht_bucket_debug__locked(struct qht_bucket *b)
 { }
 
-static inline void qht_map_debug(struct qht_map *map)
+static inline void qht_map_debug__all_locked(struct qht_map *map)
 { }
 #endif /* QHT_DEBUG */
 
@@ -214,6 +229,63 @@ static void qht_map_unlock_buckets(struct qht_map *map)
     }
 }
 
+/*
+ * Grab all bucket locks, and set @pmap after making sure the map isn't stale.
+ *
+ * Pairs with qht_map_unlock_buckets(), hence the pass-by-reference.
+ */
+static inline
+void qht_map_lock_buckets__no_stale(struct qht *ht, struct qht_map **pmap)
+{
+    struct qht_map *map;
+
+    for (;;) {
+        map = qht_map__atomic_mb(ht);
+        qht_map_lock_buckets(map);
+        if (likely(!map->stale)) {
+            *pmap = map;
+            return;
+        }
+        qht_map_unlock_buckets(map);
+
+        /* resize in progress; wait until it completes */
+        while (qemu_spin_locked(&ht->lock)) {
+            cpu_relax();
+        }
+    }
+}
+
+/*
+ * Get a head bucket and lock it, making sure its parent map is not stale.
+ * @pmap is filled with a pointer to the bucket's parent map.
+ *
+ * Unlock with qemu_spin_unlock(&b->lock).
+ */
+static inline
+struct qht_bucket *qht_bucket_lock__no_stale(struct qht *ht, uint32_t hash,
+                                             struct qht_map **pmap)
+{
+    struct qht_bucket *b;
+    struct qht_map *map;
+
+    for (;;) {
+        map = qht_map__atomic_mb(ht);
+        b = qht_map_to_bucket(map, hash);
+
+        qemu_spin_lock(&b->lock);
+        if (likely(!map->stale)) {
+            *pmap = map;
+            return b;
+        }
+        qemu_spin_unlock(&b->lock);
+
+        /* resize in progress; wait until it completes */
+        while (qemu_spin_locked(&ht->lock)) {
+            cpu_relax();
+        }
+    }
+}
+
 static inline bool qht_map_needs_resize(struct qht_map *map)
 {
     return atomic_read(&map->n_added_buckets) > map->n_added_buckets_threshold;
@@ -266,6 +338,7 @@ static struct qht_map *qht_map_create(size_t n)
         map->n_added_buckets_threshold = 1;
     }
 
+    map->stale = false;
     map->buckets = qemu_memalign(QHT_BUCKET_ALIGN, sizeof(*map->buckets) * n);
     for (i = 0; i < n; i++) {
         qht_head_init(&map->buckets[i]);
@@ -273,7 +346,8 @@ static struct qht_map *qht_map_create(size_t n)
     return map;
 }
 
-static inline void qht_publish(struct qht *ht, struct qht_map *new)
+/* call with ht->lock held */
+static inline void qht_publish__htlocked(struct qht *ht, struct qht_map *new)
 {
     /* Readers should see a properly initialized map; pair with smp_rmb() */
     smp_wmb();
@@ -286,23 +360,24 @@ void qht_init(struct qht *ht, size_t n_elems, unsigned 
int mode)
     size_t n = qht_elems_to_buckets(n_elems);
 
     ht->mode = mode;
+    qemu_spin_init(&ht->lock);
     map = qht_map_create(n);
-    qht_publish(ht, map);
+    /* no need to take ht->lock since we just initialized it */
+    qht_publish__htlocked(ht, map);
 }
 
-/* call only when there are no readers left */
+/* call only when there are no readers/writers left */
 void qht_destroy(struct qht *ht)
 {
     qht_map_destroy(ht->map);
     memset(ht, 0, sizeof(*ht));
 }
 
-static void qht_bucket_reset(struct qht_bucket *head)
+static void qht_bucket_reset__locked(struct qht_bucket *head)
 {
     struct qht_bucket *b = head;
     int i;
 
-    qemu_spin_lock(&head->lock);
     seqlock_write_begin(&head->sequence);
     do {
         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
@@ -316,33 +391,56 @@ static void qht_bucket_reset(struct qht_bucket *head)
     } while (b);
  done:
     seqlock_write_end(&head->sequence);
-    qemu_spin_unlock(&head->lock);
 }
 
-/* call with an external lock held */
-void qht_reset(struct qht *ht)
+/* call with all bucket locks held */
+static void qht_map_reset__all_locked(struct qht_map *map)
 {
-    struct qht_map *map = ht->map;
     size_t i;
 
     for (i = 0; i < map->n; i++) {
-        qht_bucket_reset(&map->buckets[i]);
+        qht_bucket_reset__locked(&map->buckets[i]);
     }
-    qht_map_debug(map);
+    qht_map_debug__all_locked(map);
+}
+
+void qht_reset(struct qht *ht)
+{
+    struct qht_map *map;
+
+    qht_map_lock_buckets__no_stale(ht, &map);
+    qht_map_reset__all_locked(map);
+    qht_map_unlock_buckets(map);
 }
 
-/* call with an external lock held */
 bool qht_reset_size(struct qht *ht, size_t n_elems)
 {
-    struct qht_map *old = ht->map;
+    struct qht_map *new;
+    struct qht_map *map;
+    bool resize;
+    size_t n;
 
-    qht_reset(ht);
-    if (old->n == qht_elems_to_buckets(n_elems)) {
-        return false;
+    n = qht_elems_to_buckets(n_elems);
+    /* allocate the new map first so that we don't hold up ht->lock */
+    new = qht_map_create(n);
+    resize = false;
+
+    qemu_spin_lock(&ht->lock);
+    map = ht->map;
+    qht_map_lock_buckets(map);
+    qht_map_reset__all_locked(map);
+    if (n != map->n) {
+        qht_do_resize(ht, new);
+        resize = true;
     }
-    qht_init(ht, n_elems, ht->mode);
-    call_rcu1(&old->rcu, qht_map_reclaim);
-    return true;
+    qht_map_unlock_buckets(map);
+    qemu_spin_unlock(&ht->lock);
+
+    if (unlikely(!resize)) {
+        qht_map_destroy(new);
+    }
+
+    return resize;
 }
 
 static inline
@@ -452,24 +550,23 @@ static bool qht_insert__locked(struct qht *ht, struct 
qht_map *map,
     }
 }
 
-/* call with an external lock held */
 bool qht_insert(struct qht *ht, void *p, uint32_t hash)
 {
-    struct qht_map *map = ht->map;
-    struct qht_bucket *b = qht_map_to_bucket(map, hash);
+    struct qht_bucket *b;
+    struct qht_map *map;
     bool needs_resize = false;
     bool ret;
 
     /* NULL pointers are not supported */
     assert(p);
 
-    qemu_spin_lock(&b->lock);
+    b = qht_bucket_lock__no_stale(ht, hash, &map);
     ret = qht_insert__locked(ht, map, b, p, hash, &needs_resize);
-    qht_bucket_debug(b);
+    qht_bucket_debug__locked(b);
     qemu_spin_unlock(&b->lock);
 
     if (unlikely(needs_resize) && ht->mode & QHT_MODE_AUTO_RESIZE) {
-        qht_do_resize(ht, map->n * 2);
+        qht_grow_maybe(ht);
     }
     return ret;
 }
@@ -560,16 +657,15 @@ bool qht_remove__locked(struct qht_map *map, struct 
qht_bucket *head,
     return false;
 }
 
-/* call with an external lock held */
 bool qht_remove(struct qht *ht, const void *p, uint32_t hash)
 {
-    struct qht_map *map = ht->map;
-    struct qht_bucket *b = qht_map_to_bucket(map, hash);
+    struct qht_bucket *b;
+    struct qht_map *map;
     bool ret;
 
-    qemu_spin_lock(&b->lock);
+    b = qht_bucket_lock__no_stale(ht, hash, &map);
     ret = qht_remove__locked(map, b, p, hash);
-    qht_bucket_debug(b);
+    qht_bucket_debug__locked(b);
     qemu_spin_unlock(&b->lock);
     return ret;
 }
@@ -590,9 +686,9 @@ static inline void qht_bucket_iter(struct qht *ht, struct 
qht_bucket *b,
     } while (b);
 }
 
-/* external lock + all of the map's locks held */
-static inline void qht_map_iter__locked(struct qht *ht, struct qht_map *map,
-                                        qht_iter_func_t func, void *userp)
+/* call with all of the map's locks held */
+static inline void qht_map_iter__all_locked(struct qht *ht, struct qht_map 
*map,
+                                            qht_iter_func_t func, void *userp)
 {
     size_t i;
 
@@ -601,12 +697,15 @@ static inline void qht_map_iter__locked(struct qht *ht, 
struct qht_map *map,
     }
 }
 
-/* call with an external lock held */
 void qht_iter(struct qht *ht, qht_iter_func_t func, void *userp)
 {
-    qht_map_lock_buckets(ht->map);
-    qht_map_iter__locked(ht, ht->map, func, userp);
-    qht_map_unlock_buckets(ht->map);
+    struct qht_map *map;
+
+    map = qht_map__atomic_mb(ht);
+    qht_map_lock_buckets(map);
+    /* Note: ht here is merely for carrying ht->mode; ht->map won't be read */
+    qht_map_iter__all_locked(ht, map, func, userp);
+    qht_map_unlock_buckets(map);
 }
 
 static void qht_map_copy(struct qht *ht, void *p, uint32_t hash, void *userp)
@@ -618,31 +717,80 @@ static void qht_map_copy(struct qht *ht, void *p, 
uint32_t hash, void *userp)
     qht_insert__locked(ht, new, b, p, hash, NULL);
 }
 
-/* call with an external lock held */
-static void qht_do_resize(struct qht *ht, size_t n)
+/*
+ * Call with ht->lock and all bucket locks held.
+ *
+ * Creating the @new map here would add unnecessary delay while all the locks
+ * are held--holding up the bucket locks is particularly bad, since no writes
+ * can occur while these are held. Thus, we let callers create the new map,
+ * hopefully without the bucket locks held.
+ */
+static void qht_do_resize(struct qht *ht, struct qht_map *new)
 {
-    struct qht_map *old = ht->map;
-    struct qht_map *new;
+    struct qht_map *old;
 
-    g_assert_cmpuint(n, !=, old->n);
-    new = qht_map_create(n);
-    qht_iter(ht, qht_map_copy, new);
-    qht_map_debug(new);
+    old = ht->map;
+    g_assert_cmpuint(new->n, !=, old->n);
 
-    qht_publish(ht, new);
+    assert(!old->stale);
+    old->stale = true;
+    qht_map_iter__all_locked(ht, old, qht_map_copy, new);
+
+    qht_map_debug__all_locked(new);
+
+    qht_publish__htlocked(ht, new);
     call_rcu1(&old->rcu, qht_map_reclaim);
 }
 
-/* call with an external lock held */
 bool qht_resize(struct qht *ht, size_t n_elems)
 {
-    size_t n = qht_elems_to_buckets(n_elems);
+    struct qht_map *new;
+    bool resized;
+    size_t n;
+
+    n = qht_elems_to_buckets(n_elems);
+    /* allocate the new map first so that we don't hold up ht->lock */
+    new = qht_map_create(n);
+    resized = false;
+
+    qemu_spin_lock(&ht->lock);
+    if (n != ht->map->n) {
+        struct qht_map *old = ht->map;
+
+        qht_map_lock_buckets(old);
+        qht_do_resize(ht, new);
+        qht_map_unlock_buckets(old);
+        resized = true;
+    }
+    qemu_spin_unlock(&ht->lock);
 
-    if (n == ht->map->n) {
-        return false;
+    if (unlikely(!resized)) {
+        qht_map_destroy(new);
+    }
+    return resized;
+}
+
+static __attribute__((noinline)) void qht_grow_maybe(struct qht *ht)
+{
+    struct qht_map *map;
+
+    /*
+     * If the lock is taken it probably means there's an ongoing resize,
+     * so bail out.
+     */
+    if (qemu_spin_trylock(&ht->lock)) {
+        return;
+    }
+    map = ht->map;
+    /* another thread might have just performed the resize we were after */
+    if (qht_map_needs_resize(map)) {
+        struct qht_map *new = qht_map_create(map->n * 2);
+
+        qht_map_lock_buckets(map);
+        qht_do_resize(ht, new);
+        qht_map_unlock_buckets(map);
     }
-    qht_do_resize(ht, n);
-    return true;
+    qemu_spin_unlock(&ht->lock);
 }
 
 /* pass @stats to qht_statistics_destroy() when done */
-- 
2.5.0




reply via email to

[Prev in Thread] Current Thread [Next in Thread]