Skip to content

Commit 93d96bd

Browse files
committed
Break out try_reduce_with to avoid unwanted constraints
Even though we already know `Self::Item: Send`, this doesn't apply constraints to its `Try::Ok` and `Error`. Therefore, if we map to `Result<Option<Ok>, Error>` at a high level, we have to require extra `Send` bounds to make sure that map is `Send`, like: <Self::Item as Try>::Ok: Send, <Self::Item as Try>::Error: Send We can avoid this by implementing a custom `Consumer`, where we only hold it as a `Result` in the low level `Folder` regardless of `Send`.
1 parent a4aef05 commit 93d96bd

File tree

2 files changed

+132
-18
lines changed

2 files changed

+132
-18
lines changed

src/iter/mod.rs

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ mod try_fold;
111111
pub use self::try_fold::{TryFold, TryFoldWith};
112112
mod reduce;
113113
mod try_reduce;
114+
mod try_reduce_with;
114115
mod skip;
115116
pub use self::skip::Skip;
116117
mod splitter;
@@ -711,25 +712,8 @@ pub trait ParallelIterator: Sized + Send {
711712
fn try_reduce_with<T, OP>(self, op: OP) -> Option<Self::Item>
712713
where OP: Fn(T, T) -> Self::Item + Sync + Send,
713714
Self::Item: Try<Ok = T>,
714-
<Self::Item as Try>::Ok: Send,
715-
<Self::Item as Try>::Error: Send
716715
{
717-
// Map into `Result<Option<Ok>, Error>`, then reduce it.
718-
let result = self.map(|try_b| try_b.into_result().map(Some)).try_reduce(
719-
|| None,
720-
|opt_a, opt_b| match (opt_a, opt_b) {
721-
(Some(a), Some(b)) => op(a, b).into_result().map(Some),
722-
(Some(v), None) | (None, Some(v)) => Ok(Some(v)),
723-
(None, None) => Ok(None),
724-
},
725-
);
726-
727-
// Map `Result<Option<Ok>, Error>` back to `Option<Self::Item>`.
728-
match result {
729-
Ok(None) => None,
730-
Ok(Some(v)) => Some(Self::Item::from_ok(v)),
731-
Err(e) => Some(Self::Item::from_error(e)),
732-
}
716+
try_reduce_with::try_reduce_with(self, op)
733717
}
734718

735719
/// Parallel fold is similar to sequential fold except that the

src/iter/try_reduce_with.rs

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
use super::ParallelIterator;
2+
use super::plumbing::*;
3+
4+
use super::private::Try;
5+
use std::sync::atomic::{AtomicBool, Ordering};
6+
7+
pub fn try_reduce_with<PI, R, T>(pi: PI, reduce_op: R) -> Option<T>
8+
where PI: ParallelIterator<Item = T>,
9+
R: Fn(T::Ok, T::Ok) -> T + Sync,
10+
T: Try + Send
11+
{
12+
let full = AtomicBool::new(false);
13+
let consumer = TryReduceWithConsumer {
14+
reduce_op: &reduce_op,
15+
full: &full,
16+
};
17+
pi.drive_unindexed(consumer)
18+
}
19+
20+
struct TryReduceWithConsumer<'r, R: 'r> {
21+
reduce_op: &'r R,
22+
full: &'r AtomicBool,
23+
}
24+
25+
impl<'r, R> Copy for TryReduceWithConsumer<'r, R> {}
26+
27+
impl<'r, R> Clone for TryReduceWithConsumer<'r, R> {
28+
fn clone(&self) -> Self {
29+
*self
30+
}
31+
}
32+
33+
impl<'r, R, T> Consumer<T> for TryReduceWithConsumer<'r, R>
34+
where R: Fn(T::Ok, T::Ok) -> T + Sync,
35+
T: Try + Send
36+
{
37+
type Folder = TryReduceWithFolder<'r, R, T>;
38+
type Reducer = Self;
39+
type Result = Option<T>;
40+
41+
fn split_at(self, _index: usize) -> (Self, Self, Self) {
42+
(self, self, self)
43+
}
44+
45+
fn into_folder(self) -> Self::Folder {
46+
TryReduceWithFolder {
47+
reduce_op: self.reduce_op,
48+
opt_result: None,
49+
full: self.full,
50+
}
51+
}
52+
53+
fn full(&self) -> bool {
54+
self.full.load(Ordering::Relaxed)
55+
}
56+
}
57+
58+
impl<'r, R, T> UnindexedConsumer<T> for TryReduceWithConsumer<'r, R>
59+
where R: Fn(T::Ok, T::Ok) -> T + Sync,
60+
T: Try + Send
61+
{
62+
fn split_off_left(&self) -> Self {
63+
*self
64+
}
65+
66+
fn to_reducer(&self) -> Self::Reducer {
67+
*self
68+
}
69+
}
70+
71+
impl<'r, R, T> Reducer<Option<T>> for TryReduceWithConsumer<'r, R>
72+
where R: Fn(T::Ok, T::Ok) -> T + Sync,
73+
T: Try
74+
{
75+
fn reduce(self, left: Option<T>, right: Option<T>) -> Option<T> {
76+
let reduce_op = self.reduce_op;
77+
match (left, right) {
78+
(None, x) | (x, None) => x,
79+
(Some(a), Some(b)) => {
80+
match (a.into_result(), b.into_result()) {
81+
(Ok(a), Ok(b)) => Some(reduce_op(a, b)),
82+
(Err(e), _) | (_, Err(e)) => Some(T::from_error(e)),
83+
}
84+
}
85+
}
86+
}
87+
}
88+
89+
struct TryReduceWithFolder<'r, R: 'r, T: Try> {
90+
reduce_op: &'r R,
91+
opt_result: Option<Result<T::Ok, T::Error>>,
92+
full: &'r AtomicBool,
93+
}
94+
95+
impl<'r, R, T> Folder<T> for TryReduceWithFolder<'r, R, T>
96+
where R: Fn(T::Ok, T::Ok) -> T,
97+
T: Try
98+
{
99+
type Result = Option<T>;
100+
101+
fn consume(self, item: T) -> Self {
102+
let reduce_op = self.reduce_op;
103+
let result = match self.opt_result {
104+
None => item.into_result(),
105+
Some(Ok(a)) => match item.into_result() {
106+
Ok(b) => reduce_op(a, b).into_result(),
107+
Err(e) => Err(e),
108+
},
109+
Some(Err(e)) => Err(e),
110+
};
111+
if result.is_err() {
112+
self.full.store(true, Ordering::Relaxed)
113+
}
114+
TryReduceWithFolder {
115+
opt_result: Some(result),
116+
..self
117+
}
118+
}
119+
120+
fn complete(self) -> Option<T> {
121+
self.opt_result.map(|result| match result {
122+
Ok(ok) => T::from_ok(ok),
123+
Err(error) => T::from_error(error),
124+
})
125+
}
126+
127+
fn full(&self) -> bool {
128+
self.full.load(Ordering::Relaxed)
129+
}
130+
}

0 commit comments

Comments
 (0)