Skip to content

Commit 8ba64ab

Browse files
bors[bot]cuviper
andcommitted
563: [WIP] Add try_fold, try_for_each, try_reduce r=nikomatsakis a=cuviper There are six variations here, matching the existing non-try suite: - `try_fold` and `try_fold_with` - `try_for_each` and `try_for_each_with` - `try_reduce` and `try_reduce_with` All of them operate on `Try::Ok` values similar to the exiting non-try methods, and short-circuit early to return any `Try::Error` value seen. This `Try` is a pub-in-private clone of the unstable `std::ops::Try`, implemented for `Option<T>` and `Result<T, E>`. TODO and open questions: - [ ] Needs documentation, examples, and tests. - [x] Should we wait for `Iterator::try_fold` and `try_for_each` to reach rust stable? They were stabilized in rust-lang/rust#49607, but there's always a chance this could get backed out. - **Resolved**: they're stable in 1.27 - [x] Should we wait for stable `std::ops::Try`? We could just keep ours private for now, and change to publicly use the standard trait later (on sufficiently new rustc). - **Resolved**: keep our pub-in-private `Try` for now. - [x] Should `try_fold` and `try_fold_with` continue to short-circuit globally, or change to only a local check? - When I first implemented `try_reduce_with`, I use a `try_fold` + `try_reduce` combination, like `reduce_with`'s implementation, but I didn't like the idea of having double `full: AtomicBool` flags in use. - If `try_fold` only errors locally, then other threads can continue folding normally, and you can decide what to do with the error when you further reduce/collect/etc. e.g. A following `try_reduce` will still short-circuit globally. - **Resolved**: changed to just a local check. Closes rayon-rs#495. Co-authored-by: Josh Stone <[email protected]>
2 parents da16c2c + 800c736 commit 8ba64ab

File tree

8 files changed

+773
-1
lines changed

8 files changed

+773
-1
lines changed

src/compile_fail/must_use.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ must_use! {
2727
flatten /** v.par_iter().flatten(); */
2828
fold /** v.par_iter().fold(|| 0, |x, _| x); */
2929
fold_with /** v.par_iter().fold_with(0, |x, _| x); */
30+
try_fold /** v.par_iter().try_fold(|| 0, |x, _| Some(x)); */
31+
try_fold_with /** v.par_iter().try_fold_with(0, |x, _| Some(x)); */
3032
inspect /** v.par_iter().inspect(|_| {}); */
3133
interleave /** v.par_iter().interleave(&v); */
3234
interleave_shortest /** v.par_iter().interleave_shortest(&v); */

src/iter/fold.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ impl<'r, C, ID, F, T> Folder<T> for FoldFolder<'r, C, ID, F>
140140

141141
pub fn fold_with<U, I, F>(base: I, item: U, fold_op: F) -> FoldWith<I, U, F>
142142
where I: ParallelIterator,
143-
F: Fn(U, I::Item) -> U + Sync,
143+
F: Fn(U, I::Item) -> U + Sync + Send,
144144
U: Send + Clone
145145
{
146146
FoldWith {

src/iter/mod.rs

Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,19 @@
6666
//! [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
6767
//! [split]: fn.split.html
6868
//! [plumbing]: plumbing
69+
//!
70+
//! Note: Several of the `ParallelIterator` methods rely on a `Try` trait which
71+
//! has been deliberately obscured from the public API. This trait is intended
72+
//! to mirror the unstable `std::ops::Try` with implementations for `Option` and
73+
//! `Result`, where `Some`/`Ok` values will let those iterators continue, but
74+
//! `None`/`Err` values will exit early.
6975
7076
pub use either::Either;
7177
use std::cmp::{self, Ordering};
7278
use std::iter::{Sum, Product};
7379
use std::ops::Fn;
7480
use self::plumbing::*;
81+
use self::private::Try;
7582

7683
// There is a method to the madness here:
7784
//
@@ -109,7 +116,11 @@ pub mod plumbing;
109116
mod for_each;
110117
mod fold;
111118
pub use self::fold::{Fold, FoldWith};
119+
mod try_fold;
120+
pub use self::try_fold::{TryFold, TryFoldWith};
112121
mod reduce;
122+
mod try_reduce;
123+
mod try_reduce_with;
113124
mod skip;
114125
pub use self::skip::Skip;
115126
mod splitter;
@@ -364,6 +375,69 @@ pub trait ParallelIterator: Sized + Send {
364375
self.map_with(init, op).for_each(|()| ())
365376
}
366377

378+
/// Executes a fallible `OP` on each item produced by the iterator, in parallel.
379+
///
380+
/// If the `OP` returns `Result::Err` or `Option::None`, we will attempt to
381+
/// stop processing the rest of the items in the iterator as soon as
382+
/// possible, and we will return that terminating value. Otherwise, we will
383+
/// return an empty `Result::Ok(())` or `Option::Some(())`. If there are
384+
/// multiple errors in parallel, it is not specified which will be returned.
385+
///
386+
/// # Examples
387+
///
388+
/// ```
389+
/// use rayon::prelude::*;
390+
/// use std::io::{self, Write};
391+
///
392+
/// // This will stop iteration early if there's any write error, like
393+
/// // having piped output get closed on the other end.
394+
/// (0..100).into_par_iter()
395+
/// .try_for_each(|x| writeln!(io::stdout(), "{:?}", x))
396+
/// .expect("expected no write errors");
397+
/// ```
398+
fn try_for_each<OP, R>(self, op: OP) -> R
399+
where OP: Fn(Self::Item) -> R + Sync + Send,
400+
R: Try<Ok = ()> + Send
401+
{
402+
self.map(op).try_reduce(|| (), |(), ()| R::from_ok(()))
403+
}
404+
405+
/// Executes a fallible `OP` on the given `init` value with each item
406+
/// produced by the iterator, in parallel.
407+
///
408+
/// This combines the `init` semantics of [`for_each_with()`] and the
409+
/// failure semantics of [`try_for_each()`].
410+
///
411+
/// [`for_each_with()`]: #method.for_each_with
412+
/// [`try_for_each()`]: #method.try_for_each
413+
///
414+
/// # Examples
415+
///
416+
/// ```
417+
/// use std::sync::mpsc::channel;
418+
/// use rayon::prelude::*;
419+
///
420+
/// let (sender, receiver) = channel();
421+
///
422+
/// (0..5).into_par_iter()
423+
/// .try_for_each_with(sender, |s, x| s.send(x))
424+
/// .expect("expected no send errors");
425+
///
426+
/// let mut res: Vec<_> = receiver.iter().collect();
427+
///
428+
/// res.sort();
429+
///
430+
/// assert_eq!(&res[..], &[0, 1, 2, 3, 4])
431+
/// ```
432+
fn try_for_each_with<OP, T, R>(self, init: T, op: OP) -> R
433+
where OP: Fn(&mut T, Self::Item) -> R + Sync + Send,
434+
T: Send + Clone,
435+
R: Try<Ok = ()> + Send
436+
{
437+
self.map_with(init, op)
438+
.try_reduce(|| (), |(), ()| R::from_ok(()))
439+
}
440+
367441
/// Counts the number of items in this parallel iterator.
368442
///
369443
/// # Examples
@@ -679,6 +753,87 @@ pub trait ParallelIterator: Sized + Send {
679753
})
680754
}
681755

756+
/// Reduces the items in the iterator into one item using a fallible `op`.
757+
/// The `identity` argument is used the same way as in [`reduce()`].
758+
///
759+
/// [`reduce()`]: #method.reduce
760+
///
761+
/// If a `Result::Err` or `Option::None` item is found, or if `op` reduces
762+
/// to one, we will attempt to stop processing the rest of the items in the
763+
/// iterator as soon as possible, and we will return that terminating value.
764+
/// Otherwise, we will return the final reduced `Result::Ok(T)` or
765+
/// `Option::Some(T)`. If there are multiple errors in parallel, it is not
766+
/// specified which will be returned.
767+
///
768+
/// # Examples
769+
///
770+
/// ```
771+
/// use rayon::prelude::*;
772+
///
773+
/// // Compute the sum of squares, being careful about overflow.
774+
/// fn sum_squares<I: IntoParallelIterator<Item = i32>>(iter: I) -> Option<i32> {
775+
/// iter.into_par_iter()
776+
/// .map(|i| i.checked_mul(i)) // square each item,
777+
/// .try_reduce(|| 0, i32::checked_add) // and add them up!
778+
/// }
779+
/// assert_eq!(sum_squares(0..5), Some(0 + 1 + 4 + 9 + 16));
780+
///
781+
/// // The sum might overflow
782+
/// assert_eq!(sum_squares(0..10_000), None);
783+
///
784+
/// // Or the squares might overflow before it even reaches `try_reduce`
785+
/// assert_eq!(sum_squares(1_000_000..1_000_001), None);
786+
/// ```
787+
fn try_reduce<T, OP, ID>(self, identity: ID, op: OP) -> Self::Item
788+
where OP: Fn(T, T) -> Self::Item + Sync + Send,
789+
ID: Fn() -> T + Sync + Send,
790+
Self::Item: Try<Ok = T>
791+
{
792+
try_reduce::try_reduce(self, identity, op)
793+
}
794+
795+
/// Reduces the items in the iterator into one item using a fallible `op`.
796+
///
797+
/// Like [`reduce_with()`], if the iterator is empty, `None` is returned;
798+
/// otherwise, `Some` is returned. Beyond that, it behaves like
799+
/// [`try_reduce()`] for handling `Err`/`None`.
800+
///
801+
/// [`reduce_with()`]: #method.reduce_with
802+
/// [`try_reduce()`]: #method.try_reduce
803+
///
804+
/// For instance, with `Option` items, the return value may be:
805+
/// - `None`, the iterator was empty
806+
/// - `Some(None)`, we stopped after encountering `None`.
807+
/// - `Some(Some(x))`, the entire iterator reduced to `x`.
808+
///
809+
/// With `Result` items, the nesting is more obvious:
810+
/// - `None`, the iterator was empty
811+
/// - `Some(Err(e))`, we stopped after encountering an error `e`.
812+
/// - `Some(Ok(x))`, the entire iterator reduced to `x`.
813+
///
814+
/// # Examples
815+
///
816+
/// ```
817+
/// use rayon::prelude::*;
818+
///
819+
/// let files = ["/dev/null", "/does/not/exist"];
820+
///
821+
/// // Find the biggest file
822+
/// files.into_par_iter()
823+
/// .map(|path| std::fs::metadata(path).map(|m| (path, m.len())))
824+
/// .try_reduce_with(|a, b| {
825+
/// Ok(if a.1 >= b.1 { a } else { b })
826+
/// })
827+
/// .expect("Some value, since the iterator is not empty")
828+
/// .expect_err("not found");
829+
/// ```
830+
fn try_reduce_with<T, OP>(self, op: OP) -> Option<Self::Item>
831+
where OP: Fn(T, T) -> Self::Item + Sync + Send,
832+
Self::Item: Try<Ok = T>,
833+
{
834+
try_reduce_with::try_reduce_with(self, op)
835+
}
836+
682837
/// Parallel fold is similar to sequential fold except that the
683838
/// sequence of items may be subdivided before it is
684839
/// folded. Consider a list of numbers like `22 3 77 89 46`. If
@@ -846,6 +1001,65 @@ pub trait ParallelIterator: Sized + Send {
8461001
fold::fold_with(self, init, fold_op)
8471002
}
8481003

1004+
/// Perform a fallible parallel fold.
1005+
///
1006+
/// This is a variation of [`fold()`] for operations which can fail with
1007+
/// `Option::None` or `Result::Err`. The first such failure stops
1008+
/// processing the local set of items, without affecting other folds in the
1009+
/// iterator's subdivisions.
1010+
///
1011+
/// Often, `try_fold()` will be followed by [`try_reduce()`]
1012+
/// for a final reduction and global short-circuiting effect.
1013+
///
1014+
/// [`fold()`]: #method.fold
1015+
/// [`try_reduce()`]: #method.try_reduce
1016+
///
1017+
/// # Examples
1018+
///
1019+
/// ```
1020+
/// use rayon::prelude::*;
1021+
///
1022+
/// let bytes = 0..22_u8;
1023+
/// let sum = bytes.into_par_iter()
1024+
/// .try_fold(|| 0_u32, |a: u32, b: u8| a.checked_add(b as u32))
1025+
/// .try_reduce(|| 0, u32::checked_add);
1026+
///
1027+
/// assert_eq!(sum, Some((0..22).sum())); // compare to sequential
1028+
/// ```
1029+
fn try_fold<T, R, ID, F>(self, identity: ID, fold_op: F) -> TryFold<Self, R, ID, F>
1030+
where F: Fn(T, Self::Item) -> R + Sync + Send,
1031+
ID: Fn() -> T + Sync + Send,
1032+
R: Try<Ok = T> + Send
1033+
{
1034+
try_fold::try_fold(self, identity, fold_op)
1035+
}
1036+
1037+
/// Perform a fallible parallel fold with a cloneable `init` value.
1038+
///
1039+
/// This combines the `init` semantics of [`fold_with()`] and the failure
1040+
/// semantics of [`try_fold()`].
1041+
///
1042+
/// [`fold_with()`]: #method.fold_with
1043+
/// [`try_fold()`]: #method.try_fold
1044+
///
1045+
/// ```
1046+
/// use rayon::prelude::*;
1047+
///
1048+
/// let bytes = 0..22_u8;
1049+
/// let sum = bytes.into_par_iter()
1050+
/// .try_fold_with(0_u32, |a: u32, b: u8| a.checked_add(b as u32))
1051+
/// .try_reduce(|| 0, u32::checked_add);
1052+
///
1053+
/// assert_eq!(sum, Some((0..22).sum())); // compare to sequential
1054+
/// ```
1055+
fn try_fold_with<F, T, R>(self, init: T, fold_op: F) -> TryFoldWith<Self, R, F>
1056+
where F: Fn(T, Self::Item) -> R + Sync + Send,
1057+
R: Try<Ok = T> + Send,
1058+
T: Clone + Send
1059+
{
1060+
try_fold::try_fold_with(self, init, fold_op)
1061+
}
1062+
8491063
/// Sums up the items in the iterator.
8501064
///
8511065
/// Note that the order in items will be reduced is not specified,
@@ -2114,3 +2328,42 @@ pub trait ParallelExtend<T>
21142328
/// ```
21152329
fn par_extend<I>(&mut self, par_iter: I) where I: IntoParallelIterator<Item = T>;
21162330
}
2331+
2332+
/// We hide the `Try` trait in a private module, as it's only meant to be a
2333+
/// stable clone of the standard library's `Try` trait, as yet unstable.
2334+
mod private {
2335+
/// Clone of `std::ops::Try`.
2336+
///
2337+
/// Implementing this trait is not permitted outside of `rayon`.
2338+
pub trait Try {
2339+
private_decl!{}
2340+
2341+
type Ok;
2342+
type Error;
2343+
fn into_result(self) -> Result<Self::Ok, Self::Error>;
2344+
fn from_ok(v: Self::Ok) -> Self;
2345+
fn from_error(v: Self::Error) -> Self;
2346+
}
2347+
2348+
impl<T> Try for Option<T> {
2349+
private_impl!{}
2350+
2351+
type Ok = T;
2352+
type Error = ();
2353+
2354+
fn into_result(self) -> Result<T, ()> { self.ok_or(()) }
2355+
fn from_ok(v: T) -> Self { Some(v) }
2356+
fn from_error(_: ()) -> Self { None }
2357+
}
2358+
2359+
impl<T, E> Try for Result<T, E> {
2360+
private_impl!{}
2361+
2362+
type Ok = T;
2363+
type Error = E;
2364+
2365+
fn into_result(self) -> Result<T, E> { self }
2366+
fn from_ok(v: T) -> Self { Ok(v) }
2367+
fn from_error(v: E) -> Self { Err(v) }
2368+
}
2369+
}

0 commit comments

Comments
 (0)