forked from TimelyDataflow/timely-dataflow
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbranch.rs
More file actions
124 lines (110 loc) · 4.49 KB
/
branch.rs
File metadata and controls
124 lines (110 loc) · 4.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
//! Operators that separate one stream into two streams based on some condition
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
use crate::dataflow::{Scope, Stream, StreamCore};
use crate::{Container, Data};
/// Extension trait for `Stream`.
pub trait Branch<S: Scope, D: Data> {
/// Takes one input stream and splits it into two output streams.
/// For each record, the supplied closure is called with a reference to
/// the data and its time. If it returns `true`, the record will be sent
/// to the second returned stream, otherwise it will be sent to the first.
///
/// If the result of the closure only depends on the time, not the data,
/// `branch_when` should be used instead.
///
/// # Examples
/// ```
/// use timely::dataflow::operators::{ToStream, Branch, Inspect};
///
/// timely::example(|scope| {
/// let (odd, even) = (0..10)
/// .to_stream(scope)
/// .branch(|_time, x| *x % 2 == 0);
///
/// even.inspect(|x| println!("even numbers: {:?}", x));
/// odd.inspect(|x| println!("odd numbers: {:?}", x));
/// });
/// ```
fn branch(
&self,
condition: impl Fn(&S::Timestamp, &D) -> bool + 'static,
) -> (Stream<S, D>, Stream<S, D>);
}
impl<S: Scope, D: Data> Branch<S, D> for Stream<S, D> {
fn branch(
&self,
condition: impl Fn(&S::Timestamp, &D) -> bool + 'static,
) -> (Stream<S, D>, Stream<S, D>) {
let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
builder.set_notify(false);
let mut input = builder.new_input(self, Pipeline);
let (mut output1, stream1) = builder.new_output();
let (mut output2, stream2) = builder.new_output();
builder.build(move |_| {
move |_frontiers| {
let mut output1_handle = output1.activate();
let mut output2_handle = output2.activate();
input.for_each(|time, data| {
let mut out1 = output1_handle.session(&time);
let mut out2 = output2_handle.session(&time);
for datum in data.drain(..) {
if condition(time.time(), &datum) {
out2.give(datum);
} else {
out1.give(datum);
}
}
});
}
});
(stream1, stream2)
}
}
/// Extension trait for `Stream`.
pub trait BranchWhen<T>: Sized {
/// Takes one input stream and splits it into two output streams.
/// For each time, the supplied closure is called. If it returns `true`,
/// the records for that will be sent to the second returned stream, otherwise
/// they will be sent to the first.
///
/// # Examples
/// ```
/// use timely::dataflow::operators::{ToStream, BranchWhen, Inspect, Delay};
///
/// timely::example(|scope| {
/// let (before_five, after_five) = (0..10)
/// .to_stream(scope)
/// .delay(|x,t| *x) // data 0..10 at time 0..10
/// .branch_when(|time| time >= &5);
///
/// before_five.inspect(|x| println!("Times 0-4: {:?}", x));
/// after_five.inspect(|x| println!("Times 5 and later: {:?}", x));
/// });
/// ```
fn branch_when(&self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self);
}
impl<S: Scope, C: Container + Data> BranchWhen<S::Timestamp> for StreamCore<S, C> {
fn branch_when(&self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) {
let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
builder.set_notify(false);
let mut input = builder.new_input(self, Pipeline);
let (mut output1, stream1) = builder.new_output();
let (mut output2, stream2) = builder.new_output();
builder.build(move |_| {
move |_frontiers| {
let mut output1_handle = output1.activate();
let mut output2_handle = output2.activate();
input.for_each(|time, data| {
let mut out = if condition(time.time()) {
output2_handle.session(&time)
} else {
output1_handle.session(&time)
};
out.give_container(data);
});
}
});
(stream1, stream2)
}
}