Skip to content

Commit aa08b30

Browse files
committed
Switch builder API to port-identified iterators
1 parent f8a234a commit aa08b30

File tree

5 files changed

+49
-43
lines changed

5 files changed

+49
-43
lines changed

timely/src/dataflow/operators/core/feedback.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ impl<G: Scope, C: Container + Data> ConnectLoop<G, C> for StreamCore<G, C> {
113113
let summary = handle.summary;
114114
let mut output = handle.output;
115115

116-
let mut input = builder.new_input_connection(self, Pipeline, vec![Antichain::from_elem(summary.clone())]);
116+
let mut input = builder.new_input_connection(self, Pipeline, [(0, Antichain::from_elem(summary.clone()))]);
117117

118118
builder.build(move |_capability| move |_frontier| {
119119
let mut output = output.activate();

timely/src/dataflow/operators/generic/builder_raw.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::scheduling::{Schedule, Activations};
1212

1313
use crate::progress::{Source, Target};
1414
use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain};
15-
use crate::progress::operate::Connectivity;
15+
use crate::progress::operate::{Connectivity, PortConnectivity};
1616
use crate::Container;
1717
use crate::dataflow::{StreamCore, Scope};
1818
use crate::dataflow::channels::pushers::Tee;
@@ -105,48 +105,53 @@ impl<G: Scope> OperatorBuilder<G> {
105105

106106
/// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
107107
pub fn new_input<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P) -> P::Puller
108-
where
109-
P: ParallelizationContract<G::Timestamp, C> {
110-
let connection = vec![Antichain::from_elem(Default::default()); self.shape.outputs];
108+
where
109+
P: ParallelizationContract<G::Timestamp, C>
110+
{
111+
let connection = (0 .. self.shape.outputs).map(|o| (o, Antichain::from_elem(Default::default())));
111112
self.new_input_connection(stream, pact, connection)
112113
}
113114

114115
/// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
115-
pub fn new_input_connection<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> P::Puller
116+
pub fn new_input_connection<C: Container, P, I>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: I) -> P::Puller
116117
where
117-
P: ParallelizationContract<G::Timestamp, C> {
118-
118+
P: ParallelizationContract<G::Timestamp, C>,
119+
I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)>,
120+
{
119121
let channel_id = self.scope.new_identifier();
120122
let logging = self.scope.logging();
121123
let (sender, receiver) = pact.connect(&mut self.scope, channel_id, Rc::clone(&self.address), logging);
122124
let target = Target::new(self.index, self.shape.inputs);
123125
stream.connect_to(target, sender, channel_id);
124126

125127
self.shape.inputs += 1;
126-
assert_eq!(self.shape.outputs, connection.len());
127-
self.summary.push(connection.into());
128+
let connectivity: PortConnectivity<_> = connection.into_iter().collect();
129+
assert!(connectivity.iter_ports().all(|(o,_)| o < self.shape.outputs));
130+
self.summary.push(connectivity);
128131

129132
receiver
130133
}
131134

132135
/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
133136
pub fn new_output<C: Container>(&mut self) -> (Tee<G::Timestamp, C>, StreamCore<G, C>) {
134137

135-
let connection = vec![Antichain::from_elem(Default::default()); self.shape.inputs];
138+
let connection = (0 .. self.shape.inputs).map(|i| (i, Antichain::from_elem(Default::default())));
136139
self.new_output_connection(connection)
137140
}
138141

139142
/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
140-
pub fn new_output_connection<C: Container>(&mut self, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> (Tee<G::Timestamp, C>, StreamCore<G, C>) {
143+
pub fn new_output_connection<C: Container, I>(&mut self, connection: I) -> (Tee<G::Timestamp, C>, StreamCore<G, C>)
144+
where
145+
I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)>,
146+
{
141147
let new_output = self.shape.outputs;
142148
self.shape.outputs += 1;
143149
let (targets, registrar) = Tee::<G::Timestamp,C>::new();
144150
let source = Source::new(self.index, new_output);
145151
let stream = StreamCore::new(source, registrar, self.scope.clone());
146152

147-
assert_eq!(self.shape.inputs, connection.len());
148-
for (summary, entry) in self.summary.iter_mut().zip(connection.into_iter()) {
149-
summary.add_port(new_output, entry);
153+
for (input, entry) in connection {
154+
self.summary[input].add_port(new_output, entry);
150155
}
151156

152157
(targets, stream)

timely/src/dataflow/operators/generic/builder_rc.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ impl<G: Scope> OperatorBuilder<G> {
6464
where
6565
P: ParallelizationContract<G::Timestamp, C> {
6666

67-
let connection = (0..self.builder.shape().outputs()).map(|_| Antichain::from_elem(Default::default())).collect();
67+
let connection = (0..self.builder.shape().outputs()).map(|o| (o, Antichain::from_elem(Default::default())));
6868
self.new_input_connection(stream, pact, connection)
6969
}
7070

@@ -76,25 +76,26 @@ impl<G: Scope> OperatorBuilder<G> {
7676
///
7777
/// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
7878
/// antichain indicating that there is no connection from the input to the output.
79-
pub fn new_input_connection<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> InputHandleCore<G::Timestamp, C, P::Puller>
80-
where
81-
P: ParallelizationContract<G::Timestamp, C> {
82-
79+
pub fn new_input_connection<C: Container, P, I>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: I) -> InputHandleCore<G::Timestamp, C, P::Puller>
80+
where
81+
P: ParallelizationContract<G::Timestamp, C>,
82+
I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)> + Clone,
83+
{
8384
let puller = self.builder.new_input_connection(stream, pact, connection.clone());
8485

8586
let input = PullCounter::new(puller);
8687
self.frontier.push(MutableAntichain::new());
8788
self.consumed.push(Rc::clone(input.consumed()));
8889

89-
let shared_summary = Rc::new(RefCell::new(connection.into()));
90+
let shared_summary = Rc::new(RefCell::new(connection.into_iter().collect()));
9091
self.summaries.push(Rc::clone(&shared_summary));
9192

9293
new_input_handle(input, Rc::clone(&self.internal), shared_summary, self.logging.clone())
9394
}
9495

9596
/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
9697
pub fn new_output<CB: ContainerBuilder>(&mut self) -> (OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, StreamCore<G, CB::Container>) {
97-
let connection = (0..self.builder.shape().inputs()).map(|_| Antichain::from_elem(Default::default())).collect();
98+
let connection = (0..self.builder.shape().inputs()).map(|i| (i, Antichain::from_elem(Default::default())));
9899
self.new_output_connection(connection)
99100
}
100101

@@ -106,13 +107,13 @@ impl<G: Scope> OperatorBuilder<G> {
106107
///
107108
/// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
108109
/// antichain indicating that there is no connection from the input to the output.
109-
pub fn new_output_connection<CB: ContainerBuilder>(
110-
&mut self,
111-
connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>
112-
) -> (
110+
pub fn new_output_connection<CB: ContainerBuilder, I>(&mut self, connection: I) -> (
113111
OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
114112
StreamCore<G, CB::Container>
115-
) {
113+
)
114+
where
115+
I: IntoIterator<Item = (usize, Antichain<<G::Timestamp as Timestamp>::Summary>)> + Clone,
116+
{
116117
let new_output = self.shape().outputs();
117118
let (tee, stream) = self.builder.new_output_connection(connection.clone());
118119

@@ -122,8 +123,8 @@ impl<G: Scope> OperatorBuilder<G> {
122123
let mut buffer = PushBuffer::new(PushCounter::new(tee));
123124
self.produced.push(Rc::clone(buffer.inner().produced()));
124125

125-
for (summary, connection) in self.summaries.iter().zip(connection.into_iter()) {
126-
summary.borrow_mut().add_port(new_output, connection.clone());
126+
for (input, entry) in connection {
127+
self.summaries[input].borrow_mut().add_port(new_output, entry);
127128
}
128129

129130
(OutputWrapper::new(buffer, internal), stream)

timely/src/progress/operate.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,9 @@ impl<TS> PortConnectivity<TS> {
101101
}
102102
}
103103

104-
impl<TS> From<Vec<Antichain<TS>>> for PortConnectivity<TS> {
105-
fn from(list: Vec<Antichain<TS>>) -> Self {
106-
Self { tree: list.into_iter().enumerate().filter(|(_,p)| !p.is_empty()).collect() }
104+
impl<TS> FromIterator<(usize, Antichain<TS>)> for PortConnectivity<TS> {
105+
fn from_iter<T>(iter: T) -> Self where T: IntoIterator<Item = (usize, Antichain<TS>)> {
106+
Self { tree: iter.into_iter().filter(|(_,p)| !p.is_empty()).collect() }
107107
}
108108
}
109109

timely/src/progress/reachability.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
//! let mut builder = Builder::<usize>::new();
1818
//!
1919
//! // Each node with one input connected to one output.
20-
//! builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)].into()]);
21-
//! builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)].into()]);
22-
//! builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)].into()]);
20+
//! builder.add_node(0, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
21+
//! builder.add_node(1, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
22+
//! builder.add_node(2, 1, 1, vec![[(0, Antichain::from_elem(1))].into_iter().collect()]);
2323
//!
2424
//! // Connect nodes in sequence, looping around to the first from the last.
2525
//! builder.add_edge(Source::new(0, 0), Target::new(1, 0));
@@ -113,9 +113,9 @@ use crate::progress::timestamp::PathSummary;
113113
/// let mut builder = Builder::<usize>::new();
114114
///
115115
/// // Each node with one input connected to one output.
116-
/// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)].into()]);
117-
/// builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)].into()]);
118-
/// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)].into()]);
116+
/// builder.add_node(0, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
117+
/// builder.add_node(1, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
118+
/// builder.add_node(2, 1, 1, vec![[(0, Antichain::from_elem(1))].into_iter().collect()]);
119119
///
120120
/// // Connect nodes in sequence, looping around to the first from the last.
121121
/// builder.add_edge(Source::new(0, 0), Target::new(1, 0));
@@ -224,9 +224,9 @@ impl<T: Timestamp> Builder<T> {
224224
/// let mut builder = Builder::<usize>::new();
225225
///
226226
/// // Each node with one input connected to one output.
227-
/// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)].into()]);
228-
/// builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)].into()]);
229-
/// builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(0)].into()]);
227+
/// builder.add_node(0, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
228+
/// builder.add_node(1, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
229+
/// builder.add_node(2, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
230230
///
231231
/// // Connect nodes in sequence, looping around to the first from the last.
232232
/// builder.add_edge(Source::new(0, 0), Target::new(1, 0));
@@ -253,8 +253,8 @@ impl<T: Timestamp> Builder<T> {
253253
///
254254
/// // Two inputs and outputs, only one of which advances.
255255
/// builder.add_node(0, 2, 2, vec![
256-
/// vec![Antichain::from_elem(0),Antichain::new(),].into(),
257-
/// vec![Antichain::new(),Antichain::from_elem(1),].into(),
256+
/// [(0,Antichain::from_elem(0)),(1,Antichain::new())].into_iter().collect(),
257+
/// [(0,Antichain::new()),(1,Antichain::from_elem(1))].into_iter().collect(),
258258
/// ]);
259259
///
260260
/// // Connect each output to the opposite input.

0 commit comments

Comments
 (0)