Rollup merge of #102773 - joboet:apple_parker, r=thomcc
Use semaphores for thread parking on Apple platforms Currently we use a mutex-condvar pair for thread parking on Apple systems. Unfortunately, `pthread_cond_timedwait` uses the real-time clock for measuring time, which causes problems when the system time changes. The parking implementation in this PR uses a semaphore instead, which measures monotonic time by default, avoiding these issues. As a further benefit, this has the potential to improve performance a bit, since `unpark` does not need to wait for a lock to be released. Since the Mach semaphores are poorly documented (I could not find availability or stability guarantees for instance), this uses a [dispatch semaphore](https://developer.apple.com/documentation/dispatch/dispatch_semaphore?language=objc) instead. While it adds a layer of indirection (it uses Mach semaphores internally), the overhead is probably negligible. Tested on macOS 12.5. r? ``````@thomcc``````
This commit is contained in:
commit
cbe5e7bc62
3 changed files with 165 additions and 1 deletions
131
library/std/src/sys/unix/thread_parker/darwin.rs
Normal file
131
library/std/src/sys/unix/thread_parker/darwin.rs
Normal file
|
@ -0,0 +1,131 @@
|
||||||
|
//! Thread parking for Darwin-based systems.
|
||||||
|
//!
|
||||||
|
//! Darwin actually has futex syscalls (`__ulock_wait`/`__ulock_wake`), but they
|
||||||
|
//! cannot be used in `std` because they are non-public (their use will lead to
|
||||||
|
//! rejection from the App Store) and because they are only available starting
|
||||||
|
//! with macOS version 10.12, even though the minimum target version is 10.7.
|
||||||
|
//!
|
||||||
|
//! Therefore, we need to look for other synchronization primitives. Luckily, Darwin
|
||||||
|
//! supports semaphores, which allow us to implement the behaviour we need with
|
||||||
|
//! only one primitive (as opposed to a mutex-condvar pair). We use the semaphore
|
||||||
|
//! provided by libdispatch, as the underlying Mach semaphore is only dubiously
|
||||||
|
//! public.
|
||||||
|
|
||||||
|
use crate::pin::Pin;
|
||||||
|
use crate::sync::atomic::{
|
||||||
|
AtomicI8,
|
||||||
|
Ordering::{Acquire, Release},
|
||||||
|
};
|
||||||
|
use crate::time::Duration;
|
||||||
|
|
||||||
|
type dispatch_semaphore_t = *mut crate::ffi::c_void;
|
||||||
|
type dispatch_time_t = u64;
|
||||||
|
|
||||||
|
const DISPATCH_TIME_NOW: dispatch_time_t = 0;
|
||||||
|
const DISPATCH_TIME_FOREVER: dispatch_time_t = !0;
|
||||||
|
|
||||||
|
// Contained in libSystem.dylib, which is linked by default.
|
||||||
|
extern "C" {
|
||||||
|
fn dispatch_time(when: dispatch_time_t, delta: i64) -> dispatch_time_t;
|
||||||
|
fn dispatch_semaphore_create(val: isize) -> dispatch_semaphore_t;
|
||||||
|
fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) -> isize;
|
||||||
|
fn dispatch_semaphore_signal(dsema: dispatch_semaphore_t) -> isize;
|
||||||
|
fn dispatch_release(object: *mut crate::ffi::c_void);
|
||||||
|
}
|
||||||
|
|
||||||
|
const EMPTY: i8 = 0;
|
||||||
|
const NOTIFIED: i8 = 1;
|
||||||
|
const PARKED: i8 = -1;
|
||||||
|
|
||||||
|
pub struct Parker {
|
||||||
|
semaphore: dispatch_semaphore_t,
|
||||||
|
state: AtomicI8,
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl Sync for Parker {}
|
||||||
|
unsafe impl Send for Parker {}
|
||||||
|
|
||||||
|
impl Parker {
|
||||||
|
pub unsafe fn new(parker: *mut Parker) {
|
||||||
|
let semaphore = dispatch_semaphore_create(0);
|
||||||
|
assert!(
|
||||||
|
!semaphore.is_null(),
|
||||||
|
"failed to create dispatch semaphore for thread synchronization"
|
||||||
|
);
|
||||||
|
parker.write(Parker { semaphore, state: AtomicI8::new(EMPTY) })
|
||||||
|
}
|
||||||
|
|
||||||
|
// Does not need `Pin`, but other implementation do.
|
||||||
|
pub unsafe fn park(self: Pin<&Self>) {
|
||||||
|
// The semaphore counter must be zero at this point, because unparking
|
||||||
|
// threads will not actually increase it until we signalled that we
|
||||||
|
// are waiting.
|
||||||
|
|
||||||
|
// Change NOTIFIED to EMPTY and EMPTY to PARKED.
|
||||||
|
if self.state.fetch_sub(1, Acquire) == NOTIFIED {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Another thread may increase the semaphore counter from this point on.
|
||||||
|
// If it is faster than us, we will decrement it again immediately below.
|
||||||
|
// If we are faster, we wait.
|
||||||
|
|
||||||
|
// Ensure that the semaphore counter has actually been decremented, even
|
||||||
|
// if the call timed out for some reason.
|
||||||
|
while dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER) != 0 {}
|
||||||
|
|
||||||
|
// At this point, the semaphore counter is zero again.
|
||||||
|
|
||||||
|
// We were definitely woken up, so we don't need to check the state.
|
||||||
|
// Still, we need to reset the state using a swap to observe the state
|
||||||
|
// change with acquire ordering.
|
||||||
|
self.state.swap(EMPTY, Acquire);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Does not need `Pin`, but other implementation do.
|
||||||
|
pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
|
||||||
|
if self.state.fetch_sub(1, Acquire) == NOTIFIED {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let nanos = dur.as_nanos().try_into().unwrap_or(i64::MAX);
|
||||||
|
let timeout = dispatch_time(DISPATCH_TIME_NOW, nanos);
|
||||||
|
|
||||||
|
let timeout = dispatch_semaphore_wait(self.semaphore, timeout) != 0;
|
||||||
|
|
||||||
|
let state = self.state.swap(EMPTY, Acquire);
|
||||||
|
if state == NOTIFIED && timeout {
|
||||||
|
// If the state was NOTIFIED but semaphore_wait returned without
|
||||||
|
// decrementing the count because of a timeout, it means another
|
||||||
|
// thread is about to call semaphore_signal. We must wait for that
|
||||||
|
// to happen to ensure the semaphore count is reset.
|
||||||
|
while dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER) != 0 {}
|
||||||
|
} else {
|
||||||
|
// Either a timeout occurred and we reset the state before any thread
|
||||||
|
// tried to wake us up, or we were woken up and reset the state,
|
||||||
|
// making sure to observe the state change with acquire ordering.
|
||||||
|
// Either way, the semaphore counter is now zero again.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Does not need `Pin`, but other implementation do.
|
||||||
|
pub fn unpark(self: Pin<&Self>) {
|
||||||
|
let state = self.state.swap(NOTIFIED, Release);
|
||||||
|
if state == PARKED {
|
||||||
|
unsafe {
|
||||||
|
dispatch_semaphore_signal(self.semaphore);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for Parker {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
// SAFETY:
|
||||||
|
// We always ensure that the semaphore count is reset, so this will
|
||||||
|
// never cause an exception.
|
||||||
|
unsafe {
|
||||||
|
dispatch_release(self.semaphore);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -11,7 +11,18 @@
|
||||||
)))]
|
)))]
|
||||||
|
|
||||||
cfg_if::cfg_if! {
|
cfg_if::cfg_if! {
|
||||||
if #[cfg(target_os = "netbsd")] {
|
if #[cfg(all(
|
||||||
|
any(
|
||||||
|
target_os = "macos",
|
||||||
|
target_os = "ios",
|
||||||
|
target_os = "watchos",
|
||||||
|
target_os = "tvos",
|
||||||
|
),
|
||||||
|
not(miri),
|
||||||
|
))] {
|
||||||
|
mod darwin;
|
||||||
|
pub use darwin::Parker;
|
||||||
|
} else if #[cfg(target_os = "netbsd")] {
|
||||||
mod netbsd;
|
mod netbsd;
|
||||||
pub use netbsd::Parker;
|
pub use netbsd::Parker;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -244,6 +244,28 @@ fn test_try_panic_any_message_unit_struct() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_park_unpark_before() {
|
||||||
|
for _ in 0..10 {
|
||||||
|
thread::current().unpark();
|
||||||
|
thread::park();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_park_unpark_called_other_thread() {
|
||||||
|
for _ in 0..10 {
|
||||||
|
let th = thread::current();
|
||||||
|
|
||||||
|
let _guard = thread::spawn(move || {
|
||||||
|
super::sleep(Duration::from_millis(50));
|
||||||
|
th.unpark();
|
||||||
|
});
|
||||||
|
|
||||||
|
thread::park();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_park_timeout_unpark_before() {
|
fn test_park_timeout_unpark_before() {
|
||||||
for _ in 0..10 {
|
for _ in 0..10 {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue