Skip to content

Commit 25b7eff

Browse files
committed
Add Parallel::with_min_len to limit splitting
Splitting of parallel iterators working `ArrayView` and `Zip` is currently not limited and continues until only one element is left which can lead to excessive overhead for algorithms formulated in terms of these iterators. Since these iterators are also not indexed, Rayon's generic `IndexedParallelIterator::with_min_len` does not apply. However, since the number of elements is known and currently checked against to one to determine if another split is possible, it appears straight-forward to replace this constant by a parameter and make it available to the user via a `Parallel::with_min_len` inherent method.
1 parent ee5880b commit 25b7eff

File tree

1 file changed

+25
-9
lines changed

1 file changed

+25
-9
lines changed

src/parallel/par.rs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,24 @@ use crate::split_at::SplitPreference;
2121
#[derive(Copy, Clone, Debug)]
2222
pub struct Parallel<I> {
2323
iter: I,
24+
min_len: usize,
2425
}
2526

27+
impl<I> Parallel<I> {
28+
/// Sets the minimum number of elements desired to process in each job. This will not be split any smaller than this length, but of course a producer could already be smaller to begin with.
29+
pub fn with_min_len(self, min_len: usize) -> Self {
30+
Self {
31+
min_len,
32+
..self
33+
}
34+
}
35+
}
36+
37+
const DEFAULT_MIN_LEN: usize = 1;
38+
2639
/// Parallel producer wrapper.
2740
#[derive(Copy, Clone, Debug)]
28-
struct ParallelProducer<I>(I);
41+
struct ParallelProducer<I>(I, usize);
2942

3043
macro_rules! par_iter_wrapper {
3144
// thread_bounds are either Sync or Send + Sync
@@ -40,6 +53,7 @@ macro_rules! par_iter_wrapper {
4053
fn into_par_iter(self) -> Self::Iter {
4154
Parallel {
4255
iter: self,
56+
min_len: DEFAULT_MIN_LEN,
4357
}
4458
}
4559
}
@@ -67,7 +81,7 @@ macro_rules! par_iter_wrapper {
6781
fn with_producer<Cb>(self, callback: Cb) -> Cb::Output
6882
where Cb: ProducerCallback<Self::Item>
6983
{
70-
callback.callback(ParallelProducer(self.iter))
84+
callback.callback(ParallelProducer(self.iter, self.min_len))
7185
}
7286

7387
fn len(&self) -> usize {
@@ -106,7 +120,7 @@ macro_rules! par_iter_wrapper {
106120

107121
fn split_at(self, i: usize) -> (Self, Self) {
108122
let (a, b) = self.0.split_at(i);
109-
(ParallelProducer(a), ParallelProducer(b))
123+
(ParallelProducer(a, self.1), ParallelProducer(b, self.1))
110124
}
111125
}
112126

@@ -131,6 +145,7 @@ macro_rules! par_iter_view_wrapper {
131145
fn into_par_iter(self) -> Self::Iter {
132146
Parallel {
133147
iter: self,
148+
min_len: DEFAULT_MIN_LEN,
134149
}
135150
}
136151
}
@@ -144,7 +159,7 @@ macro_rules! par_iter_view_wrapper {
144159
fn drive_unindexed<C>(self, consumer: C) -> C::Result
145160
where C: UnindexedConsumer<Self::Item>
146161
{
147-
bridge_unindexed(ParallelProducer(self.iter), consumer)
162+
bridge_unindexed(ParallelProducer(self.iter, self.min_len), consumer)
148163
}
149164

150165
fn opt_len(&self) -> Option<usize> {
@@ -158,14 +173,14 @@ macro_rules! par_iter_view_wrapper {
158173
{
159174
type Item = <$view_name<'a, A, D> as IntoIterator>::Item;
160175
fn split(self) -> (Self, Option<Self>) {
161-
if self.0.len() <= 1 {
176+
if self.0.len() <= self.1 {
162177
return (self, None)
163178
}
164179
let array = self.0;
165180
let max_axis = array.max_stride_axis();
166181
let mid = array.len_of(max_axis) / 2;
167182
let (a, b) = array.split_at(max_axis, mid);
168-
(ParallelProducer(a), Some(ParallelProducer(b)))
183+
(ParallelProducer(a, self.1), Some(ParallelProducer(b, self.1)))
169184
}
170185

171186
fn fold_with<F>(self, folder: F) -> F
@@ -217,6 +232,7 @@ macro_rules! zip_impl {
217232
fn into_par_iter(self) -> Self::Iter {
218233
Parallel {
219234
iter: self,
235+
min_len: DEFAULT_MIN_LEN,
220236
}
221237
}
222238
}
@@ -233,7 +249,7 @@ macro_rules! zip_impl {
233249
fn drive_unindexed<Cons>(self, consumer: Cons) -> Cons::Result
234250
where Cons: UnindexedConsumer<Self::Item>
235251
{
236-
bridge_unindexed(ParallelProducer(self.iter), consumer)
252+
bridge_unindexed(ParallelProducer(self.iter, self.min_len), consumer)
237253
}
238254

239255
fn opt_len(&self) -> Option<usize> {
@@ -251,11 +267,11 @@ macro_rules! zip_impl {
251267
type Item = ($($p::Item ,)*);
252268

253269
fn split(self) -> (Self, Option<Self>) {
254-
if !self.0.can_split() {
270+
if self.0.size() <= self.1 {
255271
return (self, None)
256272
}
257273
let (a, b) = self.0.split();
258-
(ParallelProducer(a), Some(ParallelProducer(b)))
274+
(ParallelProducer(a, self.1), Some(ParallelProducer(b, self.1)))
259275
}
260276

261277
fn fold_with<Fold>(self, folder: Fold) -> Fold

0 commit comments

Comments
 (0)