Use more precise atomic orderings
This commit is contained in:
parent
88c70edef6
commit
c11a44ab6c
1 changed files with 41 additions and 12 deletions
|
@ -51,6 +51,38 @@
|
||||||
//
|
//
|
||||||
// You'll find a few more details in the implementation, but that's the gist of
|
// You'll find a few more details in the implementation, but that's the gist of
|
||||||
// it!
|
// it!
|
||||||
|
//
|
||||||
|
// Atomic orderings:
|
||||||
|
// When running `Once` we deal with multiple atomics:
|
||||||
|
// `Once.state_and_queue` and an unknown number of `Waiter.signaled`.
|
||||||
|
// * `state_and_queue` is used (1) as a state flag, (2) for synchronizing the
|
||||||
|
// result of the `Once`, and (3) for synchronizing `Waiter` nodes.
|
||||||
|
// - At the end of the `call_inner` function we have to make sure the result
|
||||||
|
// of the `Once` is acquired. So every load which can be the only one to
|
||||||
|
// load COMPLETED must have at least Acquire ordering, which means all
|
||||||
|
// three of them.
|
||||||
|
// - `WaiterQueue::Drop` is the only place that may store COMPLETED, and
|
||||||
|
// must do so with Release ordering to make the result available.
|
||||||
|
// - `wait` inserts `Waiter` nodes as a pointer in `state_and_queue`, and
|
||||||
|
// needs to make the nodes available with Release ordering. The load in
|
||||||
|
// its `compare_and_swap` can be Relaxed because it only has to compare
|
||||||
|
// the atomic, not to read other data.
|
||||||
|
// - `WaiterQueue::Drop` must see the `Waiter` nodes, so it must load
|
||||||
|
// `state_and_queue` with Acquire ordering.
|
||||||
|
// - There is just one store where `state_and_queue` is used only as a
|
||||||
|
// state flag, without having to synchronize data: switching the state
|
||||||
|
// from INCOMPLETE to RUNNING in `call_inner`. This store can be Relaxed,
|
||||||
|
// but the read has to be Acquire because of the requirements mentioned
|
||||||
|
// above.
|
||||||
|
// * `Waiter.signaled` is both used as a flag, and to protect a field with
|
||||||
|
// interior mutability in `Waiter`. `Waiter.thread` is changed in
|
||||||
|
// `WaiterQueue::Drop` which then sets `signaled` with Release ordering.
|
||||||
|
// After `wait` loads `signaled` with Acquire and sees it is true, it needs to
|
||||||
|
// see the changes to drop the `Waiter` struct correctly.
|
||||||
|
// * There is one place where the two atomics `Once.state_and_queue` and
|
||||||
|
// `Waiter.signaled` come together, and might be reordered by the compiler or
|
||||||
|
// processor. Because both use Aquire ordering such a reordering is not
|
||||||
|
// allowed, so no need for SeqCst.
|
||||||
|
|
||||||
use crate::cell::Cell;
|
use crate::cell::Cell;
|
||||||
use crate::fmt;
|
use crate::fmt;
|
||||||
|
@ -337,7 +369,7 @@ impl Once {
|
||||||
// An `Acquire` load is enough because that makes all the initialization
|
// An `Acquire` load is enough because that makes all the initialization
|
||||||
// operations visible to us, and, this being a fast path, weaker
|
// operations visible to us, and, this being a fast path, weaker
|
||||||
// ordering helps with performance. This `Acquire` synchronizes with
|
// ordering helps with performance. This `Acquire` synchronizes with
|
||||||
// `SeqCst` operations on the slow path.
|
// `Release` operations on the slow path.
|
||||||
self.state_and_queue.load(Ordering::Acquire) == COMPLETE
|
self.state_and_queue.load(Ordering::Acquire) == COMPLETE
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -355,12 +387,9 @@ impl Once {
|
||||||
#[cold]
|
#[cold]
|
||||||
fn call_inner(&self,
|
fn call_inner(&self,
|
||||||
ignore_poisoning: bool,
|
ignore_poisoning: bool,
|
||||||
init: &mut dyn FnMut(bool)) {
|
init: &mut dyn FnMut(bool))
|
||||||
|
{
|
||||||
// This cold path uses SeqCst consistently because the
|
let mut state_and_queue = self.state_and_queue.load(Ordering::Acquire);
|
||||||
// performance difference really does not matter there, and
|
|
||||||
// SeqCst minimizes the chances of something going wrong.
|
|
||||||
let mut state_and_queue = self.state_and_queue.load(Ordering::SeqCst);
|
|
||||||
loop {
|
loop {
|
||||||
match state_and_queue {
|
match state_and_queue {
|
||||||
COMPLETE => break,
|
COMPLETE => break,
|
||||||
|
@ -373,7 +402,7 @@ impl Once {
|
||||||
// Try to register this thread as the one RUNNING.
|
// Try to register this thread as the one RUNNING.
|
||||||
let old = self.state_and_queue.compare_and_swap(state_and_queue,
|
let old = self.state_and_queue.compare_and_swap(state_and_queue,
|
||||||
RUNNING,
|
RUNNING,
|
||||||
Ordering::SeqCst);
|
Ordering::Acquire);
|
||||||
if old != state_and_queue {
|
if old != state_and_queue {
|
||||||
state_and_queue = old;
|
state_and_queue = old;
|
||||||
continue
|
continue
|
||||||
|
@ -395,7 +424,7 @@ impl Once {
|
||||||
// pointer to the waiter queue in the more significant bits.
|
// pointer to the waiter queue in the more significant bits.
|
||||||
assert!(state_and_queue & STATE_MASK == RUNNING);
|
assert!(state_and_queue & STATE_MASK == RUNNING);
|
||||||
wait(&self.state_and_queue, state_and_queue);
|
wait(&self.state_and_queue, state_and_queue);
|
||||||
state_and_queue = self.state_and_queue.load(Ordering::SeqCst);
|
state_and_queue = self.state_and_queue.load(Ordering::Acquire);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -438,7 +467,7 @@ fn wait(state_and_queue: &AtomicUsize, current_state: usize) {
|
||||||
// drop our `Waiter` node and leave a hole in the linked list (and a
|
// drop our `Waiter` node and leave a hole in the linked list (and a
|
||||||
// dangling reference). Guard against spurious wakeups by reparking
|
// dangling reference). Guard against spurious wakeups by reparking
|
||||||
// ourselves until we are signaled.
|
// ourselves until we are signaled.
|
||||||
while !node.signaled.load(Ordering::SeqCst) {
|
while !node.signaled.load(Ordering::Acquire) {
|
||||||
thread::park();
|
thread::park();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -454,7 +483,7 @@ impl Drop for WaiterQueue<'_> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
// Swap out our state with however we finished.
|
// Swap out our state with however we finished.
|
||||||
let state_and_queue = self.state_and_queue.swap(self.set_state_on_drop_to,
|
let state_and_queue = self.state_and_queue.swap(self.set_state_on_drop_to,
|
||||||
Ordering::SeqCst);
|
Ordering::AcqRel);
|
||||||
|
|
||||||
// We should only ever see an old state which was RUNNING.
|
// We should only ever see an old state which was RUNNING.
|
||||||
assert_eq!(state_and_queue & STATE_MASK, RUNNING);
|
assert_eq!(state_and_queue & STATE_MASK, RUNNING);
|
||||||
|
@ -470,7 +499,7 @@ impl Drop for WaiterQueue<'_> {
|
||||||
while !queue.is_null() {
|
while !queue.is_null() {
|
||||||
let next = (*queue).next;
|
let next = (*queue).next;
|
||||||
let thread = (*queue).thread.replace(None).unwrap();
|
let thread = (*queue).thread.replace(None).unwrap();
|
||||||
(*queue).signaled.store(true, Ordering::SeqCst);
|
(*queue).signaled.store(true, Ordering::Release);
|
||||||
// ^- FIXME (maybe): This is another case of issue #55005
|
// ^- FIXME (maybe): This is another case of issue #55005
|
||||||
// `store()` has a potentially dangling ref to `signaled`.
|
// `store()` has a potentially dangling ref to `signaled`.
|
||||||
queue = next;
|
queue = next;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue