Skip to content

Commit f8a234a

Browse files
committed
Adjust reachability logic to be more linear
1 parent a3c83f8 commit f8a234a

File tree

1 file changed

+24
-25
lines changed

1 file changed

+24
-25
lines changed

timely/src/progress/reachability.rs

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -749,6 +749,19 @@ fn summarize_outputs<T: Timestamp>(
749749
}
750750
}
751751

752+
// A reverse map from operator outputs to inputs, along their internal summaries.
753+
let mut reverse_internal: HashMap<_, Vec<_>> = HashMap::new();
754+
for (node, connectivity) in nodes.iter().enumerate() {
755+
for (input, outputs) in connectivity.iter().enumerate() {
756+
for (output, summary) in outputs.iter_ports() {
757+
reverse_internal
758+
.entry(Location::new_source(node, output))
759+
.or_default()
760+
.push((input, summary));
761+
}
762+
}
763+
}
764+
752765
let mut results: HashMap<Location, PortConnectivity<T::Summary>> = HashMap::new();
753766
let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new();
754767

@@ -766,50 +779,36 @@ fn summarize_outputs<T: Timestamp>(
766779

767780
// Loop until we stop discovering novel reachability paths.
768781
while let Some((location, output, summary)) = worklist.pop_front() {
769-
770782
match location.port {
771783

772784
// This is an output port of an operator, or a scope input.
773785
// We want to crawl up the operator, to its inputs.
774-
Port::Source(output_port) => {
775-
776-
// Consider each input port of the associated operator.
777-
for (input_port, summaries) in nodes[location.node].iter().enumerate() {
778-
779-
// Determine the current path summaries from the input port.
780-
let location = Location { node: location.node, port: Port::Target(input_port) };
781-
let antichains = results.entry(location).or_default();
782-
783-
// Combine each operator-internal summary to the output with `summary`.
784-
if let Some(connection) = summaries.get(output_port) {
785-
for operator_summary in connection.elements().iter() {
786-
if let Some(combined) = operator_summary.followed_by(&summary) {
787-
if antichains.insert_ref(output, &combined) {
788-
worklist.push_back((location, output, combined));
786+
Port::Source(_output_port) => {
787+
if let Some(inputs) = reverse_internal.get(&location) {
788+
for (input_port, operator_summary) in inputs.iter() {
789+
let new_location = Location::new_target(location.node, *input_port);
790+
for op_summary in operator_summary.elements().iter() {
791+
if let Some(combined) = op_summary.followed_by(&summary) {
792+
if results.entry(new_location).or_default().insert_ref(output, &combined) {
793+
worklist.push_back((new_location, output, combined));
789794
}
790795
}
791796
}
792797
}
793798
}
794-
795-
},
799+
}
796800

797801
// This is an input port of an operator, or a scope output.
798802
// We want to walk back the edges leading to it.
799803
Port::Target(_port) => {
800-
801804
// Each target should have (at most) one source.
802805
if let Some(&source) = reverse.get(&location) {
803-
let antichains = results.entry(source).or_default();
804-
805-
if antichains.insert_ref(output, &summary) {
806-
worklist.push_back((source, output, summary.clone()));
806+
if results.entry(source).or_default().insert_ref(output, &summary) {
807+
worklist.push_back((source, output, summary));
807808
}
808809
}
809-
810810
},
811811
}
812-
813812
}
814813

815814
results

0 commit comments

Comments
 (0)