Skip to content

Commit 6bf627b

Browse files
committed
dataflow/operators/core: add .partition() for StreamCore
Signed-off-by: Petros Angelatos <[email protected]>
1 parent 2398b79 commit 6bf627b

File tree

3 files changed

+83
-33
lines changed

3 files changed

+83
-33
lines changed

timely/src/dataflow/operators/core/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub mod input;
1111
pub mod inspect;
1212
pub mod map;
1313
pub mod ok_err;
14+
pub mod partition;
1415
pub mod probe;
1516
pub mod rc;
1617
pub mod reclock;
@@ -21,13 +22,14 @@ pub use capture::Capture;
2122
pub use concat::{Concat, Concatenate};
2223
pub use enterleave::{Enter, Leave};
2324
pub use exchange::Exchange;
24-
pub use feedback::{Feedback, LoopVariable, ConnectLoop};
25+
pub use feedback::{ConnectLoop, Feedback, LoopVariable};
2526
pub use filter::Filter;
2627
pub use input::Input;
2728
pub use inspect::{Inspect, InspectCore};
2829
pub use map::Map;
2930
pub use ok_err::OkErr;
31+
pub use partition::Partition;
3032
pub use probe::Probe;
31-
pub use to_stream::{ToStream, ToStreamBuilder};
3233
pub use reclock::Reclock;
33-
pub use unordered_input::{UnorderedInput, UnorderedHandle};
34+
pub use to_stream::{ToStream, ToStreamBuilder};
35+
pub use unordered_input::{UnorderedHandle, UnorderedInput};
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
//! Partition a stream of records into multiple streams.
2+
3+
use timely_container::{Container, PushInto, SizableContainer};
4+
5+
use crate::dataflow::channels::pact::Pipeline;
6+
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
7+
use crate::dataflow::{Scope, StreamCore};
8+
use crate::Data;
9+
10+
/// Partition a stream of records into multiple streams.
11+
pub trait Partition<G: Scope, C: Container> {
12+
/// Produces `parts` output streams, containing records produced and assigned by `route`.
13+
///
14+
/// # Examples
15+
/// ```
16+
/// use timely::dataflow::operators::ToStream;
17+
/// use timely::dataflow::operators::core::{Partition, Inspect};
18+
///
19+
/// timely::example(|scope| {
20+
/// let streams = (0..10).to_stream(scope)
21+
/// .partition(3, |x| (x % 3, x));
22+
///
23+
/// for (idx, stream) in streams.into_iter().enumerate() {
24+
/// stream
25+
/// .container::<Vec<_>>()
26+
/// .inspect(move |x| println!("seen {idx}: {x:?}"));
27+
/// }
28+
/// });
29+
/// ```
30+
fn partition<C2, D2, F>(&self, parts: u64, route: F) -> Vec<StreamCore<G, C2>>
31+
where
32+
C2: SizableContainer + PushInto<D2> + Data,
33+
F: FnMut(C::Item<'_>) -> (u64, D2) + 'static;
34+
}
35+
36+
impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
37+
fn partition<C2, D2, F>(&self, parts: u64, mut route: F) -> Vec<StreamCore<G, C2>>
38+
where
39+
C2: SizableContainer + PushInto<D2> + Data,
40+
F: FnMut(C::Item<'_>) -> (u64, D2) + 'static,
41+
{
42+
let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());
43+
44+
let mut input = builder.new_input(self, Pipeline);
45+
let mut outputs = Vec::with_capacity(parts as usize);
46+
let mut streams = Vec::with_capacity(parts as usize);
47+
48+
for _ in 0..parts {
49+
let (output, stream) = builder.new_output();
50+
outputs.push(output);
51+
streams.push(stream);
52+
}
53+
54+
builder.build(move |_| {
55+
move |_frontiers| {
56+
let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::<Vec<_>>();
57+
input.for_each(|time, data| {
58+
let mut sessions = handles
59+
.iter_mut()
60+
.map(|h| h.session(&time))
61+
.collect::<Vec<_>>();
62+
63+
for datum in data.drain() {
64+
let (part, datum2) = route(datum);
65+
sessions[part as usize].give(datum2);
66+
}
67+
});
68+
}
69+
});
70+
71+
streams
72+
}
73+
}
Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
//! Partition a stream of records into multiple streams.
22
3-
use crate::dataflow::channels::pact::Pipeline;
4-
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
3+
use crate::dataflow::operators::core::Partition as PartitionCore;
54
use crate::dataflow::{Scope, Stream};
65
use crate::Data;
76

@@ -25,34 +24,10 @@ pub trait Partition<G: Scope, D: Data, D2: Data, F: Fn(D) -> (u64, D2)> {
2524
fn partition(&self, parts: u64, route: F) -> Vec<Stream<G, D2>>;
2625
}
2726

28-
impl<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)+'static> Partition<G, D, D2, F> for Stream<G, D> {
27+
impl<G: Scope, D: Data, D2: Data, F: Fn(D) -> (u64, D2) + 'static> Partition<G, D, D2, F>
28+
for Stream<G, D>
29+
{
2930
fn partition(&self, parts: u64, route: F) -> Vec<Stream<G, D2>> {
30-
let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());
31-
32-
let mut input = builder.new_input(self, Pipeline);
33-
let mut outputs = Vec::with_capacity(parts as usize);
34-
let mut streams = Vec::with_capacity(parts as usize);
35-
36-
for _ in 0 .. parts {
37-
let (output, stream) = builder.new_output();
38-
outputs.push(output);
39-
streams.push(stream);
40-
}
41-
42-
builder.build(move |_| {
43-
move |_frontiers| {
44-
let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::<Vec<_>>();
45-
input.for_each(|time, data| {
46-
let mut sessions = handles.iter_mut().map(|h| h.session(&time)).collect::<Vec<_>>();
47-
48-
for datum in data.drain(..) {
49-
let (part, datum2) = route(datum);
50-
sessions[part as usize].give(datum2);
51-
}
52-
});
53-
}
54-
});
55-
56-
streams
31+
PartitionCore::partition(self, parts, route)
5732
}
5833
}

0 commit comments

Comments
 (0)