1
Fork 0

shim Apple's futex primitives

This is necessary to unblock rust-lang/rust#122408. The documentation for these is available [here](https://developer.apple.com/documentation/os/os_sync_wait_on_address?language=objc).

Because the futex wait operations (`os_sync_wait_on_address` et al.) return the number of remaining waiters after returning, this required some changes to the common futex infrastructure, which I've changed to take a callback instead of precalculating the return values.
This commit is contained in:
joboet 2025-01-19 20:41:35 +01:00
parent b82463fcbf
commit 2d6c9b4c79
No known key found for this signature in database
GPG key ID: 704E0149B0194B3C
7 changed files with 626 additions and 53 deletions

View file

@ -128,7 +128,7 @@ struct Condvar {
/// The futex state.
#[derive(Default, Debug)]
struct Futex {
waiters: VecDeque<FutexWaiter>,
waiters: Vec<FutexWaiter>,
/// Tracks the happens-before relationship
/// between a futex-wake and a futex-wait
/// during a non-spurious wake event.
@ -140,6 +140,12 @@ struct Futex {
#[derive(Default, Clone)]
pub struct FutexRef(Rc<RefCell<Futex>>);
impl FutexRef {
pub fn waiters(&self) -> usize {
self.0.borrow().waiters.len()
}
}
impl VisitProvenance for FutexRef {
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
// No provenance in `Futex`.
@ -728,25 +734,21 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
interp_ok(true)
}
/// Wait for the futex to be signaled, or a timeout.
/// On a signal, `retval_succ` is written to `dest`.
/// On a timeout, `retval_timeout` is written to `dest` and `errno_timeout` is set as the last error.
/// Wait for the futex to be signaled, or a timeout. Once the thread is
/// unblocked, `callback` is called with the unblock reason.
fn futex_wait(
&mut self,
futex_ref: FutexRef,
bitset: u32,
timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
retval_succ: Scalar,
retval_timeout: Scalar,
dest: MPlaceTy<'tcx>,
errno_timeout: IoError,
callback: DynUnblockCallback<'tcx>,
) {
let this = self.eval_context_mut();
let thread = this.active_thread();
let mut futex = futex_ref.0.borrow_mut();
let waiters = &mut futex.waiters;
assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting");
waiters.push_back(FutexWaiter { thread, bitset });
waiters.push(FutexWaiter { thread, bitset });
drop(futex);
this.block_thread(
@ -755,10 +757,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
callback!(
@capture<'tcx> {
futex_ref: FutexRef,
retval_succ: Scalar,
retval_timeout: Scalar,
dest: MPlaceTy<'tcx>,
errno_timeout: IoError,
callback: DynUnblockCallback<'tcx>,
}
|this, unblock: UnblockKind| {
match unblock {
@ -768,29 +767,29 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(&futex.clock, &this.machine.threads);
}
// Write the return value.
this.write_scalar(retval_succ, &dest)?;
interp_ok(())
},
UnblockKind::TimedOut => {
// Remove the waiter from the futex.
let thread = this.active_thread();
let mut futex = futex_ref.0.borrow_mut();
futex.waiters.retain(|waiter| waiter.thread != thread);
// Set errno and write return value.
this.set_last_error(errno_timeout)?;
this.write_scalar(retval_timeout, &dest)?;
interp_ok(())
},
}
callback.call(this, unblock)
}
),
);
}
/// Wake up the first thread in the queue that matches any of the bits in the bitset.
/// Returns whether anything was woken.
fn futex_wake(&mut self, futex_ref: &FutexRef, bitset: u32) -> InterpResult<'tcx, bool> {
/// Wake up `count` of the threads in the queue that match any of the bits
/// in the bitset. Returns how many threads were woken.
fn futex_wake(
&mut self,
futex_ref: &FutexRef,
bitset: u32,
count: usize,
) -> InterpResult<'tcx, usize> {
let this = self.eval_context_mut();
let mut futex = futex_ref.0.borrow_mut();
let data_race = &this.machine.data_race;
@ -800,13 +799,18 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
data_race.release_clock(&this.machine.threads, |clock| futex.clock.clone_from(clock));
}
// Wake up the first thread in the queue that matches any of the bits in the bitset.
let Some(i) = futex.waiters.iter().position(|w| w.bitset & bitset != 0) else {
return interp_ok(false);
};
let waiter = futex.waiters.remove(i).unwrap();
// Remove `count` of the threads in the queue that match any of the bits in the bitset.
// We collect all of them before unblocking because the unblock callback may access the
// futex state to retrieve the remaining number of waiters on macOS.
let waiters: Vec<_> =
futex.waiters.extract_if(.., |w| w.bitset & bitset != 0).take(count).collect();
drop(futex);
this.unblock_thread(waiter.thread, BlockReason::Futex)?;
interp_ok(true)
let woken = waiters.len();
for waiter in waiters {
this.unblock_thread(waiter.thread, BlockReason::Futex)?;
}
interp_ok(woken)
}
}

View file

@ -15,6 +15,8 @@
#![feature(unqualified_local_imports)]
#![feature(derive_coerce_pointee)]
#![feature(arbitrary_self_types)]
#![feature(unsigned_is_multiple_of)]
#![feature(extract_if)]
// Configure clippy and other lints
#![allow(
clippy::collapsible_else_if,

View file

@ -158,14 +158,24 @@ pub fn futex<'tcx>(
.futex
.clone();
let dest = dest.clone();
ecx.futex_wait(
futex_ref,
bitset,
timeout,
Scalar::from_target_isize(0, ecx), // retval_succ
Scalar::from_target_isize(-1, ecx), // retval_timeout
dest.clone(),
LibcError("ETIMEDOUT"), // errno_timeout
callback!(
@capture<'tcx> {
dest: MPlaceTy<'tcx>,
}
|ecx, unblock: UnblockKind| match unblock {
UnblockKind::Ready => {
ecx.write_int(0, &dest)
}
UnblockKind::TimedOut => {
ecx.set_last_error_and_return(LibcError("ETIMEDOUT"), &dest)
}
}
),
);
} else {
// The futex value doesn't match the expected value, so we return failure
@ -209,16 +219,8 @@ pub fn futex<'tcx>(
// will see the latest value on addr which could be changed by our caller
// before doing the syscall.
ecx.atomic_fence(AtomicFenceOrd::SeqCst)?;
let mut n = 0;
#[expect(clippy::arithmetic_side_effects)]
for _ in 0..val {
if ecx.futex_wake(&futex_ref, bitset)? {
n += 1;
} else {
break;
}
}
ecx.write_scalar(Scalar::from_target_isize(n, ecx), dest)?;
let woken = ecx.futex_wake(&futex_ref, bitset, val.try_into().unwrap())?;
ecx.write_scalar(Scalar::from_target_isize(woken.try_into().unwrap(), ecx), dest)?;
}
op => throw_unsup_format!("Miri does not support `futex` syscall with op={}", op),
}

View file

@ -2,12 +2,20 @@ use rustc_middle::ty::Ty;
use rustc_span::Symbol;
use rustc_target::callconv::{Conv, FnAbi};
use super::sync::EvalContextExt as _;
use super::sync::{EvalContextExt as _, MacOsFutexTimeout};
use crate::shims::unix::*;
use crate::*;
pub fn is_dyn_sym(_name: &str) -> bool {
false
pub fn is_dyn_sym(name: &str) -> bool {
match name {
// These only became available with macOS 11.0, so std looks them up dynamically.
"os_sync_wait_on_address"
| "os_sync_wait_on_address_with_deadline"
| "os_sync_wait_on_address_with_timeout"
| "os_sync_wake_by_address_any"
| "os_sync_wake_by_address_all" => true,
_ => false,
}
}
impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
@ -214,6 +222,58 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
this.write_scalar(res, dest)?;
}
// Futex primitives
"os_sync_wait_on_address" => {
let [addr_op, value_op, size_op, flags_op] =
this.check_shim(abi, Conv::C, link_name, args)?;
this.os_sync_wait_on_address(
addr_op,
value_op,
size_op,
flags_op,
MacOsFutexTimeout::None,
dest,
)?;
}
"os_sync_wait_on_address_with_deadline" => {
let [addr_op, value_op, size_op, flags_op, clock_op, timeout_op] =
this.check_shim(abi, Conv::C, link_name, args)?;
this.os_sync_wait_on_address(
addr_op,
value_op,
size_op,
flags_op,
MacOsFutexTimeout::Absolute { clock_op, timeout_op },
dest,
)?;
}
"os_sync_wait_on_address_with_timeout" => {
let [addr_op, value_op, size_op, flags_op, clock_op, timeout_op] =
this.check_shim(abi, Conv::C, link_name, args)?;
this.os_sync_wait_on_address(
addr_op,
value_op,
size_op,
flags_op,
MacOsFutexTimeout::Relative { clock_op, timeout_op },
dest,
)?;
}
"os_sync_wake_by_address_any" => {
let [addr_op, size_op, flags_op] =
this.check_shim(abi, Conv::C, link_name, args)?;
this.os_sync_wake_by_address(
addr_op, size_op, flags_op, /* all */ false, dest,
)?;
}
"os_sync_wake_by_address_all" => {
let [addr_op, size_op, flags_op] =
this.check_shim(abi, Conv::C, link_name, args)?;
this.os_sync_wake_by_address(
addr_op, size_op, flags_op, /* all */ true, dest,
)?;
}
"os_unfair_lock_lock" => {
let [lock_op] = this.check_shim(abi, Conv::C, link_name, args)?;
this.os_unfair_lock_lock(lock_op)?;

View file

@ -10,8 +10,12 @@
//! and we do not detect copying of the lock, but macOS doesn't guarantee anything
//! in that case either.
use std::cell::Cell;
use std::time::Duration;
use rustc_abi::Size;
use crate::concurrency::sync::FutexRef;
use crate::*;
#[derive(Clone)]
@ -20,6 +24,26 @@ enum MacOsUnfairLock {
Active { mutex_ref: MutexRef },
}
pub enum MacOsFutexTimeout<'a, 'tcx> {
None,
Relative { clock_op: &'a OpTy<'tcx>, timeout_op: &'a OpTy<'tcx> },
Absolute { clock_op: &'a OpTy<'tcx>, timeout_op: &'a OpTy<'tcx> },
}
/// Metadata for a macOS futex.
///
/// Since macOS 11.0, Apple has exposed the previously private futex API consisting
/// of `os_sync_wait_on_address` (and friends) and `os_sync_wake_by_address_{any, all}`.
/// These work with different value sizes and flags, which are validated to be consistent.
/// This structure keeps track of both the futex queue and these values.
struct MacOsFutex {
futex: FutexRef,
/// The size in bytes of the atomic primitive underlying this futex.
size: Cell<u64>,
/// Whether the futex is shared across process boundaries.
shared: Cell<bool>,
}
impl<'tcx> EvalContextExtPriv<'tcx> for crate::MiriInterpCx<'tcx> {}
trait EvalContextExtPriv<'tcx>: crate::MiriInterpCxExt<'tcx> {
fn os_unfair_lock_get_data<'a>(
@ -54,6 +78,198 @@ trait EvalContextExtPriv<'tcx>: crate::MiriInterpCxExt<'tcx> {
impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
/// Implements [`os_sync_wait_on_address`], [`os_sync_wait_on_address_with_deadline`]
/// and [`os_sync_wait_on_address_with_timeout`].
///
/// [`os_sync_wait_on_address`]: https://developer.apple.com/documentation/os/os_sync_wait_on_address?language=objc
/// [`os_sync_wait_on_address_with_deadline`]: https://developer.apple.com/documentation/os/os_sync_wait_on_address_with_deadline?language=objc
/// [`os_sync_wait_on_address_with_timeout`]: https://developer.apple.com/documentation/os/os_sync_wait_on_address_with_timeout?language=objc
fn os_sync_wait_on_address(
&mut self,
addr_op: &OpTy<'tcx>,
value_op: &OpTy<'tcx>,
size_op: &OpTy<'tcx>,
flags_op: &OpTy<'tcx>,
timeout: MacOsFutexTimeout<'_, 'tcx>,
dest: &MPlaceTy<'tcx>,
) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
let none = this.eval_libc_u32("OS_SYNC_WAIT_ON_ADDRESS_NONE");
let shared = this.eval_libc_u32("OS_SYNC_WAIT_ON_ADDRESS_SHARED");
let absolute_clock = this.eval_libc_u32("OS_CLOCK_MACH_ABSOLUTE_TIME");
let ptr = this.read_pointer(addr_op)?;
let value = this.read_scalar(value_op)?.to_u64()?;
let size = this.read_target_usize(size_op)?;
let flags = this.read_scalar(flags_op)?.to_u32()?;
let clock_timeout = match timeout {
MacOsFutexTimeout::None => None,
MacOsFutexTimeout::Relative { clock_op, timeout_op } => {
let clock = this.read_scalar(clock_op)?.to_u32()?;
let timeout = this.read_scalar(timeout_op)?.to_u64()?;
Some((clock, TimeoutAnchor::Relative, timeout))
}
MacOsFutexTimeout::Absolute { clock_op, timeout_op } => {
let clock = this.read_scalar(clock_op)?.to_u32()?;
let timeout = this.read_scalar(timeout_op)?.to_u64()?;
Some((clock, TimeoutAnchor::Absolute, timeout))
}
};
// Perform validation of the arguments.
let addr = ptr.addr().bytes();
if addr == 0
|| !matches!(size, 4 | 8)
|| !addr.is_multiple_of(size)
|| (flags != none && flags != shared)
|| clock_timeout
.is_some_and(|(clock, _, timeout)| clock != absolute_clock || timeout == 0)
{
this.set_last_error_and_return(LibcError("EINVAL"), dest)?;
return interp_ok(());
}
let is_shared = flags == shared;
let timeout = clock_timeout.map(|(_, anchor, timeout)| {
// The only clock that is currenlty supported is the monotonic clock.
// While the deadline argument of `os_sync_wait_on_address_with_deadline`
// is actually not in nanoseconds but in the units of `mach_current_time`,
// the two are equivalent in miri.
(TimeoutClock::Monotonic, anchor, Duration::from_nanos(timeout))
});
// See the Linux futex implementation for why this fence exists.
this.atomic_fence(AtomicFenceOrd::SeqCst)?;
let layout = this.machine.layouts.uint(Size::from_bytes(size)).unwrap();
let futex_val = this
.read_scalar_atomic(&this.ptr_to_mplace(ptr, layout), AtomicReadOrd::Acquire)?
.to_bits(Size::from_bytes(size))?;
let futex = this
.get_sync_or_init(ptr, |_| {
MacOsFutex {
futex: Default::default(),
size: Cell::new(size),
shared: Cell::new(is_shared),
}
})
.unwrap();
// Detect mismatches between the flags and sizes used on this address
// by comparing it with the parameters used by the other waiters in
// the current list. If the list is currently empty, update those
// parameters.
if futex.futex.waiters() == 0 {
futex.size.set(size);
futex.shared.set(is_shared);
} else if futex.size.get() != size || futex.shared.get() != is_shared {
this.set_last_error_and_return(LibcError("EINVAL"), dest)?;
return interp_ok(());
}
if futex_val == value.into() {
// If the values are the same, we have to block.
let futex_ref = futex.futex.clone();
let dest = dest.clone();
this.futex_wait(
futex_ref.clone(),
u32::MAX, // bitset
timeout,
callback!(
@capture<'tcx> {
dest: MPlaceTy<'tcx>,
futex_ref: FutexRef,
}
|this, unblock: UnblockKind| {
match unblock {
UnblockKind::Ready => {
let remaining = futex_ref.waiters().try_into().unwrap();
this.write_scalar(Scalar::from_i32(remaining), &dest)
}
UnblockKind::TimedOut => {
this.set_last_error_and_return(LibcError("ETIMEDOUT"), &dest)
}
}
}
),
);
} else {
// else retrieve the current number of waiters.
let waiters = futex.futex.waiters().try_into().unwrap();
this.write_scalar(Scalar::from_i32(waiters), dest)?;
}
interp_ok(())
}
/// Implements [`os_sync_wake_by_address_all`] and [`os_sync_wake_by_address_any`].
///
/// [`os_sync_wake_by_address_all`]: https://developer.apple.com/documentation/os/os_sync_wake_by_address_all?language=objc
/// [`os_sync_wake_by_address_any`]: https://developer.apple.com/documentation/os/os_sync_wake_by_address_any?language=objc
fn os_sync_wake_by_address(
&mut self,
addr_op: &OpTy<'tcx>,
size_op: &OpTy<'tcx>,
flags_op: &OpTy<'tcx>,
all: bool,
dest: &MPlaceTy<'tcx>,
) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
let none = this.eval_libc_u32("OS_SYNC_WAKE_BY_ADDRESS_NONE");
let shared = this.eval_libc_u32("OS_SYNC_WAKE_BY_ADDRESS_SHARED");
let ptr = this.read_pointer(addr_op)?;
let size = this.read_target_usize(size_op)?;
let flags = this.read_scalar(flags_op)?.to_u32()?;
// Perform validation of the arguments.
let addr = ptr.addr().bytes();
if addr == 0 || !matches!(size, 4 | 8) || (flags != none && flags != shared) {
this.set_last_error_and_return(LibcError("EINVAL"), dest)?;
return interp_ok(());
}
let is_shared = flags == shared;
let Some(futex) = this.get_sync_or_init(ptr, |_| {
MacOsFutex {
futex: Default::default(),
size: Cell::new(size),
shared: Cell::new(is_shared),
}
}) else {
// No AllocId, or no live allocation at that AllocId. Return an
// error code. (That seems nicer than silently doing something
// non-intuitive.) This means that if an address gets reused by a
// new allocation, we'll use an independent futex queue for this...
// that seems acceptable.
this.set_last_error_and_return(LibcError("ENOENT"), dest)?;
return interp_ok(());
};
if futex.futex.waiters() == 0 {
this.set_last_error_and_return(LibcError("ENOENT"), dest)?;
return interp_ok(());
// If there are waiters in the queue, they have all used the parameters
// stored in `futex` (we check this in `os_sync_wait_on_address` above).
// Detect mismatches between "our" parameters and the parameters used by
// the waiters and return an error in that case.
} else if futex.size.get() != size || futex.shared.get() != is_shared {
this.set_last_error_and_return(LibcError("EINVAL"), dest)?;
return interp_ok(());
}
let futex_ref = futex.futex.clone();
// See the Linux futex implementation for why this fence exists.
this.atomic_fence(AtomicFenceOrd::SeqCst)?;
this.futex_wake(&futex_ref, u32::MAX, if all { usize::MAX } else { 1 })?;
this.write_scalar(Scalar::from_i32(0), dest)?;
interp_ok(())
}
fn os_unfair_lock_lock(&mut self, lock_op: &OpTy<'tcx>) -> InterpResult<'tcx> {
let this = self.eval_context_mut();

View file

@ -212,14 +212,27 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
.futex
.clone();
let dest = dest.clone();
this.futex_wait(
futex_ref,
u32::MAX, // bitset
timeout,
Scalar::from_i32(1), // retval_succ
Scalar::from_i32(0), // retval_timeout
dest.clone(),
IoError::WindowsError("ERROR_TIMEOUT"), // errno_timeout
callback!(
@capture<'tcx> {
dest: MPlaceTy<'tcx>
}
|this, unblock: UnblockKind| {
match unblock {
UnblockKind::Ready => {
this.write_int(1, &dest)
}
UnblockKind::TimedOut => {
this.set_last_error(IoError::WindowsError("ERROR_TIMEOUT"))?;
this.write_int(0, &dest)
}
}
}
),
);
}
@ -244,7 +257,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
};
let futex_ref = futex_ref.futex.clone();
this.futex_wake(&futex_ref, u32::MAX)?;
this.futex_wake(&futex_ref, u32::MAX, 1)?;
interp_ok(())
}
@ -264,7 +277,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
};
let futex_ref = futex_ref.futex.clone();
while this.futex_wake(&futex_ref, u32::MAX)? {}
this.futex_wake(&futex_ref, u32::MAX, usize::MAX)?;
interp_ok(())
}

View file

@ -0,0 +1,276 @@
//@only-target: darwin
//@compile-flags: -Zmiri-preemption-rate=0
use std::time::{Duration, Instant};
use std::{io, ptr, thread};
fn wake_nobody() {
let futex = 0;
// Wake 1 waiter. Expect ENOENT as nobody is waiting.
unsafe {
assert_eq!(
libc::os_sync_wake_by_address_any(
ptr::from_ref(&futex).cast_mut().cast(),
size_of::<i32>(),
libc::OS_SYNC_WAKE_BY_ADDRESS_NONE
),
-1
);
assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::ENOENT);
}
}
fn wake_dangling() {
let futex = Box::new(0);
let ptr = ptr::from_ref(&futex).cast_mut().cast();
drop(futex);
// Expect error since this is now "unmapped" memory.
unsafe {
assert_eq!(
libc::os_sync_wake_by_address_any(
ptr,
size_of::<i32>(),
libc::OS_SYNC_WAKE_BY_ADDRESS_NONE
),
-1
);
assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::ENOENT);
}
}
fn wait_wrong_val() {
let futex: i32 = 123;
// Only wait if the futex value is 456.
unsafe {
assert_eq!(
libc::os_sync_wait_on_address(
ptr::from_ref(&futex).cast_mut().cast(),
456,
size_of::<i32>(),
libc::OS_SYNC_WAIT_ON_ADDRESS_NONE
),
0,
);
}
}
fn wait_timeout() {
let start = Instant::now();
let futex: i32 = 123;
// Wait for 200ms, with nobody waking us up early.
unsafe {
assert_eq!(
libc::os_sync_wait_on_address_with_timeout(
ptr::from_ref(&futex).cast_mut().cast(),
123,
size_of::<i32>(),
libc::OS_SYNC_WAIT_ON_ADDRESS_NONE,
libc::OS_CLOCK_MACH_ABSOLUTE_TIME,
200_000_000,
),
-1,
);
assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::ETIMEDOUT);
}
assert!((200..1000).contains(&start.elapsed().as_millis()));
}
fn wait_absolute_timeout() {
let start = Instant::now();
// Get the current monotonic timestamp.
#[allow(deprecated)]
let mut deadline = unsafe { libc::mach_absolute_time() };
// Add 200ms.
// What we should be doing here is call `mach_timebase_info` to determine the
// unit used for `deadline`, but we know what Miri returns for that function:
// the unit is nanoseconds.
deadline += 200_000_000;
let futex: i32 = 123;
// Wait for 200ms from now, with nobody waking us up early.
unsafe {
assert_eq!(
libc::os_sync_wait_on_address_with_deadline(
ptr::from_ref(&futex).cast_mut().cast(),
123,
size_of::<i32>(),
libc::OS_SYNC_WAIT_ON_ADDRESS_NONE,
libc::OS_CLOCK_MACH_ABSOLUTE_TIME,
deadline,
),
-1,
);
assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::ETIMEDOUT);
}
assert!((200..1000).contains(&start.elapsed().as_millis()));
}
fn wait_wake() {
let start = Instant::now();
static mut FUTEX: i32 = 0;
let t = thread::spawn(move || {
thread::sleep(Duration::from_millis(200));
unsafe {
assert_eq!(
libc::os_sync_wake_by_address_any(
(&raw const FUTEX).cast_mut().cast(),
size_of::<i32>(),
libc::OS_SYNC_WAKE_BY_ADDRESS_NONE,
),
0,
);
}
});
unsafe {
assert_eq!(
libc::os_sync_wait_on_address(
(&raw const FUTEX).cast_mut().cast(),
0,
size_of::<i32>(),
libc::OS_SYNC_WAIT_ON_ADDRESS_NONE,
),
0,
);
}
// When running this in stress-gc mode, things can take quite long.
// So the timeout is 3000 ms.
assert!((200..3000).contains(&start.elapsed().as_millis()));
t.join().unwrap();
}
fn wait_wake_multiple() {
let val = 0i32;
let futex = &val;
thread::scope(|s| {
// Spawn some threads and make them wait on the futex.
for i in 0..4 {
s.spawn(move || unsafe {
assert_eq!(
libc::os_sync_wait_on_address(
ptr::from_ref(futex).cast_mut().cast(),
0,
size_of::<i32>(),
libc::OS_SYNC_WAIT_ON_ADDRESS_NONE,
),
// The last two threads will be woken at the same time,
// but for the first two threads the remaining number
// of waiters should be strictly decreasing.
if i < 2 { 3 - i } else { 0 },
);
});
thread::yield_now();
}
// Wake the threads up again.
unsafe {
assert_eq!(
libc::os_sync_wake_by_address_any(
ptr::from_ref(futex).cast_mut().cast(),
size_of::<i32>(),
libc::OS_SYNC_WAKE_BY_ADDRESS_NONE,
),
0
);
assert_eq!(
libc::os_sync_wake_by_address_any(
ptr::from_ref(futex).cast_mut().cast(),
size_of::<i32>(),
libc::OS_SYNC_WAKE_BY_ADDRESS_NONE,
),
0
);
// Wake both remaining threads at the same time.
assert_eq!(
libc::os_sync_wake_by_address_all(
ptr::from_ref(futex).cast_mut().cast(),
size_of::<i32>(),
libc::OS_SYNC_WAKE_BY_ADDRESS_NONE,
),
0
);
}
})
}
fn param_mismatch() {
let futex = 0;
thread::scope(|s| {
s.spawn(|| {
unsafe {
assert_eq!(
libc::os_sync_wait_on_address_with_timeout(
ptr::from_ref(&futex).cast_mut().cast(),
0,
size_of::<i32>(),
libc::OS_SYNC_WAIT_ON_ADDRESS_NONE,
libc::OS_CLOCK_MACH_ABSOLUTE_TIME,
400_000_000,
),
-1,
);
assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::ETIMEDOUT);
}
});
s.spawn(|| {
thread::yield_now();
unsafe {
assert_eq!(
libc::os_sync_wait_on_address(
ptr::from_ref(&futex).cast_mut().cast(),
0,
size_of::<i32>(),
libc::OS_SYNC_WAIT_ON_ADDRESS_SHARED,
),
-1,
);
// This call fails because it uses the shared flag whereas the first waiter didn't.
assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::EINVAL);
}
});
thread::yield_now();
unsafe {
assert_eq!(
libc::os_sync_wake_by_address_any(
ptr::from_ref(&futex).cast_mut().cast(),
size_of::<i32>(),
libc::OS_SYNC_WAIT_ON_ADDRESS_SHARED,
),
-1,
);
// This call fails because it uses the shared flag whereas the waiter didn't.
assert_eq!(io::Error::last_os_error().raw_os_error().unwrap(), libc::EINVAL);
}
});
}
fn main() {
wake_nobody();
wake_dangling();
wait_wrong_val();
wait_timeout();
wait_absolute_timeout();
wait_wake();
wait_wake_multiple();
param_mismatch();
}