1
Fork 0

modify queue implementation documentation

This commit only has documentation changes and a few things moved around
the file. The very few code changes are cosmetic: changes like turning a
`match` statement into an `if let` statement or reducing indentation for
long if statements.

This commit also adds several safety comments on top of `unsafe` blocks
that might not be immediately obvious to a first-time reader.

Code "changes" are in:
- `add_backlinks_and_find_tail`
- `lock_contended`

A majority of the changes are just expanding the comments from 80
columns to 100 columns.
This commit is contained in:
Connor Tsui 2024-10-25 14:54:43 -04:00
parent fa9f04af5d
commit 31e35c2131

View file

@ -1,37 +1,38 @@
//! Efficient read-write locking without `pthread_rwlock_t`. //! Efficient read-write locking without `pthread_rwlock_t`.
//! //!
//! The readers-writer lock provided by the `pthread` library has a number of //! The readers-writer lock provided by the `pthread` library has a number of problems which make it
//! problems which make it a suboptimal choice for `std`: //! a suboptimal choice for `std`:
//! //!
//! * It is non-movable, so it needs to be allocated (lazily, to make the //! * It is non-movable, so it needs to be allocated (lazily, to make the constructor `const`).
//! constructor `const`). //! * `pthread` is an external library, meaning the fast path of acquiring an uncontended lock
//! * `pthread` is an external library, meaning the fast path of acquiring an //! cannot be inlined.
//! uncontended lock cannot be inlined. //! * Some platforms (at least glibc before version 2.25) have buggy implementations that can easily
//! * Some platforms (at least glibc before version 2.25) have buggy implementations //! lead to undefined behaviour in safe Rust code when not properly guarded against.
//! that can easily lead to undefined behavior in safe Rust code when not properly
//! guarded against.
//! * On some platforms (e.g. macOS), the lock is very slow. //! * On some platforms (e.g. macOS), the lock is very slow.
//! //!
//! Therefore, we implement our own `RwLock`! Naively, one might reach for a //! Therefore, we implement our own [`RwLock`]! Naively, one might reach for a spinlock, but those
//! spinlock, but those [can be quite problematic] when the lock is contended. //! can be quite [problematic] when the lock is contended.
//! Instead, this readers-writer lock copies its implementation strategy from
//! the Windows [SRWLOCK] and the [usync] library. Spinning is still used for the
//! fast path, but it is bounded: after spinning fails, threads will locklessly
//! add an information structure containing a [`Thread`] handle into a queue of
//! waiters associated with the lock. The lock owner, upon releasing the lock,
//! will scan through the queue and wake up threads as appropriate, which will
//! then again try to acquire the lock. The resulting [`RwLock`] is:
//! //!
//! * adaptive, since it spins before doing any heavywheight parking operations //! Instead, this [`RwLock`] copies its implementation strategy from the Windows [SRWLOCK] and the
//! * allocation-free, modulo the per-thread [`Thread`] handle, which is //! [usync] library implementations.
//! allocated regardless when using threads created by `std` //!
//! Spinning is still used for the fast path, but it is bounded: after spinning fails, threads will
//! locklessly add an information structure ([`Node`]) containing a [`Thread`] handle into a queue
//! of waiters associated with the lock. The lock owner, upon releasing the lock, will scan through
//! the queue and wake up threads as appropriate, and the newly-awoken threads will then try to
//! acquire the lock themselves.
//!
//! The resulting [`RwLock`] is:
//!
//! * adaptive, since it spins before doing any heavyweight parking operations
//! * allocation-free, modulo the per-thread [`Thread`] handle, which is allocated anyways when
//! using threads created by `std`
//! * writer-preferring, even if some readers may still slip through //! * writer-preferring, even if some readers may still slip through
//! * unfair, which reduces context-switching and thus drastically improves //! * unfair, which reduces context-switching and thus drastically improves performance
//! performance
//! //!
//! and also quite fast in most cases. //! and also quite fast in most cases.
//! //!
//! [can be quite problematic]: https://matklad.github.io/2020/01/02/spinlocks-considered-harmful.html //! [problematic]: https://matklad.github.io/2020/01/02/spinlocks-considered-harmful.html
//! [SRWLOCK]: https://learn.microsoft.com/en-us/windows/win32/sync/slim-reader-writer--srw--locks //! [SRWLOCK]: https://learn.microsoft.com/en-us/windows/win32/sync/slim-reader-writer--srw--locks
//! [usync]: https://crates.io/crates/usync //! [usync]: https://crates.io/crates/usync
//! //!
@ -39,8 +40,8 @@
//! //!
//! ## State //! ## State
//! //!
//! A single [`AtomicPtr`] is used as state variable. The lowest three bits are used //! A single [`AtomicPtr`] is used as state variable. The lowest three bits are used to indicate the
//! to indicate the meaning of the remaining bits: //! meaning of the remaining bits:
//! //!
//! | [`LOCKED`] | [`QUEUED`] | [`QUEUE_LOCKED`] | Remaining | | //! | [`LOCKED`] | [`QUEUED`] | [`QUEUE_LOCKED`] | Remaining | |
//! |:-----------|:-----------|:-----------------|:-------------|:----------------------------------------------------------------------------------------------------------------------------| //! |:-----------|:-----------|:-----------------|:-------------|:----------------------------------------------------------------------------------------------------------------------------|
@ -50,22 +51,26 @@
//! | 0 | 1 | * | `*mut Node` | The lock is unlocked, but some threads are waiting. Only writers may lock the lock | //! | 0 | 1 | * | `*mut Node` | The lock is unlocked, but some threads are waiting. Only writers may lock the lock |
//! | 1 | 1 | * | `*mut Node` | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count | //! | 1 | 1 | * | `*mut Node` | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count |
//! //!
//! ## Waiter queue //! ## Waiter Queue
//! //!
//! When threads are waiting on the lock (`QUEUE` is set), the lock state //! When threads are waiting on the lock (the `QUEUE` bit is set), the lock state points to a queue
//! points to a queue of waiters, which is implemented as a linked list of //! of waiters, which is implemented as a linked list of nodes stored on the stack to avoid memory
//! nodes stored on the stack to avoid memory allocation. To enable lockless //! allocation.
//! enqueuing of new nodes to the queue, the linked list is single-linked upon
//! creation. Since when the lock is read-locked, the lock count is stored in
//! the last link of the queue, threads have to traverse the queue to find the
//! last element upon releasing the lock. To avoid having to traverse the whole
//! list again and again, a pointer to the found tail is cached in the (current)
//! first element of the queue.
//! //!
//! Also, while the lock is unfair for performance reasons, it is still best to //! To enable lock-free enqueuing of new nodes to the queue, the linked list is singly-linked upon
//! wake the tail node first, which requires backlinks to previous nodes to be //! creation.
//! created. This is done at the same time as finding the tail, and thus a set //!
//! tail field indicates the remaining portion of the queue is initialized. //! When the lock is read-locked, the lock count (number of readers) is stored in the last link of
//! the queue. Threads have to traverse the queue to find the last element upon releasing the lock.
//! To avoid having to traverse the entire list every time we want to access the reader count, a
//! pointer to the found tail is cached in the (current) first element of the queue.
//!
//! Also, while the lock is unfair for performance reasons, it is still best to wake the tail node
//! first (FIFO ordering). Since we always pop nodes off the tail of the queue, we must store
//! backlinks to previous nodes so that we can update the `tail` field of the (current) first
//! element of the queue. Adding backlinks is done at the same time as finding the tail (via the
//! function [`find_tail_and_add_backlinks`]), and thus encountering a set tail field on a node
//! indicates that all following nodes in the queue are initialized.
//! //!
//! TLDR: Here's a diagram of what the queue looks like: //! TLDR: Here's a diagram of what the queue looks like:
//! //!
@ -89,21 +94,21 @@
//! 3. All nodes preceding this node must have a correct, non-null `next` field. //! 3. All nodes preceding this node must have a correct, non-null `next` field.
//! 4. All nodes following this node must have a correct, non-null `prev` field. //! 4. All nodes following this node must have a correct, non-null `prev` field.
//! //!
//! Access to the queue is controlled by the `QUEUE_LOCKED` bit, which threads //! Access to the queue is controlled by the `QUEUE_LOCKED` bit. Threads will try to set this bit
//! try to set both after enqueuing themselves to eagerly add backlinks to the //! in two cases: one is when a thread enqueues itself and eagerly adds backlinks to the queue
//! queue, which drastically improves performance, and after unlocking the lock //! (which drastically improves performance), and the other is after a thread unlocks the lock to
//! to wake the next waiter(s). This is done atomically at the same time as the //! wake up the next waiter(s).
//! enqueuing/unlocking operation. The thread releasing the `QUEUE_LOCK` bit
//! will check the state of the lock and wake up waiters as appropriate. This
//! guarantees forward-progress even if the unlocking thread could not acquire
//! the queue lock.
//! //!
//! ## Memory orderings //! `QUEUE_LOCKED` is set atomically at the same time as the enqueuing/unlocking operations. The
//! thread releasing the `QUEUE_LOCK` bit will check the state of the lock and wake up waiters as
//! appropriate. This guarantees forward progress even if the unlocking thread could not acquire the
//! queue lock.
//! //!
//! To properly synchronize changes to the data protected by the lock, the lock //! ## Memory Orderings
//! is acquired and released with [`Acquire`] and [`Release`] ordering, respectively. //!
//! To propagate the initialization of nodes, changes to the queue lock are also //! To properly synchronize changes to the data protected by the lock, the lock is acquired and
//! performed using these orderings. //! released with [`Acquire`] and [`Release`] ordering, respectively. To propagate the
//! initialization of nodes, changes to the queue lock are also performed using these orderings.
#![forbid(unsafe_op_in_unsafe_fn)] #![forbid(unsafe_op_in_unsafe_fn)]
@ -115,20 +120,23 @@ use crate::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
use crate::sync::atomic::{AtomicBool, AtomicPtr}; use crate::sync::atomic::{AtomicBool, AtomicPtr};
use crate::thread::{self, Thread, ThreadId}; use crate::thread::{self, Thread, ThreadId};
// Locking uses exponential backoff. `SPIN_COUNT` indicates how many times the /// The atomic lock state.
// locking operation will be retried.
// `spin_loop` will be called `2.pow(SPIN_COUNT) - 1` times.
const SPIN_COUNT: usize = 7;
type State = *mut ();
type AtomicState = AtomicPtr<()>; type AtomicState = AtomicPtr<()>;
/// The inner lock state.
type State = *mut ();
const UNLOCKED: State = without_provenance_mut(0); const UNLOCKED: State = without_provenance_mut(0);
const LOCKED: usize = 1; const LOCKED: usize = 1 << 0;
const QUEUED: usize = 2; const QUEUED: usize = 1 << 1;
const QUEUE_LOCKED: usize = 4; const QUEUE_LOCKED: usize = 1 << 2;
const SINGLE: usize = 8; const SINGLE: usize = 1 << 3;
const MASK: usize = !(QUEUE_LOCKED | QUEUED | LOCKED); const NODE_MASK: usize = !(QUEUE_LOCKED | QUEUED | LOCKED);
/// Locking uses exponential backoff. `SPIN_COUNT` indicates how many times the locking operation
/// will be retried.
///
/// In other words, `spin_loop` will be called `2.pow(SPIN_COUNT) - 1` times.
const SPIN_COUNT: usize = 7;
/// Marks the state as write-locked, if possible. /// Marks the state as write-locked, if possible.
#[inline] #[inline]
@ -147,13 +155,28 @@ fn read_lock(state: State) -> Option<State> {
} }
} }
/// Masks the state, assuming it points to a queue node. /// Converts a `State` into a `Node` by masking out the bottom bits of the state, assuming that the
/// state points to a queue node.
/// ///
/// # Safety /// # Safety
///
/// The state must contain a valid pointer to a queue node. /// The state must contain a valid pointer to a queue node.
#[inline] #[inline]
unsafe fn to_node(state: State) -> NonNull<Node> { unsafe fn to_node(state: State) -> NonNull<Node> {
unsafe { NonNull::new_unchecked(state.mask(MASK)).cast() } unsafe { NonNull::new_unchecked(state.mask(NODE_MASK)).cast() }
}
/// The representation of a thread waiting on the lock queue.
///
/// We initialize these `Node`s on thread execution stacks to avoid allocation.
#[repr(align(8))]
struct Node {
next: AtomicLink,
prev: AtomicLink,
tail: AtomicLink,
write: bool,
thread: OnceCell<Thread>,
completed: AtomicBool,
} }
/// An atomic node pointer with relaxed operations. /// An atomic node pointer with relaxed operations.
@ -173,16 +196,6 @@ impl AtomicLink {
} }
} }
#[repr(align(8))]
struct Node {
next: AtomicLink,
prev: AtomicLink,
tail: AtomicLink,
write: bool,
thread: OnceCell<Thread>,
completed: AtomicBool,
}
impl Node { impl Node {
/// Creates a new queue node. /// Creates a new queue node.
fn new(write: bool) -> Node { fn new(write: bool) -> Node {
@ -198,17 +211,17 @@ impl Node {
/// Prepare this node for waiting. /// Prepare this node for waiting.
fn prepare(&mut self) { fn prepare(&mut self) {
// Fall back to creating an unnamed `Thread` handle to allow locking in // Fall back to creating an unnamed `Thread` handle to allow locking in TLS destructors.
// TLS destructors.
self.thread.get_or_init(|| { self.thread.get_or_init(|| {
thread::try_current().unwrap_or_else(|| Thread::new_unnamed(ThreadId::new())) thread::try_current().unwrap_or_else(|| Thread::new_unnamed(ThreadId::new()))
}); });
self.completed = AtomicBool::new(false); self.completed = AtomicBool::new(false);
} }
/// Wait until this node is marked as completed. /// Wait until this node is marked as [`complete`](Node::complete)d by another thread.
/// ///
/// # Safety /// # Safety
///
/// May only be called from the thread that created the node. /// May only be called from the thread that created the node.
unsafe fn wait(&self) { unsafe fn wait(&self) {
while !self.completed.load(Acquire) { while !self.completed.load(Acquire) {
@ -218,51 +231,48 @@ impl Node {
} }
} }
/// Atomically mark this node as completed. The node may not outlive this call. /// Atomically mark this node as completed.
unsafe fn complete(this: NonNull<Node>) { ///
// Since the node may be destroyed immediately after the completed flag /// # Safety
// is set, clone the thread handle before that. ///
let thread = unsafe { this.as_ref().thread.get().unwrap().clone() }; /// `node` must point to a valid `Node`, and the node may not outlive this call.
unsafe fn complete(node: NonNull<Node>) {
// Since the node may be destroyed immediately after the completed flag is set, clone the
// thread handle before that.
let thread = unsafe { node.as_ref().thread.get().unwrap().clone() };
unsafe { unsafe {
this.as_ref().completed.store(true, Release); node.as_ref().completed.store(true, Release);
} }
thread.unpark(); thread.unpark();
} }
} }
struct PanicGuard; /// Traverse the queue and find the tail, adding backlinks to the queue while traversing.
impl Drop for PanicGuard {
fn drop(&mut self) {
rtabort!("tried to drop node in intrusive list.");
}
}
/// Add backlinks to the queue, returning the tail.
/// ///
/// May be called from multiple threads at the same time, while the queue is not /// This may be called from multiple threads at the same time as long as the queue is not being
/// modified (this happens when unlocking multiple readers). /// modified (this happens when unlocking multiple readers).
/// ///
/// # Safety /// # Safety
///
/// * `head` must point to a node in a valid queue. /// * `head` must point to a node in a valid queue.
/// * `head` must be or be in front of the head of the queue at the time of the /// * `head` must be in front of the head of the queue at the time of the last removal.
/// last removal. /// * The part of the queue starting with `head` must not be modified during this call.
/// * The part of the queue starting with `head` must not be modified during this unsafe fn find_tail_and_add_backlinks(head: NonNull<Node>) -> NonNull<Node> {
/// call.
unsafe fn add_backlinks_and_find_tail(head: NonNull<Node>) -> NonNull<Node> {
let mut current = head; let mut current = head;
// Traverse the queue until we find a node that has a set `tail`.
let tail = loop { let tail = loop {
let c = unsafe { current.as_ref() }; let c = unsafe { current.as_ref() };
match c.tail.get() { if let Some(tail) = c.tail.get() {
Some(tail) => break tail, break tail;
// SAFETY: }
// All `next` fields before the first node with a `set` tail are
// non-null and valid (invariant 3). // SAFETY: All `next` fields before the first node with a set `tail` are non-null and valid
None => unsafe { // (by Invariant 3).
let next = c.next.get().unwrap_unchecked(); unsafe {
next.as_ref().prev.set(Some(current)); let next = c.next.get().unwrap_unchecked();
current = next; next.as_ref().prev.set(Some(current));
}, current = next;
} }
}; };
@ -272,6 +282,16 @@ unsafe fn add_backlinks_and_find_tail(head: NonNull<Node>) -> NonNull<Node> {
} }
} }
/// A type to guard against the unwinds of stacks that nodes are located on due to panics.
struct PanicGuard;
impl Drop for PanicGuard {
fn drop(&mut self) {
rtabort!("tried to drop node in intrusive list.");
}
}
/// The public inner `RwLock` type.
pub struct RwLock { pub struct RwLock {
state: AtomicState, state: AtomicState,
} }
@ -296,11 +316,10 @@ impl RwLock {
#[inline] #[inline]
pub fn try_write(&self) -> bool { pub fn try_write(&self) -> bool {
// Atomically set the `LOCKED` bit. This is lowered to a single atomic // Atomically set the `LOCKED` bit. This is lowered to a single atomic instruction on most
// instruction on most modern processors (e.g. "lock bts" on x86 and // modern processors (e.g. "lock bts" on x86 and "ldseta" on modern AArch64), and therefore
// "ldseta" on modern AArch64), and therefore is more efficient than // is more efficient than `fetch_update(lock(true))`, which can spuriously fail if a new
// `fetch_update(lock(true))`, which can spuriously fail if a new node // node is appended to the queue.
// is appended to the queue.
self.state.fetch_or(LOCKED, Acquire).addr() & LOCKED == 0 self.state.fetch_or(LOCKED, Acquire).addr() & LOCKED == 0
} }
@ -313,88 +332,91 @@ impl RwLock {
#[cold] #[cold]
fn lock_contended(&self, write: bool) { fn lock_contended(&self, write: bool) {
let update = if write { write_lock } else { read_lock }; let update_fn = if write { write_lock } else { read_lock };
let mut node = Node::new(write); let mut node = Node::new(write);
let mut state = self.state.load(Relaxed); let mut state = self.state.load(Relaxed);
let mut count = 0; let mut count = 0;
loop { loop {
if let Some(next) = update(state) { // Optimistically update the state.
if let Some(next) = update_fn(state) {
// The lock is available, try locking it. // The lock is available, try locking it.
match self.state.compare_exchange_weak(state, next, Acquire, Relaxed) { match self.state.compare_exchange_weak(state, next, Acquire, Relaxed) {
Ok(_) => return, Ok(_) => return,
Err(new) => state = new, Err(new) => state = new,
} }
continue;
} else if state.addr() & QUEUED == 0 && count < SPIN_COUNT { } else if state.addr() & QUEUED == 0 && count < SPIN_COUNT {
// If the lock is not available and no threads are queued, spin // If the lock is not available and no threads are queued, optimistically spin for a
// for a while, using exponential backoff to decrease cache // while, using exponential backoff to decrease cache contention.
// contention.
for _ in 0..(1 << count) { for _ in 0..(1 << count) {
spin_loop(); spin_loop();
} }
state = self.state.load(Relaxed); state = self.state.load(Relaxed);
count += 1; count += 1;
} else { continue;
// Fall back to parking. First, prepare the node.
node.prepare();
// If there are threads queued, set the `next` field to a
// pointer to the next node in the queue. Otherwise set it to
// the lock count if the state is read-locked or to zero if it
// is write-locked.
node.next.0 = AtomicPtr::new(state.mask(MASK).cast());
node.prev = AtomicLink::new(None);
let mut next = ptr::from_ref(&node)
.map_addr(|addr| addr | QUEUED | (state.addr() & LOCKED))
as State;
if state.addr() & QUEUED == 0 {
// If this is the first node in the queue, set the tail field to
// the node itself to ensure there is a current `tail` field in
// the queue (invariants 1 and 2). This needs to use `set` to
// avoid invalidating the new pointer.
node.tail.set(Some(NonNull::from(&node)));
} else {
// Otherwise, the tail of the queue is not known.
node.tail.set(None);
// Try locking the queue to eagerly add backlinks.
next = next.map_addr(|addr| addr | QUEUE_LOCKED);
}
// Register the node, using release ordering to propagate our
// changes to the waking thread.
if let Err(new) = self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) {
// The state has changed, just try again.
state = new;
continue;
}
// The node is registered, so the structure must not be
// mutably accessed or destroyed while other threads may
// be accessing it. Guard against unwinds using a panic
// guard that aborts when dropped.
let guard = PanicGuard;
// If the current thread locked the queue, unlock it again,
// linking it in the process.
if state.addr() & (QUEUE_LOCKED | QUEUED) == QUEUED {
unsafe {
self.unlock_queue(next);
}
}
// Wait until the node is removed from the queue.
// SAFETY: the node was created by the current thread.
unsafe {
node.wait();
}
// The node was removed from the queue, disarm the guard.
mem::forget(guard);
// Reload the state and try again.
state = self.state.load(Relaxed);
count = 0;
} }
// The optimistic paths did not succeed, so fall back to parking the thread.
// First, prepare the node.
node.prepare();
// If there are threads queued, this will set the `next` field to be a pointer to the
// first node in the queue.
// If the state is read-locked, this will set `next` to the lock count.
// If it is write-locked, it will set `next` to zero.
node.next.0 = AtomicPtr::new(state.mask(NODE_MASK).cast());
node.prev = AtomicLink::new(None);
// Set the `QUEUED` bit and maintain the `LOCKED` bit.
let mut next = ptr::from_ref(&node)
.map_addr(|addr| addr | QUEUED | (state.addr() & LOCKED))
as State;
if state.addr() & QUEUED == 0 {
// If this is the first node in the queue, set the `tail` field to the node itself
// to ensure there is a valid `tail` field in the queue (Invariants 1 & 2).
// This needs to use `set` to avoid invalidating the new pointer.
node.tail.set(Some(NonNull::from(&node)));
} else {
// Otherwise, the tail of the queue is not known.
node.tail.set(None);
// Try locking the queue to eagerly add backlinks.
next = next.map_addr(|addr| addr | QUEUE_LOCKED);
}
// Register the node, using release ordering to propagate our changes to the waking
// thread.
if let Err(new) = self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) {
// The state has changed, just try again.
state = new;
continue;
}
// The node has been registered, so the structure must not be mutably accessed or
// destroyed while other threads may be accessing it.
// Guard against unwinds using a `PanicGuard` that aborts when dropped.
let guard = PanicGuard;
// If the current thread locked the queue, unlock it to eagerly add backlinks.
if state.addr() & (QUEUE_LOCKED | QUEUED) == QUEUED {
unsafe {
self.unlock_queue(next);
}
}
// Wait until the node is removed from the queue.
// SAFETY: the node was created by the current thread.
unsafe {
node.wait();
}
// The node was removed from the queue, disarm the guard.
mem::forget(guard);
// Reload the state and try again.
state = self.state.load(Relaxed);
count = 0;
} }
} }
@ -402,6 +424,7 @@ impl RwLock {
pub unsafe fn read_unlock(&self) { pub unsafe fn read_unlock(&self) {
match self.state.fetch_update(Release, Acquire, |state| { match self.state.fetch_update(Release, Acquire, |state| {
if state.addr() & QUEUED == 0 { if state.addr() & QUEUED == 0 {
// If there are no threads queued, simply decrement the reader count.
let count = state.addr() - (SINGLE | LOCKED); let count = state.addr() - (SINGLE | LOCKED);
Some(if count > 0 { without_provenance_mut(count | LOCKED) } else { UNLOCKED }) Some(if count > 0 { without_provenance_mut(count | LOCKED) } else { UNLOCKED })
} else { } else {
@ -409,8 +432,7 @@ impl RwLock {
} }
}) { }) {
Ok(_) => {} Ok(_) => {}
// There are waiters queued and the lock count was moved to the // There are waiters queued and the lock count was moved to the tail of the queue.
// tail of the queue.
Err(state) => unsafe { self.read_unlock_contended(state) }, Err(state) => unsafe { self.read_unlock_contended(state) },
} }
} }
@ -420,21 +442,21 @@ impl RwLock {
// The state was observed with acquire ordering above, so the current // The state was observed with acquire ordering above, so the current
// thread will observe all node initializations. // thread will observe all node initializations.
// SAFETY: // FIXME this is a bit confusing
// Because new read-locks cannot be acquired while threads are queued, // SAFETY: Because new read-locks cannot be acquired while threads are queued, all
// all queue-lock owners will observe the set `LOCKED` bit. Because they // queue-lock owners will observe the set `LOCKED` bit. And because no downgrade can be in
// do not modify the queue while there is a lock owner, the queue will // progress (we checked above), they hence do not modify the queue, so the queue will not be
// not be removed from here. // removed from here.
let tail = unsafe { add_backlinks_and_find_tail(to_node(state)).as_ref() }; let tail = unsafe { find_tail_and_add_backlinks(to_node(state)).as_ref() };
// The lock count is stored in the `next` field of `tail`. // The lock count is stored in the `next` field of `tail`.
// Decrement it, making sure to observe all changes made to the queue // Decrement it, making sure to observe all changes made to the queue by the other lock
// by the other lock owners by using acquire-release ordering. // owners by using acquire-release ordering.
let was_last = tail.next.0.fetch_byte_sub(SINGLE, AcqRel).addr() - SINGLE == 0; let was_last = tail.next.0.fetch_byte_sub(SINGLE, AcqRel).addr() - SINGLE == 0;
if was_last { if was_last {
// SAFETY: // SAFETY: Other threads cannot read-lock while threads are queued. Also, the `LOCKED`
// Other threads cannot read-lock while threads are queued. Also, // bit is still set, so there are no writers. Thus the current thread exclusively owns
// the `LOCKED` bit is still set, so there are no writers. Therefore, // this lock, even though it is a reader.
// the current thread exclusively owns the lock.
unsafe { self.unlock_contended(state) } unsafe { self.unlock_contended(state) }
} }
} }
@ -444,14 +466,14 @@ impl RwLock {
if let Err(state) = if let Err(state) =
self.state.compare_exchange(without_provenance_mut(LOCKED), UNLOCKED, Release, Relaxed) self.state.compare_exchange(without_provenance_mut(LOCKED), UNLOCKED, Release, Relaxed)
{ {
// SAFETY: // SAFETY: Since other threads cannot acquire the lock, the state can only have changed
// Since other threads cannot acquire the lock, the state can only // because there are threads queued on the lock.
// have changed because there are threads queued on the lock.
unsafe { self.unlock_contended(state) } unsafe { self.unlock_contended(state) }
} }
} }
/// # Safety /// # Safety
///
/// * The lock must be exclusively owned by this thread. /// * The lock must be exclusively owned by this thread.
/// * There must be threads queued on the lock. /// * There must be threads queued on the lock.
#[cold] #[cold]
@ -477,12 +499,13 @@ impl RwLock {
/// thread(s). /// thread(s).
/// ///
/// # Safety /// # Safety
///
/// The queue lock must be held by the current thread. /// The queue lock must be held by the current thread.
unsafe fn unlock_queue(&self, mut state: State) { unsafe fn unlock_queue(&self, mut state: State) {
debug_assert_eq!(state.addr() & (QUEUED | QUEUE_LOCKED), QUEUED | QUEUE_LOCKED); debug_assert_eq!(state.addr() & (QUEUED | QUEUE_LOCKED), QUEUED | QUEUE_LOCKED);
loop { loop {
let tail = unsafe { add_backlinks_and_find_tail(to_node(state)) }; let tail = unsafe { find_tail_and_add_backlinks(to_node(state)) };
if state.addr() & LOCKED == LOCKED { if state.addr() & LOCKED == LOCKED {
// Another thread has locked the lock. Leave waking up waiters // Another thread has locked the lock. Leave waking up waiters