Skip to content

Commit 59218d1

Browse files
committed
Add scaling test
1 parent aa08b30 commit 59218d1

File tree

1 file changed

+79
-0
lines changed

1 file changed

+79
-0
lines changed

timely/tests/shape_scaling.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
use timely::dataflow::channels::pact::Pipeline;
2+
use timely::dataflow::operators::Input;
3+
use timely::dataflow::InputHandle;
4+
use timely::Config;
5+
6+
#[test] fn operator_scaling_1() { operator_scaling(1); }
7+
#[test] fn operator_scaling_10() { operator_scaling(10); }
8+
#[test] fn operator_scaling_100() { operator_scaling(100); }
9+
#[test] #[cfg_attr(miri, ignore)] fn operator_scaling_1000() { operator_scaling(1000); }
10+
#[test] #[cfg_attr(miri, ignore)] fn operator_scaling_10000() { operator_scaling(10000); }
11+
#[test] #[cfg_attr(miri, ignore)] fn operator_scaling_100000() { operator_scaling(100000); }
12+
13+
fn operator_scaling(scale: u64) {
14+
timely::execute(Config::thread(), move |worker| {
15+
let mut input = InputHandle::new();
16+
worker.dataflow::<u64, _, _>(|scope| {
17+
use timely::dataflow::operators::Partition;
18+
let parts =
19+
scope
20+
.input_from(&mut input)
21+
.partition(scale, |()| (0, ()));
22+
23+
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
24+
let mut builder = OperatorBuilder::new("OpScaling".to_owned(), scope.clone());
25+
let mut handles = Vec::with_capacity(parts.len());
26+
let mut outputs = Vec::with_capacity(parts.len());
27+
for (index, part) in parts.into_iter().enumerate() {
28+
use timely::container::CapacityContainerBuilder;
29+
let (output, stream) = builder.new_output_connection::<CapacityContainerBuilder<Vec<()>>,_>([]);
30+
use timely::progress::Antichain;
31+
let connectivity = [(index, Antichain::from_elem(Default::default()))];
32+
handles.push((builder.new_input_connection(&part, Pipeline, connectivity), output));
33+
outputs.push(stream);
34+
}
35+
36+
builder.build(move |_| {
37+
move |_frontiers| {
38+
for (input, output) in handles.iter_mut() {
39+
let mut output = output.activate();
40+
input.for_each(|time, data| {
41+
let mut output = output.session_with_builder(&time);
42+
for datum in data.drain(..) {
43+
output.give(datum);
44+
}
45+
});
46+
}
47+
}
48+
});
49+
});
50+
})
51+
.unwrap();
52+
}
53+
54+
#[test] fn subgraph_scaling_1() { subgraph_scaling(1); }
55+
#[test] fn subgraph_scaling_10() { subgraph_scaling(10); }
56+
#[test] fn subgraph_scaling_100() { subgraph_scaling(100); }
57+
#[test] #[cfg_attr(miri, ignore)] fn subgraph_scaling_1000() { subgraph_scaling(1000); }
58+
#[test] #[cfg_attr(miri, ignore)] fn subgraph_scaling_10000() { subgraph_scaling(10000); }
59+
#[test] #[cfg_attr(miri, ignore)] fn subgraph_scaling_100000() { subgraph_scaling(100000); }
60+
61+
fn subgraph_scaling(scale: u64) {
62+
timely::execute(Config::thread(), move |worker| {
63+
let mut input = InputHandle::new();
64+
worker.dataflow::<u64, _, _>(|scope| {
65+
use timely::dataflow::operators::Partition;
66+
let parts =
67+
scope
68+
.input_from(&mut input)
69+
.partition(scale, |()| (0, ()));
70+
71+
use timely::dataflow::Scope;
72+
let _outputs = scope.region(|inner| {
73+
use timely::dataflow::operators::{Enter, Leave};
74+
parts.into_iter().map(|part| part.enter(inner).leave()).collect::<Vec<_>>()
75+
});
76+
});
77+
})
78+
.unwrap();
79+
}

0 commit comments

Comments
 (0)