std: unify id-based thread parking implementations
This commit is contained in:
parent
0c0b403f19
commit
a9e5c1a309
19 changed files with 208 additions and 231 deletions
|
@ -34,7 +34,7 @@ pub mod process;
|
||||||
pub mod stdio;
|
pub mod stdio;
|
||||||
pub mod thread;
|
pub mod thread;
|
||||||
pub mod thread_local_key;
|
pub mod thread_local_key;
|
||||||
pub mod thread_parker;
|
pub mod thread_parking;
|
||||||
pub mod time;
|
pub mod time;
|
||||||
|
|
||||||
mod condvar;
|
mod condvar;
|
||||||
|
|
|
@ -65,9 +65,10 @@ mod task_queue {
|
||||||
/// execution. The signal is sent once all TLS destructors have finished at
|
/// execution. The signal is sent once all TLS destructors have finished at
|
||||||
/// which point no new thread locals should be created.
|
/// which point no new thread locals should be created.
|
||||||
pub mod wait_notify {
|
pub mod wait_notify {
|
||||||
use super::super::thread_parker::Parker;
|
use crate::mem::MaybeUninit;
|
||||||
use crate::pin::Pin;
|
use crate::pin::Pin;
|
||||||
use crate::sync::Arc;
|
use crate::sync::Arc;
|
||||||
|
use crate::sys_common::thread_parking::Parker;
|
||||||
|
|
||||||
pub struct Notifier(Arc<Parker>);
|
pub struct Notifier(Arc<Parker>);
|
||||||
|
|
||||||
|
@ -94,7 +95,18 @@ pub mod wait_notify {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new() -> (Notifier, Waiter) {
|
pub fn new() -> (Notifier, Waiter) {
|
||||||
let inner = Arc::new(Parker::new_internal());
|
// Safety:
|
||||||
|
// Some other platforms (looking at you, UNIX!) require that the thread
|
||||||
|
// parker is constructed in-place. This is just a noisy way of writing:
|
||||||
|
// ```rust
|
||||||
|
// let parker = Parker::new();
|
||||||
|
// ```
|
||||||
|
let parker = unsafe {
|
||||||
|
let mut place = MaybeUninit::uninit();
|
||||||
|
Parker::new(place.as_mut_ptr());
|
||||||
|
place.assume_init()
|
||||||
|
};
|
||||||
|
let inner = Arc::new(parker);
|
||||||
(Notifier(inner.clone()), Waiter(inner))
|
(Notifier(inner.clone()), Waiter(inner))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,107 +0,0 @@
|
||||||
//! Thread parking based on SGX events.
|
|
||||||
|
|
||||||
use super::abi::{thread, usercalls};
|
|
||||||
use crate::io::ErrorKind;
|
|
||||||
use crate::pin::Pin;
|
|
||||||
use crate::ptr::{self, NonNull};
|
|
||||||
use crate::sync::atomic::AtomicPtr;
|
|
||||||
use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
|
|
||||||
use crate::time::Duration;
|
|
||||||
use fortanix_sgx_abi::{EV_UNPARK, WAIT_INDEFINITE};
|
|
||||||
|
|
||||||
// The TCS structure must be page-aligned (this is checked by EENTER), so these cannot
|
|
||||||
// be valid pointers
|
|
||||||
const EMPTY: *mut u8 = ptr::invalid_mut(1);
|
|
||||||
const NOTIFIED: *mut u8 = ptr::invalid_mut(2);
|
|
||||||
|
|
||||||
pub struct Parker {
|
|
||||||
/// The park state. One of EMPTY, NOTIFIED or a TCS address.
|
|
||||||
/// A state change to NOTIFIED must be done with release ordering
|
|
||||||
/// and be observed with acquire ordering so that operations after
|
|
||||||
/// `thread::park` returns will not occur before the unpark message
|
|
||||||
/// was sent.
|
|
||||||
state: AtomicPtr<u8>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Parker {
|
|
||||||
/// Construct the thread parker. The UNIX parker implementation
|
|
||||||
/// requires this to happen in-place.
|
|
||||||
pub unsafe fn new(parker: *mut Parker) {
|
|
||||||
unsafe { parker.write(Parker::new_internal()) }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) fn new_internal() -> Parker {
|
|
||||||
Parker { state: AtomicPtr::new(EMPTY) }
|
|
||||||
}
|
|
||||||
|
|
||||||
// This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
|
|
||||||
pub unsafe fn park(self: Pin<&Self>) {
|
|
||||||
if self.state.load(Acquire) != NOTIFIED {
|
|
||||||
let mut prev = EMPTY;
|
|
||||||
loop {
|
|
||||||
// Guard against changing TCS addresses by always setting the state to
|
|
||||||
// the current value.
|
|
||||||
let tcs = thread::current().as_ptr();
|
|
||||||
if self.state.compare_exchange(prev, tcs, Relaxed, Acquire).is_ok() {
|
|
||||||
let event = usercalls::wait(EV_UNPARK, WAIT_INDEFINITE).unwrap();
|
|
||||||
assert!(event & EV_UNPARK == EV_UNPARK);
|
|
||||||
prev = tcs;
|
|
||||||
} else {
|
|
||||||
// The state was definitely changed by another thread at this point.
|
|
||||||
// The only time this occurs is when the state is changed to NOTIFIED.
|
|
||||||
// We observed this change with acquire ordering, so we can simply
|
|
||||||
// change the state to EMPTY with a relaxed store.
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// At this point, the token was definately read with acquire ordering,
|
|
||||||
// so this can be a relaxed store.
|
|
||||||
self.state.store(EMPTY, Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
// This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
|
|
||||||
pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
|
|
||||||
let timeout = u128::min(dur.as_nanos(), WAIT_INDEFINITE as u128 - 1) as u64;
|
|
||||||
let tcs = thread::current().as_ptr();
|
|
||||||
|
|
||||||
if self.state.load(Acquire) != NOTIFIED {
|
|
||||||
if self.state.compare_exchange(EMPTY, tcs, Relaxed, Acquire).is_ok() {
|
|
||||||
match usercalls::wait(EV_UNPARK, timeout) {
|
|
||||||
Ok(event) => assert!(event & EV_UNPARK == EV_UNPARK),
|
|
||||||
Err(e) => {
|
|
||||||
assert!(matches!(e.kind(), ErrorKind::TimedOut | ErrorKind::WouldBlock))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Swap to provide acquire ordering even if the timeout occurred
|
|
||||||
// before the token was set. This situation can result in spurious
|
|
||||||
// wakeups on the next call to `park_timeout`, but it is better to let
|
|
||||||
// those be handled by the user than do some perhaps unnecessary, but
|
|
||||||
// always expensive guarding.
|
|
||||||
self.state.swap(EMPTY, Acquire);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// The token was already read with `acquire` ordering, this can be a store.
|
|
||||||
self.state.store(EMPTY, Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
// This implementation doesn't require `Pin`, but other implementations do.
|
|
||||||
pub fn unpark(self: Pin<&Self>) {
|
|
||||||
let state = self.state.swap(NOTIFIED, Release);
|
|
||||||
|
|
||||||
if !matches!(state, EMPTY | NOTIFIED) {
|
|
||||||
// There is a thread waiting, wake it up.
|
|
||||||
let tcs = NonNull::new(state).unwrap();
|
|
||||||
// This will fail if the thread has already terminated or its TCS is destroyed
|
|
||||||
// by the time the signal is sent, but that is fine. If another thread receives
|
|
||||||
// the same TCS, it will receive this notification as a spurious wakeup, but
|
|
||||||
// all users of `wait` should and (internally) do guard against those where
|
|
||||||
// necessary.
|
|
||||||
let _ = usercalls::send(EV_UNPARK, Some(tcs));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
23
library/std/src/sys/sgx/thread_parking.rs
Normal file
23
library/std/src/sys/sgx/thread_parking.rs
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
use super::abi::usercalls;
|
||||||
|
use crate::io::ErrorKind;
|
||||||
|
use crate::time::Duration;
|
||||||
|
use fortanix_sgx_abi::{EV_UNPARK, WAIT_INDEFINITE};
|
||||||
|
|
||||||
|
pub type ThreadId = fortanix_sgx_abi::Tcs;
|
||||||
|
|
||||||
|
pub use super::abi::thread::current;
|
||||||
|
|
||||||
|
pub fn park() {
|
||||||
|
usercalls::wait(EV_UNPARK, WAIT_INDEFINITE).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn park_timeout(dur: Duration) {
|
||||||
|
let timeout = u128::min(dur.as_nanos(), WAIT_INDEFINITE as u128 - 1) as u64;
|
||||||
|
if let Err(e) = usercalls::wait(EV_UNPARK, timeout) {
|
||||||
|
assert!(matches!(e.kind(), ErrorKind::TimedOut | ErrorKind::WouldBlock))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn unpark(tid: ThreadId) {
|
||||||
|
let _ = usercalls::send(EV_UNPARK, Some(tid));
|
||||||
|
}
|
|
@ -40,7 +40,7 @@ pub mod stdio;
|
||||||
pub mod thread;
|
pub mod thread;
|
||||||
pub mod thread_local_dtor;
|
pub mod thread_local_dtor;
|
||||||
pub mod thread_local_key;
|
pub mod thread_local_key;
|
||||||
pub mod thread_parker;
|
pub mod thread_parking;
|
||||||
pub mod time;
|
pub mod time;
|
||||||
|
|
||||||
#[cfg(target_os = "espidf")]
|
#[cfg(target_os = "espidf")]
|
||||||
|
|
|
@ -1,113 +0,0 @@
|
||||||
use crate::ffi::{c_int, c_void};
|
|
||||||
use crate::pin::Pin;
|
|
||||||
use crate::ptr::{null, null_mut};
|
|
||||||
use crate::sync::atomic::{
|
|
||||||
AtomicU64,
|
|
||||||
Ordering::{Acquire, Relaxed, Release},
|
|
||||||
};
|
|
||||||
use crate::time::Duration;
|
|
||||||
use libc::{_lwp_self, clockid_t, lwpid_t, time_t, timespec, CLOCK_MONOTONIC};
|
|
||||||
|
|
||||||
extern "C" {
|
|
||||||
fn ___lwp_park60(
|
|
||||||
clock_id: clockid_t,
|
|
||||||
flags: c_int,
|
|
||||||
ts: *mut timespec,
|
|
||||||
unpark: lwpid_t,
|
|
||||||
hint: *const c_void,
|
|
||||||
unparkhint: *const c_void,
|
|
||||||
) -> c_int;
|
|
||||||
fn _lwp_unpark(lwp: lwpid_t, hint: *const c_void) -> c_int;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The thread is not parked and the token is not available.
|
|
||||||
///
|
|
||||||
/// Zero cannot be a valid LWP id, since it is used as empty value for the unpark
|
|
||||||
/// argument in _lwp_park.
|
|
||||||
const EMPTY: u64 = 0;
|
|
||||||
/// The token is available. Do not park anymore.
|
|
||||||
const NOTIFIED: u64 = u64::MAX;
|
|
||||||
|
|
||||||
pub struct Parker {
|
|
||||||
/// The parker state. Contains either one of the two state values above or the LWP
|
|
||||||
/// id of the parked thread.
|
|
||||||
state: AtomicU64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Parker {
|
|
||||||
pub unsafe fn new(parker: *mut Parker) {
|
|
||||||
parker.write(Parker { state: AtomicU64::new(EMPTY) })
|
|
||||||
}
|
|
||||||
|
|
||||||
// Does not actually need `unsafe` or `Pin`, but the pthread implementation does.
|
|
||||||
pub unsafe fn park(self: Pin<&Self>) {
|
|
||||||
// If the token has already been made available, we can skip
|
|
||||||
// a bit of work, so check for it here.
|
|
||||||
if self.state.load(Acquire) != NOTIFIED {
|
|
||||||
let parked = _lwp_self() as u64;
|
|
||||||
let hint = self.state.as_mut_ptr().cast();
|
|
||||||
if self.state.compare_exchange(EMPTY, parked, Relaxed, Acquire).is_ok() {
|
|
||||||
// Loop to guard against spurious wakeups.
|
|
||||||
loop {
|
|
||||||
___lwp_park60(0, 0, null_mut(), 0, hint, null());
|
|
||||||
if self.state.load(Acquire) == NOTIFIED {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// At this point, the change to NOTIFIED has always been observed with acquire
|
|
||||||
// ordering, so we can just use a relaxed store here (instead of a swap).
|
|
||||||
self.state.store(EMPTY, Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Does not actually need `unsafe` or `Pin`, but the pthread implementation does.
|
|
||||||
pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
|
|
||||||
if self.state.load(Acquire) != NOTIFIED {
|
|
||||||
let parked = _lwp_self() as u64;
|
|
||||||
let hint = self.state.as_mut_ptr().cast();
|
|
||||||
let mut timeout = timespec {
|
|
||||||
// Saturate so that the operation will definitely time out
|
|
||||||
// (even if it is after the heat death of the universe).
|
|
||||||
tv_sec: dur.as_secs().try_into().ok().unwrap_or(time_t::MAX),
|
|
||||||
tv_nsec: dur.subsec_nanos().into(),
|
|
||||||
};
|
|
||||||
|
|
||||||
if self.state.compare_exchange(EMPTY, parked, Relaxed, Acquire).is_ok() {
|
|
||||||
// Timeout needs to be mutable since it is modified on NetBSD 9.0 and
|
|
||||||
// above.
|
|
||||||
___lwp_park60(CLOCK_MONOTONIC, 0, &mut timeout, 0, hint, null());
|
|
||||||
// Use a swap to get acquire ordering even if the token was set after
|
|
||||||
// the timeout occurred.
|
|
||||||
self.state.swap(EMPTY, Acquire);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
self.state.store(EMPTY, Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Does not actually need `Pin`, but the pthread implementation does.
|
|
||||||
pub fn unpark(self: Pin<&Self>) {
|
|
||||||
let state = self.state.swap(NOTIFIED, Release);
|
|
||||||
if !matches!(state, EMPTY | NOTIFIED) {
|
|
||||||
let lwp = state as lwpid_t;
|
|
||||||
let hint = self.state.as_mut_ptr().cast();
|
|
||||||
|
|
||||||
// If the parking thread terminated and did not actually park, this will
|
|
||||||
// probably return an error, which is OK. In the worst case, another
|
|
||||||
// thread has received the same LWP id. It will then receive a spurious
|
|
||||||
// wakeup, but those are allowable per the API contract. The same reasoning
|
|
||||||
// applies if a timeout occurred before this call, but the state was not
|
|
||||||
// yet reset.
|
|
||||||
|
|
||||||
// SAFETY:
|
|
||||||
// The syscall has no invariants to hold. Only unsafe because it is an
|
|
||||||
// extern function.
|
|
||||||
unsafe {
|
|
||||||
_lwp_unpark(lwp, hint);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -24,7 +24,7 @@ cfg_if::cfg_if! {
|
||||||
pub use darwin::Parker;
|
pub use darwin::Parker;
|
||||||
} else if #[cfg(target_os = "netbsd")] {
|
} else if #[cfg(target_os = "netbsd")] {
|
||||||
mod netbsd;
|
mod netbsd;
|
||||||
pub use netbsd::Parker;
|
pub use netbsd::{current, park, park_timeout, unpark, ThreadId};
|
||||||
} else {
|
} else {
|
||||||
mod pthread;
|
mod pthread;
|
||||||
pub use pthread::Parker;
|
pub use pthread::Parker;
|
54
library/std/src/sys/unix/thread_parking/netbsd.rs
Normal file
54
library/std/src/sys/unix/thread_parking/netbsd.rs
Normal file
|
@ -0,0 +1,54 @@
|
||||||
|
#![cfg(target_os = "netbsd")]
|
||||||
|
|
||||||
|
use crate::ffi::{c_int, c_void};
|
||||||
|
use crate::ptr::{null, null_mut};
|
||||||
|
use crate::time::Duration;
|
||||||
|
use libc::{_lwp_self, clockid_t, lwpid_t, time_t, timespec, CLOCK_MONOTONIC};
|
||||||
|
|
||||||
|
extern "C" {
|
||||||
|
fn ___lwp_park60(
|
||||||
|
clock_id: clockid_t,
|
||||||
|
flags: c_int,
|
||||||
|
ts: *mut timespec,
|
||||||
|
unpark: lwpid_t,
|
||||||
|
hint: *const c_void,
|
||||||
|
unparkhint: *const c_void,
|
||||||
|
) -> c_int;
|
||||||
|
fn _lwp_unpark(lwp: lwpid_t, hint: *const c_void) -> c_int;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type ThreadId = lwpid_t;
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn current() -> ThreadId {
|
||||||
|
unsafe { _lwp_self() }
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn park() {
|
||||||
|
unsafe {
|
||||||
|
___lwp_park60(0, 0, null_mut(), 0, null(), null());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn park_timeout(dur: Duration) {
|
||||||
|
let mut timeout = timespec {
|
||||||
|
// Saturate so that the operation will definitely time out
|
||||||
|
// (even if it is after the heat death of the universe).
|
||||||
|
tv_sec: dur.as_secs().try_into().ok().unwrap_or(time_t::MAX),
|
||||||
|
tv_nsec: dur.subsec_nanos().into(),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Timeout needs to be mutable since it is modified on NetBSD 9.0 and
|
||||||
|
// above.
|
||||||
|
unsafe {
|
||||||
|
___lwp_park60(CLOCK_MONOTONIC, 0, &mut timeout, 0, null(), null());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn unpark(tid: ThreadId) {
|
||||||
|
unsafe {
|
||||||
|
_lwp_unpark(tid, null());
|
||||||
|
}
|
||||||
|
}
|
|
@ -33,7 +33,7 @@ pub mod stdio;
|
||||||
pub mod thread;
|
pub mod thread;
|
||||||
pub mod thread_local_dtor;
|
pub mod thread_local_dtor;
|
||||||
pub mod thread_local_key;
|
pub mod thread_local_key;
|
||||||
pub mod thread_parker;
|
pub mod thread_parking;
|
||||||
pub mod time;
|
pub mod time;
|
||||||
cfg_if::cfg_if! {
|
cfg_if::cfg_if! {
|
||||||
if #[cfg(not(target_vendor = "uwp"))] {
|
if #[cfg(not(target_vendor = "uwp"))] {
|
||||||
|
|
|
@ -30,7 +30,7 @@ pub mod process;
|
||||||
pub mod thread;
|
pub mod thread;
|
||||||
pub mod thread_info;
|
pub mod thread_info;
|
||||||
pub mod thread_local_dtor;
|
pub mod thread_local_dtor;
|
||||||
pub mod thread_parker;
|
pub mod thread_parking;
|
||||||
pub mod wstr;
|
pub mod wstr;
|
||||||
pub mod wtf8;
|
pub mod wtf8;
|
||||||
|
|
||||||
|
|
104
library/std/src/sys_common/thread_parking/id.rs
Normal file
104
library/std/src/sys_common/thread_parking/id.rs
Normal file
|
@ -0,0 +1,104 @@
|
||||||
|
//! Thread parking using thread ids.
|
||||||
|
//!
|
||||||
|
//! Some platforms (notably NetBSD) have thread parking primitives whose semantics
|
||||||
|
//! match those offered by `thread::park`, with the difference that the thread to
|
||||||
|
//! be unparked is referenced by a platform-specific thread id. Since the thread
|
||||||
|
//! parker is constructed before that id is known, an atomic state variable is used
|
||||||
|
//! to manage the park state and propagate the thread id. This also avoids platform
|
||||||
|
//! calls in the case where `unpark` is called before `park`.
|
||||||
|
|
||||||
|
use crate::cell::UnsafeCell;
|
||||||
|
use crate::pin::Pin;
|
||||||
|
use crate::sync::atomic::{
|
||||||
|
fence, AtomicI8,
|
||||||
|
Ordering::{Acquire, Relaxed, Release},
|
||||||
|
};
|
||||||
|
use crate::sys::thread_parking::{current, park, park_timeout, unpark, ThreadId};
|
||||||
|
use crate::time::Duration;
|
||||||
|
|
||||||
|
pub struct Parker {
|
||||||
|
state: AtomicI8,
|
||||||
|
tid: UnsafeCell<Option<ThreadId>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
const PARKED: i8 = -1;
|
||||||
|
const EMPTY: i8 = 0;
|
||||||
|
const NOTIFIED: i8 = 1;
|
||||||
|
|
||||||
|
impl Parker {
|
||||||
|
/// Create a new thread parker. UNIX requires this to happen in-place.
|
||||||
|
pub unsafe fn new(parker: *mut Parker) {
|
||||||
|
parker.write(Parker { state: AtomicI8::new(EMPTY), tid: UnsafeCell::new(None) })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// # Safety
|
||||||
|
/// * must always be called from the same thread
|
||||||
|
/// * must be called before the state is set to PARKED
|
||||||
|
unsafe fn init_tid(&self) {
|
||||||
|
// The field is only ever written to from this thread, so we don't need
|
||||||
|
// synchronization to read it here.
|
||||||
|
if self.tid.get().read().is_none() {
|
||||||
|
// Because this point is only reached once, before the state is set
|
||||||
|
// to PARKED for the first time, the non-atomic write here can not
|
||||||
|
// conflict with reads by other threads.
|
||||||
|
self.tid.get().write(Some(current()));
|
||||||
|
// Ensure that the write can be observed by all threads reading the
|
||||||
|
// state. Synchronizes with the acquire barrier in `unpark`.
|
||||||
|
fence(Release);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub unsafe fn park(self: Pin<&Self>) {
|
||||||
|
self.init_tid();
|
||||||
|
|
||||||
|
// Changes NOTIFIED to EMPTY and EMPTY to PARKED.
|
||||||
|
let mut state = self.state.fetch_sub(1, Acquire).wrapping_sub(1);
|
||||||
|
if state == PARKED {
|
||||||
|
// Loop to guard against spurious wakeups.
|
||||||
|
while state == PARKED {
|
||||||
|
park();
|
||||||
|
state = self.state.load(Acquire);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since the state change has already been observed with acquire
|
||||||
|
// ordering, the state can be reset with a relaxed store instead
|
||||||
|
// of a swap.
|
||||||
|
self.state.store(EMPTY, Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
|
||||||
|
self.init_tid();
|
||||||
|
|
||||||
|
let state = self.state.fetch_sub(1, Acquire).wrapping_sub(1);
|
||||||
|
if state == PARKED {
|
||||||
|
park_timeout(dur);
|
||||||
|
// Swap to ensure that we observe all state changes with acquire
|
||||||
|
// ordering, even if the state has been changed after the timeout
|
||||||
|
// occured.
|
||||||
|
self.state.swap(EMPTY, Acquire);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn unpark(self: Pin<&Self>) {
|
||||||
|
let state = self.state.swap(NOTIFIED, Release);
|
||||||
|
if state == PARKED {
|
||||||
|
// Synchronize with the release fence in `init_tid` to observe the
|
||||||
|
// write to `tid`.
|
||||||
|
fence(Acquire);
|
||||||
|
// # Safety
|
||||||
|
// The thread id is initialized before the state is set to `PARKED`
|
||||||
|
// for the first time and is not written to from that point on
|
||||||
|
// (negating the need for an atomic read).
|
||||||
|
let tid = unsafe { self.tid.get().read().unwrap_unchecked() };
|
||||||
|
// It is possible that the waiting thread woke up because of a timeout
|
||||||
|
// and terminated before this call is made. This call then returns an
|
||||||
|
// error or wakes up an unrelated thread. The platform API and
|
||||||
|
// environment does allow this, however.
|
||||||
|
unpark(tid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl Send for Parker {}
|
||||||
|
unsafe impl Sync for Parker {}
|
|
@ -11,13 +11,17 @@ cfg_if::cfg_if! {
|
||||||
))] {
|
))] {
|
||||||
mod futex;
|
mod futex;
|
||||||
pub use futex::Parker;
|
pub use futex::Parker;
|
||||||
|
} else if #[cfg(any(
|
||||||
|
target_os = "netbsd",
|
||||||
|
all(target_vendor = "fortanix", target_env = "sgx"),
|
||||||
|
))] {
|
||||||
|
mod id;
|
||||||
|
pub use id::Parker;
|
||||||
} else if #[cfg(target_os = "solid_asp3")] {
|
} else if #[cfg(target_os = "solid_asp3")] {
|
||||||
mod wait_flag;
|
mod wait_flag;
|
||||||
pub use wait_flag::Parker;
|
pub use wait_flag::Parker;
|
||||||
} else if #[cfg(any(windows, target_family = "unix"))] {
|
} else if #[cfg(any(windows, target_family = "unix"))] {
|
||||||
pub use crate::sys::thread_parker::Parker;
|
pub use crate::sys::thread_parking::Parker;
|
||||||
} else if #[cfg(all(target_vendor = "fortanix", target_env = "sgx"))] {
|
|
||||||
pub use crate::sys::thread_parker::Parker;
|
|
||||||
} else {
|
} else {
|
||||||
mod generic;
|
mod generic;
|
||||||
pub use generic::Parker;
|
pub use generic::Parker;
|
|
@ -173,7 +173,7 @@ use crate::sync::Arc;
|
||||||
use crate::sys::thread as imp;
|
use crate::sys::thread as imp;
|
||||||
use crate::sys_common::thread;
|
use crate::sys_common::thread;
|
||||||
use crate::sys_common::thread_info;
|
use crate::sys_common::thread_info;
|
||||||
use crate::sys_common::thread_parker::Parker;
|
use crate::sys_common::thread_parking::Parker;
|
||||||
use crate::sys_common::{AsInner, IntoInner};
|
use crate::sys_common::{AsInner, IntoInner};
|
||||||
use crate::time::Duration;
|
use crate::time::Duration;
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue