66
66
//! [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
67
67
//! [split]: fn.split.html
68
68
//! [plumbing]: plumbing
69
+ //!
70
+ //! Note: Several of the `ParallelIterator` methods rely on a `Try` trait which
71
+ //! has been deliberately obscured from the public API. This trait is intended
72
+ //! to mirror the unstable `std::ops::Try` with implementations for `Option` and
73
+ //! `Result`, where `Some`/`Ok` values will let those iterators continue, but
74
+ //! `None`/`Err` values will exit early.
69
75
70
76
pub use either:: Either ;
71
77
use std:: cmp:: { self , Ordering } ;
@@ -366,15 +372,60 @@ pub trait ParallelIterator: Sized + Send {
366
372
self . map_with ( init, op) . for_each ( |( ) | ( ) )
367
373
}
368
374
369
- /// TODO
375
+ /// Executes a fallible `OP` on each item produced by the iterator, in parallel.
376
+ ///
377
+ /// If the `OP` returns `Result::Err` or `Option::None`, we will attempt to
378
+ /// stop processing the rest of the items in the iterator as soon as
379
+ /// possible, and we will return that terminating value. Otherwise, we will
380
+ /// return an empty `Result::Ok(())` or `Option::Some(())`. If there are
381
+ /// multiple errors in parallel, it is not specified which will be returned.
382
+ ///
383
+ /// # Examples
384
+ ///
385
+ /// ```
386
+ /// use rayon::prelude::*;
387
+ /// use std::io::{self, Write};
388
+ ///
389
+ /// // This will stop iteration early if there's any write error, like
390
+ /// // having piped output get closed on the other end.
391
+ /// (0..100).into_par_iter()
392
+ /// .try_for_each(|x| writeln!(io::stdout(), "{:?}", x))
393
+ /// .expect("expected no write errors");
394
+ /// ```
370
395
fn try_for_each < OP , R > ( self , op : OP ) -> R
371
396
where OP : Fn ( Self :: Item ) -> R + Sync + Send ,
372
397
R : Try < Ok = ( ) > + Send
373
398
{
374
399
self . map ( op) . try_reduce ( || ( ) , |( ) , ( ) | R :: from_ok ( ( ) ) )
375
400
}
376
401
377
- /// TODO
402
+ /// Executes a fallible `OP` on the given `init` value with each item
403
+ /// produced by the iterator, in parallel.
404
+ ///
405
+ /// This combines the `init` semantics of [`for_each_with()`] and the
406
+ /// failure semantics of [`try_for_each()`].
407
+ ///
408
+ /// [`for_each_with()`]: #method.for_each_with
409
+ /// [`try_for_each()`]: #method.try_for_each
410
+ ///
411
+ /// # Examples
412
+ ///
413
+ /// ```
414
+ /// use std::sync::mpsc::channel;
415
+ /// use rayon::prelude::*;
416
+ ///
417
+ /// let (sender, receiver) = channel();
418
+ ///
419
+ /// (0..5).into_par_iter()
420
+ /// .try_for_each_with(sender, |s, x| s.send(x))
421
+ /// .expect("expected no send errors");
422
+ ///
423
+ /// let mut res: Vec<_> = receiver.iter().collect();
424
+ ///
425
+ /// res.sort();
426
+ ///
427
+ /// assert_eq!(&res[..], &[0, 1, 2, 3, 4])
428
+ /// ```
378
429
fn try_for_each_with < OP , T , R > ( self , init : T , op : OP ) -> R
379
430
where OP : Fn ( & mut T , Self :: Item ) -> R + Sync + Send ,
380
431
T : Send + Clone ,
@@ -699,7 +750,37 @@ pub trait ParallelIterator: Sized + Send {
699
750
} )
700
751
}
701
752
702
- /// TODO
753
+ /// Reduces the items in the iterator into one item using a fallible `op`.
754
+ /// The `identity` argument is used the same way as in [`reduce()`].
755
+ ///
756
+ /// [`reduce()`]: #method.reduce
757
+ ///
758
+ /// If a `Result::Err` or `Option::None` item is found, or if `op` reduces
759
+ /// to one, we will attempt to stop processing the rest of the items in the
760
+ /// iterator as soon as possible, and we will return that terminating value.
761
+ /// Otherwise, we will return the final reduced `Result::Ok(T)` or
762
+ /// `Option::Some(T)`. If there are multiple errors in parallel, it is not
763
+ /// specified which will be returned.
764
+ ///
765
+ /// # Examples
766
+ ///
767
+ /// ```
768
+ /// use rayon::prelude::*;
769
+ ///
770
+ /// // Compute the sum of squares, being careful about overflow.
771
+ /// fn sum_squares<I: IntoParallelIterator<Item = i32>>(iter: I) -> Option<i32> {
772
+ /// iter.into_par_iter()
773
+ /// .map(|i| i.checked_mul(i)) // square each item,
774
+ /// .try_reduce(|| 0, i32::checked_add) // and add them up!
775
+ /// }
776
+ /// assert_eq!(sum_squares(0..5), Some(0 + 1 + 4 + 9 + 16));
777
+ ///
778
+ /// // The sum might overflow
779
+ /// assert_eq!(sum_squares(0..10_000), None);
780
+ ///
781
+ /// // Or the squares might overflow before it even reaches `try_reduce`
782
+ /// assert_eq!(sum_squares(1_000_000..1_000_001), None);
783
+ /// ```
703
784
fn try_reduce < T , OP , ID > ( self , identity : ID , op : OP ) -> Self :: Item
704
785
where OP : Fn ( T , T ) -> Self :: Item + Sync + Send ,
705
786
ID : Fn ( ) -> T + Sync + Send ,
@@ -708,7 +789,41 @@ pub trait ParallelIterator: Sized + Send {
708
789
try_reduce:: try_reduce ( self , identity, op)
709
790
}
710
791
711
- /// TODO
792
+ /// Reduces the items in the iterator into one item using a fallible `op`.
793
+ ///
794
+ /// Like [`reduce_with()`], if the iterator is empty, `None` is returned;
795
+ /// otherwise, `Some` is returned. Beyond that, it behaves like
796
+ /// [`try_reduce()`] for handling `Err`/`None`.
797
+ ///
798
+ /// [`reduce_with()`]: #method.reduce_with
799
+ /// [`try_reduce()`]: #method.try_reduce
800
+ ///
801
+ /// For instance, with `Option` items, the return value may be:
802
+ /// - `None`, the iterator was empty
803
+ /// - `Some(None)`, we stopped after encountering `None`.
804
+ /// - `Some(Some(x))`, the entire iterator reduced to `x`.
805
+ ///
806
+ /// With `Result` items, the nesting is more obvious:
807
+ /// - `None`, the iterator was empty
808
+ /// - `Some(Err(e))`, we stopped after encountering an error `e`.
809
+ /// - `Some(Ok(x))`, the entire iterator reduced to `x`.
810
+ ///
811
+ /// # Examples
812
+ ///
813
+ /// ```
814
+ /// use rayon::prelude::*;
815
+ ///
816
+ /// let files = ["/dev/null", "/does/not/exist"];
817
+ ///
818
+ /// // Find the biggest file
819
+ /// files.into_par_iter()
820
+ /// .map(|path| std::fs::metadata(path).map(|m| (path, m.len())))
821
+ /// .try_reduce_with(|a, b| {
822
+ /// Ok(if a.1 >= b.1 { a } else { b })
823
+ /// })
824
+ /// .expect("Some value, since the iterator is not empty")
825
+ /// .expect_err("not found");
826
+ /// ```
712
827
fn try_reduce_with < T , OP > ( self , op : OP ) -> Option < Self :: Item >
713
828
where OP : Fn ( T , T ) -> Self :: Item + Sync + Send ,
714
829
Self :: Item : Try < Ok = T > ,
@@ -883,7 +998,31 @@ pub trait ParallelIterator: Sized + Send {
883
998
fold:: fold_with ( self , init, fold_op)
884
999
}
885
1000
886
- /// TODO
1001
+ /// Perform a fallible parallel fold.
1002
+ ///
1003
+ /// This is a variation of [`fold()`] for operations which can fail with
1004
+ /// `Option::None` or `Result::Err`. The first such failure stops
1005
+ /// processing the local set of items, without affecting other folds in the
1006
+ /// iterator's subdivisions.
1007
+ ///
1008
+ /// Often, `try_fold()` will be followed by [`try_reduce()`]
1009
+ /// for a final reduction and global short-circuiting effect.
1010
+ ///
1011
+ /// [`fold()`]: #method.fold
1012
+ /// [`try_reduce()`]: #method.try_reduce
1013
+ ///
1014
+ /// # Examples
1015
+ ///
1016
+ /// ```
1017
+ /// use rayon::prelude::*;
1018
+ ///
1019
+ /// let bytes = 0..22_u8;
1020
+ /// let sum = bytes.into_par_iter()
1021
+ /// .try_fold(|| 0_u32, |a: u32, b: u8| a.checked_add(b as u32))
1022
+ /// .try_reduce(|| 0, u32::checked_add);
1023
+ ///
1024
+ /// assert_eq!(sum, Some((0..22).sum())); // compare to sequential
1025
+ /// ```
887
1026
fn try_fold < T , R , ID , F > ( self , identity : ID , fold_op : F ) -> TryFold < Self , R , ID , F >
888
1027
where F : Fn ( T , Self :: Item ) -> R + Sync + Send ,
889
1028
ID : Fn ( ) -> T + Sync + Send ,
@@ -892,7 +1031,24 @@ pub trait ParallelIterator: Sized + Send {
892
1031
try_fold:: try_fold ( self , identity, fold_op)
893
1032
}
894
1033
895
- /// TODO
1034
+ /// Perform a fallible parallel fold with a cloneable `init` value.
1035
+ ///
1036
+ /// This combines the `init` semantics of [`fold_with()`] and the failure
1037
+ /// semantics of [`try_fold()`].
1038
+ ///
1039
+ /// [`fold_with()`]: #method.fold_with
1040
+ /// [`try_fold()`]: #method.try_fold
1041
+ ///
1042
+ /// ```
1043
+ /// use rayon::prelude::*;
1044
+ ///
1045
+ /// let bytes = 0..22_u8;
1046
+ /// let sum = bytes.into_par_iter()
1047
+ /// .try_fold_with(0_u32, |a: u32, b: u8| a.checked_add(b as u32))
1048
+ /// .try_reduce(|| 0, u32::checked_add);
1049
+ ///
1050
+ /// assert_eq!(sum, Some((0..22).sum())); // compare to sequential
1051
+ /// ```
896
1052
fn try_fold_with < F , T , R > ( self , init : T , fold_op : F ) -> TryFoldWith < Self , R , F >
897
1053
where F : Fn ( T , Self :: Item ) -> R + Sync + Send ,
898
1054
R : Try < Ok = T > + Send ,
0 commit comments