Skip to content

Commit 2e12b82

Browse files
committed
Introduce builder for flatmap operators
1 parent af70245 commit 2e12b82

File tree

1 file changed

+96
-0
lines changed
  • timely/src/dataflow/operators/core

1 file changed

+96
-0
lines changed

timely/src/dataflow/operators/core/map.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,41 @@ pub trait Map<S: Scope, C: DrainContainer> {
4949
C2: Container + SizableContainer + PushInto<I::Item>,
5050
L: FnMut(C::Item<'_>)->I + 'static,
5151
;
52+
53+
/// Creates a `FlatMapBuilder`, which allows chaining of iterator logic before finalization into a stream.
54+
///
55+
/// This pattern exists to make it easier to provide the ergonomics of iterator combinators without the
56+
/// overhead of multiple dataflow operators. The resulting single operator will internally use compiled
57+
/// iterators to go record-by-record, and unlike a chain of operators will not need to stage the records
58+
/// of intermediate stages.
59+
///
60+
/// # Examples
61+
/// ```
62+
/// use timely::dataflow::operators::{Capture, ToStream, core::Map};
63+
/// use timely::dataflow::operators::capture::Extract;
64+
///
65+
/// let data = timely::example(|scope| {
66+
/// (0..10i32)
67+
/// .to_stream(scope)
68+
/// .flat_map_builder(|x| x + 1)
69+
/// .map(|x| x + 1)
70+
/// .map(|x| x + 1)
71+
/// .map(|x| x + 1)
72+
/// .map(Some)
73+
/// .into_stream::<Vec<i32>>()
74+
/// .capture()
75+
/// });
76+
///
77+
/// assert_eq!((4..14).collect::<Vec<_>>(), data.extract()[0].1);
78+
/// ```
79+
fn flat_map_builder<'t, I, L>(&'t self, logic: L) -> FlatMapBuilder<'t, Self, S, C, L, I>
80+
where
81+
C: Clone + 'static,
82+
L: for<'a> Fn(C::Item<'a>) -> I,
83+
Self: Sized,
84+
{
85+
FlatMapBuilder::new(self, logic)
86+
}
5287
}
5388

5489
impl<S: Scope, C: Container + DrainContainer> Map<S, C> for StreamCore<S, C> {
@@ -68,3 +103,64 @@ impl<S: Scope, C: Container + DrainContainer> Map<S, C> for StreamCore<S, C> {
68103
})
69104
}
70105
}
106+
107+
108+
/// A stream wrapper that allows the accumulation of flatmap logic.
109+
pub struct FlatMapBuilder<'t, T: Map<S, C>, S: Scope, C: Container, F: 'static, I>
110+
where
111+
for<'a> F: Fn(C::Item<'a>) -> I,
112+
{
113+
stream: &'t T,
114+
logic: F,
115+
marker: std::marker::PhantomData<(S, C)>,
116+
}
117+
118+
impl<'t, T: Map<S, C>, S: Scope, C: Container + Clone + 'static, F, I> FlatMapBuilder<'t, T, S, C, F, I>
119+
where
120+
for<'a> F: Fn(C::Item<'a>) -> I,
121+
{
122+
/// Create a new wrapper with no action on the stream.
123+
pub fn new(stream: &'t T, logic: F) -> Self {
124+
FlatMapBuilder { stream, logic, marker: std::marker::PhantomData }
125+
}
126+
127+
/// Transform a flatmapped stream through addiitonal logic.
128+
pub fn map<G: Fn(I) -> I2 + 'static, I2>(self, g: G) -> FlatMapBuilder<'t, T, S, C, impl Fn(C::Item<'_>) -> I2 + 'static, I2> {
129+
let logic = self.logic;
130+
FlatMapBuilder {
131+
stream: self.stream,
132+
logic: move |x| g(logic(x)),
133+
marker: std::marker::PhantomData,
134+
}
135+
}
136+
/// Convert the wrapper into a stream.
137+
pub fn into_stream<C2>(self) -> StreamCore<S, C2>
138+
where
139+
I: IntoIterator,
140+
C2: SizableContainer + PushInto<I::Item> + Data,
141+
{
142+
Map::flat_map(self.stream, self.logic)
143+
}
144+
}
145+
146+
#[cfg(test)]
147+
mod tests {
148+
use crate::dataflow::operators::{Capture, ToStream, core::Map};
149+
use crate::dataflow::operators::capture::Extract;
150+
151+
#[test]
152+
fn test_builder() {
153+
let data = crate::example(|scope| {
154+
let stream = (0..10i32).to_stream(scope);
155+
stream.flat_map_builder(|x| x + 1)
156+
.map(|x| x + 1)
157+
.map(|x| x + 1)
158+
.map(|x| x + 1)
159+
.map(Some)
160+
.into_stream::<Vec<i32>>()
161+
.capture()
162+
});
163+
164+
assert_eq!((4..14).collect::<Vec<_>>(), data.extract()[0].1);
165+
}
166+
}

0 commit comments

Comments
 (0)