Skip to content

Commit 617ac52

Browse files
authored
Remove batcher from Trace (#544)
There is no inherent reason a trace must know about a batcher, other than convenience. This change factors the batcher out of the trace and moves it to an explicit type parameter where necessary. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 4dfb13a commit 617ac52

File tree

14 files changed

+127
-113
lines changed

14 files changed

+127
-113
lines changed

examples/spines.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,46 +28,46 @@ fn main() {
2828

2929
match mode.as_str() {
3030
"new" => {
31-
use differential_dataflow::trace::implementations::ord_neu::ColKeySpine;
32-
let data = data.arrange::<ColKeySpine<_,_,_>>();
33-
let keys = keys.arrange::<ColKeySpine<_,_,_>>();
31+
use differential_dataflow::trace::implementations::ord_neu::{ColKeyBatcher, ColKeySpine};
32+
let data = data.arrange::<ColKeyBatcher<_,_,_>, ColKeySpine<_,_,_>>();
33+
let keys = keys.arrange::<ColKeyBatcher<_,_,_>, ColKeySpine<_,_,_>>();
3434
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
3535
.probe_with(&mut probe);
3636
},
3737
"old" => {
38-
use differential_dataflow::trace::implementations::ord_neu::OrdKeySpine;
39-
let data = data.arrange::<OrdKeySpine<_,_,_>>();
40-
let keys = keys.arrange::<OrdKeySpine<_,_,_>>();
38+
use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatcher, OrdKeySpine};
39+
let data = data.arrange::<OrdKeyBatcher<_,_,_>, OrdKeySpine<_,_,_>>();
40+
let keys = keys.arrange::<OrdKeyBatcher<_,_,_>, OrdKeySpine<_,_,_>>();
4141
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
4242
.probe_with(&mut probe);
4343
},
4444
"rhh" => {
45-
use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine};
46-
let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
47-
let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecSpine<_,(),_,_>>();
45+
use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecBatcher, VecSpine};
46+
let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_>, VecSpine<_,(),_,_>>();
47+
let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_>, VecSpine<_,(),_,_>>();
4848
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
4949
.probe_with(&mut probe);
5050
},
5151
"slc" => {
5252

53-
use differential_dataflow::trace::implementations::ord_neu::PreferredSpine;
53+
use differential_dataflow::trace::implementations::ord_neu::{PreferredBatcher, PreferredSpine};
5454

5555
let data =
5656
data.map(|x| (x.clone().into_bytes(), x.into_bytes()))
57-
.arrange::<PreferredSpine<[u8],[u8],_,_>>()
57+
.arrange::<PreferredBatcher<[u8],[u8],_,_>, PreferredSpine<[u8],[u8],_,_>>()
5858
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
5959
let keys =
6060
keys.map(|x| (x.clone().into_bytes(), 7))
61-
.arrange::<PreferredSpine<[u8],u8,_,_>>()
61+
.arrange::<PreferredBatcher<[u8],u8,_,_>, PreferredSpine<[u8],u8,_,_>>()
6262
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
6363

6464
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
6565
.probe_with(&mut probe);
6666
},
6767
"flat" => {
68-
use differential_dataflow::trace::implementations::ord_neu::FlatKeySpineDefault;
69-
let data = data.arrange::<FlatKeySpineDefault<String,usize,isize, _>>();
70-
let keys = keys.arrange::<FlatKeySpineDefault<String,usize,isize,_>>();
68+
use differential_dataflow::trace::implementations::ord_neu::{FlatKeyBatcherDefault, FlatKeySpineDefault};
69+
let data = data.arrange::<FlatKeyBatcherDefault<String,usize,isize,_>, FlatKeySpineDefault<String,usize,isize>>();
70+
let keys = keys.arrange::<FlatKeyBatcherDefault<String,usize,isize,_>, FlatKeySpineDefault<String,usize,isize>>();
7171
keys.join_core(&data, |_k, (), ()| Option::<()>::None)
7272
.probe_with(&mut probe);
7373
}

experiments/src/bin/deals.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use differential_dataflow::input::Input;
66
use differential_dataflow::Collection;
77
use differential_dataflow::operators::*;
88

9-
use differential_dataflow::trace::implementations::{ValSpine, KeySpine};
9+
use differential_dataflow::trace::implementations::{ValSpine, KeySpine, KeyBatcher, ValBatcher};
1010
use differential_dataflow::operators::arrange::TraceAgent;
1111
use differential_dataflow::operators::arrange::Arranged;
1212
use differential_dataflow::operators::arrange::Arrange;
@@ -41,7 +41,7 @@ fn main() {
4141
let (input, graph) = scope.new_collection();
4242

4343
// each edge should exist in both directions.
44-
let graph = graph.arrange::<ValSpine<_,_,_,_>>();
44+
let graph = graph.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
4545

4646
match program.as_str() {
4747
"tc" => tc(&graph).filter(move |_| inspect).map(|_| ()).consolidate().inspect(|x| println!("tc count: {:?}", x)).probe(),
@@ -94,10 +94,10 @@ fn tc<G: Scope<Timestamp=()>>(edges: &EdgeArranged<G, Node, Node, Present>) -> C
9494
let result =
9595
inner
9696
.map(|(x,y)| (y,x))
97-
.arrange::<ValSpine<_,_,_,_>>()
97+
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
9898
.join_core(&edges, |_y,&x,&z| Some((x, z)))
9999
.concat(&edges.as_collection(|&k,&v| (k,v)))
100-
.arrange::<KeySpine<_,_,_>>()
100+
.arrange::<KeyBatcher<_,_,_>, KeySpine<_,_,_>>()
101101
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
102102
;
103103

@@ -121,12 +121,12 @@ fn sg<G: Scope<Timestamp=()>>(edges: &EdgeArranged<G, Node, Node, Present>) -> C
121121

122122
let result =
123123
inner
124-
.arrange::<ValSpine<_,_,_,_>>()
124+
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
125125
.join_core(&edges, |_,&x,&z| Some((x, z)))
126-
.arrange::<ValSpine<_,_,_,_>>()
126+
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
127127
.join_core(&edges, |_,&x,&z| Some((x, z)))
128128
.concat(&peers)
129-
.arrange::<KeySpine<_,_,_>>()
129+
.arrange::<KeyBatcher<_,_,_>, KeySpine<_,_,_>>()
130130
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
131131
;
132132

experiments/src/bin/graspan1.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use timely::order::Product;
66

77
use differential_dataflow::difference::Present;
88
use differential_dataflow::input::Input;
9-
use differential_dataflow::trace::implementations::ValSpine;
9+
use differential_dataflow::trace::implementations::{ValBatcher, ValSpine};
1010
use differential_dataflow::operators::*;
1111
use differential_dataflow::operators::arrange::Arrange;
1212
use differential_dataflow::operators::iterate::SemigroupVariable;
@@ -31,7 +31,7 @@ fn main() {
3131
let (n_handle, nodes) = scope.new_collection();
3232
let (e_handle, edges) = scope.new_collection();
3333

34-
let edges = edges.arrange::<ValSpine<_,_,_,_>>();
34+
let edges = edges.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
3535

3636
// a N c <- a N b && b E c
3737
// N(a,c) <- N(a,b), E(b, c)
@@ -46,7 +46,7 @@ fn main() {
4646
let next =
4747
labels.join_core(&edges, |_b, a, c| Some((*c, *a)))
4848
.concat(&nodes)
49-
.arrange::<ValSpine<_,_,_,_>>()
49+
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
5050
// .distinct_total_core::<Diff>();
5151
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None });
5252

experiments/src/bin/graspan2.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use differential_dataflow::Collection;
1010
use differential_dataflow::input::Input;
1111
use differential_dataflow::operators::*;
1212
use differential_dataflow::operators::arrange::Arrange;
13-
use differential_dataflow::trace::implementations::{ValSpine, KeySpine};
13+
use differential_dataflow::trace::implementations::{ValSpine, KeySpine, ValBatcher, KeyBatcher};
1414
use differential_dataflow::difference::Present;
1515

1616
type Node = u32;
@@ -47,7 +47,7 @@ fn unoptimized() {
4747
.flat_map(|(a,b)| vec![a,b])
4848
.concat(&dereference.flat_map(|(a,b)| vec![a,b]));
4949

50-
let dereference = dereference.arrange::<ValSpine<_,_,_,_>>();
50+
let dereference = dereference.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
5151

5252
let (value_flow, memory_alias, value_alias) =
5353
scope
@@ -60,14 +60,14 @@ fn unoptimized() {
6060
let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
6161
let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
6262

63-
let value_flow_arranged = value_flow.arrange::<ValSpine<_,_,_,_>>();
64-
let memory_alias_arranged = memory_alias.arrange::<ValSpine<_,_,_,_>>();
63+
let value_flow_arranged = value_flow.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
64+
let memory_alias_arranged = memory_alias.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
6565

6666
// VA(a,b) <- VF(x,a),VF(x,b)
6767
// VA(a,b) <- VF(x,a),MA(x,y),VF(y,b)
6868
let value_alias_next = value_flow_arranged.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b)));
6969
let value_alias_next = value_flow_arranged.join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a)))
70-
.arrange::<ValSpine<_,_,_,_>>()
70+
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
7171
.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b)))
7272
.concat(&value_alias_next);
7373

@@ -77,16 +77,16 @@ fn unoptimized() {
7777
let value_flow_next =
7878
assignment
7979
.map(|(a,b)| (b,a))
80-
.arrange::<ValSpine<_,_,_,_>>()
80+
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
8181
.join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a)))
8282
.concat(&assignment.map(|(a,b)| (b,a)))
83-
.arrange::<ValSpine<_,_,_,_>>()
83+
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
8484
.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b)))
8585
.concat(&nodes.map(|n| (n,n)));
8686

8787
let value_flow_next =
8888
value_flow_next
89-
.arrange::<KeySpine<_,_,_>>()
89+
.arrange::<ValBatcher<_,_,_,_>, KeySpine<_,_,_>>()
9090
// .distinct_total_core::<Diff>()
9191
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
9292
;
@@ -95,12 +95,12 @@ fn unoptimized() {
9595
let memory_alias_next: Collection<_,_,Present> =
9696
value_alias_next
9797
.join_core(&dereference, |_x,&y,&a| Some((y,a)))
98-
.arrange::<ValSpine<_,_,_,_>>()
98+
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
9999
.join_core(&dereference, |_y,&a,&b| Some((a,b)));
100100

101101
let memory_alias_next: Collection<_,_,Present> =
102102
memory_alias_next
103-
.arrange::<KeySpine<_,_,_>>()
103+
.arrange::<KeyBatcher<_,_,_>, KeySpine<_,_,_>>()
104104
// .distinct_total_core::<Diff>()
105105
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
106106
;
@@ -172,7 +172,7 @@ fn optimized() {
172172
.flat_map(|(a,b)| vec![a,b])
173173
.concat(&dereference.flat_map(|(a,b)| vec![a,b]));
174174

175-
let dereference = dereference.arrange::<ValSpine<_,_,_,_>>();
175+
let dereference = dereference.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
176176

177177
let (value_flow, memory_alias) =
178178
scope
@@ -185,22 +185,22 @@ fn optimized() {
185185
let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
186186
let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
187187

188-
let value_flow_arranged = value_flow.arrange::<ValSpine<_,_,_,_>>();
189-
let memory_alias_arranged = memory_alias.arrange::<ValSpine<_,_,_,_>>();
188+
let value_flow_arranged = value_flow.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
189+
let memory_alias_arranged = memory_alias.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
190190

191191
// VF(a,a) <-
192192
// VF(a,b) <- A(a,x),VF(x,b)
193193
// VF(a,b) <- A(a,x),MA(x,y),VF(y,b)
194194
let value_flow_next =
195195
assignment
196196
.map(|(a,b)| (b,a))
197-
.arrange::<ValSpine<_,_,_,_>>()
197+
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
198198
.join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a)))
199199
.concat(&assignment.map(|(a,b)| (b,a)))
200-
.arrange::<ValSpine<_,_,_,_>>()
200+
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
201201
.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b)))
202202
.concat(&nodes.map(|n| (n,n)))
203-
.arrange::<KeySpine<_,_,_>>()
203+
.arrange::<KeyBatcher<_,_,_>, KeySpine<_,_,_>>()
204204
// .distinct_total_core::<Diff>()
205205
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
206206
;
@@ -209,9 +209,9 @@ fn optimized() {
209209
let value_flow_deref =
210210
value_flow
211211
.map(|(a,b)| (b,a))
212-
.arrange::<ValSpine<_,_,_,_>>()
212+
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
213213
.join_core(&dereference, |_x,&a,&b| Some((a,b)))
214-
.arrange::<ValSpine<_,_,_,_>>();
214+
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>();
215215

216216
// MA(a,b) <- VFD(x,a),VFD(y,b)
217217
// MA(a,b) <- VFD(x,a),MA(x,y),VFD(y,b)
@@ -222,10 +222,10 @@ fn optimized() {
222222
let memory_alias_next =
223223
memory_alias_arranged
224224
.join_core(&value_flow_deref, |_x,&y,&a| Some((y,a)))
225-
.arrange::<ValSpine<_,_,_,_>>()
225+
.arrange::<ValBatcher<_,_,_,_>, ValSpine<_,_,_,_>>()
226226
.join_core(&value_flow_deref, |_y,&a,&b| Some((a,b)))
227227
.concat(&memory_alias_next)
228-
.arrange::<KeySpine<_,_,_>>()
228+
.arrange::<KeyBatcher<_,_,_>, KeySpine<_,_,_>>()
229229
// .distinct_total_core::<Diff>()
230230
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
231231
;

0 commit comments

Comments
 (0)