diff --git a/timely/src/dataflow/operators/core/map.rs b/timely/src/dataflow/operators/core/map.rs index 140eeba62..221b86201 100644 --- a/timely/src/dataflow/operators/core/map.rs +++ b/timely/src/dataflow/operators/core/map.rs @@ -49,6 +49,41 @@ pub trait Map { C2: Container + SizableContainer + PushInto, L: FnMut(C::Item<'_>)->I + 'static, ; + + /// Creates a `FlatMapBuilder`, which allows chaining of iterator logic before finalization into a stream. + /// + /// This pattern exists to make it easier to provide the ergonomics of iterator combinators without the + /// overhead of multiple dataflow operators. The resulting single operator will internally use compiled + /// iterators to go record-by-record, and unlike a chain of operators will not need to stage the records + /// of intermediate stages. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{Capture, ToStream, core::Map}; + /// use timely::dataflow::operators::capture::Extract; + /// + /// let data = timely::example(|scope| { + /// (0..10i32) + /// .to_stream(scope) + /// .flat_map_builder(|x| x + 1) + /// .map(|x| x + 1) + /// .map(|x| x + 1) + /// .map(|x| x + 1) + /// .map(Some) + /// .into_stream::<_,Vec>() + /// .capture() + /// }); + /// + /// assert_eq!((4..14).collect::>(), data.extract()[0].1); + /// ``` + fn flat_map_builder<'t, I, L>(&'t self, logic: L) -> FlatMapBuilder<'t, Self, C, L, I> + where + C: Clone + 'static, + L: for<'a> Fn(C::Item<'a>) -> I, + Self: Sized, + { + FlatMapBuilder::new(self, logic) + } } impl Map for StreamCore { @@ -68,3 +103,66 @@ impl Map for StreamCore { }) } } + + +/// A stream wrapper that allows the accumulation of flatmap logic. +pub struct FlatMapBuilder<'t, T, C: DrainContainer, F: 'static, I> +where + for<'a> F: Fn(C::Item<'a>) -> I, +{ + stream: &'t T, + logic: F, + marker: std::marker::PhantomData, +} + +impl<'t, T, C: DrainContainer + Clone + 'static, F, I> FlatMapBuilder<'t, T, C, F, I> +where + for<'a> F: Fn(C::Item<'a>) -> I, +{ + /// Create a new wrapper with no action on the stream. + pub fn new(stream: &'t T, logic: F) -> Self { + FlatMapBuilder { stream, logic, marker: std::marker::PhantomData } + } + + /// Transform a flatmapped stream through addiitonal logic. + pub fn map I2 + 'static, I2>(self, g: G) -> FlatMapBuilder<'t, T, C, impl Fn(C::Item<'_>) -> I2 + 'static, I2> { + let logic = self.logic; + FlatMapBuilder { + stream: self.stream, + logic: move |x| g(logic(x)), + marker: std::marker::PhantomData, + } + } + /// Convert the wrapper into a stream. + pub fn into_stream(self) -> StreamCore + where + I: IntoIterator, + S: Scope, + T: Map, + C2: Container + SizableContainer + PushInto, + { + Map::flat_map(self.stream, self.logic) + } +} + +#[cfg(test)] +mod tests { + use crate::dataflow::operators::{Capture, ToStream, core::Map}; + use crate::dataflow::operators::capture::Extract; + + #[test] + fn test_builder() { + let data = crate::example(|scope| { + let stream = (0..10i32).to_stream(scope); + stream.flat_map_builder(|x| x + 1) + .map(|x| x + 1) + .map(|x| x + 1) + .map(|x| x + 1) + .map(Some) + .into_stream::<_,Vec>() + .capture() + }); + + assert_eq!((4..14).collect::>(), data.extract()[0].1); + } +} \ No newline at end of file