Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions timely/src/dataflow/operators/core/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,41 @@ pub trait Map<S: Scope, C: DrainContainer> {
C2: Container + SizableContainer + PushInto<I::Item>,
L: FnMut(C::Item<'_>)->I + 'static,
;

/// Creates a `FlatMapBuilder`, which allows chaining of iterator logic before finalization into a stream.
///
/// This pattern exists to make it easier to provide the ergonomics of iterator combinators without the
/// overhead of multiple dataflow operators. The resulting single operator will internally use compiled
/// iterators to go record-by-record, and unlike a chain of operators will not need to stage the records
/// of intermediate stages.
///
/// # Examples
/// ```
/// use timely::dataflow::operators::{Capture, ToStream, core::Map};
/// use timely::dataflow::operators::capture::Extract;
///
/// let data = timely::example(|scope| {
/// (0..10i32)
/// .to_stream(scope)
/// .flat_map_builder(|x| x + 1)
/// .map(|x| x + 1)
/// .map(|x| x + 1)
/// .map(|x| x + 1)
/// .map(Some)
/// .into_stream::<_,Vec<i32>>()
/// .capture()
/// });
///
/// assert_eq!((4..14).collect::<Vec<_>>(), data.extract()[0].1);
/// ```
fn flat_map_builder<'t, I, L>(&'t self, logic: L) -> FlatMapBuilder<'t, Self, C, L, I>
where
C: Clone + 'static,
L: for<'a> Fn(C::Item<'a>) -> I,
Self: Sized,
{
FlatMapBuilder::new(self, logic)
}
}

impl<S: Scope, C: Container + DrainContainer> Map<S, C> for StreamCore<S, C> {
Expand All @@ -68,3 +103,66 @@ impl<S: Scope, C: Container + DrainContainer> Map<S, C> for StreamCore<S, C> {
})
}
}


/// A stream wrapper that allows the accumulation of flatmap logic.
pub struct FlatMapBuilder<'t, T, C: DrainContainer, F: 'static, I>
where
for<'a> F: Fn(C::Item<'a>) -> I,
{
stream: &'t T,
logic: F,
marker: std::marker::PhantomData<C>,
}

impl<'t, T, C: DrainContainer + Clone + 'static, F, I> FlatMapBuilder<'t, T, C, F, I>
where
for<'a> F: Fn(C::Item<'a>) -> I,
{
/// Create a new wrapper with no action on the stream.
pub fn new(stream: &'t T, logic: F) -> Self {
FlatMapBuilder { stream, logic, marker: std::marker::PhantomData }
}

/// Transform a flatmapped stream through addiitonal logic.
pub fn map<G: Fn(I) -> I2 + 'static, I2>(self, g: G) -> FlatMapBuilder<'t, T, C, impl Fn(C::Item<'_>) -> I2 + 'static, I2> {
let logic = self.logic;
FlatMapBuilder {
stream: self.stream,
logic: move |x| g(logic(x)),
marker: std::marker::PhantomData,
}
}
/// Convert the wrapper into a stream.
pub fn into_stream<S, C2>(self) -> StreamCore<S, C2>
where
I: IntoIterator,
S: Scope,
T: Map<S, C>,
C2: Container + SizableContainer + PushInto<I::Item>,
{
Map::flat_map(self.stream, self.logic)
}
}

#[cfg(test)]
mod tests {
use crate::dataflow::operators::{Capture, ToStream, core::Map};
use crate::dataflow::operators::capture::Extract;

#[test]
fn test_builder() {
let data = crate::example(|scope| {
let stream = (0..10i32).to_stream(scope);
stream.flat_map_builder(|x| x + 1)
.map(|x| x + 1)
.map(|x| x + 1)
.map(|x| x + 1)
.map(Some)
.into_stream::<_,Vec<i32>>()
.capture()
});

assert_eq!((4..14).collect::<Vec<_>>(), data.extract()[0].1);
}
}
Loading