1
Fork 0

add downgrade to queue implementation

This commit adds the `downgrade` method onto the inner `RwLock` queue
implementation.

There are also a few other style patches included in this commit.

Co-authored-by: Jonas Böttiger <jonasboettiger@icloud.com>
This commit is contained in:
Connor Tsui 2024-10-25 15:01:57 -04:00
parent 31e35c2131
commit 26b5a1485e

View file

@ -40,16 +40,16 @@
//!
//! ## State
//!
//! A single [`AtomicPtr`] is used as state variable. The lowest three bits are used to indicate the
//! A single [`AtomicPtr`] is used as state variable. The lowest four bits are used to indicate the
//! meaning of the remaining bits:
//!
//! | [`LOCKED`] | [`QUEUED`] | [`QUEUE_LOCKED`] | Remaining | |
//! |:-----------|:-----------|:-----------------|:-------------|:----------------------------------------------------------------------------------------------------------------------------|
//! | 0 | 0 | 0 | 0 | The lock is unlocked, no threads are waiting |
//! | 1 | 0 | 0 | 0 | The lock is write-locked, no threads waiting |
//! | 1 | 0 | 0 | n > 0 | The lock is read-locked with n readers |
//! | 0 | 1 | * | `*mut Node` | The lock is unlocked, but some threads are waiting. Only writers may lock the lock |
//! | 1 | 1 | * | `*mut Node` | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count |
//! | [`LOCKED`] | [`QUEUED`] | [`QUEUE_LOCKED`] | [`DOWNGRADED`] | Remaining | |
//! |------------|:-----------|:-----------------|:---------------|:-------------|:----------------------------------------------------------------------------------------------------------------------------|
//! | 0 | 0 | 0 | 0 | 0 | The lock is unlocked, no threads are waiting |
//! | 1 | 0 | 0 | 0 | 0 | The lock is write-locked, no threads waiting |
//! | 1 | 0 | 0 | 0 | n > 0 | The lock is read-locked with n readers |
//! | 0 | 1 | * | 0 | `*mut Node` | The lock is unlocked, but some threads are waiting. Only writers may lock the lock |
//! | 1 | 1 | * | * | `*mut Node` | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count |
//!
//! ## Waiter Queue
//!
@ -100,9 +100,9 @@
//! wake up the next waiter(s).
//!
//! `QUEUE_LOCKED` is set atomically at the same time as the enqueuing/unlocking operations. The
//! thread releasing the `QUEUE_LOCK` bit will check the state of the lock and wake up waiters as
//! appropriate. This guarantees forward progress even if the unlocking thread could not acquire the
//! queue lock.
//! thread releasing the `QUEUE_LOCKED` bit will check the state of the lock (in particular, whether
//! a downgrade was requested using the [`DOWNGRADED`] bit) and wake up waiters as appropriate. This
//! guarantees forward progress even if the unlocking thread could not acquire the queue lock.
//!
//! ## Memory Orderings
//!
@ -129,8 +129,10 @@ const UNLOCKED: State = without_provenance_mut(0);
const LOCKED: usize = 1 << 0;
const QUEUED: usize = 1 << 1;
const QUEUE_LOCKED: usize = 1 << 2;
const SINGLE: usize = 1 << 3;
const NODE_MASK: usize = !(QUEUE_LOCKED | QUEUED | LOCKED);
const DOWNGRADED: usize = 1 << 3;
const SINGLE: usize = 1 << 4;
const STATE: usize = DOWNGRADED | QUEUE_LOCKED | QUEUED | LOCKED;
const NODE_MASK: usize = !STATE;
/// Locking uses exponential backoff. `SPIN_COUNT` indicates how many times the locking operation
/// will be retried.
@ -141,8 +143,7 @@ const SPIN_COUNT: usize = 7;
/// Marks the state as write-locked, if possible.
#[inline]
fn write_lock(state: State) -> Option<State> {
let state = state.wrapping_byte_add(LOCKED);
if state.addr() & LOCKED == LOCKED { Some(state) } else { None }
if state.addr() & LOCKED == 0 { Some(state.map_addr(|addr| addr | LOCKED)) } else { None }
}
/// Marks the state as read-locked, if possible.
@ -169,7 +170,11 @@ unsafe fn to_node(state: State) -> NonNull<Node> {
/// The representation of a thread waiting on the lock queue.
///
/// We initialize these `Node`s on thread execution stacks to avoid allocation.
#[repr(align(8))]
///
/// Note that we need an alignment of 16 to ensure that the last 4 bits of any
/// pointers to `Node`s are always zeroed (for the bit flags described in the
/// module-level documentation).
#[repr(align(16))]
struct Node {
next: AtomicLink,
prev: AtomicLink,
@ -255,7 +260,7 @@ impl Node {
/// # Safety
///
/// * `head` must point to a node in a valid queue.
/// * `head` must be in front of the head of the queue at the time of the last removal.
/// * `head` must be in front of the previous head node that was used to perform the last removal.
/// * The part of the queue starting with `head` must not be modified during this call.
unsafe fn find_tail_and_add_backlinks(head: NonNull<Node>) -> NonNull<Node> {
let mut current = head;
@ -282,6 +287,28 @@ unsafe fn find_tail_and_add_backlinks(head: NonNull<Node>) -> NonNull<Node> {
}
}
/// [`complete`](Node::complete)s all threads in the queue ending with `tail`.
///
/// # Safety
///
/// * `tail` must be a valid tail of a fully linked queue.
/// * The current thread must have exclusive access to that queue.
unsafe fn complete_all(tail: NonNull<Node>) {
let mut current = tail;
// Traverse backwards through the queue (FIFO) and `complete` all of the nodes.
loop {
let prev = unsafe { current.as_ref().prev.get() };
unsafe {
Node::complete(current);
}
match prev {
Some(prev) => current = prev,
None => return,
}
}
}
/// A type to guard against the unwinds of stacks that nodes are located on due to panics.
struct PanicGuard;
@ -332,10 +359,11 @@ impl RwLock {
#[cold]
fn lock_contended(&self, write: bool) {
let update_fn = if write { write_lock } else { read_lock };
let mut node = Node::new(write);
let mut state = self.state.load(Relaxed);
let mut count = 0;
let update_fn = if write { write_lock } else { read_lock };
loop {
// Optimistically update the state.
if let Some(next) = update_fn(state) {
@ -372,6 +400,7 @@ impl RwLock {
.map_addr(|addr| addr | QUEUED | (state.addr() & LOCKED))
as State;
let mut is_queue_locked = false;
if state.addr() & QUEUED == 0 {
// If this is the first node in the queue, set the `tail` field to the node itself
// to ensure there is a valid `tail` field in the queue (Invariants 1 & 2).
@ -383,6 +412,9 @@ impl RwLock {
// Try locking the queue to eagerly add backlinks.
next = next.map_addr(|addr| addr | QUEUE_LOCKED);
// Track if we changed the `QUEUE_LOCKED` bit from off to on.
is_queue_locked = state.addr() & QUEUE_LOCKED == 0;
}
// Register the node, using release ordering to propagate our changes to the waking
@ -398,8 +430,9 @@ impl RwLock {
// Guard against unwinds using a `PanicGuard` that aborts when dropped.
let guard = PanicGuard;
// If the current thread locked the queue, unlock it to eagerly add backlinks.
if state.addr() & (QUEUE_LOCKED | QUEUED) == QUEUED {
// If the current thread locked the queue, unlock it to eagerly adding backlinks.
if is_queue_locked {
// SAFETY: This thread set the `QUEUE_LOCKED` bit above.
unsafe {
self.unlock_queue(next);
}
@ -427,6 +460,12 @@ impl RwLock {
// If there are no threads queued, simply decrement the reader count.
let count = state.addr() - (SINGLE | LOCKED);
Some(if count > 0 { without_provenance_mut(count | LOCKED) } else { UNLOCKED })
} else if state.addr() & DOWNGRADED != 0 {
// This thread used to have exclusive access, but requested a downgrade. This has
// not been completed yet, so we still have exclusive access.
// Retract the downgrade request and unlock, but leave waking up new threads to the
// thread that already holds the queue lock.
Some(state.mask(!(DOWNGRADED | LOCKED)))
} else {
None
}
@ -476,40 +515,127 @@ impl RwLock {
///
/// * The lock must be exclusively owned by this thread.
/// * There must be threads queued on the lock.
/// * There cannot be a `downgrade` in progress.
#[cold]
unsafe fn unlock_contended(&self, mut state: State) {
unsafe fn unlock_contended(&self, state: State) {
debug_assert!(state.addr() & STATE == (QUEUED | LOCKED));
let mut current = state;
// We want to atomically release the lock and try to acquire the queue lock.
loop {
// Atomically release the lock and try to acquire the queue lock.
let next = state.map_addr(|a| (a & !LOCKED) | QUEUE_LOCKED);
match self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) {
// The queue lock was acquired. Release it, waking up the next
// waiter in the process.
Ok(_) if state.addr() & QUEUE_LOCKED == 0 => unsafe {
return self.unlock_queue(next);
},
// Another thread already holds the queue lock, leave waking up
// waiters to it.
// First check if the queue lock is already held.
if current.addr() & QUEUE_LOCKED != 0 {
// Another thread holds the queue lock, so let them wake up waiters for us.
let next = current.mask(!LOCKED);
match self.state.compare_exchange_weak(current, next, Release, Relaxed) {
Ok(_) => return,
Err(new) => state = new,
Err(new) => {
current = new;
continue;
}
}
}
/// Unlocks the queue. If the lock is unlocked, wakes up the next eligible
/// thread(s).
// Atomically release the lock and try to acquire the queue lock.
let next = current.map_addr(|addr| (addr & !LOCKED) | QUEUE_LOCKED);
match self.state.compare_exchange_weak(current, next, AcqRel, Relaxed) {
Ok(_) => {
// Now that we have the queue lock, we can wake up the next waiter.
// SAFETY: This thread is exclusively owned by this thread.
unsafe { self.unlock_queue(next) };
return;
}
Err(new) => current = new,
}
}
}
/// # Safety
///
/// * The lock must be write-locked by this thread.
#[inline]
pub unsafe fn downgrade(&self) {
// Optimistically change the state from write-locked with a single writer and no waiters to
// read-locked with a single reader and no waiters.
if let Err(state) = self.state.compare_exchange(
without_provenance_mut(LOCKED),
without_provenance_mut(SINGLE | LOCKED),
Release,
Relaxed,
) {
// SAFETY: The only way the state can have changed is if there are threads queued.
// Wake all of them up.
unsafe { self.downgrade_slow(state) }
}
}
/// Downgrades the lock from write-locked to read-locked in the case that there are threads
/// waiting on the wait queue.
///
/// This function will either wake up all of the waiters on the wait queue or designate the
/// current holder of the queue lock to wake up all of the waiters instead. Once the waiters
/// wake up, they will continue in the execution loop of `lock_contended`.
///
/// # Safety
///
/// The queue lock must be held by the current thread.
/// * The lock must be write-locked by this thread.
/// * There must be threads queued on the lock.
#[cold]
unsafe fn downgrade_slow(&self, mut state: State) {
debug_assert!(state.addr() & (DOWNGRADED | QUEUED | LOCKED) == (QUEUED | LOCKED));
// Attempt to wake up all waiters by taking ownership of the entire waiter queue.
loop {
if state.addr() & QUEUE_LOCKED != 0 {
// Another thread already holds the queue lock. Tell it to wake up all waiters.
// If the other thread succeeds in waking up waiters before we release our lock, the
// effect will be just the same as if we had changed the state below.
// Otherwise, the `DOWNGRADED` bit will still be set, meaning that when this thread
// calls `read_unlock` later (because it holds a read lock and must unlock
// eventually), it will realize that the lock is still exclusively locked and act
// accordingly.
let next = state.map_addr(|addr| addr | DOWNGRADED);
match self.state.compare_exchange_weak(state, next, Release, Relaxed) {
Ok(_) => return,
Err(new) => state = new,
}
} else {
// Grab the entire queue by swapping the `state` with a single reader.
let next = ptr::without_provenance_mut(SINGLE | LOCKED);
if let Err(new) = self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) {
state = new;
continue;
}
// SAFETY: We have full ownership of this queue now, so nobody else can modify it.
let tail = unsafe { find_tail_and_add_backlinks(to_node(state)) };
// Wake up all waiters.
// SAFETY: `tail` was just computed, meaning the whole queue is linked.
unsafe { complete_all(tail) };
return;
}
}
}
/// Unlocks the queue. Wakes up all threads if a downgrade was requested, otherwise wakes up the
/// next eligible thread(s) if the lock is unlocked.
///
/// # Safety
///
/// * The queue lock must be held by the current thread.
/// * There must be threads queued on the lock.
unsafe fn unlock_queue(&self, mut state: State) {
debug_assert_eq!(state.addr() & (QUEUED | QUEUE_LOCKED), QUEUED | QUEUE_LOCKED);
loop {
let tail = unsafe { find_tail_and_add_backlinks(to_node(state)) };
if state.addr() & LOCKED == LOCKED {
// Another thread has locked the lock. Leave waking up waiters
// to them by releasing the queue lock.
if state.addr() & (DOWNGRADED | LOCKED) == LOCKED {
// Another thread has locked the lock and no downgrade was requested.
// Leave waking up waiters to them by releasing the queue lock.
match self.state.compare_exchange_weak(
state,
state.mask(!QUEUE_LOCKED),
@ -524,53 +650,63 @@ impl RwLock {
}
}
let is_writer = unsafe { tail.as_ref().write };
if is_writer && let Some(prev) = unsafe { tail.as_ref().prev.get() } {
// `tail` is a writer and there is a node before `tail`.
// Split off `tail`.
// Since we hold the queue lock and downgrades cannot be requested if the lock is
// already read-locked, we have exclusive control over the queue here and can make
// modifications.
// There are no set `tail` links before the node pointed to by
// `state`, so the first non-null tail field will be current
// (invariant 2). Invariant 4 is fullfilled since `find_tail`
// was called on this node, which ensures all backlinks are set.
let downgrade = state.addr() & DOWNGRADED != 0;
let is_writer = unsafe { tail.as_ref().write };
if !downgrade
&& is_writer
&& let Some(prev) = unsafe { tail.as_ref().prev.get() }
{
// If we are not downgrading and the next thread is a writer, only wake up that
// writing thread.
// Split off `tail`.
// There are no set `tail` links before the node pointed to by `state`, so the first
// non-null tail field will be current (Invariant 2).
// We also fulfill Invariant 4 since `find_tail` was called on this node, which
// ensures all backlinks are set.
unsafe {
to_node(state).as_ref().tail.set(Some(prev));
}
// Release the queue lock. Doing this by subtraction is more
// efficient on modern processors since it is a single instruction
// instead of an update loop, which will fail if new threads are
// added to the list.
self.state.fetch_byte_sub(QUEUE_LOCKED, Release);
// The tail was split off and the lock released. Mark the node as
// completed.
// Try to release the queue lock. We need to check the state again since another
// thread might have acquired the lock and requested a downgrade.
let next = state.mask(!QUEUE_LOCKED);
if let Err(new) = self.state.compare_exchange_weak(state, next, Release, Acquire) {
// Undo the tail modification above, so that we can find the tail again above.
// As mentioned above, we have exclusive control over the queue, so no other
// thread could have noticed the change.
unsafe {
return Node::complete(tail);
to_node(state).as_ref().tail.set(Some(tail));
}
} else {
// The next waiter is a reader or the queue only consists of one
// waiter. Just wake all threads.
// The lock cannot be locked (checked above), so mark it as
// unlocked to reset the queue.
if let Err(new) =
self.state.compare_exchange_weak(state, UNLOCKED, Release, Acquire)
{
state = new;
continue;
}
let mut current = tail;
loop {
let prev = unsafe { current.as_ref().prev.get() };
// The tail was split off and the lock was released. Mark the node as completed.
unsafe {
Node::complete(current);
}
match prev {
Some(prev) => current = prev,
None => return,
return Node::complete(tail);
}
} else {
// We are either downgrading, the next waiter is a reader, or the queue only
// consists of one waiter. In any case, just wake all threads.
// Clear the queue.
let next =
if downgrade { ptr::without_provenance_mut(SINGLE | LOCKED) } else { UNLOCKED };
if let Err(new) = self.state.compare_exchange_weak(state, next, Release, Acquire) {
state = new;
continue;
}
// SAFETY: we computed `tail` above, and no new nodes can have been added since
// (otherwise the CAS above would have failed).
// Thus we have complete control over the whole queue.
unsafe {
return complete_all(tail);
}
}
}