1
Fork 0

initial port of crossbeam-channel

This commit is contained in:
Ibraheem Ahmed 2022-10-17 19:09:54 -04:00
parent 34115d040b
commit a43da5a097
11 changed files with 2694 additions and 0 deletions

View file

@ -182,6 +182,7 @@ pub mod mpsc;
mod barrier;
mod condvar;
mod lazy_lock;
mod mpmc;
mod mutex;
mod once;
mod once_lock;

View file

@ -0,0 +1,523 @@
//! Bounded channel based on a preallocated array.
//!
//! This flavor has a fixed, positive capacity.
//!
//! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
//!
//! Source:
//! - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue>
//! - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub>
use super::context::Context;
use super::error::*;
use super::select::{Operation, Selected, Token};
use super::utils::{Backoff, CachePadded};
use super::waker::SyncWaker;
use crate::cell::UnsafeCell;
use crate::mem::MaybeUninit;
use crate::ptr;
use crate::sync::atomic::{self, AtomicUsize, Ordering};
use crate::time::Instant;
/// A slot in a channel.
struct Slot<T> {
/// The current stamp.
stamp: AtomicUsize,
/// The message in this slot.
msg: UnsafeCell<MaybeUninit<T>>,
}
/// The token type for the array flavor.
#[derive(Debug)]
pub(crate) struct ArrayToken {
/// Slot to read from or write to.
slot: *const u8,
/// Stamp to store into the slot after reading or writing.
stamp: usize,
}
impl Default for ArrayToken {
#[inline]
fn default() -> Self {
ArrayToken { slot: ptr::null(), stamp: 0 }
}
}
/// Bounded channel based on a preallocated array.
pub(crate) struct Channel<T> {
/// The head of the channel.
///
/// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
/// packed into a single `usize`. The lower bits represent the index, while the upper bits
/// represent the lap. The mark bit in the head is always zero.
///
/// Messages are popped from the head of the channel.
head: CachePadded<AtomicUsize>,
/// The tail of the channel.
///
/// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
/// packed into a single `usize`. The lower bits represent the index, while the upper bits
/// represent the lap. The mark bit indicates that the channel is disconnected.
///
/// Messages are pushed into the tail of the channel.
tail: CachePadded<AtomicUsize>,
/// The buffer holding slots.
buffer: Box<[Slot<T>]>,
/// The channel capacity.
cap: usize,
/// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
one_lap: usize,
/// If this bit is set in the tail, that means the channel is disconnected.
mark_bit: usize,
/// Senders waiting while the channel is full.
senders: SyncWaker,
/// Receivers waiting while the channel is empty and not disconnected.
receivers: SyncWaker,
}
impl<T> Channel<T> {
/// Creates a bounded channel of capacity `cap`.
pub(crate) fn with_capacity(cap: usize) -> Self {
assert!(cap > 0, "capacity must be positive");
// Compute constants `mark_bit` and `one_lap`.
let mark_bit = (cap + 1).next_power_of_two();
let one_lap = mark_bit * 2;
// Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
let head = 0;
// Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
let tail = 0;
// Allocate a buffer of `cap` slots initialized
// with stamps.
let buffer: Box<[Slot<T>]> = (0..cap)
.map(|i| {
// Set the stamp to `{ lap: 0, mark: 0, index: i }`.
Slot { stamp: AtomicUsize::new(i), msg: UnsafeCell::new(MaybeUninit::uninit()) }
})
.collect();
Channel {
buffer,
cap,
one_lap,
mark_bit,
head: CachePadded::new(AtomicUsize::new(head)),
tail: CachePadded::new(AtomicUsize::new(tail)),
senders: SyncWaker::new(),
receivers: SyncWaker::new(),
}
}
/// Attempts to reserve a slot for sending a message.
fn start_send(&self, token: &mut Token) -> bool {
let backoff = Backoff::new();
let mut tail = self.tail.load(Ordering::Relaxed);
loop {
// Check if the channel is disconnected.
if tail & self.mark_bit != 0 {
token.array.slot = ptr::null();
token.array.stamp = 0;
return true;
}
// Deconstruct the tail.
let index = tail & (self.mark_bit - 1);
let lap = tail & !(self.one_lap - 1);
// Inspect the corresponding slot.
debug_assert!(index < self.buffer.len());
let slot = unsafe { self.buffer.get_unchecked(index) };
let stamp = slot.stamp.load(Ordering::Acquire);
// If the tail and the stamp match, we may attempt to push.
if tail == stamp {
let new_tail = if index + 1 < self.cap {
// Same lap, incremented index.
// Set to `{ lap: lap, mark: 0, index: index + 1 }`.
tail + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
lap.wrapping_add(self.one_lap)
};
// Try moving the tail.
match self.tail.compare_exchange_weak(
tail,
new_tail,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => {
// Prepare the token for the follow-up call to `write`.
token.array.slot = slot as *const Slot<T> as *const u8;
token.array.stamp = tail + 1;
return true;
}
Err(t) => {
tail = t;
backoff.spin();
}
}
} else if stamp.wrapping_add(self.one_lap) == tail + 1 {
atomic::fence(Ordering::SeqCst);
let head = self.head.load(Ordering::Relaxed);
// If the head lags one lap behind the tail as well...
if head.wrapping_add(self.one_lap) == tail {
// ...then the channel is full.
return false;
}
backoff.spin();
tail = self.tail.load(Ordering::Relaxed);
} else {
// Snooze because we need to wait for the stamp to get updated.
backoff.snooze();
tail = self.tail.load(Ordering::Relaxed);
}
}
}
/// Writes a message into the channel.
pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
// If there is no slot, the channel is disconnected.
if token.array.slot.is_null() {
return Err(msg);
}
let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
// Write the message into the slot and update the stamp.
slot.msg.get().write(MaybeUninit::new(msg));
slot.stamp.store(token.array.stamp, Ordering::Release);
// Wake a sleeping receiver.
self.receivers.notify();
Ok(())
}
/// Attempts to reserve a slot for receiving a message.
fn start_recv(&self, token: &mut Token) -> bool {
let backoff = Backoff::new();
let mut head = self.head.load(Ordering::Relaxed);
loop {
// Deconstruct the head.
let index = head & (self.mark_bit - 1);
let lap = head & !(self.one_lap - 1);
// Inspect the corresponding slot.
debug_assert!(index < self.buffer.len());
let slot = unsafe { self.buffer.get_unchecked(index) };
let stamp = slot.stamp.load(Ordering::Acquire);
// If the the stamp is ahead of the head by 1, we may attempt to pop.
if head + 1 == stamp {
let new = if index + 1 < self.cap {
// Same lap, incremented index.
// Set to `{ lap: lap, mark: 0, index: index + 1 }`.
head + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
lap.wrapping_add(self.one_lap)
};
// Try moving the head.
match self.head.compare_exchange_weak(
head,
new,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => {
// Prepare the token for the follow-up call to `read`.
token.array.slot = slot as *const Slot<T> as *const u8;
token.array.stamp = head.wrapping_add(self.one_lap);
return true;
}
Err(h) => {
head = h;
backoff.spin();
}
}
} else if stamp == head {
atomic::fence(Ordering::SeqCst);
let tail = self.tail.load(Ordering::Relaxed);
// If the tail equals the head, that means the channel is empty.
if (tail & !self.mark_bit) == head {
// If the channel is disconnected...
if tail & self.mark_bit != 0 {
// ...then receive an error.
token.array.slot = ptr::null();
token.array.stamp = 0;
return true;
} else {
// Otherwise, the receive operation is not ready.
return false;
}
}
backoff.spin();
head = self.head.load(Ordering::Relaxed);
} else {
// Snooze because we need to wait for the stamp to get updated.
backoff.snooze();
head = self.head.load(Ordering::Relaxed);
}
}
}
/// Reads a message from the channel.
pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
if token.array.slot.is_null() {
// The channel is disconnected.
return Err(());
}
let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
// Read the message from the slot and update the stamp.
let msg = slot.msg.get().read().assume_init();
slot.stamp.store(token.array.stamp, Ordering::Release);
// Wake a sleeping sender.
self.senders.notify();
Ok(msg)
}
/// Attempts to send a message into the channel.
pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
let token = &mut Token::default();
if self.start_send(token) {
unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
} else {
Err(TrySendError::Full(msg))
}
}
/// Sends a message into the channel.
pub(crate) fn send(
&self,
msg: T,
deadline: Option<Instant>,
) -> Result<(), SendTimeoutError<T>> {
let token = &mut Token::default();
loop {
// Try sending a message several times.
let backoff = Backoff::new();
loop {
if self.start_send(token) {
let res = unsafe { self.write(token, msg) };
return res.map_err(SendTimeoutError::Disconnected);
}
if backoff.is_completed() {
break;
} else {
backoff.snooze();
}
}
if let Some(d) = deadline {
if Instant::now() >= d {
return Err(SendTimeoutError::Timeout(msg));
}
}
Context::with(|cx| {
// Prepare for blocking until a receiver wakes us up.
let oper = Operation::hook(token);
self.senders.register(oper, cx);
// Has the channel become ready just now?
if !self.is_full() || self.is_disconnected() {
let _ = cx.try_select(Selected::Aborted);
}
// Block the current thread.
let sel = cx.wait_until(deadline);
match sel {
Selected::Waiting => unreachable!(),
Selected::Aborted | Selected::Disconnected => {
self.senders.unregister(oper).unwrap();
}
Selected::Operation(_) => {}
}
});
}
}
/// Attempts to receive a message without blocking.
pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
let token = &mut Token::default();
if self.start_recv(token) {
unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
} else {
Err(TryRecvError::Empty)
}
}
/// Receives a message from the channel.
pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
let token = &mut Token::default();
loop {
// Try receiving a message several times.
let backoff = Backoff::new();
loop {
if self.start_recv(token) {
let res = unsafe { self.read(token) };
return res.map_err(|_| RecvTimeoutError::Disconnected);
}
if backoff.is_completed() {
break;
} else {
backoff.snooze();
}
}
if let Some(d) = deadline {
if Instant::now() >= d {
return Err(RecvTimeoutError::Timeout);
}
}
Context::with(|cx| {
// Prepare for blocking until a sender wakes us up.
let oper = Operation::hook(token);
self.receivers.register(oper, cx);
// Has the channel become ready just now?
if !self.is_empty() || self.is_disconnected() {
let _ = cx.try_select(Selected::Aborted);
}
// Block the current thread.
let sel = cx.wait_until(deadline);
match sel {
Selected::Waiting => unreachable!(),
Selected::Aborted | Selected::Disconnected => {
self.receivers.unregister(oper).unwrap();
// If the channel was disconnected, we still have to check for remaining
// messages.
}
Selected::Operation(_) => {}
}
});
}
}
/// Returns the current number of messages inside the channel.
pub(crate) fn len(&self) -> usize {
loop {
// Load the tail, then load the head.
let tail = self.tail.load(Ordering::SeqCst);
let head = self.head.load(Ordering::SeqCst);
// If the tail didn't change, we've got consistent values to work with.
if self.tail.load(Ordering::SeqCst) == tail {
let hix = head & (self.mark_bit - 1);
let tix = tail & (self.mark_bit - 1);
return if hix < tix {
tix - hix
} else if hix > tix {
self.cap - hix + tix
} else if (tail & !self.mark_bit) == head {
0
} else {
self.cap
};
}
}
}
/// Returns the capacity of the channel.
#[allow(clippy::unnecessary_wraps)] // This is intentional.
pub(crate) fn capacity(&self) -> Option<usize> {
Some(self.cap)
}
/// Disconnects the channel and wakes up all blocked senders and receivers.
///
/// Returns `true` if this call disconnected the channel.
pub(crate) fn disconnect(&self) -> bool {
let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
if tail & self.mark_bit == 0 {
self.senders.disconnect();
self.receivers.disconnect();
true
} else {
false
}
}
/// Returns `true` if the channel is disconnected.
pub(crate) fn is_disconnected(&self) -> bool {
self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
}
/// Returns `true` if the channel is empty.
pub(crate) fn is_empty(&self) -> bool {
let head = self.head.load(Ordering::SeqCst);
let tail = self.tail.load(Ordering::SeqCst);
// Is the tail equal to the head?
//
// Note: If the head changes just before we load the tail, that means there was a moment
// when the channel was not empty, so it is safe to just return `false`.
(tail & !self.mark_bit) == head
}
/// Returns `true` if the channel is full.
pub(crate) fn is_full(&self) -> bool {
let tail = self.tail.load(Ordering::SeqCst);
let head = self.head.load(Ordering::SeqCst);
// Is the head lagging one lap behind tail?
//
// Note: If the tail changes just before we load the head, that means there was a moment
// when the channel was not full, so it is safe to just return `false`.
head.wrapping_add(self.one_lap) == tail & !self.mark_bit
}
}
impl<T> Drop for Channel<T> {
fn drop(&mut self) {
// Get the index of the head.
let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1);
// Loop over all slots that hold a message and drop them.
for i in 0..self.len() {
// Compute the index of the next slot holding a message.
let index = if hix + i < self.cap { hix + i } else { hix + i - self.cap };
unsafe {
debug_assert!(index < self.buffer.len());
let slot = self.buffer.get_unchecked_mut(index);
let msg = &mut *slot.msg.get();
msg.as_mut_ptr().drop_in_place();
}
}
}
}

View file

@ -0,0 +1,170 @@
//! Thread-local channel context.
use super::select::Selected;
use super::utils::Backoff;
use crate::cell::Cell;
use crate::ptr;
use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use crate::sync::Arc;
use crate::thread::{self, Thread, ThreadId};
use crate::time::Instant;
/// Thread-local context.
#[derive(Debug, Clone)]
pub struct Context {
inner: Arc<Inner>,
}
/// Inner representation of `Context`.
#[derive(Debug)]
struct Inner {
/// Selected operation.
select: AtomicUsize,
/// A slot into which another thread may store a pointer to its `Packet`.
packet: AtomicPtr<()>,
/// Thread handle.
thread: Thread,
/// Thread id.
thread_id: ThreadId,
}
impl Context {
/// Creates a new context for the duration of the closure.
#[inline]
pub fn with<F, R>(f: F) -> R
where
F: FnOnce(&Context) -> R,
{
thread_local! {
/// Cached thread-local context.
static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new()));
}
let mut f = Some(f);
let mut f = |cx: &Context| -> R {
let f = f.take().unwrap();
f(cx)
};
CONTEXT
.try_with(|cell| match cell.take() {
None => f(&Context::new()),
Some(cx) => {
cx.reset();
let res = f(&cx);
cell.set(Some(cx));
res
}
})
.unwrap_or_else(|_| f(&Context::new()))
}
/// Creates a new `Context`.
#[cold]
fn new() -> Context {
Context {
inner: Arc::new(Inner {
select: AtomicUsize::new(Selected::Waiting.into()),
packet: AtomicPtr::new(ptr::null_mut()),
thread: thread::current(),
thread_id: thread::current().id(),
}),
}
}
/// Resets `select` and `packet`.
#[inline]
fn reset(&self) {
self.inner.select.store(Selected::Waiting.into(), Ordering::Release);
self.inner.packet.store(ptr::null_mut(), Ordering::Release);
}
/// Attempts to select an operation.
///
/// On failure, the previously selected operation is returned.
#[inline]
pub fn try_select(&self, select: Selected) -> Result<(), Selected> {
self.inner
.select
.compare_exchange(
Selected::Waiting.into(),
select.into(),
Ordering::AcqRel,
Ordering::Acquire,
)
.map(|_| ())
.map_err(|e| e.into())
}
/// Stores a packet.
///
/// This method must be called after `try_select` succeeds and there is a packet to provide.
#[inline]
pub fn store_packet(&self, packet: *mut ()) {
if !packet.is_null() {
self.inner.packet.store(packet, Ordering::Release);
}
}
/// Waits until an operation is selected and returns it.
///
/// If the deadline is reached, `Selected::Aborted` will be selected.
#[inline]
pub fn wait_until(&self, deadline: Option<Instant>) -> Selected {
// Spin for a short time, waiting until an operation is selected.
let backoff = Backoff::new();
loop {
let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
if sel != Selected::Waiting {
return sel;
}
if backoff.is_completed() {
break;
} else {
backoff.snooze();
}
}
loop {
// Check whether an operation has been selected.
let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
if sel != Selected::Waiting {
return sel;
}
// If there's a deadline, park the current thread until the deadline is reached.
if let Some(end) = deadline {
let now = Instant::now();
if now < end {
thread::park_timeout(end - now);
} else {
// The deadline has been reached. Try aborting select.
return match self.try_select(Selected::Aborted) {
Ok(()) => Selected::Aborted,
Err(s) => s,
};
}
} else {
thread::park();
}
}
}
/// Unparks the thread this context belongs to.
#[inline]
pub fn unpark(&self) {
self.inner.thread.unpark();
}
/// Returns the id of the thread this context belongs to.
#[inline]
pub fn thread_id(&self) -> ThreadId {
self.inner.thread_id
}
}

View file

@ -0,0 +1,137 @@
use crate::ops;
use crate::process;
use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
/// Reference counter internals.
struct Counter<C> {
/// The number of senders associated with the channel.
senders: AtomicUsize,
/// The number of receivers associated with the channel.
receivers: AtomicUsize,
/// Set to `true` if the last sender or the last receiver reference deallocates the channel.
destroy: AtomicBool,
/// The internal channel.
chan: C,
}
/// Wraps a channel into the reference counter.
pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) {
let counter = Box::into_raw(Box::new(Counter {
senders: AtomicUsize::new(1),
receivers: AtomicUsize::new(1),
destroy: AtomicBool::new(false),
chan,
}));
let s = Sender { counter };
let r = Receiver { counter };
(s, r)
}
/// The sending side.
pub(crate) struct Sender<C> {
counter: *mut Counter<C>,
}
impl<C> Sender<C> {
/// Returns the internal `Counter`.
fn counter(&self) -> &Counter<C> {
unsafe { &*self.counter }
}
/// Acquires another sender reference.
pub(crate) fn acquire(&self) -> Sender<C> {
let count = self.counter().senders.fetch_add(1, Ordering::Relaxed);
// Cloning senders and calling `mem::forget` on the clones could potentially overflow the
// counter. It's very difficult to recover sensibly from such degenerate scenarios so we
// just abort when the count becomes very large.
if count > isize::MAX as usize {
process::abort();
}
Sender { counter: self.counter }
}
/// Releases the sender reference.
///
/// Function `disconnect` will be called if this is the last sender reference.
pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 {
disconnect(&self.counter().chan);
if self.counter().destroy.swap(true, Ordering::AcqRel) {
drop(Box::from_raw(self.counter));
}
}
}
}
impl<C> ops::Deref for Sender<C> {
type Target = C;
fn deref(&self) -> &C {
&self.counter().chan
}
}
impl<C> PartialEq for Sender<C> {
fn eq(&self, other: &Sender<C>) -> bool {
self.counter == other.counter
}
}
/// The receiving side.
pub(crate) struct Receiver<C> {
counter: *mut Counter<C>,
}
impl<C> Receiver<C> {
/// Returns the internal `Counter`.
fn counter(&self) -> &Counter<C> {
unsafe { &*self.counter }
}
/// Acquires another receiver reference.
pub(crate) fn acquire(&self) -> Receiver<C> {
let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed);
// Cloning receivers and calling `mem::forget` on the clones could potentially overflow the
// counter. It's very difficult to recover sensibly from such degenerate scenarios so we
// just abort when the count becomes very large.
if count > isize::MAX as usize {
process::abort();
}
Receiver { counter: self.counter }
}
/// Releases the receiver reference.
///
/// Function `disconnect` will be called if this is the last receiver reference.
pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
disconnect(&self.counter().chan);
if self.counter().destroy.swap(true, Ordering::AcqRel) {
drop(Box::from_raw(self.counter));
}
}
}
}
impl<C> ops::Deref for Receiver<C> {
type Target = C;
fn deref(&self) -> &C {
&self.counter().chan
}
}
impl<C> PartialEq for Receiver<C> {
fn eq(&self, other: &Receiver<C>) -> bool {
self.counter == other.counter
}
}

View file

@ -0,0 +1,46 @@
use crate::error;
use crate::fmt;
pub use crate::sync::mpsc::{RecvError, RecvTimeoutError, SendError, TryRecvError, TrySendError};
/// An error returned from the [`send_timeout`] method.
///
/// The error contains the message being sent so it can be recovered.
///
/// [`send_timeout`]: super::Sender::send_timeout
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum SendTimeoutError<T> {
/// The message could not be sent because the channel is full and the operation timed out.
///
/// If this is a zero-capacity channel, then the error indicates that there was no receiver
/// available to receive the message and the operation timed out.
Timeout(T),
/// The message could not be sent because the channel is disconnected.
Disconnected(T),
}
impl<T> fmt::Debug for SendTimeoutError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
"SendTimeoutError(..)".fmt(f)
}
}
impl<T> fmt::Display for SendTimeoutError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
SendTimeoutError::Timeout(..) => "timed out waiting on send operation".fmt(f),
SendTimeoutError::Disconnected(..) => "sending on a disconnected channel".fmt(f),
}
}
}
impl<T: Send> error::Error for SendTimeoutError<T> {}
impl<T> From<SendError<T>> for SendTimeoutError<T> {
fn from(err: SendError<T>) -> SendTimeoutError<T> {
match err {
SendError(e) => SendTimeoutError::Disconnected(e),
}
}
}

View file

@ -0,0 +1,648 @@
//! Unbounded channel implemented as a linked list.
use super::context::Context;
use super::error::*;
use super::select::{Operation, Selected, Token};
use super::utils::{Backoff, CachePadded};
use super::waker::SyncWaker;
use crate::cell::UnsafeCell;
use crate::marker::PhantomData;
use crate::mem::MaybeUninit;
use crate::ptr;
use crate::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
use crate::time::Instant;
// Bits indicating the state of a slot:
// * If a message has been written into the slot, `WRITE` is set.
// * If a message has been read from the slot, `READ` is set.
// * If the block is being destroyed, `DESTROY` is set.
const WRITE: usize = 1;
const READ: usize = 2;
const DESTROY: usize = 4;
// Each block covers one "lap" of indices.
const LAP: usize = 32;
// The maximum number of messages a block can hold.
const BLOCK_CAP: usize = LAP - 1;
// How many lower bits are reserved for metadata.
const SHIFT: usize = 1;
// Has two different purposes:
// * If set in head, indicates that the block is not the last one.
// * If set in tail, indicates that the channel is disconnected.
const MARK_BIT: usize = 1;
/// A slot in a block.
struct Slot<T> {
/// The message.
msg: UnsafeCell<MaybeUninit<T>>,
/// The state of the slot.
state: AtomicUsize,
}
impl<T> Slot<T> {
/// Waits until a message is written into the slot.
fn wait_write(&self) {
let backoff = Backoff::new();
while self.state.load(Ordering::Acquire) & WRITE == 0 {
backoff.snooze();
}
}
}
/// A block in a linked list.
///
/// Each block in the list can hold up to `BLOCK_CAP` messages.
struct Block<T> {
/// The next block in the linked list.
next: AtomicPtr<Block<T>>,
/// Slots for messages.
slots: [Slot<T>; BLOCK_CAP],
}
impl<T> Block<T> {
/// Creates an empty block.
fn new() -> Block<T> {
// SAFETY: This is safe because:
// [1] `Block::next` (AtomicPtr) may be safely zero initialized.
// [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
// [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it
// holds a MaybeUninit.
// [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
unsafe { MaybeUninit::zeroed().assume_init() }
}
/// Waits until the next pointer is set.
fn wait_next(&self) -> *mut Block<T> {
let backoff = Backoff::new();
loop {
let next = self.next.load(Ordering::Acquire);
if !next.is_null() {
return next;
}
backoff.snooze();
}
}
/// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
unsafe fn destroy(this: *mut Block<T>, start: usize) {
// It is not necessary to set the `DESTROY` bit in the last slot because that slot has
// begun destruction of the block.
for i in start..BLOCK_CAP - 1 {
let slot = (*this).slots.get_unchecked(i);
// Mark the `DESTROY` bit if a thread is still using the slot.
if slot.state.load(Ordering::Acquire) & READ == 0
&& slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
{
// If a thread is still using the slot, it will continue destruction of the block.
return;
}
}
// No thread is using the block, now it is safe to destroy it.
drop(Box::from_raw(this));
}
}
/// A position in a channel.
#[derive(Debug)]
struct Position<T> {
/// The index in the channel.
index: AtomicUsize,
/// The block in the linked list.
block: AtomicPtr<Block<T>>,
}
/// The token type for the list flavor.
#[derive(Debug)]
pub(crate) struct ListToken {
/// The block of slots.
block: *const u8,
/// The offset into the block.
offset: usize,
}
impl Default for ListToken {
#[inline]
fn default() -> Self {
ListToken { block: ptr::null(), offset: 0 }
}
}
/// Unbounded channel implemented as a linked list.
///
/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
/// represented as numbers of type `usize` and wrap on overflow.
///
/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
/// improve cache efficiency.
pub(crate) struct Channel<T> {
/// The head of the channel.
head: CachePadded<Position<T>>,
/// The tail of the channel.
tail: CachePadded<Position<T>>,
/// Receivers waiting while the channel is empty and not disconnected.
receivers: SyncWaker,
/// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
_marker: PhantomData<T>,
}
impl<T> Channel<T> {
/// Creates a new unbounded channel.
pub(crate) fn new() -> Self {
Channel {
head: CachePadded::new(Position {
block: AtomicPtr::new(ptr::null_mut()),
index: AtomicUsize::new(0),
}),
tail: CachePadded::new(Position {
block: AtomicPtr::new(ptr::null_mut()),
index: AtomicUsize::new(0),
}),
receivers: SyncWaker::new(),
_marker: PhantomData,
}
}
/// Attempts to reserve a slot for sending a message.
fn start_send(&self, token: &mut Token) -> bool {
let backoff = Backoff::new();
let mut tail = self.tail.index.load(Ordering::Acquire);
let mut block = self.tail.block.load(Ordering::Acquire);
let mut next_block = None;
loop {
// Check if the channel is disconnected.
if tail & MARK_BIT != 0 {
token.list.block = ptr::null();
return true;
}
// Calculate the offset of the index into the block.
let offset = (tail >> SHIFT) % LAP;
// If we reached the end of the block, wait until the next one is installed.
if offset == BLOCK_CAP {
backoff.snooze();
tail = self.tail.index.load(Ordering::Acquire);
block = self.tail.block.load(Ordering::Acquire);
continue;
}
// If we're going to have to install the next block, allocate it in advance in order to
// make the wait for other threads as short as possible.
if offset + 1 == BLOCK_CAP && next_block.is_none() {
next_block = Some(Box::new(Block::<T>::new()));
}
// If this is the first message to be sent into the channel, we need to allocate the
// first block and install it.
if block.is_null() {
let new = Box::into_raw(Box::new(Block::<T>::new()));
if self
.tail
.block
.compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
.is_ok()
{
self.head.block.store(new, Ordering::Release);
block = new;
} else {
next_block = unsafe { Some(Box::from_raw(new)) };
tail = self.tail.index.load(Ordering::Acquire);
block = self.tail.block.load(Ordering::Acquire);
continue;
}
}
let new_tail = tail + (1 << SHIFT);
// Try advancing the tail forward.
match self.tail.index.compare_exchange_weak(
tail,
new_tail,
Ordering::SeqCst,
Ordering::Acquire,
) {
Ok(_) => unsafe {
// If we've reached the end of the block, install the next one.
if offset + 1 == BLOCK_CAP {
let next_block = Box::into_raw(next_block.unwrap());
self.tail.block.store(next_block, Ordering::Release);
self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
(*block).next.store(next_block, Ordering::Release);
}
token.list.block = block as *const u8;
token.list.offset = offset;
return true;
},
Err(t) => {
tail = t;
block = self.tail.block.load(Ordering::Acquire);
backoff.spin();
}
}
}
}
/// Writes a message into the channel.
pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
// If there is no slot, the channel is disconnected.
if token.list.block.is_null() {
return Err(msg);
}
// Write the message into the slot.
let block = token.list.block as *mut Block<T>;
let offset = token.list.offset;
let slot = (*block).slots.get_unchecked(offset);
slot.msg.get().write(MaybeUninit::new(msg));
slot.state.fetch_or(WRITE, Ordering::Release);
// Wake a sleeping receiver.
self.receivers.notify();
Ok(())
}
/// Attempts to reserve a slot for receiving a message.
fn start_recv(&self, token: &mut Token) -> bool {
let backoff = Backoff::new();
let mut head = self.head.index.load(Ordering::Acquire);
let mut block = self.head.block.load(Ordering::Acquire);
loop {
// Calculate the offset of the index into the block.
let offset = (head >> SHIFT) % LAP;
// If we reached the end of the block, wait until the next one is installed.
if offset == BLOCK_CAP {
backoff.snooze();
head = self.head.index.load(Ordering::Acquire);
block = self.head.block.load(Ordering::Acquire);
continue;
}
let mut new_head = head + (1 << SHIFT);
if new_head & MARK_BIT == 0 {
atomic::fence(Ordering::SeqCst);
let tail = self.tail.index.load(Ordering::Relaxed);
// If the tail equals the head, that means the channel is empty.
if head >> SHIFT == tail >> SHIFT {
// If the channel is disconnected...
if tail & MARK_BIT != 0 {
// ...then receive an error.
token.list.block = ptr::null();
return true;
} else {
// Otherwise, the receive operation is not ready.
return false;
}
}
// If head and tail are not in the same block, set `MARK_BIT` in head.
if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
new_head |= MARK_BIT;
}
}
// The block can be null here only if the first message is being sent into the channel.
// In that case, just wait until it gets initialized.
if block.is_null() {
backoff.snooze();
head = self.head.index.load(Ordering::Acquire);
block = self.head.block.load(Ordering::Acquire);
continue;
}
// Try moving the head index forward.
match self.head.index.compare_exchange_weak(
head,
new_head,
Ordering::SeqCst,
Ordering::Acquire,
) {
Ok(_) => unsafe {
// If we've reached the end of the block, move to the next one.
if offset + 1 == BLOCK_CAP {
let next = (*block).wait_next();
let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
if !(*next).next.load(Ordering::Relaxed).is_null() {
next_index |= MARK_BIT;
}
self.head.block.store(next, Ordering::Release);
self.head.index.store(next_index, Ordering::Release);
}
token.list.block = block as *const u8;
token.list.offset = offset;
return true;
},
Err(h) => {
head = h;
block = self.head.block.load(Ordering::Acquire);
backoff.spin();
}
}
}
}
/// Reads a message from the channel.
pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
if token.list.block.is_null() {
// The channel is disconnected.
return Err(());
}
// Read the message.
let block = token.list.block as *mut Block<T>;
let offset = token.list.offset;
let slot = (*block).slots.get_unchecked(offset);
slot.wait_write();
let msg = slot.msg.get().read().assume_init();
// Destroy the block if we've reached the end, or if another thread wanted to destroy but
// couldn't because we were busy reading from the slot.
if offset + 1 == BLOCK_CAP {
Block::destroy(block, 0);
} else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
Block::destroy(block, offset + 1);
}
Ok(msg)
}
/// Attempts to send a message into the channel.
pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
self.send(msg, None).map_err(|err| match err {
SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
SendTimeoutError::Timeout(_) => unreachable!(),
})
}
/// Sends a message into the channel.
pub(crate) fn send(
&self,
msg: T,
_deadline: Option<Instant>,
) -> Result<(), SendTimeoutError<T>> {
let token = &mut Token::default();
assert!(self.start_send(token));
unsafe { self.write(token, msg).map_err(SendTimeoutError::Disconnected) }
}
/// Attempts to receive a message without blocking.
pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
let token = &mut Token::default();
if self.start_recv(token) {
unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
} else {
Err(TryRecvError::Empty)
}
}
/// Receives a message from the channel.
pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
let token = &mut Token::default();
loop {
// Try receiving a message several times.
let backoff = Backoff::new();
loop {
if self.start_recv(token) {
unsafe {
return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
}
}
if backoff.is_completed() {
break;
} else {
backoff.snooze();
}
}
if let Some(d) = deadline {
if Instant::now() >= d {
return Err(RecvTimeoutError::Timeout);
}
}
// Prepare for blocking until a sender wakes us up.
Context::with(|cx| {
let oper = Operation::hook(token);
self.receivers.register(oper, cx);
// Has the channel become ready just now?
if !self.is_empty() || self.is_disconnected() {
let _ = cx.try_select(Selected::Aborted);
}
// Block the current thread.
let sel = cx.wait_until(deadline);
match sel {
Selected::Waiting => unreachable!(),
Selected::Aborted | Selected::Disconnected => {
self.receivers.unregister(oper).unwrap();
// If the channel was disconnected, we still have to check for remaining
// messages.
}
Selected::Operation(_) => {}
}
});
}
}
/// Returns the current number of messages inside the channel.
pub(crate) fn len(&self) -> usize {
loop {
// Load the tail index, then load the head index.
let mut tail = self.tail.index.load(Ordering::SeqCst);
let mut head = self.head.index.load(Ordering::SeqCst);
// If the tail index didn't change, we've got consistent indices to work with.
if self.tail.index.load(Ordering::SeqCst) == tail {
// Erase the lower bits.
tail &= !((1 << SHIFT) - 1);
head &= !((1 << SHIFT) - 1);
// Fix up indices if they fall onto block ends.
if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
tail = tail.wrapping_add(1 << SHIFT);
}
if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
head = head.wrapping_add(1 << SHIFT);
}
// Rotate indices so that head falls into the first block.
let lap = (head >> SHIFT) / LAP;
tail = tail.wrapping_sub((lap * LAP) << SHIFT);
head = head.wrapping_sub((lap * LAP) << SHIFT);
// Remove the lower bits.
tail >>= SHIFT;
head >>= SHIFT;
// Return the difference minus the number of blocks between tail and head.
return tail - head - tail / LAP;
}
}
}
/// Returns the capacity of the channel.
pub(crate) fn capacity(&self) -> Option<usize> {
None
}
/// Disconnects senders and wakes up all blocked receivers.
///
/// Returns `true` if this call disconnected the channel.
pub(crate) fn disconnect_senders(&self) -> bool {
let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
if tail & MARK_BIT == 0 {
self.receivers.disconnect();
true
} else {
false
}
}
/// Disconnects receivers.
///
/// Returns `true` if this call disconnected the channel.
pub(crate) fn disconnect_receivers(&self) -> bool {
let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
if tail & MARK_BIT == 0 {
// If receivers are dropped first, discard all messages to free
// memory eagerly.
self.discard_all_messages();
true
} else {
false
}
}
/// Discards all messages.
///
/// This method should only be called when all receivers are dropped.
fn discard_all_messages(&self) {
let backoff = Backoff::new();
let mut tail = self.tail.index.load(Ordering::Acquire);
loop {
let offset = (tail >> SHIFT) % LAP;
if offset != BLOCK_CAP {
break;
}
// New updates to tail will be rejected by MARK_BIT and aborted unless it's
// at boundary. We need to wait for the updates take affect otherwise there
// can be memory leaks.
backoff.snooze();
tail = self.tail.index.load(Ordering::Acquire);
}
let mut head = self.head.index.load(Ordering::Acquire);
let mut block = self.head.block.load(Ordering::Acquire);
unsafe {
// Drop all messages between head and tail and deallocate the heap-allocated blocks.
while head >> SHIFT != tail >> SHIFT {
let offset = (head >> SHIFT) % LAP;
if offset < BLOCK_CAP {
// Drop the message in the slot.
let slot = (*block).slots.get_unchecked(offset);
slot.wait_write();
let p = &mut *slot.msg.get();
p.as_mut_ptr().drop_in_place();
} else {
(*block).wait_next();
// Deallocate the block and move to the next one.
let next = (*block).next.load(Ordering::Acquire);
drop(Box::from_raw(block));
block = next;
}
head = head.wrapping_add(1 << SHIFT);
}
// Deallocate the last remaining block.
if !block.is_null() {
drop(Box::from_raw(block));
}
}
head &= !MARK_BIT;
self.head.block.store(ptr::null_mut(), Ordering::Release);
self.head.index.store(head, Ordering::Release);
}
/// Returns `true` if the channel is disconnected.
pub(crate) fn is_disconnected(&self) -> bool {
self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
}
/// Returns `true` if the channel is empty.
pub(crate) fn is_empty(&self) -> bool {
let head = self.head.index.load(Ordering::SeqCst);
let tail = self.tail.index.load(Ordering::SeqCst);
head >> SHIFT == tail >> SHIFT
}
/// Returns `true` if the channel is full.
pub(crate) fn is_full(&self) -> bool {
false
}
}
impl<T> Drop for Channel<T> {
fn drop(&mut self) {
let mut head = self.head.index.load(Ordering::Relaxed);
let mut tail = self.tail.index.load(Ordering::Relaxed);
let mut block = self.head.block.load(Ordering::Relaxed);
// Erase the lower bits.
head &= !((1 << SHIFT) - 1);
tail &= !((1 << SHIFT) - 1);
unsafe {
// Drop all messages between head and tail and deallocate the heap-allocated blocks.
while head != tail {
let offset = (head >> SHIFT) % LAP;
if offset < BLOCK_CAP {
// Drop the message in the slot.
let slot = (*block).slots.get_unchecked(offset);
let p = &mut *slot.msg.get();
p.as_mut_ptr().drop_in_place();
} else {
// Deallocate the block and move to the next one.
let next = (*block).next.load(Ordering::Relaxed);
drop(Box::from_raw(block));
block = next;
}
head = head.wrapping_add(1 << SHIFT);
}
// Deallocate the last remaining block.
if !block.is_null() {
drop(Box::from_raw(block));
}
}
}
}

View file

@ -0,0 +1,430 @@
//! Multi-producer multi-consumer channels.
// This module is not currently exposed publicly, but is used
// as the implementation for the channels in `sync::mpsc`. The
// implementation comes from the crossbeam-channel crate:
//
// Copyright (c) 2019 The Crossbeam Project Developers
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
mod array;
mod context;
mod counter;
mod error;
mod list;
mod select;
mod utils;
mod waker;
mod zero;
use crate::fmt;
use crate::panic::{RefUnwindSafe, UnwindSafe};
use crate::time::{Duration, Instant};
use error::*;
/// Creates a channel of unbounded capacity.
///
/// This channel has a growable buffer that can hold any number of messages at a time.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let (s, r) = counter::new(list::Channel::new());
let s = Sender { flavor: SenderFlavor::List(s) };
let r = Receiver { flavor: ReceiverFlavor::List(r) };
(s, r)
}
/// Creates a channel of bounded capacity.
///
/// This channel has a buffer that can hold at most `cap` messages at a time.
///
/// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
/// receive operations must appear at the same time in order to pair up and pass the message over.
pub fn sync_channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
if cap == 0 {
let (s, r) = counter::new(zero::Channel::new());
let s = Sender { flavor: SenderFlavor::Zero(s) };
let r = Receiver { flavor: ReceiverFlavor::Zero(r) };
(s, r)
} else {
let (s, r) = counter::new(array::Channel::with_capacity(cap));
let s = Sender { flavor: SenderFlavor::Array(s) };
let r = Receiver { flavor: ReceiverFlavor::Array(r) };
(s, r)
}
}
/// The sending side of a channel.
pub struct Sender<T> {
flavor: SenderFlavor<T>,
}
/// Sender flavors.
enum SenderFlavor<T> {
/// Bounded channel based on a preallocated array.
Array(counter::Sender<array::Channel<T>>),
/// Unbounded channel implemented as a linked list.
List(counter::Sender<list::Channel<T>>),
/// Zero-capacity channel.
Zero(counter::Sender<zero::Channel<T>>),
}
unsafe impl<T: Send> Send for Sender<T> {}
unsafe impl<T: Send> Sync for Sender<T> {}
impl<T> UnwindSafe for Sender<T> {}
impl<T> RefUnwindSafe for Sender<T> {}
impl<T> Sender<T> {
/// Attempts to send a message into the channel without blocking.
///
/// This method will either send a message into the channel immediately or return an error if
/// the channel is full or disconnected. The returned error contains the original message.
///
/// If called on a zero-capacity channel, this method will send the message only if there
/// happens to be a receive operation on the other side of the channel at the same time.
pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.try_send(msg),
SenderFlavor::List(chan) => chan.try_send(msg),
SenderFlavor::Zero(chan) => chan.try_send(msg),
}
}
/// Blocks the current thread until a message is sent or the channel is disconnected.
///
/// If the channel is full and not disconnected, this call will block until the send operation
/// can proceed. If the channel becomes disconnected, this call will wake up and return an
/// error. The returned error contains the original message.
///
/// If called on a zero-capacity channel, this method will wait for a receive operation to
/// appear on the other side of the channel.
pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.send(msg, None),
SenderFlavor::List(chan) => chan.send(msg, None),
SenderFlavor::Zero(chan) => chan.send(msg, None),
}
.map_err(|err| match err {
SendTimeoutError::Disconnected(msg) => SendError(msg),
SendTimeoutError::Timeout(_) => unreachable!(),
})
}
}
// The methods below are not used by `sync::mpsc`, but
// are useful and we'll likely want to expose them
// eventually
#[allow(unused)]
impl<T> Sender<T> {
/// Waits for a message to be sent into the channel, but only for a limited time.
///
/// If the channel is full and not disconnected, this call will block until the send operation
/// can proceed or the operation times out. If the channel becomes disconnected, this call will
/// wake up and return an error. The returned error contains the original message.
///
/// If called on a zero-capacity channel, this method will wait for a receive operation to
/// appear on the other side of the channel.
pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
match Instant::now().checked_add(timeout) {
Some(deadline) => self.send_deadline(msg, deadline),
// So far in the future that it's practically the same as waiting indefinitely.
None => self.send(msg).map_err(SendTimeoutError::from),
}
}
/// Waits for a message to be sent into the channel, but only until a given deadline.
///
/// If the channel is full and not disconnected, this call will block until the send operation
/// can proceed or the operation times out. If the channel becomes disconnected, this call will
/// wake up and return an error. The returned error contains the original message.
///
/// If called on a zero-capacity channel, this method will wait for a receive operation to
/// appear on the other side of the channel.
pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
}
}
/// Returns `true` if the channel is empty.
///
/// Note: Zero-capacity channels are always empty.
pub fn is_empty(&self) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.is_empty(),
SenderFlavor::List(chan) => chan.is_empty(),
SenderFlavor::Zero(chan) => chan.is_empty(),
}
}
/// Returns `true` if the channel is full.
///
/// Note: Zero-capacity channels are always full.
pub fn is_full(&self) -> bool {
match &self.flavor {
SenderFlavor::Array(chan) => chan.is_full(),
SenderFlavor::List(chan) => chan.is_full(),
SenderFlavor::Zero(chan) => chan.is_full(),
}
}
/// Returns the number of messages in the channel.
pub fn len(&self) -> usize {
match &self.flavor {
SenderFlavor::Array(chan) => chan.len(),
SenderFlavor::List(chan) => chan.len(),
SenderFlavor::Zero(chan) => chan.len(),
}
}
/// If the channel is bounded, returns its capacity.
pub fn capacity(&self) -> Option<usize> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.capacity(),
SenderFlavor::List(chan) => chan.capacity(),
SenderFlavor::Zero(chan) => chan.capacity(),
}
}
/// Returns `true` if senders belong to the same channel.
pub fn same_channel(&self, other: &Sender<T>) -> bool {
match (&self.flavor, &other.flavor) {
(SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b,
(SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b,
(SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b,
_ => false,
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
unsafe {
match &self.flavor {
SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
}
}
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
let flavor = match &self.flavor {
SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
};
Sender { flavor }
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Sender { .. }")
}
}
/// The receiving side of a channel.
pub struct Receiver<T> {
flavor: ReceiverFlavor<T>,
}
/// Receiver flavors.
enum ReceiverFlavor<T> {
/// Bounded channel based on a preallocated array.
Array(counter::Receiver<array::Channel<T>>),
/// Unbounded channel implemented as a linked list.
List(counter::Receiver<list::Channel<T>>),
/// Zero-capacity channel.
Zero(counter::Receiver<zero::Channel<T>>),
}
unsafe impl<T: Send> Send for Receiver<T> {}
unsafe impl<T: Send> Sync for Receiver<T> {}
impl<T> UnwindSafe for Receiver<T> {}
impl<T> RefUnwindSafe for Receiver<T> {}
impl<T> Receiver<T> {
/// Attempts to receive a message from the channel without blocking.
///
/// This method will either receive a message from the channel immediately or return an error
/// if the channel is empty.
///
/// If called on a zero-capacity channel, this method will receive a message only if there
/// happens to be a send operation on the other side of the channel at the same time.
pub fn try_recv(&self) -> Result<T, TryRecvError> {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.try_recv(),
ReceiverFlavor::List(chan) => chan.try_recv(),
ReceiverFlavor::Zero(chan) => chan.try_recv(),
}
}
/// Blocks the current thread until a message is received or the channel is empty and
/// disconnected.
///
/// If the channel is empty and not disconnected, this call will block until the receive
/// operation can proceed. If the channel is empty and becomes disconnected, this call will
/// wake up and return an error.
///
/// If called on a zero-capacity channel, this method will wait for a send operation to appear
/// on the other side of the channel.
pub fn recv(&self) -> Result<T, RecvError> {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.recv(None),
ReceiverFlavor::List(chan) => chan.recv(None),
ReceiverFlavor::Zero(chan) => chan.recv(None),
}
.map_err(|_| RecvError)
}
/// Waits for a message to be received from the channel, but only for a limited time.
///
/// If the channel is empty and not disconnected, this call will block until the receive
/// operation can proceed or the operation times out. If the channel is empty and becomes
/// disconnected, this call will wake up and return an error.
///
/// If called on a zero-capacity channel, this method will wait for a send operation to appear
/// on the other side of the channel.
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
match Instant::now().checked_add(timeout) {
Some(deadline) => self.recv_deadline(deadline),
// So far in the future that it's practically the same as waiting indefinitely.
None => self.recv().map_err(RecvTimeoutError::from),
}
}
/// Waits for a message to be received from the channel, but only for a limited time.
///
/// If the channel is empty and not disconnected, this call will block until the receive
/// operation can proceed or the operation times out. If the channel is empty and becomes
/// disconnected, this call will wake up and return an error.
///
/// If called on a zero-capacity channel, this method will wait for a send operation to appear
/// on the other side of the channel.
pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
}
}
}
// The methods below are not used by `sync::mpsc`, but
// are useful and we'll likely want to expose them
// eventually
#[allow(unused)]
impl<T> Receiver<T> {
/// Returns `true` if the channel is empty.
///
/// Note: Zero-capacity channels are always empty.
pub fn is_empty(&self) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.is_empty(),
ReceiverFlavor::List(chan) => chan.is_empty(),
ReceiverFlavor::Zero(chan) => chan.is_empty(),
}
}
/// Returns `true` if the channel is full.
///
/// Note: Zero-capacity channels are always full.
pub fn is_full(&self) -> bool {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.is_full(),
ReceiverFlavor::List(chan) => chan.is_full(),
ReceiverFlavor::Zero(chan) => chan.is_full(),
}
}
/// Returns the number of messages in the channel.
pub fn len(&self) -> usize {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.len(),
ReceiverFlavor::List(chan) => chan.len(),
ReceiverFlavor::Zero(chan) => chan.len(),
}
}
/// If the channel is bounded, returns its capacity.
pub fn capacity(&self) -> Option<usize> {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.capacity(),
ReceiverFlavor::List(chan) => chan.capacity(),
ReceiverFlavor::Zero(chan) => chan.capacity(),
}
}
/// Returns `true` if receivers belong to the same channel.
pub fn same_channel(&self, other: &Receiver<T>) -> bool {
match (&self.flavor, &other.flavor) {
(ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
(ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
(ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
_ => false,
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
unsafe {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
}
}
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
let flavor = match &self.flavor {
ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
};
Receiver { flavor }
}
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Receiver { .. }")
}
}

View file

@ -0,0 +1,71 @@
/// Temporary data that gets initialized during a blocking operation, and is consumed by
/// `read` or `write`.
///
/// Each field contains data associated with a specific channel flavor.
#[derive(Debug, Default)]
pub struct Token {
pub(crate) array: super::array::ArrayToken,
pub(crate) list: super::list::ListToken,
#[allow(dead_code)]
pub(crate) zero: super::zero::ZeroToken,
}
/// Identifier associated with an operation by a specific thread on a specific channel.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Operation(usize);
impl Operation {
/// Creates an operation identifier from a mutable reference.
///
/// This function essentially just turns the address of the reference into a number. The
/// reference should point to a variable that is specific to the thread and the operation,
/// and is alive for the entire duration of a blocking operation.
#[inline]
pub fn hook<T>(r: &mut T) -> Operation {
let val = r as *mut T as usize;
// Make sure that the pointer address doesn't equal the numerical representation of
// `Selected::{Waiting, Aborted, Disconnected}`.
assert!(val > 2);
Operation(val)
}
}
/// Current state of a blocking operation.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Selected {
/// Still waiting for an operation.
Waiting,
/// The attempt to block the current thread has been aborted.
Aborted,
/// An operation became ready because a channel is disconnected.
Disconnected,
/// An operation became ready because a message can be sent or received.
Operation(Operation),
}
impl From<usize> for Selected {
#[inline]
fn from(val: usize) -> Selected {
match val {
0 => Selected::Waiting,
1 => Selected::Aborted,
2 => Selected::Disconnected,
oper => Selected::Operation(Operation(oper)),
}
}
}
impl Into<usize> for Selected {
#[inline]
fn into(self) -> usize {
match self {
Selected::Waiting => 0,
Selected::Aborted => 1,
Selected::Disconnected => 2,
Selected::Operation(Operation(val)) => val,
}
}
}

View file

@ -0,0 +1,143 @@
use crate::cell::Cell;
use crate::ops::{Deref, DerefMut};
/// Pads and aligns a value to the length of a cache line.
#[derive(Clone, Copy, Default, Hash, PartialEq, Eq)]
// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache
// lines at a time, so we have to align to 128 bytes rather than 64.
//
// Sources:
// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107
//
// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size.
//
// Sources:
// - https://www.mono-project.com/news/2016/09/12/arm64-icache/
//
// powerpc64 has 128-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9
#[cfg_attr(
any(target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64",),
repr(align(128))
)]
// arm, mips, mips64, and riscv64 have 32-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7
#[cfg_attr(
any(
target_arch = "arm",
target_arch = "mips",
target_arch = "mips64",
target_arch = "riscv64",
),
repr(align(32))
)]
// s390x has 256-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7
#[cfg_attr(target_arch = "s390x", repr(align(256)))]
// x86 and wasm have 64-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7
//
// All others are assumed to have 64-byte cache line size.
#[cfg_attr(
not(any(
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "powerpc64",
target_arch = "arm",
target_arch = "mips",
target_arch = "mips64",
target_arch = "riscv64",
target_arch = "s390x",
)),
repr(align(64))
)]
pub struct CachePadded<T> {
value: T,
}
impl<T> CachePadded<T> {
/// Pads and aligns a value to the length of a cache line.
pub fn new(value: T) -> CachePadded<T> {
CachePadded::<T> { value }
}
}
impl<T> Deref for CachePadded<T> {
type Target = T;
fn deref(&self) -> &T {
&self.value
}
}
impl<T> DerefMut for CachePadded<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.value
}
}
const SPIN_LIMIT: u32 = 6;
const YIELD_LIMIT: u32 = 10;
/// Performs exponential backoff in spin loops.
pub struct Backoff {
step: Cell<u32>,
}
impl Backoff {
/// Creates a new `Backoff`.
pub fn new() -> Self {
Backoff { step: Cell::new(0) }
}
/// Backs off in a lock-free loop.
///
/// This method should be used when we need to retry an operation because another thread made
/// progress.
#[inline]
pub fn spin(&self) {
for _ in 0..1 << self.step.get().min(SPIN_LIMIT) {
crate::hint::spin_loop();
}
if self.step.get() <= SPIN_LIMIT {
self.step.set(self.step.get() + 1);
}
}
/// Backs off in a blocking loop.
#[inline]
pub fn snooze(&self) {
if self.step.get() <= SPIN_LIMIT {
for _ in 0..1 << self.step.get() {
crate::hint::spin_loop()
}
} else {
crate::thread::yield_now();
}
if self.step.get() <= YIELD_LIMIT {
self.step.set(self.step.get() + 1);
}
}
/// Returns `true` if exponential backoff has completed and blocking the thread is advised.
#[inline]
pub fn is_completed(&self) -> bool {
self.step.get() > YIELD_LIMIT
}
}

View file

@ -0,0 +1,207 @@
//! Waking mechanism for threads blocked on channel operations.
use super::context::Context;
use super::select::{Operation, Selected};
use crate::ptr;
use crate::sync::atomic::{AtomicBool, Ordering};
use crate::sync::Mutex;
use crate::thread::{self, ThreadId};
/// Represents a thread blocked on a specific channel operation.
pub(crate) struct Entry {
/// The operation.
pub(crate) oper: Operation,
/// Optional packet.
pub(crate) packet: *mut (),
/// Context associated with the thread owning this operation.
pub(crate) cx: Context,
}
/// A queue of threads blocked on channel operations.
///
/// This data structure is used by threads to register blocking operations and get woken up once
/// an operation becomes ready.
pub(crate) struct Waker {
/// A list of select operations.
selectors: Vec<Entry>,
/// A list of operations waiting to be ready.
observers: Vec<Entry>,
}
impl Waker {
/// Creates a new `Waker`.
#[inline]
pub(crate) fn new() -> Self {
Waker { selectors: Vec::new(), observers: Vec::new() }
}
/// Registers a select operation.
#[inline]
pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
self.register_with_packet(oper, ptr::null_mut(), cx);
}
/// Registers a select operation and a packet.
#[inline]
pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
self.selectors.push(Entry { oper, packet, cx: cx.clone() });
}
/// Unregisters a select operation.
#[inline]
pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
if let Some((i, _)) =
self.selectors.iter().enumerate().find(|&(_, entry)| entry.oper == oper)
{
let entry = self.selectors.remove(i);
Some(entry)
} else {
None
}
}
/// Attempts to find another thread's entry, select the operation, and wake it up.
#[inline]
pub(crate) fn try_select(&mut self) -> Option<Entry> {
self.selectors
.iter()
.position(|selector| {
// Does the entry belong to a different thread?
selector.cx.thread_id() != current_thread_id()
&& selector // Try selecting this operation.
.cx
.try_select(Selected::Operation(selector.oper))
.is_ok()
&& {
// Provide the packet.
selector.cx.store_packet(selector.packet);
// Wake the thread up.
selector.cx.unpark();
true
}
})
// Remove the entry from the queue to keep it clean and improve
// performance.
.map(|pos| self.selectors.remove(pos))
}
/// Notifies all operations waiting to be ready.
#[inline]
pub(crate) fn notify(&mut self) {
for entry in self.observers.drain(..) {
if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
entry.cx.unpark();
}
}
}
/// Notifies all registered operations that the channel is disconnected.
#[inline]
pub(crate) fn disconnect(&mut self) {
for entry in self.selectors.iter() {
if entry.cx.try_select(Selected::Disconnected).is_ok() {
// Wake the thread up.
//
// Here we don't remove the entry from the queue. Registered threads must
// unregister from the waker by themselves. They might also want to recover the
// packet value and destroy it, if necessary.
entry.cx.unpark();
}
}
self.notify();
}
}
impl Drop for Waker {
#[inline]
fn drop(&mut self) {
debug_assert_eq!(self.selectors.len(), 0);
debug_assert_eq!(self.observers.len(), 0);
}
}
/// A waker that can be shared among threads without locking.
///
/// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization.
pub(crate) struct SyncWaker {
/// The inner `Waker`.
inner: Mutex<Waker>,
/// `true` if the waker is empty.
is_empty: AtomicBool,
}
impl SyncWaker {
/// Creates a new `SyncWaker`.
#[inline]
pub(crate) fn new() -> Self {
SyncWaker { inner: Mutex::new(Waker::new()), is_empty: AtomicBool::new(true) }
}
/// Registers the current thread with an operation.
#[inline]
pub(crate) fn register(&self, oper: Operation, cx: &Context) {
let mut inner = self.inner.lock().unwrap();
inner.register(oper, cx);
self.is_empty
.store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
}
/// Unregisters an operation previously registered by the current thread.
#[inline]
pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
let mut inner = self.inner.lock().unwrap();
let entry = inner.unregister(oper);
self.is_empty
.store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
entry
}
/// Attempts to find one thread (not the current one), select its operation, and wake it up.
#[inline]
pub(crate) fn notify(&self) {
if !self.is_empty.load(Ordering::SeqCst) {
let mut inner = self.inner.lock().unwrap();
if !self.is_empty.load(Ordering::SeqCst) {
inner.try_select();
inner.notify();
self.is_empty.store(
inner.selectors.is_empty() && inner.observers.is_empty(),
Ordering::SeqCst,
);
}
}
}
/// Notifies all threads that the channel is disconnected.
#[inline]
pub(crate) fn disconnect(&self) {
let mut inner = self.inner.lock().unwrap();
inner.disconnect();
self.is_empty
.store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
}
}
impl Drop for SyncWaker {
#[inline]
fn drop(&mut self) {
debug_assert!(self.is_empty.load(Ordering::SeqCst));
}
}
/// Returns the id of the current thread.
#[inline]
fn current_thread_id() -> ThreadId {
thread_local! {
/// Cached thread-local id.
static THREAD_ID: ThreadId = thread::current().id();
}
THREAD_ID.try_with(|id| *id).unwrap_or_else(|_| thread::current().id())
}

View file

@ -0,0 +1,318 @@
//! Zero-capacity channel.
//!
//! This kind of channel is also known as *rendezvous* channel.
use super::context::Context;
use super::error::*;
use super::select::{Operation, Selected, Token};
use super::utils::Backoff;
use super::waker::Waker;
use crate::cell::UnsafeCell;
use crate::marker::PhantomData;
use crate::sync::atomic::{AtomicBool, Ordering};
use crate::sync::Mutex;
use crate::time::Instant;
use crate::{fmt, ptr};
/// A pointer to a packet.
pub(crate) struct ZeroToken(*mut ());
impl Default for ZeroToken {
fn default() -> Self {
Self(ptr::null_mut())
}
}
impl fmt::Debug for ZeroToken {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&(self.0 as usize), f)
}
}
/// A slot for passing one message from a sender to a receiver.
struct Packet<T> {
/// Equals `true` if the packet is allocated on the stack.
on_stack: bool,
/// Equals `true` once the packet is ready for reading or writing.
ready: AtomicBool,
/// The message.
msg: UnsafeCell<Option<T>>,
}
impl<T> Packet<T> {
/// Creates an empty packet on the stack.
fn empty_on_stack() -> Packet<T> {
Packet { on_stack: true, ready: AtomicBool::new(false), msg: UnsafeCell::new(None) }
}
/// Creates a packet on the stack, containing a message.
fn message_on_stack(msg: T) -> Packet<T> {
Packet { on_stack: true, ready: AtomicBool::new(false), msg: UnsafeCell::new(Some(msg)) }
}
/// Waits until the packet becomes ready for reading or writing.
fn wait_ready(&self) {
let backoff = Backoff::new();
while !self.ready.load(Ordering::Acquire) {
backoff.snooze();
}
}
}
/// Inner representation of a zero-capacity channel.
struct Inner {
/// Senders waiting to pair up with a receive operation.
senders: Waker,
/// Receivers waiting to pair up with a send operation.
receivers: Waker,
/// Equals `true` when the channel is disconnected.
is_disconnected: bool,
}
/// Zero-capacity channel.
pub(crate) struct Channel<T> {
/// Inner representation of the channel.
inner: Mutex<Inner>,
/// Indicates that dropping a `Channel<T>` may drop values of type `T`.
_marker: PhantomData<T>,
}
impl<T> Channel<T> {
/// Constructs a new zero-capacity channel.
pub(crate) fn new() -> Self {
Channel {
inner: Mutex::new(Inner {
senders: Waker::new(),
receivers: Waker::new(),
is_disconnected: false,
}),
_marker: PhantomData,
}
}
/// Writes a message into the packet.
pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
// If there is no packet, the channel is disconnected.
if token.zero.0.is_null() {
return Err(msg);
}
let packet = &*(token.zero.0 as *const Packet<T>);
packet.msg.get().write(Some(msg));
packet.ready.store(true, Ordering::Release);
Ok(())
}
/// Reads a message from the packet.
pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
// If there is no packet, the channel is disconnected.
if token.zero.0.is_null() {
return Err(());
}
let packet = &*(token.zero.0 as *const Packet<T>);
if packet.on_stack {
// The message has been in the packet from the beginning, so there is no need to wait
// for it. However, after reading the message, we need to set `ready` to `true` in
// order to signal that the packet can be destroyed.
let msg = packet.msg.get().replace(None).unwrap();
packet.ready.store(true, Ordering::Release);
Ok(msg)
} else {
// Wait until the message becomes available, then read it and destroy the
// heap-allocated packet.
packet.wait_ready();
let msg = packet.msg.get().replace(None).unwrap();
drop(Box::from_raw(token.zero.0 as *mut Packet<T>));
Ok(msg)
}
}
/// Attempts to send a message into the channel.
pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
let token = &mut Token::default();
let mut inner = self.inner.lock().unwrap();
// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
token.zero.0 = operation.packet;
drop(inner);
unsafe {
self.write(token, msg).ok().unwrap();
}
Ok(())
} else if inner.is_disconnected {
Err(TrySendError::Disconnected(msg))
} else {
Err(TrySendError::Full(msg))
}
}
/// Sends a message into the channel.
pub(crate) fn send(
&self,
msg: T,
deadline: Option<Instant>,
) -> Result<(), SendTimeoutError<T>> {
let token = &mut Token::default();
let mut inner = self.inner.lock().unwrap();
// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
token.zero.0 = operation.packet;
drop(inner);
unsafe {
self.write(token, msg).ok().unwrap();
}
return Ok(());
}
if inner.is_disconnected {
return Err(SendTimeoutError::Disconnected(msg));
}
Context::with(|cx| {
// Prepare for blocking until a receiver wakes us up.
let oper = Operation::hook(token);
let mut packet = Packet::<T>::message_on_stack(msg);
inner.senders.register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx);
inner.receivers.notify();
drop(inner);
// Block the current thread.
let sel = cx.wait_until(deadline);
match sel {
Selected::Waiting => unreachable!(),
Selected::Aborted => {
self.inner.lock().unwrap().senders.unregister(oper).unwrap();
let msg = unsafe { packet.msg.get().replace(None).unwrap() };
Err(SendTimeoutError::Timeout(msg))
}
Selected::Disconnected => {
self.inner.lock().unwrap().senders.unregister(oper).unwrap();
let msg = unsafe { packet.msg.get().replace(None).unwrap() };
Err(SendTimeoutError::Disconnected(msg))
}
Selected::Operation(_) => {
// Wait until the message is read, then drop the packet.
packet.wait_ready();
Ok(())
}
}
})
}
/// Attempts to receive a message without blocking.
pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
let token = &mut Token::default();
let mut inner = self.inner.lock().unwrap();
// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
token.zero.0 = operation.packet;
drop(inner);
unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
} else if inner.is_disconnected {
Err(TryRecvError::Disconnected)
} else {
Err(TryRecvError::Empty)
}
}
/// Receives a message from the channel.
pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
let token = &mut Token::default();
let mut inner = self.inner.lock().unwrap();
// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
token.zero.0 = operation.packet;
drop(inner);
unsafe {
return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
}
}
if inner.is_disconnected {
return Err(RecvTimeoutError::Disconnected);
}
Context::with(|cx| {
// Prepare for blocking until a sender wakes us up.
let oper = Operation::hook(token);
let mut packet = Packet::<T>::empty_on_stack();
inner.receivers.register_with_packet(
oper,
&mut packet as *mut Packet<T> as *mut (),
cx,
);
inner.senders.notify();
drop(inner);
// Block the current thread.
let sel = cx.wait_until(deadline);
match sel {
Selected::Waiting => unreachable!(),
Selected::Aborted => {
self.inner.lock().unwrap().receivers.unregister(oper).unwrap();
Err(RecvTimeoutError::Timeout)
}
Selected::Disconnected => {
self.inner.lock().unwrap().receivers.unregister(oper).unwrap();
Err(RecvTimeoutError::Disconnected)
}
Selected::Operation(_) => {
// Wait until the message is provided, then read it.
packet.wait_ready();
unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
}
}
})
}
/// Disconnects the channel and wakes up all blocked senders and receivers.
///
/// Returns `true` if this call disconnected the channel.
pub(crate) fn disconnect(&self) -> bool {
let mut inner = self.inner.lock().unwrap();
if !inner.is_disconnected {
inner.is_disconnected = true;
inner.senders.disconnect();
inner.receivers.disconnect();
true
} else {
false
}
}
/// Returns the current number of messages inside the channel.
pub(crate) fn len(&self) -> usize {
0
}
/// Returns the capacity of the channel.
#[allow(clippy::unnecessary_wraps)] // This is intentional.
pub(crate) fn capacity(&self) -> Option<usize> {
Some(0)
}
/// Returns `true` if the channel is empty.
pub(crate) fn is_empty(&self) -> bool {
true
}
/// Returns `true` if the channel is full.
pub(crate) fn is_full(&self) -> bool {
true
}
}