introduce DynSend
and DynSync
auto trait
This commit is contained in:
parent
963e5c0eff
commit
b9746ce039
26 changed files with 558 additions and 115 deletions
|
@ -39,6 +39,7 @@
|
|||
//!
|
||||
//! [^2] `MTLockRef` is a typedef.
|
||||
|
||||
pub use crate::marker::*;
|
||||
use crate::owned_slice::OwnedSlice;
|
||||
use std::collections::HashMap;
|
||||
use std::hash::{BuildHasher, Hash};
|
||||
|
@ -55,6 +56,37 @@ pub use vec::{AppendOnlyIndexVec, AppendOnlyVec};
|
|||
|
||||
mod vec;
|
||||
|
||||
mod mode {
|
||||
use super::Ordering;
|
||||
use std::sync::atomic::AtomicU8;
|
||||
|
||||
const UNINITIALIZED: u8 = 0;
|
||||
const INACTIVE: u8 = 1;
|
||||
const ACTIVE: u8 = 2;
|
||||
|
||||
static MODE: AtomicU8 = AtomicU8::new(UNINITIALIZED);
|
||||
|
||||
#[inline]
|
||||
pub fn active() -> bool {
|
||||
match MODE.load(Ordering::Relaxed) {
|
||||
INACTIVE => false,
|
||||
ACTIVE => true,
|
||||
_ => panic!("uninitialized parallel mode!"),
|
||||
}
|
||||
}
|
||||
|
||||
// Only set by the `-Z threads` compile option
|
||||
pub fn set(parallel: bool) {
|
||||
let set: u8 = if parallel { ACTIVE } else { INACTIVE };
|
||||
let previous =
|
||||
MODE.compare_exchange(UNINITIALIZED, set, Ordering::Relaxed, Ordering::Relaxed);
|
||||
|
||||
// Check that the mode was either uninitialized or was already set to the requested mode.
|
||||
assert!(previous.is_ok() || previous == Err(set));
|
||||
}
|
||||
}
|
||||
|
||||
pub use mode::{active, set};
|
||||
cfg_if! {
|
||||
if #[cfg(not(parallel_compiler))] {
|
||||
pub unsafe auto trait Send {}
|
||||
|
@ -149,7 +181,7 @@ cfg_if! {
|
|||
|
||||
#[macro_export]
|
||||
macro_rules! parallel {
|
||||
($($blocks:tt),*) => {
|
||||
($($blocks:block),*) => {{
|
||||
// We catch panics here ensuring that all the blocks execute.
|
||||
// This makes behavior consistent with the parallel compiler.
|
||||
let mut panic = None;
|
||||
|
@ -165,13 +197,7 @@ cfg_if! {
|
|||
if let Some(panic) = panic {
|
||||
::std::panic::resume_unwind(panic);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub use Iterator as ParallelIterator;
|
||||
|
||||
pub fn par_iter<T: IntoIterator>(t: T) -> T::IntoIter {
|
||||
t.into_iter()
|
||||
}}
|
||||
}
|
||||
|
||||
pub fn par_for_each_in<T: IntoIterator>(t: T, mut for_each: impl FnMut(T::Item) + Sync + Send) {
|
||||
|
@ -190,6 +216,29 @@ cfg_if! {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn par_map<T: IntoIterator, R, C: FromIterator<R>>(
|
||||
t: T,
|
||||
mut map: impl FnMut(<<T as IntoIterator>::IntoIter as Iterator>::Item) -> R,
|
||||
) -> C {
|
||||
// We catch panics here ensuring that all the loop iterations execute.
|
||||
let mut panic = None;
|
||||
let r = t.into_iter().filter_map(|i| {
|
||||
match catch_unwind(AssertUnwindSafe(|| map(i))) {
|
||||
Ok(r) => Some(r),
|
||||
Err(p) => {
|
||||
if panic.is_none() {
|
||||
panic = Some(p);
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
}).collect();
|
||||
if let Some(panic) = panic {
|
||||
resume_unwind(panic);
|
||||
}
|
||||
r
|
||||
}
|
||||
|
||||
pub type MetadataRef = OwnedSlice;
|
||||
|
||||
pub use std::rc::Rc as Lrc;
|
||||
|
@ -302,46 +351,159 @@ cfg_if! {
|
|||
use parking_lot::RwLock as InnerRwLock;
|
||||
|
||||
use std::thread;
|
||||
pub use rayon::{join, scope};
|
||||
|
||||
#[inline]
|
||||
pub fn join<A, B, RA: DynSend, RB: DynSend>(oper_a: A, oper_b: B) -> (RA, RB)
|
||||
where
|
||||
A: FnOnce() -> RA + DynSend,
|
||||
B: FnOnce() -> RB + DynSend,
|
||||
{
|
||||
if mode::active() {
|
||||
let oper_a = FromDyn::from(oper_a);
|
||||
let oper_b = FromDyn::from(oper_b);
|
||||
let (a, b) = rayon::join(move || FromDyn::from(oper_a.into_inner()()), move || FromDyn::from(oper_b.into_inner()()));
|
||||
(a.into_inner(), b.into_inner())
|
||||
} else {
|
||||
(oper_a(), oper_b())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn scope<'scope, OP, R>(op: OP) -> R
|
||||
where
|
||||
OP: FnOnce(&rayon::Scope<'scope>) -> R + DynSend,
|
||||
R: DynSend,
|
||||
{
|
||||
let op = FromDyn::from(op);
|
||||
rayon::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner()
|
||||
}
|
||||
|
||||
/// Runs a list of blocks in parallel. The first block is executed immediately on
|
||||
/// the current thread. Use that for the longest running block.
|
||||
#[macro_export]
|
||||
macro_rules! parallel {
|
||||
(impl $fblock:tt [$($c:tt,)*] [$block:tt $(, $rest:tt)*]) => {
|
||||
parallel!(impl $fblock [$block, $($c,)*] [$($rest),*])
|
||||
($fblock:block [$($c:expr,)*] [$block:expr $(, $rest:expr)*]) => {
|
||||
parallel!($fblock [$block, $($c,)*] [$($rest),*])
|
||||
};
|
||||
(impl $fblock:tt [$($blocks:tt,)*] []) => {
|
||||
::rustc_data_structures::sync::scope(|s| {
|
||||
($fblock:block [$($blocks:expr,)*] []) => {
|
||||
{
|
||||
::rustc_data_structures::sync::scope(|s| {
|
||||
$(let block = rustc_data_structures::sync::FromDyn::from(|| $blocks);
|
||||
s.spawn(move |_| block.into_inner()());)*
|
||||
(|| $fblock)();
|
||||
});
|
||||
}
|
||||
};
|
||||
($fblock:block, $($blocks:block),*) => {
|
||||
if rustc_data_structures::sync::active() {
|
||||
// Reverse the order of the later blocks since Rayon executes them in reverse order
|
||||
// when using a single thread. This ensures the execution order matches that
|
||||
// of a single threaded rustc
|
||||
parallel!($fblock [] [$($blocks),*]);
|
||||
} else {
|
||||
// We catch panics here ensuring that all the blocks execute.
|
||||
// This makes behavior consistent with the parallel compiler.
|
||||
let mut panic = None;
|
||||
$(
|
||||
s.spawn(|_| $blocks);
|
||||
if let Err(p) = ::std::panic::catch_unwind(
|
||||
::std::panic::AssertUnwindSafe(|| $blocks)
|
||||
) {
|
||||
if panic.is_none() {
|
||||
panic = Some(p);
|
||||
}
|
||||
}
|
||||
)*
|
||||
$fblock;
|
||||
})
|
||||
};
|
||||
($fblock:tt, $($blocks:tt),*) => {
|
||||
// Reverse the order of the later blocks since Rayon executes them in reverse order
|
||||
// when using a single thread. This ensures the execution order matches that
|
||||
// of a single threaded rustc
|
||||
parallel!(impl $fblock [] [$($blocks),*]);
|
||||
if let Some(panic) = panic {
|
||||
::std::panic::resume_unwind(panic);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
pub use rayon::iter::ParallelIterator;
|
||||
use rayon::iter::IntoParallelIterator;
|
||||
use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelIterator};
|
||||
|
||||
pub fn par_iter<T: IntoParallelIterator>(t: T) -> T::Iter {
|
||||
t.into_par_iter()
|
||||
}
|
||||
|
||||
pub fn par_for_each_in<T: IntoParallelIterator>(
|
||||
pub fn par_for_each_in<I, T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>>(
|
||||
t: T,
|
||||
for_each: impl Fn(T::Item) + Sync + Send,
|
||||
for_each: impl Fn(I) + DynSync + DynSend
|
||||
) {
|
||||
let ps: Vec<_> = t.into_par_iter().map(|i| catch_unwind(AssertUnwindSafe(|| for_each(i)))).collect();
|
||||
ps.into_iter().for_each(|p| if let Err(panic) = p {
|
||||
resume_unwind(panic)
|
||||
});
|
||||
if mode::active() {
|
||||
let for_each = FromDyn::from(for_each);
|
||||
let panic: Lock<Option<_>> = Lock::new(None);
|
||||
t.into_par_iter().for_each(|i| if let Err(p) = catch_unwind(AssertUnwindSafe(|| for_each(i))) {
|
||||
let mut l = panic.lock();
|
||||
if l.is_none() {
|
||||
*l = Some(p)
|
||||
}
|
||||
});
|
||||
|
||||
if let Some(panic) = panic.into_inner() {
|
||||
resume_unwind(panic);
|
||||
}
|
||||
} else {
|
||||
// We catch panics here ensuring that all the loop iterations execute.
|
||||
// This makes behavior consistent with the parallel compiler.
|
||||
let mut panic = None;
|
||||
t.into_iter().for_each(|i| {
|
||||
if let Err(p) = catch_unwind(AssertUnwindSafe(|| for_each(i))) {
|
||||
if panic.is_none() {
|
||||
panic = Some(p);
|
||||
}
|
||||
}
|
||||
});
|
||||
if let Some(panic) = panic {
|
||||
resume_unwind(panic);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn par_map<
|
||||
I,
|
||||
T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>,
|
||||
R: std::marker::Send,
|
||||
C: FromIterator<R> + FromParallelIterator<R>
|
||||
>(
|
||||
t: T,
|
||||
map: impl Fn(I) -> R + DynSync + DynSend
|
||||
) -> C {
|
||||
if mode::active() {
|
||||
let panic: Lock<Option<_>> = Lock::new(None);
|
||||
let map = FromDyn::from(map);
|
||||
// We catch panics here ensuring that all the loop iterations execute.
|
||||
let r = t.into_par_iter().filter_map(|i| {
|
||||
match catch_unwind(AssertUnwindSafe(|| map(i))) {
|
||||
Ok(r) => Some(r),
|
||||
Err(p) => {
|
||||
let mut l = panic.lock();
|
||||
if l.is_none() {
|
||||
*l = Some(p);
|
||||
}
|
||||
None
|
||||
},
|
||||
}
|
||||
}).collect();
|
||||
|
||||
if let Some(panic) = panic.into_inner() {
|
||||
resume_unwind(panic);
|
||||
}
|
||||
r
|
||||
} else {
|
||||
// We catch panics here ensuring that all the loop iterations execute.
|
||||
let mut panic = None;
|
||||
let r = t.into_iter().filter_map(|i| {
|
||||
match catch_unwind(AssertUnwindSafe(|| map(i))) {
|
||||
Ok(r) => Some(r),
|
||||
Err(p) => {
|
||||
if panic.is_none() {
|
||||
panic = Some(p);
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
}).collect();
|
||||
if let Some(panic) = panic {
|
||||
resume_unwind(panic);
|
||||
}
|
||||
r
|
||||
}
|
||||
}
|
||||
|
||||
pub type MetadataRef = OwnedSlice;
|
||||
|
@ -352,11 +514,6 @@ cfg_if! {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn assert_sync<T: ?Sized + Sync>() {}
|
||||
pub fn assert_send<T: ?Sized + Send>() {}
|
||||
pub fn assert_send_val<T: ?Sized + Send>(_t: &T) {}
|
||||
pub fn assert_send_sync_val<T: ?Sized + Sync + Send>(_t: &T) {}
|
||||
|
||||
#[derive(Default)]
|
||||
#[cfg_attr(parallel_compiler, repr(align(64)))]
|
||||
pub struct CacheAligned<T>(pub T);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue