Skip to content

Commit 3d5dcf0

Browse files
committed
Introduce tests; catch further issues
1 parent 89a7b50 commit 3d5dcf0

File tree

4 files changed

+107
-18
lines changed

4 files changed

+107
-18
lines changed

timely/src/dataflow/operators/generic/builder_raw.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ impl<G: Scope> OperatorBuilder<G> {
124124
stream.connect_to(target, sender, channel_id);
125125

126126
self.shape.inputs += 1;
127-
assert_eq!(self.shape.outputs, connection.len());
128127
self.summary.push(connection);
129128

130129
receiver

timely/src/progress/reachability.rs

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -754,6 +754,19 @@ fn summarize_outputs<T: Timestamp>(
754754
}
755755
}
756756

757+
// A reverse map from operator outputs to inputs, along their internal summaries.
758+
let mut reverse_internal = HashMap::new();
759+
for (node, connectivity) in nodes.iter().enumerate() {
760+
for (input, outputs) in connectivity.iter().enumerate() {
761+
for (output, summary) in outputs.iter() {
762+
reverse_internal
763+
.entry(Location::new_source(node, *output))
764+
.or_insert_with(|| Vec::default())
765+
.push((input, summary));
766+
}
767+
}
768+
}
769+
757770
let mut results: HashMap<Location, PortConnectivity<T::Summary>> = HashMap::new();
758771
let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new();
759772

@@ -776,20 +789,18 @@ fn summarize_outputs<T: Timestamp>(
776789

777790
// This is an output port of an operator, or a scope input.
778791
// We want to crawl up the operator, to its inputs.
779-
Port::Source(output_port) => {
780-
781-
// Consider each input port of the associated operator.
782-
for (input_port, summaries) in nodes[location.node].iter().enumerate() {
783-
784-
// Determine the current path summaries from the input port.
785-
let location = Location { node: location.node, port: Port::Target(input_port) };
786-
let antichains = results.entry(location).or_insert_with(|| PortConnectivity::default());
787-
788-
// Combine each operator-internal summary to the output with `summary`.
789-
if let Some(port_connectivity) = summaries.get(&output_port) {
790-
for operator_summary in port_connectivity.elements().iter() {
791-
if let Some(combined) = operator_summary.followed_by(&summary) {
792-
let update = antichains.entry(output).or_insert_with(|| Antichain::default()).insert_ref(&combined);
792+
Port::Source(_output_port) => {
793+
if let Some(inputs) = reverse_internal.get(&location) {
794+
for (input_port, operator_summary) in inputs.iter() {
795+
let location = Location::new_target(location.node, *input_port);
796+
for summary in operator_summary.elements().iter() {
797+
if let Some(combined) = summary.followed_by(&summary) {
798+
let update =
799+
results.entry(location)
800+
.or_insert_with(|| PortConnectivity::default())
801+
.entry(output)
802+
.or_insert_with(|| Antichain::default())
803+
.insert_ref(&combined);
793804
if update {
794805
worklist.push_back((location, output, combined));
795806
}

timely/src/progress/subgraph.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -572,8 +572,8 @@ where
572572
"the internal summary should have as many elements as there are inputs",
573573
);
574574
debug_assert!(
575-
internal_summary.iter().all(|summary| summary.len() == self.outputs()),
576-
"each element of the internal summary should have as many elements as there are outputs",
575+
internal_summary.iter().all(|summary| summary.keys().all(|output| output < &self.outputs())),
576+
"each element of the internal summary should only reference recognized outputs",
577577
);
578578

579579
// Each child has expressed initial capabilities (their `shared_progress.internals`).
@@ -673,7 +673,7 @@ impl<T: Timestamp> PerOperatorState<T> {
673673
inputs,
674674
);
675675
assert!(
676-
!internal_summary.iter().any(|x| x.len() != outputs),
676+
!internal_summary.iter().any(|x| x.keys().any(|k| k >= &outputs)),
677677
"operator summary had too few outputs",
678678
);
679679

timely/tests/shape_scaling.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
use timely::dataflow::channels::pact::Pipeline;
2+
use timely::dataflow::operators::Input;
3+
use timely::dataflow::InputHandle;
4+
use timely::Config;
5+
6+
#[test] fn operator_scaling_1() { operator_scaling(1); }
7+
#[test] fn operator_scaling_10() { operator_scaling(10); }
8+
#[test] fn operator_scaling_100() { operator_scaling(100); }
9+
#[test] fn operator_scaling_1000() { operator_scaling(1000); }
10+
#[test] fn operator_scaling_10000() { operator_scaling(10000); }
11+
#[test] fn operator_scaling_100000() { operator_scaling(100000); }
12+
13+
fn operator_scaling(scale: u64) {
14+
timely::execute(Config::thread(), move |worker| {
15+
let mut input = InputHandle::new();
16+
worker.dataflow::<u64, _, _>(|scope| {
17+
use timely::dataflow::operators::Partition;
18+
let parts =
19+
scope
20+
.input_from(&mut input)
21+
.partition(scale, |()| (0, ()));
22+
23+
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
24+
let mut builder = OperatorBuilder::new("OpScaling".to_owned(), scope.clone());
25+
let mut handles = Vec::with_capacity(parts.len());
26+
let mut outputs = Vec::with_capacity(parts.len());
27+
for (index, part) in parts.into_iter().enumerate() {
28+
use timely::container::CapacityContainerBuilder;
29+
let (output, stream) = builder.new_output_connection::<CapacityContainerBuilder<Vec<()>>>(Default::default());
30+
use timely::progress::Antichain;
31+
let connectivity = [(index, Antichain::from_elem(Default::default()))].into();
32+
handles.push((builder.new_input_connection(&part, Pipeline, connectivity), output));
33+
outputs.push(stream);
34+
}
35+
36+
builder.build(move |_| {
37+
move |_frontiers| {
38+
for (input, output) in handles.iter_mut() {
39+
let mut output = output.activate();
40+
input.for_each(|time, data| {
41+
let mut output = output.session_with_builder(&time);
42+
for datum in data.drain(..) {
43+
output.give(datum);
44+
}
45+
});
46+
}
47+
}
48+
});
49+
});
50+
})
51+
.unwrap();
52+
}
53+
54+
#[test] fn subgraph_scaling_1() { subgraph_scaling(1); }
55+
#[test] fn subgraph_scaling_10() { subgraph_scaling(10); }
56+
#[test] fn subgraph_scaling_100() { subgraph_scaling(100); }
57+
#[test] fn subgraph_scaling_1000() { subgraph_scaling(1000); }
58+
#[test] fn subgraph_scaling_10000() { subgraph_scaling(10000); }
59+
#[test] fn subgraph_scaling_100000() { subgraph_scaling(100000); }
60+
61+
fn subgraph_scaling(scale: u64) {
62+
timely::execute(Config::thread(), move |worker| {
63+
let mut input = InputHandle::new();
64+
worker.dataflow::<u64, _, _>(|scope| {
65+
use timely::dataflow::operators::Partition;
66+
let parts =
67+
scope
68+
.input_from(&mut input)
69+
.partition(scale, |()| (0, ()));
70+
71+
use timely::dataflow::Scope;
72+
let _outputs = scope.region(|inner| {
73+
use timely::dataflow::operators::{Enter, Leave};
74+
parts.into_iter().map(|part| part.enter(inner).leave()).collect::<Vec<_>>()
75+
});
76+
});
77+
})
78+
.unwrap();
79+
}

0 commit comments

Comments
 (0)