Skip to content

Commit 4337b10

Browse files
authored
dataflow/operators/core: add .partition() for StreamCore (#610)
Signed-off-by: Petros Angelatos <[email protected]>
1 parent 47e0722 commit 4337b10

File tree

3 files changed

+80
-29
lines changed

3 files changed

+80
-29
lines changed

timely/src/dataflow/operators/core/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub mod input;
1111
pub mod inspect;
1212
pub mod map;
1313
pub mod ok_err;
14+
pub mod partition;
1415
pub mod probe;
1516
pub mod rc;
1617
pub mod reclock;
@@ -27,6 +28,7 @@ pub use input::Input;
2728
pub use inspect::{Inspect, InspectCore};
2829
pub use map::Map;
2930
pub use ok_err::OkErr;
31+
pub use partition::Partition;
3032
pub use probe::Probe;
3133
pub use to_stream::{ToStream, ToStreamBuilder};
3234
pub use reclock::Reclock;
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
//! Partition a stream of records into multiple streams.
2+
3+
use timely_container::{Container, ContainerBuilder, PushInto, SizableContainer};
4+
5+
use crate::dataflow::channels::pact::Pipeline;
6+
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
7+
use crate::dataflow::{Scope, StreamCore};
8+
use crate::Data;
9+
10+
/// Partition a stream of records into multiple streams.
11+
pub trait Partition<G: Scope, C: Container> {
12+
/// Produces `parts` output streams, containing records produced and assigned by `route`.
13+
///
14+
/// # Examples
15+
/// ```
16+
/// use timely::dataflow::operators::ToStream;
17+
/// use timely::dataflow::operators::core::{Partition, Inspect};
18+
///
19+
/// timely::example(|scope| {
20+
/// let streams = (0..10).to_stream(scope)
21+
/// .partition(3, |x| (x % 3, x));
22+
///
23+
/// for (idx, stream) in streams.into_iter().enumerate() {
24+
/// stream
25+
/// .container::<Vec<_>>()
26+
/// .inspect(move |x| println!("seen {idx}: {x:?}"));
27+
/// }
28+
/// });
29+
/// ```
30+
fn partition<CB, D2, F>(&self, parts: u64, route: F) -> Vec<StreamCore<G, CB::Container>>
31+
where
32+
CB: ContainerBuilder,
33+
CB::Container: SizableContainer + PushInto<D2> + Data,
34+
F: FnMut(C::Item<'_>) -> (u64, D2) + 'static;
35+
}
36+
37+
impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
38+
fn partition<CB, D2, F>(&self, parts: u64, mut route: F) -> Vec<StreamCore<G, CB::Container>>
39+
where
40+
CB: ContainerBuilder,
41+
CB::Container: SizableContainer + PushInto<D2> + Data,
42+
F: FnMut(C::Item<'_>) -> (u64, D2) + 'static,
43+
{
44+
let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());
45+
46+
let mut input = builder.new_input(self, Pipeline);
47+
let mut outputs = Vec::with_capacity(parts as usize);
48+
let mut streams = Vec::with_capacity(parts as usize);
49+
50+
for _ in 0..parts {
51+
let (output, stream) = builder.new_output();
52+
outputs.push(output);
53+
streams.push(stream);
54+
}
55+
56+
builder.build(move |_| {
57+
move |_frontiers| {
58+
let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::<Vec<_>>();
59+
input.for_each(|time, data| {
60+
let mut sessions = handles
61+
.iter_mut()
62+
.map(|h| h.session(&time))
63+
.collect::<Vec<_>>();
64+
65+
for datum in data.drain() {
66+
let (part, datum2) = route(datum);
67+
sessions[part as usize].give(datum2);
68+
}
69+
});
70+
}
71+
});
72+
73+
streams
74+
}
75+
}
Lines changed: 3 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Partition a stream of records into multiple streams.
22
3-
use crate::dataflow::channels::pact::Pipeline;
4-
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
3+
use crate::container::CapacityContainerBuilder;
4+
use crate::dataflow::operators::core::Partition as PartitionCore;
55
use crate::dataflow::{Scope, Stream};
66
use crate::Data;
77

@@ -27,32 +27,6 @@ pub trait Partition<G: Scope, D: Data, D2: Data, F: Fn(D) -> (u64, D2)> {
2727

2828
impl<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)+'static> Partition<G, D, D2, F> for Stream<G, D> {
2929
fn partition(&self, parts: u64, route: F) -> Vec<Stream<G, D2>> {
30-
let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());
31-
32-
let mut input = builder.new_input(self, Pipeline);
33-
let mut outputs = Vec::with_capacity(parts as usize);
34-
let mut streams = Vec::with_capacity(parts as usize);
35-
36-
for _ in 0 .. parts {
37-
let (output, stream) = builder.new_output();
38-
outputs.push(output);
39-
streams.push(stream);
40-
}
41-
42-
builder.build(move |_| {
43-
move |_frontiers| {
44-
let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::<Vec<_>>();
45-
input.for_each(|time, data| {
46-
let mut sessions = handles.iter_mut().map(|h| h.session(&time)).collect::<Vec<_>>();
47-
48-
for datum in data.drain(..) {
49-
let (part, datum2) = route(datum);
50-
sessions[part as usize].give(datum2);
51-
}
52-
});
53-
}
54-
});
55-
56-
streams
30+
PartitionCore::partition::<CapacityContainerBuilder<_>, _, _>(self, parts, route)
5731
}
5832
}

0 commit comments

Comments
 (0)