Auto merge of #139011 - Zoxc:no-rayon-iters, r=oli-obk
Remove the use of Rayon iterators This removes the use of Rayon iterators and the use of the `rustc-rayon` crate. `rustc-rayon-core` is still used however. In parallel loops, instead of a Rayon iterator a serial iterator are used to collect items into a `Vec` and we use a parallel loop over its elements using the new `par_slice` function which is built on `rustc-rayon-core`'s `join`. This change makes it easier to bring `rustc-rayon-core` in-tree. Tests using 7 threads: <table><tr><td rowspan="2">Benchmark</td><td colspan="1"><b>Before</b></th><td colspan="2"><b>After</b></th><td colspan="1"><b>Before</b></th><td colspan="2"><b>After</b></th><td colspan="1"><b>Before</b></th><td colspan="2"><b>After</b></th></tr><tr><td align="right">Time</td><td align="right">Time</td><td align="right">%</th><td align="right">Physical Memory</td><td align="right">Physical Memory</td><td align="right">%</th><td align="right">Committed Memory</td><td align="right">Committed Memory</td><td align="right">%</th></tr><tr><td>🟣 <b>clap</b>:check</td><td align="right">0.4827s</td><td align="right">0.4828s</td><td align="right"> 0.02%</td><td align="right">201.23 MiB</td><td align="right">201.31 MiB</td><td align="right"> 0.04%</td><td align="right">279.03 MiB</td><td align="right">279.46 MiB</td><td align="right"> 0.15%</td></tr><tr><td>🟣 <b>hyper</b>:check</td><td align="right">0.1443s</td><td align="right">0.1401s</td><td align="right">💚 -2.91%</td><td align="right">126.42 MiB</td><td align="right">126.70 MiB</td><td align="right"> 0.22%</td><td align="right">199.79 MiB</td><td align="right">199.99 MiB</td><td align="right"> 0.10%</td></tr><tr><td>🟣 <b>regex</b>:check</td><td align="right">0.3252s</td><td align="right">0.3065s</td><td align="right">💚 -5.78%</td><td align="right">161.87 MiB</td><td align="right">161.78 MiB</td><td align="right"> -0.05%</td><td align="right">229.59 MiB</td><td align="right">230.23 MiB</td><td align="right"> 0.28%</td></tr><tr><td>🟣 <b>syn</b>:check</td><td align="right">0.5845s</td><td align="right">0.5876s</td><td align="right"> 0.53%</td><td align="right">197.01 MiB</td><td align="right">196.89 MiB</td><td align="right"> -0.06%</td><td align="right">267.62 MiB</td><td align="right">267.47 MiB</td><td align="right"> -0.06%</td></tr><tr><td>Total</td><td align="right">1.5367s</td><td align="right">1.5169s</td><td align="right">💚 -1.29%</td><td align="right">686.53 MiB</td><td align="right">686.68 MiB</td><td align="right"> 0.02%</td><td align="right">976.04 MiB</td><td align="right">977.14 MiB</td><td align="right"> 0.11%</td></tr><tr><td>Summary</td><td align="right">1.0000s</td><td align="right">0.9796s</td><td align="right">💚 -2.04%</td><td align="right">1 byte</td><td align="right">1.00 bytes</td><td align="right"> 0.04%</td><td align="right">1 byte</td><td align="right">1.00 bytes</td><td align="right"> 0.12%</td></tr></table> <table><tr><td rowspan="2">Benchmark</td><td colspan="1"><b>Before</b></th><td colspan="2"><b>After</b></th><td colspan="1"><b>Before</b></th><td colspan="2"><b>After</b></th><td colspan="1"><b>Before</b></th><td colspan="2"><b>After</b></th></tr><tr><td align="right">Time</td><td align="right">Time</td><td align="right">%</th><td align="right">Physical Memory</td><td align="right">Physical Memory</td><td align="right">%</th><td align="right">Committed Memory</td><td align="right">Committed Memory</td><td align="right">%</th></tr><tr><td>🟠 <b>clap</b>:debug</td><td align="right">1.6371s</td><td align="right">1.6529s</td><td align="right"> 0.96%</td><td align="right">395.58 MiB</td><td align="right">396.21 MiB</td><td align="right"> 0.16%</td><td align="right">460.98 MiB</td><td align="right">461.52 MiB</td><td align="right"> 0.12%</td></tr><tr><td>🟠 <b>hyper</b>:debug</td><td align="right">0.3248s</td><td align="right">0.3210s</td><td align="right">💚 -1.16%</td><td align="right">155.16 MiB</td><td align="right">155.19 MiB</td><td align="right"> 0.02%</td><td align="right">219.21 MiB</td><td align="right">219.30 MiB</td><td align="right"> 0.04%</td></tr><tr><td>🟠 <b>regex</b>:debug</td><td align="right">1.0148s</td><td align="right">0.9929s</td><td align="right">💚 -2.16%</td><td align="right">297.96 MiB</td><td align="right">295.07 MiB</td><td align="right"> -0.97%</td><td align="right">354.53 MiB</td><td align="right">351.58 MiB</td><td align="right"> -0.83%</td></tr><tr><td>🟠 <b>syn</b>:debug</td><td align="right">1.3614s</td><td align="right">1.3717s</td><td align="right"> 0.76%</td><td align="right">319.10 MiB</td><td align="right">321.19 MiB</td><td align="right"> 0.65%</td><td align="right">378.90 MiB</td><td align="right">381.27 MiB</td><td align="right"> 0.62%</td></tr><tr><td>Total</td><td align="right">4.3381s</td><td align="right">4.3386s</td><td align="right"> 0.01%</td><td align="right">1.14 GiB</td><td align="right">1.14 GiB</td><td align="right"> -0.01%</td><td align="right">1.38 GiB</td><td align="right">1.38 GiB</td><td align="right"> 0.00%</td></tr><tr><td>Summary</td><td align="right">1.0000s</td><td align="right">0.9960s</td><td align="right"> -0.40%</td><td align="right">1 byte</td><td align="right">1.00 bytes</td><td align="right"> -0.03%</td><td align="right">1 byte</td><td align="right">1.00 bytes</td><td align="right"> -0.01%</td></tr></table>
This commit is contained in:
commit
81d8c747fb
14 changed files with 122 additions and 74 deletions
|
@ -179,6 +179,12 @@ impl<T> FromDyn<T> {
|
|||
FromDyn(val)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn derive<O>(&self, val: O) -> FromDyn<O> {
|
||||
// We already did the check for `sync::is_dyn_thread_safe()` when creating `Self`
|
||||
FromDyn(val)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn into_inner(self) -> T {
|
||||
self.0
|
||||
|
@ -200,6 +206,13 @@ impl<T> std::ops::Deref for FromDyn<T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> std::ops::DerefMut for FromDyn<T> {
|
||||
#[inline(always)]
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
// A wrapper to convert a struct that is already a `Send` or `Sync` into
|
||||
// an instance of `DynSend` and `DynSync`, since the compiler cannot infer
|
||||
// it automatically in some cases. (e.g. Box<dyn Send / Sync>)
|
||||
|
|
|
@ -7,7 +7,6 @@ use std::any::Any;
|
|||
use std::panic::{AssertUnwindSafe, catch_unwind, resume_unwind};
|
||||
|
||||
use parking_lot::Mutex;
|
||||
use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelIterator};
|
||||
|
||||
use crate::FatalErrorMarker;
|
||||
use crate::sync::{DynSend, DynSync, FromDyn, IntoDynSyncSend, mode};
|
||||
|
@ -97,11 +96,11 @@ macro_rules! parallel {
|
|||
// This function only works when `mode::is_dyn_thread_safe()`.
|
||||
pub fn scope<'scope, OP, R>(op: OP) -> R
|
||||
where
|
||||
OP: FnOnce(&rayon::Scope<'scope>) -> R + DynSend,
|
||||
OP: FnOnce(&rayon_core::Scope<'scope>) -> R + DynSend,
|
||||
R: DynSend,
|
||||
{
|
||||
let op = FromDyn::from(op);
|
||||
rayon::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner()
|
||||
rayon_core::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
@ -114,7 +113,7 @@ where
|
|||
let oper_a = FromDyn::from(oper_a);
|
||||
let oper_b = FromDyn::from(oper_b);
|
||||
let (a, b) = parallel_guard(|guard| {
|
||||
rayon::join(
|
||||
rayon_core::join(
|
||||
move || guard.run(move || FromDyn::from(oper_a.into_inner()())),
|
||||
move || guard.run(move || FromDyn::from(oper_b.into_inner()())),
|
||||
)
|
||||
|
@ -125,56 +124,103 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
pub fn par_for_each_in<I, T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>>(
|
||||
fn par_slice<I: DynSend>(
|
||||
items: &mut [I],
|
||||
guard: &ParallelGuard,
|
||||
for_each: impl Fn(&mut I) + DynSync + DynSend,
|
||||
) {
|
||||
struct State<'a, F> {
|
||||
for_each: FromDyn<F>,
|
||||
guard: &'a ParallelGuard,
|
||||
group: usize,
|
||||
}
|
||||
|
||||
fn par_rec<I: DynSend, F: Fn(&mut I) + DynSync + DynSend>(
|
||||
items: &mut [I],
|
||||
state: &State<'_, F>,
|
||||
) {
|
||||
if items.len() <= state.group {
|
||||
for item in items {
|
||||
state.guard.run(|| (state.for_each)(item));
|
||||
}
|
||||
} else {
|
||||
let (left, right) = items.split_at_mut(items.len() / 2);
|
||||
let mut left = state.for_each.derive(left);
|
||||
let mut right = state.for_each.derive(right);
|
||||
rayon_core::join(move || par_rec(*left, state), move || par_rec(*right, state));
|
||||
}
|
||||
}
|
||||
|
||||
let state = State {
|
||||
for_each: FromDyn::from(for_each),
|
||||
guard,
|
||||
group: std::cmp::max(items.len() / 128, 1),
|
||||
};
|
||||
par_rec(items, &state)
|
||||
}
|
||||
|
||||
pub fn par_for_each_in<I: DynSend, T: IntoIterator<Item = I>>(
|
||||
t: T,
|
||||
for_each: impl Fn(I) + DynSync + DynSend,
|
||||
for_each: impl Fn(&I) + DynSync + DynSend,
|
||||
) {
|
||||
parallel_guard(|guard| {
|
||||
if mode::is_dyn_thread_safe() {
|
||||
let for_each = FromDyn::from(for_each);
|
||||
t.into_par_iter().for_each(|i| {
|
||||
guard.run(|| for_each(i));
|
||||
});
|
||||
let mut items: Vec<_> = t.into_iter().collect();
|
||||
par_slice(&mut items, guard, |i| for_each(&*i))
|
||||
} else {
|
||||
t.into_iter().for_each(|i| {
|
||||
guard.run(|| for_each(i));
|
||||
guard.run(|| for_each(&i));
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub fn try_par_for_each_in<
|
||||
T: IntoIterator + IntoParallelIterator<Item = <T as IntoIterator>::Item>,
|
||||
E: Send,
|
||||
>(
|
||||
/// This runs `for_each` in parallel for each iterator item. If one or more of the
|
||||
/// `for_each` calls returns `Err`, the function will also return `Err`. The error returned
|
||||
/// will be non-deterministic, but this is expected to be used with `ErrorGuaranteed` which
|
||||
/// are all equivalent.
|
||||
pub fn try_par_for_each_in<T: IntoIterator, E: DynSend>(
|
||||
t: T,
|
||||
for_each: impl Fn(<T as IntoIterator>::Item) -> Result<(), E> + DynSync + DynSend,
|
||||
) -> Result<(), E> {
|
||||
for_each: impl Fn(&<T as IntoIterator>::Item) -> Result<(), E> + DynSync + DynSend,
|
||||
) -> Result<(), E>
|
||||
where
|
||||
<T as IntoIterator>::Item: DynSend,
|
||||
{
|
||||
parallel_guard(|guard| {
|
||||
if mode::is_dyn_thread_safe() {
|
||||
let for_each = FromDyn::from(for_each);
|
||||
t.into_par_iter()
|
||||
.filter_map(|i| guard.run(|| for_each(i)))
|
||||
.reduce(|| Ok(()), Result::and)
|
||||
let mut items: Vec<_> = t.into_iter().collect();
|
||||
|
||||
let error = Mutex::new(None);
|
||||
|
||||
par_slice(&mut items, guard, |i| {
|
||||
if let Err(err) = for_each(&*i) {
|
||||
*error.lock() = Some(err);
|
||||
}
|
||||
});
|
||||
|
||||
if let Some(err) = error.into_inner() { Err(err) } else { Ok(()) }
|
||||
} else {
|
||||
t.into_iter().filter_map(|i| guard.run(|| for_each(i))).fold(Ok(()), Result::and)
|
||||
t.into_iter().filter_map(|i| guard.run(|| for_each(&i))).fold(Ok(()), Result::and)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn par_map<
|
||||
I,
|
||||
T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>,
|
||||
R: std::marker::Send,
|
||||
C: FromIterator<R> + FromParallelIterator<R>,
|
||||
>(
|
||||
pub fn par_map<I: DynSend, T: IntoIterator<Item = I>, R: DynSend, C: FromIterator<R>>(
|
||||
t: T,
|
||||
map: impl Fn(I) -> R + DynSync + DynSend,
|
||||
) -> C {
|
||||
parallel_guard(|guard| {
|
||||
if mode::is_dyn_thread_safe() {
|
||||
let map = FromDyn::from(map);
|
||||
t.into_par_iter().filter_map(|i| guard.run(|| map(i))).collect()
|
||||
|
||||
let mut items: Vec<(Option<I>, Option<R>)> =
|
||||
t.into_iter().map(|i| (Some(i), None)).collect();
|
||||
|
||||
par_slice(&mut items, guard, |i| {
|
||||
i.1 = Some(map(i.0.take().unwrap()));
|
||||
});
|
||||
|
||||
items.into_iter().filter_map(|i| i.1).collect()
|
||||
} else {
|
||||
t.into_iter().filter_map(|i| guard.run(|| map(i))).collect()
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue