Auto merge of #107586 - SparrowLii:parallel-query, r=cjgillot
Introduce `DynSend` and `DynSync` auto trait for parallel compiler part of parallel-rustc #101566 This PR introduces `DynSend / DynSync` trait and `FromDyn / IntoDyn` structure in rustc_data_structure::marker. `FromDyn` can dynamically check data structures for thread safety when switching to parallel environments (such as calling `par_for_each_in`). This happens only when `-Z threads > 1` so it doesn't affect single-threaded mode's compile efficiency. r? `@cjgillot`
This commit is contained in:
commit
dd8ec9c88d
26 changed files with 557 additions and 110 deletions
|
@ -26,6 +26,7 @@
|
|||
#![feature(test)]
|
||||
#![feature(thread_id_value)]
|
||||
#![feature(vec_into_raw_parts)]
|
||||
#![feature(allocator_api)]
|
||||
#![feature(get_mut_unchecked)]
|
||||
#![feature(lint_reasons)]
|
||||
#![feature(unwrap_infallible)]
|
||||
|
@ -77,6 +78,7 @@ pub mod sorted_map;
|
|||
pub mod stable_hasher;
|
||||
mod atomic_ref;
|
||||
pub mod fingerprint;
|
||||
pub mod marker;
|
||||
pub mod profiling;
|
||||
pub mod sharded;
|
||||
pub mod stack;
|
||||
|
|
257
compiler/rustc_data_structures/src/marker.rs
Normal file
257
compiler/rustc_data_structures/src/marker.rs
Normal file
|
@ -0,0 +1,257 @@
|
|||
cfg_if!(
|
||||
if #[cfg(not(parallel_compiler))] {
|
||||
pub auto trait DynSend {}
|
||||
pub auto trait DynSync {}
|
||||
|
||||
impl<T> DynSend for T {}
|
||||
impl<T> DynSync for T {}
|
||||
} else {
|
||||
#[rustc_on_unimplemented(
|
||||
message = "`{Self}` doesn't implement `DynSend`. \
|
||||
Add it to `rustc_data_structures::marker` or use `IntoDynSyncSend` if it's already `Send`"
|
||||
)]
|
||||
// This is an auto trait for types which can be sent across threads if `sync::is_dyn_thread_safe()`
|
||||
// is true. These types can be wrapped in a `FromDyn` to get a `Send` type. Wrapping a
|
||||
// `Send` type in `IntoDynSyncSend` will create a `DynSend` type.
|
||||
pub unsafe auto trait DynSend {}
|
||||
|
||||
#[rustc_on_unimplemented(
|
||||
message = "`{Self}` doesn't implement `DynSync`. \
|
||||
Add it to `rustc_data_structures::marker` or use `IntoDynSyncSend` if it's already `Sync`"
|
||||
)]
|
||||
// This is an auto trait for types which can be shared across threads if `sync::is_dyn_thread_safe()`
|
||||
// is true. These types can be wrapped in a `FromDyn` to get a `Sync` type. Wrapping a
|
||||
// `Sync` type in `IntoDynSyncSend` will create a `DynSync` type.
|
||||
pub unsafe auto trait DynSync {}
|
||||
|
||||
// Same with `Sync` and `Send`.
|
||||
unsafe impl<T: DynSync + ?Sized> DynSend for &T {}
|
||||
|
||||
macro_rules! impls_dyn_send_neg {
|
||||
($([$t1: ty $(where $($generics1: tt)*)?])*) => {
|
||||
$(impl$(<$($generics1)*>)? !DynSend for $t1 {})*
|
||||
};
|
||||
}
|
||||
|
||||
// Consistent with `std`
|
||||
impls_dyn_send_neg!(
|
||||
[std::env::Args]
|
||||
[std::env::ArgsOs]
|
||||
[*const T where T: ?Sized]
|
||||
[*mut T where T: ?Sized]
|
||||
[std::ptr::NonNull<T> where T: ?Sized]
|
||||
[std::rc::Rc<T> where T: ?Sized]
|
||||
[std::rc::Weak<T> where T: ?Sized]
|
||||
[std::sync::MutexGuard<'_, T> where T: ?Sized]
|
||||
[std::sync::RwLockReadGuard<'_, T> where T: ?Sized]
|
||||
[std::sync::RwLockWriteGuard<'_, T> where T: ?Sized]
|
||||
[std::io::StdoutLock<'_>]
|
||||
[std::io::StderrLock<'_>]
|
||||
);
|
||||
cfg_if!(
|
||||
// Consistent with `std`
|
||||
// `os_imp::Env` is `!Send` in these platforms
|
||||
if #[cfg(any(unix, target_os = "hermit", target_os = "wasi", target_os = "solid_asp3"))] {
|
||||
impl !DynSend for std::env::VarsOs {}
|
||||
}
|
||||
);
|
||||
|
||||
macro_rules! already_send {
|
||||
($([$ty: ty])*) => {
|
||||
$(unsafe impl DynSend for $ty where $ty: Send {})*
|
||||
};
|
||||
}
|
||||
|
||||
// These structures are already `Send`.
|
||||
already_send!(
|
||||
[std::backtrace::Backtrace]
|
||||
[std::io::Stdout]
|
||||
[std::io::Stderr]
|
||||
[std::io::Error]
|
||||
[std::fs::File]
|
||||
[rustc_arena::DroplessArena]
|
||||
[crate::memmap::Mmap]
|
||||
[crate::profiling::SelfProfiler]
|
||||
[crate::owned_slice::OwnedSlice]
|
||||
);
|
||||
|
||||
macro_rules! impl_dyn_send {
|
||||
($($($attr: meta)* [$ty: ty where $($generics2: tt)*])*) => {
|
||||
$(unsafe impl<$($generics2)*> DynSend for $ty {})*
|
||||
};
|
||||
}
|
||||
|
||||
impl_dyn_send!(
|
||||
[std::sync::atomic::AtomicPtr<T> where T]
|
||||
[std::sync::Mutex<T> where T: ?Sized+ DynSend]
|
||||
[std::sync::mpsc::Sender<T> where T: DynSend]
|
||||
[std::sync::Arc<T> where T: ?Sized + DynSync + DynSend]
|
||||
[std::sync::LazyLock<T, F> where T: DynSend, F: DynSend]
|
||||
[std::collections::HashSet<K, S> where K: DynSend, S: DynSend]
|
||||
[std::collections::HashMap<K, V, S> where K: DynSend, V: DynSend, S: DynSend]
|
||||
[std::collections::BTreeMap<K, V, A> where K: DynSend, V: DynSend, A: std::alloc::Allocator + Clone + DynSend]
|
||||
[Vec<T, A> where T: DynSend, A: std::alloc::Allocator + DynSend]
|
||||
[Box<T, A> where T: ?Sized + DynSend, A: std::alloc::Allocator + DynSend]
|
||||
[crate::sync::Lock<T> where T: DynSend]
|
||||
[crate::sync::RwLock<T> where T: DynSend]
|
||||
[crate::tagged_ptr::CopyTaggedPtr<P, T, CP> where P: Send + crate::tagged_ptr::Pointer, T: Send + crate::tagged_ptr::Tag, const CP: bool]
|
||||
[rustc_arena::TypedArena<T> where T: DynSend]
|
||||
[indexmap::IndexSet<V, S> where V: DynSend, S: DynSend]
|
||||
[indexmap::IndexMap<K, V, S> where K: DynSend, V: DynSend, S: DynSend]
|
||||
[thin_vec::ThinVec<T> where T: DynSend]
|
||||
[smallvec::SmallVec<A> where A: smallvec::Array + DynSend]
|
||||
);
|
||||
|
||||
macro_rules! impls_dyn_sync_neg {
|
||||
($([$t1: ty $(where $($generics1: tt)*)?])*) => {
|
||||
$(impl$(<$($generics1)*>)? !DynSync for $t1 {})*
|
||||
};
|
||||
}
|
||||
|
||||
// Consistent with `std`
|
||||
impls_dyn_sync_neg!(
|
||||
[std::env::Args]
|
||||
[std::env::ArgsOs]
|
||||
[*const T where T: ?Sized]
|
||||
[*mut T where T: ?Sized]
|
||||
[std::cell::Cell<T> where T: ?Sized]
|
||||
[std::cell::RefCell<T> where T: ?Sized]
|
||||
[std::cell::UnsafeCell<T> where T: ?Sized]
|
||||
[std::ptr::NonNull<T> where T: ?Sized]
|
||||
[std::rc::Rc<T> where T: ?Sized]
|
||||
[std::rc::Weak<T> where T: ?Sized]
|
||||
[std::cell::OnceCell<T> where T]
|
||||
[std::sync::mpsc::Receiver<T> where T]
|
||||
[std::sync::mpsc::Sender<T> where T]
|
||||
);
|
||||
cfg_if!(
|
||||
// Consistent with `std`
|
||||
// `os_imp::Env` is `!Sync` in these platforms
|
||||
if #[cfg(any(unix, target_os = "hermit", target_os = "wasi", target_os = "solid_asp3"))] {
|
||||
impl !DynSync for std::env::VarsOs {}
|
||||
}
|
||||
);
|
||||
|
||||
macro_rules! already_sync {
|
||||
($([$ty: ty])*) => {
|
||||
$(unsafe impl DynSync for $ty where $ty: Sync {})*
|
||||
};
|
||||
}
|
||||
|
||||
// These structures are already `Sync`.
|
||||
already_sync!(
|
||||
[std::sync::atomic::AtomicBool]
|
||||
[std::sync::atomic::AtomicUsize]
|
||||
[std::sync::atomic::AtomicU8]
|
||||
[std::sync::atomic::AtomicU32]
|
||||
[std::sync::atomic::AtomicU64]
|
||||
[std::backtrace::Backtrace]
|
||||
[std::io::Error]
|
||||
[std::fs::File]
|
||||
[jobserver_crate::Client]
|
||||
[crate::memmap::Mmap]
|
||||
[crate::profiling::SelfProfiler]
|
||||
[crate::owned_slice::OwnedSlice]
|
||||
);
|
||||
|
||||
macro_rules! impl_dyn_sync {
|
||||
($($($attr: meta)* [$ty: ty where $($generics2: tt)*])*) => {
|
||||
$(unsafe impl<$($generics2)*> DynSync for $ty {})*
|
||||
};
|
||||
}
|
||||
|
||||
impl_dyn_sync!(
|
||||
[std::sync::atomic::AtomicPtr<T> where T]
|
||||
[std::sync::OnceLock<T> where T: DynSend + DynSync]
|
||||
[std::sync::Mutex<T> where T: ?Sized + DynSend]
|
||||
[std::sync::Arc<T> where T: ?Sized + DynSync + DynSend]
|
||||
[std::sync::LazyLock<T, F> where T: DynSend + DynSync, F: DynSend]
|
||||
[std::collections::HashSet<K, S> where K: DynSync, S: DynSync]
|
||||
[std::collections::HashMap<K, V, S> where K: DynSync, V: DynSync, S: DynSync]
|
||||
[std::collections::BTreeMap<K, V, A> where K: DynSync, V: DynSync, A: std::alloc::Allocator + Clone + DynSync]
|
||||
[Vec<T, A> where T: DynSync, A: std::alloc::Allocator + DynSync]
|
||||
[Box<T, A> where T: ?Sized + DynSync, A: std::alloc::Allocator + DynSync]
|
||||
[crate::sync::Lock<T> where T: DynSend]
|
||||
[crate::sync::RwLock<T> where T: DynSend + DynSync]
|
||||
[crate::sync::OneThread<T> where T]
|
||||
[crate::sync::WorkerLocal<T> where T: DynSend]
|
||||
[crate::intern::Interned<'a, T> where 'a, T: DynSync]
|
||||
[crate::tagged_ptr::CopyTaggedPtr<P, T, CP> where P: Sync + crate::tagged_ptr::Pointer, T: Sync + crate::tagged_ptr::Tag, const CP: bool]
|
||||
[parking_lot::lock_api::Mutex<R, T> where R: DynSync, T: ?Sized + DynSend]
|
||||
[parking_lot::lock_api::RwLock<R, T> where R: DynSync, T: ?Sized + DynSend + DynSync]
|
||||
[indexmap::IndexSet<V, S> where V: DynSync, S: DynSync]
|
||||
[indexmap::IndexMap<K, V, S> where K: DynSync, V: DynSync, S: DynSync]
|
||||
[smallvec::SmallVec<A> where A: smallvec::Array + DynSync]
|
||||
[thin_vec::ThinVec<T> where T: DynSync]
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
pub fn assert_dyn_sync<T: ?Sized + DynSync>() {}
|
||||
pub fn assert_dyn_send<T: ?Sized + DynSend>() {}
|
||||
pub fn assert_dyn_send_val<T: ?Sized + DynSend>(_t: &T) {}
|
||||
pub fn assert_dyn_send_sync_val<T: ?Sized + DynSync + DynSend>(_t: &T) {}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
pub struct FromDyn<T>(T);
|
||||
|
||||
impl<T> FromDyn<T> {
|
||||
#[inline(always)]
|
||||
pub fn from(val: T) -> Self {
|
||||
// Check that `sync::is_dyn_thread_safe()` is true on creation so we can
|
||||
// implement `Send` and `Sync` for this structure when `T`
|
||||
// implements `DynSend` and `DynSync` respectively.
|
||||
#[cfg(parallel_compiler)]
|
||||
assert!(crate::sync::is_dyn_thread_safe());
|
||||
FromDyn(val)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn into_inner(self) -> T {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
// `FromDyn` is `Send` if `T` is `DynSend`, since it ensures that sync::is_dyn_thread_safe() is true.
|
||||
#[cfg(parallel_compiler)]
|
||||
unsafe impl<T: DynSend> Send for FromDyn<T> {}
|
||||
|
||||
// `FromDyn` is `Sync` if `T` is `DynSync`, since it ensures that sync::is_dyn_thread_safe() is true.
|
||||
#[cfg(parallel_compiler)]
|
||||
unsafe impl<T: DynSync> Sync for FromDyn<T> {}
|
||||
|
||||
impl<T> std::ops::Deref for FromDyn<T> {
|
||||
type Target = T;
|
||||
|
||||
#[inline(always)]
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
// A wrapper to convert a struct that is already a `Send` or `Sync` into
|
||||
// an instance of `DynSend` and `DynSync`, since the compiler cannot infer
|
||||
// it automatically in some cases. (e.g. Box<dyn Send / Sync>)
|
||||
#[derive(Copy, Clone)]
|
||||
pub struct IntoDynSyncSend<T: ?Sized>(pub T);
|
||||
|
||||
#[cfg(parallel_compiler)]
|
||||
unsafe impl<T: ?Sized + Send> DynSend for IntoDynSyncSend<T> {}
|
||||
#[cfg(parallel_compiler)]
|
||||
unsafe impl<T: ?Sized + Sync> DynSync for IntoDynSyncSend<T> {}
|
||||
|
||||
impl<T> std::ops::Deref for IntoDynSyncSend<T> {
|
||||
type Target = T;
|
||||
|
||||
#[inline(always)]
|
||||
fn deref(&self) -> &T {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> std::ops::DerefMut for IntoDynSyncSend<T> {
|
||||
#[inline(always)]
|
||||
fn deref_mut(&mut self) -> &mut T {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
|
@ -69,6 +69,6 @@ fn drop_drops() {
|
|||
|
||||
#[test]
|
||||
fn send_sync() {
|
||||
crate::sync::assert_send::<OwnedSlice>();
|
||||
crate::sync::assert_sync::<OwnedSlice>();
|
||||
crate::sync::assert_dyn_send::<OwnedSlice>();
|
||||
crate::sync::assert_dyn_sync::<OwnedSlice>();
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@
|
|||
//!
|
||||
//! [^2] `MTLockRef` is a typedef.
|
||||
|
||||
pub use crate::marker::*;
|
||||
use crate::owned_slice::OwnedSlice;
|
||||
use std::collections::HashMap;
|
||||
use std::hash::{BuildHasher, Hash};
|
||||
|
@ -55,6 +56,42 @@ pub use vec::{AppendOnlyIndexVec, AppendOnlyVec};
|
|||
|
||||
mod vec;
|
||||
|
||||
mod mode {
|
||||
use super::Ordering;
|
||||
use std::sync::atomic::AtomicU8;
|
||||
|
||||
const UNINITIALIZED: u8 = 0;
|
||||
const DYN_NOT_THREAD_SAFE: u8 = 1;
|
||||
const DYN_THREAD_SAFE: u8 = 2;
|
||||
|
||||
static DYN_THREAD_SAFE_MODE: AtomicU8 = AtomicU8::new(UNINITIALIZED);
|
||||
|
||||
// Whether thread safety is enabled (due to running under multiple threads).
|
||||
#[inline]
|
||||
pub fn is_dyn_thread_safe() -> bool {
|
||||
match DYN_THREAD_SAFE_MODE.load(Ordering::Relaxed) {
|
||||
DYN_NOT_THREAD_SAFE => false,
|
||||
DYN_THREAD_SAFE => true,
|
||||
_ => panic!("uninitialized dyn_thread_safe mode!"),
|
||||
}
|
||||
}
|
||||
|
||||
// Only set by the `-Z threads` compile option
|
||||
pub fn set_dyn_thread_safe_mode(mode: bool) {
|
||||
let set: u8 = if mode { DYN_THREAD_SAFE } else { DYN_NOT_THREAD_SAFE };
|
||||
let previous = DYN_THREAD_SAFE_MODE.compare_exchange(
|
||||
UNINITIALIZED,
|
||||
set,
|
||||
Ordering::Relaxed,
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
|
||||
// Check that the mode was either uninitialized or was already set to the requested mode.
|
||||
assert!(previous.is_ok() || previous == Err(set));
|
||||
}
|
||||
}
|
||||
|
||||
pub use mode::{is_dyn_thread_safe, set_dyn_thread_safe_mode};
|
||||
cfg_if! {
|
||||
if #[cfg(not(parallel_compiler))] {
|
||||
pub unsafe auto trait Send {}
|
||||
|
@ -149,7 +186,7 @@ cfg_if! {
|
|||
|
||||
#[macro_export]
|
||||
macro_rules! parallel {
|
||||
($($blocks:tt),*) => {
|
||||
($($blocks:block),*) => {
|
||||
// We catch panics here ensuring that all the blocks execute.
|
||||
// This makes behavior consistent with the parallel compiler.
|
||||
let mut panic = None;
|
||||
|
@ -168,12 +205,6 @@ cfg_if! {
|
|||
}
|
||||
}
|
||||
|
||||
pub use Iterator as ParallelIterator;
|
||||
|
||||
pub fn par_iter<T: IntoIterator>(t: T) -> T::IntoIter {
|
||||
t.into_iter()
|
||||
}
|
||||
|
||||
pub fn par_for_each_in<T: IntoIterator>(t: T, mut for_each: impl FnMut(T::Item) + Sync + Send) {
|
||||
// We catch panics here ensuring that all the loop iterations execute.
|
||||
// This makes behavior consistent with the parallel compiler.
|
||||
|
@ -190,6 +221,29 @@ cfg_if! {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn par_map<T: IntoIterator, R, C: FromIterator<R>>(
|
||||
t: T,
|
||||
mut map: impl FnMut(<<T as IntoIterator>::IntoIter as Iterator>::Item) -> R,
|
||||
) -> C {
|
||||
// We catch panics here ensuring that all the loop iterations execute.
|
||||
let mut panic = None;
|
||||
let r = t.into_iter().filter_map(|i| {
|
||||
match catch_unwind(AssertUnwindSafe(|| map(i))) {
|
||||
Ok(r) => Some(r),
|
||||
Err(p) => {
|
||||
if panic.is_none() {
|
||||
panic = Some(p);
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
}).collect();
|
||||
if let Some(panic) = panic {
|
||||
resume_unwind(panic);
|
||||
}
|
||||
r
|
||||
}
|
||||
|
||||
pub type MetadataRef = OwnedSlice;
|
||||
|
||||
pub use std::rc::Rc as Lrc;
|
||||
|
@ -302,46 +356,165 @@ cfg_if! {
|
|||
use parking_lot::RwLock as InnerRwLock;
|
||||
|
||||
use std::thread;
|
||||
pub use rayon::{join, scope};
|
||||
|
||||
#[inline]
|
||||
pub fn join<A, B, RA: DynSend, RB: DynSend>(oper_a: A, oper_b: B) -> (RA, RB)
|
||||
where
|
||||
A: FnOnce() -> RA + DynSend,
|
||||
B: FnOnce() -> RB + DynSend,
|
||||
{
|
||||
if mode::is_dyn_thread_safe() {
|
||||
let oper_a = FromDyn::from(oper_a);
|
||||
let oper_b = FromDyn::from(oper_b);
|
||||
let (a, b) = rayon::join(move || FromDyn::from(oper_a.into_inner()()), move || FromDyn::from(oper_b.into_inner()()));
|
||||
(a.into_inner(), b.into_inner())
|
||||
} else {
|
||||
(oper_a(), oper_b())
|
||||
}
|
||||
}
|
||||
|
||||
// This function only works when `mode::is_dyn_thread_safe()`.
|
||||
pub fn scope<'scope, OP, R>(op: OP) -> R
|
||||
where
|
||||
OP: FnOnce(&rayon::Scope<'scope>) -> R + DynSend,
|
||||
R: DynSend,
|
||||
{
|
||||
let op = FromDyn::from(op);
|
||||
rayon::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner()
|
||||
}
|
||||
|
||||
/// Runs a list of blocks in parallel. The first block is executed immediately on
|
||||
/// the current thread. Use that for the longest running block.
|
||||
#[macro_export]
|
||||
macro_rules! parallel {
|
||||
(impl $fblock:tt [$($c:tt,)*] [$block:tt $(, $rest:tt)*]) => {
|
||||
(impl $fblock:block [$($c:expr,)*] [$block:expr $(, $rest:expr)*]) => {
|
||||
parallel!(impl $fblock [$block, $($c,)*] [$($rest),*])
|
||||
};
|
||||
(impl $fblock:tt [$($blocks:tt,)*] []) => {
|
||||
(impl $fblock:block [$($blocks:expr,)*] []) => {
|
||||
::rustc_data_structures::sync::scope(|s| {
|
||||
$(let block = rustc_data_structures::sync::FromDyn::from(|| $blocks);
|
||||
s.spawn(move |_| block.into_inner()());)*
|
||||
(|| $fblock)();
|
||||
});
|
||||
};
|
||||
($fblock:block, $($blocks:block),*) => {
|
||||
if rustc_data_structures::sync::is_dyn_thread_safe() {
|
||||
// Reverse the order of the later blocks since Rayon executes them in reverse order
|
||||
// when using a single thread. This ensures the execution order matches that
|
||||
// of a single threaded rustc.
|
||||
parallel!(impl $fblock [] [$($blocks),*]);
|
||||
} else {
|
||||
// We catch panics here ensuring that all the blocks execute.
|
||||
// This makes behavior consistent with the parallel compiler.
|
||||
let mut panic = None;
|
||||
if let Err(p) = ::std::panic::catch_unwind(
|
||||
::std::panic::AssertUnwindSafe(|| $fblock)
|
||||
) {
|
||||
if panic.is_none() {
|
||||
panic = Some(p);
|
||||
}
|
||||
}
|
||||
$(
|
||||
s.spawn(|_| $blocks);
|
||||
if let Err(p) = ::std::panic::catch_unwind(
|
||||
::std::panic::AssertUnwindSafe(|| $blocks)
|
||||
) {
|
||||
if panic.is_none() {
|
||||
panic = Some(p);
|
||||
}
|
||||
}
|
||||
)*
|
||||
$fblock;
|
||||
})
|
||||
};
|
||||
($fblock:tt, $($blocks:tt),*) => {
|
||||
// Reverse the order of the later blocks since Rayon executes them in reverse order
|
||||
// when using a single thread. This ensures the execution order matches that
|
||||
// of a single threaded rustc
|
||||
parallel!(impl $fblock [] [$($blocks),*]);
|
||||
if let Some(panic) = panic {
|
||||
::std::panic::resume_unwind(panic);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
pub use rayon::iter::ParallelIterator;
|
||||
use rayon::iter::IntoParallelIterator;
|
||||
use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelIterator};
|
||||
|
||||
pub fn par_iter<T: IntoParallelIterator>(t: T) -> T::Iter {
|
||||
t.into_par_iter()
|
||||
}
|
||||
|
||||
pub fn par_for_each_in<T: IntoParallelIterator>(
|
||||
pub fn par_for_each_in<I, T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>>(
|
||||
t: T,
|
||||
for_each: impl Fn(T::Item) + Sync + Send,
|
||||
for_each: impl Fn(I) + DynSync + DynSend
|
||||
) {
|
||||
let ps: Vec<_> = t.into_par_iter().map(|i| catch_unwind(AssertUnwindSafe(|| for_each(i)))).collect();
|
||||
ps.into_iter().for_each(|p| if let Err(panic) = p {
|
||||
resume_unwind(panic)
|
||||
});
|
||||
if mode::is_dyn_thread_safe() {
|
||||
let for_each = FromDyn::from(for_each);
|
||||
let panic: Lock<Option<_>> = Lock::new(None);
|
||||
t.into_par_iter().for_each(|i| if let Err(p) = catch_unwind(AssertUnwindSafe(|| for_each(i))) {
|
||||
let mut l = panic.lock();
|
||||
if l.is_none() {
|
||||
*l = Some(p)
|
||||
}
|
||||
});
|
||||
|
||||
if let Some(panic) = panic.into_inner() {
|
||||
resume_unwind(panic);
|
||||
}
|
||||
} else {
|
||||
// We catch panics here ensuring that all the loop iterations execute.
|
||||
// This makes behavior consistent with the parallel compiler.
|
||||
let mut panic = None;
|
||||
t.into_iter().for_each(|i| {
|
||||
if let Err(p) = catch_unwind(AssertUnwindSafe(|| for_each(i))) {
|
||||
if panic.is_none() {
|
||||
panic = Some(p);
|
||||
}
|
||||
}
|
||||
});
|
||||
if let Some(panic) = panic {
|
||||
resume_unwind(panic);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn par_map<
|
||||
I,
|
||||
T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>,
|
||||
R: std::marker::Send,
|
||||
C: FromIterator<R> + FromParallelIterator<R>
|
||||
>(
|
||||
t: T,
|
||||
map: impl Fn(I) -> R + DynSync + DynSend
|
||||
) -> C {
|
||||
if mode::is_dyn_thread_safe() {
|
||||
let panic: Lock<Option<_>> = Lock::new(None);
|
||||
let map = FromDyn::from(map);
|
||||
// We catch panics here ensuring that all the loop iterations execute.
|
||||
let r = t.into_par_iter().filter_map(|i| {
|
||||
match catch_unwind(AssertUnwindSafe(|| map(i))) {
|
||||
Ok(r) => Some(r),
|
||||
Err(p) => {
|
||||
let mut l = panic.lock();
|
||||
if l.is_none() {
|
||||
*l = Some(p);
|
||||
}
|
||||
None
|
||||
},
|
||||
}
|
||||
}).collect();
|
||||
|
||||
if let Some(panic) = panic.into_inner() {
|
||||
resume_unwind(panic);
|
||||
}
|
||||
r
|
||||
} else {
|
||||
// We catch panics here ensuring that all the loop iterations execute.
|
||||
let mut panic = None;
|
||||
let r = t.into_iter().filter_map(|i| {
|
||||
match catch_unwind(AssertUnwindSafe(|| map(i))) {
|
||||
Ok(r) => Some(r),
|
||||
Err(p) => {
|
||||
if panic.is_none() {
|
||||
panic = Some(p);
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
}).collect();
|
||||
if let Some(panic) = panic {
|
||||
resume_unwind(panic);
|
||||
}
|
||||
r
|
||||
}
|
||||
}
|
||||
|
||||
pub type MetadataRef = OwnedSlice;
|
||||
|
@ -352,11 +525,6 @@ cfg_if! {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn assert_sync<T: ?Sized + Sync>() {}
|
||||
pub fn assert_send<T: ?Sized + Send>() {}
|
||||
pub fn assert_send_val<T: ?Sized + Send>(_t: &T) {}
|
||||
pub fn assert_send_sync_val<T: ?Sized + Sync + Send>(_t: &T) {}
|
||||
|
||||
#[derive(Default)]
|
||||
#[cfg_attr(parallel_compiler, repr(align(64)))]
|
||||
pub struct CacheAligned<T>(pub T);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue