Skip to content

Commit c6d7e73

Browse files
committed
Support optional path summaries for disconnected ports
1 parent b719156 commit c6d7e73

File tree

4 files changed

+36
-31
lines changed

4 files changed

+36
-31
lines changed

timely/src/dataflow/operators/capability.rs

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -246,11 +246,13 @@ pub struct InputCapability<T: Timestamp> {
246246
impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
247247
fn time(&self) -> &T { self.time() }
248248
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
249-
let borrow = self.summaries.borrow();
250-
self.internal.borrow().iter().enumerate().any(|(index, rc)| {
251-
// To be valid, the output buffer must match and the timestamp summary needs to be the default.
252-
Rc::ptr_eq(rc, query_buffer) && borrow[index].len() == 1 && borrow[index][0] == Default::default()
253-
})
249+
let summaries_borrow = self.summaries.borrow();
250+
let internal_borrow = self.internal.borrow();
251+
// To be valid, the output buffer must match and the timestamp summary needs to be the default.
252+
let result = summaries_borrow.iter_ports().any(|(port, path)| {
253+
Rc::ptr_eq(&internal_borrow[port], query_buffer) && path.len() == 1 && path[0] == Default::default()
254+
});
255+
result
254256
}
255257
}
256258

@@ -281,10 +283,15 @@ impl<T: Timestamp> InputCapability<T> {
281283
/// Delays capability for a specific output port.
282284
pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability<T> {
283285
use crate::progress::timestamp::PathSummary;
284-
if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
285-
Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port]))
286-
} else {
287-
panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, self.summaries.borrow()[output_port], self.time());
286+
if let Some(path) = self.summaries.borrow().get(output_port) {
287+
if path.iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
288+
Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port]))
289+
} else {
290+
panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, path, self.time());
291+
}
292+
}
293+
else {
294+
panic!("Attempted to delay a capability for a disconnected output");
288295
}
289296
}
290297

@@ -305,11 +312,16 @@ impl<T: Timestamp> InputCapability<T> {
305312
pub fn retain_for_output(self, output_port: usize) -> Capability<T> {
306313
use crate::progress::timestamp::PathSummary;
307314
let self_time = self.time().clone();
308-
if self.summaries.borrow()[output_port].iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) {
309-
Capability::new(self_time, Rc::clone(&self.internal.borrow()[output_port]))
315+
if let Some(path) = self.summaries.borrow().get(output_port) {
316+
if path.iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) {
317+
Capability::new(self_time, Rc::clone(&self.internal.borrow()[output_port]))
318+
}
319+
else {
320+
panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, path, self_time);
321+
}
310322
}
311323
else {
312-
panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, self.summaries.borrow()[output_port], self_time);
324+
panic!("Attempted to retain a capability for a disconnected output");
313325
}
314326
}
315327
}

timely/src/progress/operate.rs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,6 @@ impl<TS> Default for PortConnectivity<TS> {
7373
}
7474

7575
impl<TS> PortConnectivity<TS> {
76-
/// Introduces default summaries for `0 .. count` ports.
77-
pub fn default_for(count: usize) -> Self {
78-
let mut list = Vec::with_capacity(count);
79-
for _ in 0 .. count { list.push(Default::default()) }
80-
Self { list }
81-
}
8276
/// Ensures an entry exists at `index` and returns a mutable reference to it.
8377
fn ensure(&mut self, index: usize) -> &mut Antichain<TS> {
8478
while self.list.len() <= index { self.add_port(self.list.len(), Antichain::new()); }
@@ -105,6 +99,10 @@ impl<TS> PortConnectivity<TS> {
10599
pub fn max_port(&self) -> usize {
106100
self.list.len() - 1
107101
}
102+
/// Returns the associated path summary, if it exists.
103+
pub fn get(&self, index: usize) -> Option<&Antichain<TS>> {
104+
self.list.get(index)
105+
}
108106
}
109107

110108
impl<TS> From<Vec<Antichain<TS>>> for PortConnectivity<TS> {
@@ -113,13 +111,6 @@ impl<TS> From<Vec<Antichain<TS>>> for PortConnectivity<TS> {
113111
}
114112
}
115113

116-
impl<TS> std::ops::Index<usize> for PortConnectivity<TS> {
117-
type Output = Antichain<TS>;
118-
fn index(&self, index: usize) -> &Self::Output {
119-
&self.list[index]
120-
}
121-
}
122-
123114
/// Progress information shared between parent and child.
124115
#[derive(Debug)]
125116
pub struct SharedProgress<T: Timestamp> {

timely/src/progress/reachability.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -781,10 +781,12 @@ fn summarize_outputs<T: Timestamp>(
781781
let antichains = results.entry(location).or_default();
782782

783783
// Combine each operator-internal summary to the output with `summary`.
784-
for operator_summary in summaries[output_port].elements().iter() {
785-
if let Some(combined) = operator_summary.followed_by(&summary) {
786-
if antichains.insert_ref(output, &combined) {
787-
worklist.push_back((location, output, combined));
784+
if let Some(connection) = summaries.get(output_port) {
785+
for operator_summary in connection.elements().iter() {
786+
if let Some(combined) = operator_summary.followed_by(&summary) {
787+
if antichains.insert_ref(output, &combined) {
788+
worklist.push_back((location, output, combined));
789+
}
788790
}
789791
}
790792
}

timely/src/progress/subgraph.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ where
168168
let mut builder = reachability::Builder::new();
169169

170170
// Child 0 has `inputs` outputs and `outputs` inputs, not yet connected.
171-
let summary = (0..outputs).map(|_| PortConnectivity::default_for(inputs)).collect();
171+
let summary = (0..outputs).map(|_| PortConnectivity::default()).collect();
172172
builder.add_node(0, outputs, inputs, summary);
173173
for (index, child) in self.children.iter().enumerate().skip(1) {
174174
builder.add_node(index, child.inputs, child.outputs, child.internal_summary.clone());
@@ -555,7 +555,7 @@ where
555555
// Note that we need to have `self.inputs()` elements in the summary
556556
// with each element containing `self.outputs()` antichains regardless
557557
// of how long `self.scope_summary` is
558-
let mut internal_summary = vec![PortConnectivity::default_for(self.outputs); self.inputs()];
558+
let mut internal_summary = vec![PortConnectivity::default(); self.inputs()];
559559
for (input_idx, input) in self.scope_summary.iter().enumerate() {
560560
for (output_idx, output) in input.iter_ports() {
561561
for outer in output.elements().iter().cloned().map(TInner::summarize) {

0 commit comments

Comments
 (0)