aboutsummaryrefslogtreecommitdiff
path: root/util/rcu.c
diff options
context:
space:
mode:
Diffstat (limited to 'util/rcu.c')
-rw-r--r--util/rcu.c69
1 files changed, 43 insertions, 26 deletions
diff --git a/util/rcu.c b/util/rcu.c
index b6d6c71..30a7e22 100644
--- a/util/rcu.c
+++ b/util/rcu.c
@@ -83,12 +83,6 @@ static void wait_for_readers(void)
*/
qemu_event_reset(&rcu_gp_event);
- /* Instead of using qatomic_mb_set for index->waiting, and
- * qatomic_mb_read for index->ctr, memory barriers are placed
- * manually since writes to different threads are independent.
- * qemu_event_reset has acquire semantics, so no memory barrier
- * is needed here.
- */
QLIST_FOREACH(index, &registry, node) {
qatomic_set(&index->waiting, true);
}
@@ -96,6 +90,10 @@ static void wait_for_readers(void)
/* Here, order the stores to index->waiting before the loads of
* index->ctr. Pairs with smp_mb_placeholder() in rcu_read_unlock(),
* ensuring that the loads of index->ctr are sequentially consistent.
+ *
+ * If this is the last iteration, this barrier also prevents
+ * frees from seeping upwards, and orders the two wait phases
+ * on architectures with 32-bit longs; see synchronize_rcu().
*/
smp_mb_global();
@@ -104,7 +102,7 @@ static void wait_for_readers(void)
QLIST_REMOVE(index, node);
QLIST_INSERT_HEAD(&qsreaders, index, node);
- /* No need for mb_set here, worst of all we
+ /* No need for memory barriers here, worst of all we
* get some extra futex wakeups.
*/
qatomic_set(&index->waiting, false);
@@ -149,26 +147,26 @@ void synchronize_rcu(void)
/* Write RCU-protected pointers before reading p_rcu_reader->ctr.
* Pairs with smp_mb_placeholder() in rcu_read_lock().
+ *
+ * Also orders write to RCU-protected pointers before
+ * write to rcu_gp_ctr.
*/
smp_mb_global();
QEMU_LOCK_GUARD(&rcu_registry_lock);
if (!QLIST_EMPTY(&registry)) {
- /* In either case, the qatomic_mb_set below blocks stores that free
- * old RCU-protected pointers.
- */
if (sizeof(rcu_gp_ctr) < 8) {
/* For architectures with 32-bit longs, a two-subphases algorithm
* ensures we do not encounter overflow bugs.
*
* Switch parity: 0 -> 1, 1 -> 0.
*/
- qatomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
+ qatomic_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
wait_for_readers();
- qatomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
+ qatomic_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
} else {
/* Increment current grace period. */
- qatomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
+ qatomic_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
}
wait_for_readers();
@@ -191,8 +189,22 @@ static void enqueue(struct rcu_head *node)
struct rcu_head **old_tail;
node->next = NULL;
+
+ /*
+ * Make this node the tail of the list. The node will be
+ * used by further enqueue operations, but it will not
+ * be dequeued yet...
+ */
old_tail = qatomic_xchg(&tail, &node->next);
- qatomic_mb_set(old_tail, node);
+
+ /*
+ * ... until it is pointed to from another item in the list.
+ * In the meantime, try_dequeue() will find a NULL next pointer
+ * and loop.
+ *
+ * Synchronizes with qatomic_load_acquire() in try_dequeue().
+ */
+ qatomic_store_release(old_tail, node);
}
static struct rcu_head *try_dequeue(void)
@@ -200,26 +212,31 @@ static struct rcu_head *try_dequeue(void)
struct rcu_head *node, *next;
retry:
- /* Test for an empty list, which we do not expect. Note that for
+ /* Head is only written by this thread, so no need for barriers. */
+ node = head;
+
+ /*
+ * If the head node has NULL in its next pointer, the value is
+ * wrong and we need to wait until its enqueuer finishes the update.
+ */
+ next = qatomic_load_acquire(&node->next);
+ if (!next) {
+ return NULL;
+ }
+
+ /*
+ * Test for an empty list, which we do not expect. Note that for
* the consumer head and tail are always consistent. The head
* is consistent because only the consumer reads/writes it.
* The tail, because it is the first step in the enqueuing.
* It is only the next pointers that might be inconsistent.
*/
- if (head == &dummy && qatomic_mb_read(&tail) == &dummy.next) {
+ if (head == &dummy && qatomic_read(&tail) == &dummy.next) {
abort();
}
- /* If the head node has NULL in its next pointer, the value is
- * wrong and we need to wait until its enqueuer finishes the update.
- */
- node = head;
- next = qatomic_mb_read(&head->next);
- if (!next) {
- return NULL;
- }
-
- /* Since we are the sole consumer, and we excluded the empty case
+ /*
+ * Since we are the sole consumer, and we excluded the empty case
* above, the queue will always have at least two nodes: the
* dummy node, and the one being removed. So we do not need to update
* the tail pointer.