Skip to content

Commit b719156

Browse files
committed
Make PortConnectivity API more explicit
1 parent b42bbc2 commit b719156

File tree

5 files changed

+26
-29
lines changed

5 files changed

+26
-29
lines changed

timely/src/dataflow/operators/generic/builder_raw.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,15 +138,15 @@ impl<G: Scope> OperatorBuilder<G> {
138138

139139
/// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
140140
pub fn new_output_connection<C: Container>(&mut self, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> (Tee<G::Timestamp, C>, StreamCore<G, C>) {
141-
141+
let new_output = self.shape.outputs;
142+
self.shape.outputs += 1;
142143
let (targets, registrar) = Tee::<G::Timestamp,C>::new();
143-
let source = Source::new(self.index, self.shape.outputs);
144+
let source = Source::new(self.index, new_output);
144145
let stream = StreamCore::new(source, registrar, self.scope.clone());
145146

146-
self.shape.outputs += 1;
147147
assert_eq!(self.shape.inputs, connection.len());
148148
for (summary, entry) in self.summary.iter_mut().zip(connection.into_iter()) {
149-
summary.add_port(entry);
149+
summary.add_port(new_output, entry);
150150
}
151151

152152
(targets, stream)

timely/src/dataflow/operators/generic/builder_rc.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ impl<G: Scope> OperatorBuilder<G> {
113113
OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
114114
StreamCore<G, CB::Container>
115115
) {
116-
116+
let new_output = self.shape().outputs();
117117
let (tee, stream) = self.builder.new_output_connection(connection.clone());
118118

119119
let internal = Rc::new(RefCell::new(ChangeBatch::new()));
@@ -123,7 +123,7 @@ impl<G: Scope> OperatorBuilder<G> {
123123
self.produced.push(Rc::clone(buffer.inner().produced()));
124124

125125
for (summary, connection) in self.summaries.iter().zip(connection.into_iter()) {
126-
summary.borrow_mut().add_port(connection.clone());
126+
summary.borrow_mut().add_port(new_output, connection.clone());
127127
}
128128

129129
(OutputWrapper::new(buffer, internal), stream)

timely/src/progress/operate.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -80,29 +80,30 @@ impl<TS> PortConnectivity<TS> {
8080
Self { list }
8181
}
8282
/// Ensures an entry exists at `index` and returns a mutable reference to it.
83-
pub fn ensure(&mut self, index: usize) -> &mut Antichain<TS> {
84-
while self.next_port() <= index { self.add_port(Antichain::new()); }
83+
fn ensure(&mut self, index: usize) -> &mut Antichain<TS> {
84+
while self.list.len() <= index { self.add_port(self.list.len(), Antichain::new()); }
8585
&mut self.list[index]
8686
}
8787
/// Inserts an element by reference, ensuring that the index exists.
88+
pub fn insert(&mut self, index: usize, element: TS) -> bool where TS : crate::PartialOrder + Clone {
89+
self.ensure(index).insert(element)
90+
}
91+
/// Inserts an element by reference, ensuring that the index exists.
8892
pub fn insert_ref(&mut self, index: usize, element: &TS) -> bool where TS : crate::PartialOrder + Clone {
8993
self.ensure(index).insert_ref(element)
9094
}
91-
/// Introduces a summary for the port `self.next_port()`.
92-
pub fn add_port(&mut self, summary: Antichain<TS>) {
95+
/// Introduces a summary for `port`. Panics if a summary already exists.
96+
pub fn add_port(&mut self, port: usize, summary: Antichain<TS>) {
97+
assert_eq!(self.list.len(), port);
9398
self.list.push(summary);
9499
}
95100
/// Borrowing iterator of port identifiers and antichains.
96101
pub fn iter_ports(&self) -> impl Iterator<Item = (usize, &Antichain<TS>)> {
97102
self.list.iter().enumerate()
98103
}
99-
/// Owning iterator of port identifiers and antichains.
100-
pub fn into_iter_ports(self) -> impl Iterator<Item = (usize, Antichain<TS>)> {
101-
self.list.into_iter().enumerate()
102-
}
103-
/// Announces the next output port identifier.
104-
pub fn next_port(&self) -> usize {
105-
self.list.len()
104+
/// Announces the largest port identifier, largely for debug asserts.
105+
pub fn max_port(&self) -> usize {
106+
self.list.len() - 1
106107
}
107108
}
108109

timely/src/progress/reachability.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ impl<T: Timestamp> Builder<T> {
160160

161161
// Assert that all summaries exist.
162162
debug_assert_eq!(inputs, summary.len());
163-
for x in summary.iter() { debug_assert_eq!(outputs, x.next_port()); }
163+
debug_assert!(summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < outputs)));
164164

165165
while self.nodes.len() <= index {
166166
self.nodes.push(Vec::new());
@@ -779,8 +779,6 @@ fn summarize_outputs<T: Timestamp>(
779779
// Determine the current path summaries from the input port.
780780
let location = Location { node: location.node, port: Port::Target(input_port) };
781781
let antichains = results.entry(location).or_default();
782-
// TODO: This is redundant with `insert_ref` below.
783-
antichains.ensure(output);
784782

785783
// Combine each operator-internal summary to the output with `summary`.
786784
for operator_summary in summaries[output_port].elements().iter() {
@@ -801,8 +799,6 @@ fn summarize_outputs<T: Timestamp>(
801799
// Each target should have (at most) one source.
802800
if let Some(&source) = reverse.get(&location) {
803801
let antichains = results.entry(source).or_default();
804-
// TODO: This is redundant with `insert_ref` below.
805-
antichains.ensure(output);
806802

807803
if antichains.insert_ref(output, &summary) {
808804
worklist.push_back((source, output, summary.clone()));

timely/src/progress/subgraph.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -558,9 +558,9 @@ where
558558
let mut internal_summary = vec![PortConnectivity::default_for(self.outputs); self.inputs()];
559559
for (input_idx, input) in self.scope_summary.iter().enumerate() {
560560
for (output_idx, output) in input.iter_ports() {
561-
let antichain = internal_summary[input_idx].ensure(output_idx);
562-
antichain.reserve(output.elements().len());
563-
antichain.extend(output.elements().iter().cloned().map(TInner::summarize));
561+
for outer in output.elements().iter().cloned().map(TInner::summarize) {
562+
internal_summary[input_idx].insert(output_idx, outer);
563+
}
564564
}
565565
}
566566

@@ -570,8 +570,8 @@ where
570570
"the internal summary should have as many elements as there are inputs",
571571
);
572572
debug_assert!(
573-
internal_summary.iter().all(|summary| summary.next_port() == self.outputs()),
574-
"each element of the internal summary should have as many elements as there are outputs",
573+
internal_summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < self.outputs())),
574+
"each element of the internal summary should only reference valid outputs",
575575
);
576576

577577
// Each child has expressed initial capabilities (their `shared_progress.internals`).
@@ -671,8 +671,8 @@ impl<T: Timestamp> PerOperatorState<T> {
671671
inputs,
672672
);
673673
assert!(
674-
!internal_summary.iter().any(|x| x.next_port() != outputs),
675-
"operator summary had too few outputs",
674+
internal_summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < outputs)),
675+
"operator summary references invalid output port",
676676
);
677677

678678
PerOperatorState {

0 commit comments

Comments
 (0)