Skip to content

Commit fcbac30

Browse files
committed
Adjust reachability logic to be more linear
1 parent a3c83f8 commit fcbac30

File tree

1 file changed

+48
-16
lines changed

1 file changed

+48
-16
lines changed

timely/src/progress/reachability.rs

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -749,6 +749,19 @@ fn summarize_outputs<T: Timestamp>(
749749
}
750750
}
751751

752+
// A reverse map from operator outputs to inputs, along their internal summaries.
753+
let mut reverse_internal: HashMap<_, Vec<_>> = HashMap::new();
754+
for (node, connectivity) in nodes.iter().enumerate() {
755+
for (input, outputs) in connectivity.iter().enumerate() {
756+
for (output, summary) in outputs.iter_ports() {
757+
reverse_internal
758+
.entry(Location::new_source(node, output))
759+
.or_default()
760+
.push((input, summary));
761+
}
762+
}
763+
}
764+
752765
let mut results: HashMap<Location, PortConnectivity<T::Summary>> = HashMap::new();
753766
let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new();
754767

@@ -771,28 +784,47 @@ fn summarize_outputs<T: Timestamp>(
771784

772785
// This is an output port of an operator, or a scope input.
773786
// We want to crawl up the operator, to its inputs.
774-
Port::Source(output_port) => {
775-
776-
// Consider each input port of the associated operator.
777-
for (input_port, summaries) in nodes[location.node].iter().enumerate() {
778-
779-
// Determine the current path summaries from the input port.
780-
let location = Location { node: location.node, port: Port::Target(input_port) };
781-
let antichains = results.entry(location).or_default();
782-
783-
// Combine each operator-internal summary to the output with `summary`.
784-
if let Some(connection) = summaries.get(output_port) {
785-
for operator_summary in connection.elements().iter() {
786-
if let Some(combined) = operator_summary.followed_by(&summary) {
787-
if antichains.insert_ref(output, &combined) {
787+
// Port::Source(output_port) => {
788+
789+
// // Consider each input port of the associated operator.
790+
// for (input_port, summaries) in nodes[location.node].iter().enumerate() {
791+
792+
// // Determine the current path summaries from the input port.
793+
// let location = Location { node: location.node, port: Port::Target(input_port) };
794+
// let antichains = results.entry(location).or_default();
795+
796+
// // Combine each operator-internal summary to the output with `summary`.
797+
// if let Some(connection) = summaries.get(output_port) {
798+
// for operator_summary in connection.elements().iter() {
799+
// if let Some(combined) = operator_summary.followed_by(&summary) {
800+
// if antichains.insert_ref(output, &combined) {
801+
// worklist.push_back((location, output, combined));
802+
// }
803+
// }
804+
// }
805+
// }
806+
// }
807+
808+
// },
809+
810+
Port::Source(_output_port) => {
811+
if let Some(inputs) = reverse_internal.get(&location) {
812+
for (input_port, operator_summary) in inputs.iter() {
813+
let location = Location::new_target(location.node, *input_port);
814+
for summary in operator_summary.elements().iter() {
815+
if let Some(combined) = summary.followed_by(&summary) {
816+
let update =
817+
results.entry(location)
818+
.or_default()
819+
.insert_ref(output, &combined);
820+
if update {
788821
worklist.push_back((location, output, combined));
789822
}
790823
}
791824
}
792825
}
793826
}
794-
795-
},
827+
}
796828

797829
// This is an input port of an operator, or a scope output.
798830
// We want to walk back the edges leading to it.

0 commit comments

Comments
 (0)