Skip to content

Commit d23a5bf

Browse files
authored
Simplify type arguments to reduce, make Upds fields pub (#632)
The type arguments to reduce still listed K and V, although we have the same types as KeyOwn and ValOwn on the trace. Change it to use the types defined by the trace instead. Make the Upds fields public, in line with Vals. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent e2c188b commit d23a5bf

File tree

4 files changed

+29
-35
lines changed

4 files changed

+29
-35
lines changed

differential-dataflow/src/operators/arrange/arrangement.rs

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -283,22 +283,20 @@ where
283283
T1: TraceReader + Clone + 'static,
284284
{
285285
/// A direct implementation of `ReduceCore::reduce_abelian`.
286-
pub fn reduce_abelian<L, K, V, Bu, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
286+
pub fn reduce_abelian<L, Bu, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
287287
where
288-
T1: TraceReader<KeyOwn = K>,
288+
T1: TraceReader<KeyOwn: Ord>,
289289
T2: for<'a> Trace<
290290
Key<'a>= T1::Key<'a>,
291-
KeyOwn = K,
292-
ValOwn = V,
291+
KeyOwn=T1::KeyOwn,
292+
ValOwn: Data,
293293
Time=T1::Time,
294294
Diff: Abelian,
295295
>+'static,
296-
K: Ord + 'static,
297-
V: Data,
298-
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>>,
299-
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static,
296+
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
297+
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
300298
{
301-
self.reduce_core::<_,K,V,Bu,T2>(name, move |key, input, output, change| {
299+
self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
302300
if !input.is_empty() {
303301
logic(key, input, change);
304302
}
@@ -308,22 +306,20 @@ where
308306
}
309307

310308
/// A direct implementation of `ReduceCore::reduce_core`.
311-
pub fn reduce_core<L, K, V, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
309+
pub fn reduce_core<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
312310
where
313-
T1: TraceReader<KeyOwn = K>,
311+
T1: TraceReader<KeyOwn: Ord>,
314312
T2: for<'a> Trace<
315313
Key<'a>=T1::Key<'a>,
316-
KeyOwn = K,
317-
ValOwn = V,
314+
KeyOwn=T1::KeyOwn,
315+
ValOwn: Data,
318316
Time=T1::Time,
319317
>+'static,
320-
K: Ord + 'static,
321-
V: Data,
322-
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>>,
323-
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
318+
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
319+
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
324320
{
325321
use crate::operators::reduce::reduce_trace;
326-
reduce_trace::<_,_,Bu,_,_,V,_>(self, name, logic)
322+
reduce_trace::<_,_,Bu,_,_>(self, name, logic)
327323
}
328324
}
329325

differential-dataflow/src/operators/reduce.rs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ where
8989
{
9090
fn reduce_named<L, V2: Data, R2: Ord+Abelian+'static>(&self, name: &str, logic: L) -> Collection<G, (K, V2), R2>
9191
where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
92-
self.reduce_abelian::<_,K,V2,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(name, logic)
92+
self.reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<K,V2,_,_>>(name, logic)
9393
.as_collection(|k,v| (k.clone(), v.clone()))
9494
}
9595
}
@@ -164,7 +164,7 @@ where
164164
T1: for<'a> TraceReader<Key<'a>=&'a K, KeyOwn = K, Val<'a>=&'a (), Diff=R1>+Clone+'static,
165165
{
166166
fn threshold_named<R2: Ord+Abelian+'static, F: FnMut(&K,&R1)->R2+'static>(&self, name: &str, mut thresh: F) -> Collection<G, K, R2> {
167-
self.reduce_abelian::<_,K,(),KeyBuilder<K,G::Timestamp,R2>,KeySpine<K,G::Timestamp,R2>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1))))
167+
self.reduce_abelian::<_,KeyBuilder<K,G::Timestamp,R2>,KeySpine<K,G::Timestamp,R2>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1))))
168168
.as_collection(|k,_| k.clone())
169169
}
170170
}
@@ -211,7 +211,7 @@ where
211211
T1: for<'a> TraceReader<Key<'a>=&'a K, KeyOwn = K, Val<'a>=&'a (), Diff=R>+Clone+'static,
212212
{
213213
fn count_core<R2: Ord + Abelian + From<i8> + 'static>(&self) -> Collection<G, (K, R), R2> {
214-
self.reduce_abelian::<_,K,R,ValBuilder<K,R,G::Timestamp,R2>,ValSpine<K,R,G::Timestamp,R2>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))))
214+
self.reduce_abelian::<_,ValBuilder<K,R,G::Timestamp,R2>,ValSpine<K,R,G::Timestamp,R2>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))))
215215
.as_collection(|k,c| (k.clone(), c.clone()))
216216
}
217217
}
@@ -303,22 +303,20 @@ where
303303
L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
304304
{
305305
self.arrange_by_key_named(&format!("Arrange: {}", name))
306-
.reduce_core::<_,_,_,Bu,_>(name, logic)
306+
.reduce_core::<_,Bu,_>(name, logic)
307307
}
308308
}
309309

310310
/// A key-wise reduction of values in an input trace.
311311
///
312312
/// This method exists to provide reduce functionality without opinions about qualifying trace types.
313-
pub fn reduce_trace<G, T1, Bu, T2, K, V, L>(trace: &Arranged<G, T1>, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
313+
pub fn reduce_trace<G, T1, Bu, T2, L>(trace: &Arranged<G, T1>, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
314314
where
315315
G: Scope<Timestamp=T1::Time>,
316-
T1: for<'a> TraceReader<KeyOwn = K> + Clone + 'static,
317-
T2: for<'a> Trace<Key<'a>=T1::Key<'a>, ValOwn = V, Time=T1::Time> + 'static,
318-
K: Ord + 'static,
319-
V: Data,
320-
Bu: Builder<Time=T2::Time, Output = T2::Batch, Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>>,
321-
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
316+
T1: TraceReader<KeyOwn: Ord> + Clone + 'static,
317+
T2: for<'a> Trace<Key<'a>=T1::Key<'a>, KeyOwn=T1::KeyOwn, ValOwn: Data, Time=T1::Time> + 'static,
318+
Bu: Builder<Time=T2::Time, Output = T2::Batch, Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
319+
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn,T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
322320
{
323321
let mut result_trace = None;
324322

@@ -352,7 +350,7 @@ where
352350

353351
// Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times,
354352
// as well as capabilities for these times (or their lower envelope, at least).
355-
let mut interesting = Vec::<(K, G::Timestamp)>::new();
353+
let mut interesting = Vec::<(T1::KeyOwn, G::Timestamp)>::new();
356354
let mut capabilities = Vec::<Capability<G::Timestamp>>::new();
357355

358356
// buffers and logic for computing per-key interesting times "efficiently".
@@ -451,7 +449,7 @@ where
451449
//
452450
// TODO: It would be better if all updates went into one batch, but timely dataflow prevents
453451
// this as long as it requires that there is only one capability for each message.
454-
let mut buffers = Vec::<(G::Timestamp, Vec<(V, G::Timestamp, T2::Diff)>)>::new();
452+
let mut buffers = Vec::<(G::Timestamp, Vec<(T2::ValOwn, G::Timestamp, T2::Diff)>)>::new();
455453
let mut builders = Vec::new();
456454
for cap in capabilities.iter() {
457455
buffers.push((cap.time().clone(), Vec::new()));

differential-dataflow/src/trace/implementations/ord_neu.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,11 @@ pub mod layers {
135135
#[derive(Debug, Serialize, Deserialize)]
136136
pub struct Upds<O, T, D> {
137137
/// Offsets used to provide indexes from values to updates.
138-
offs: O,
138+
pub offs: O,
139139
/// Concatenated ordered lists of update times, bracketed by offsets in `offs`.
140-
times: T,
140+
pub times: T,
141141
/// Concatenated ordered lists of update diffs, bracketed by offsets in `offs`.
142-
diffs: D,
142+
pub diffs: D,
143143
}
144144

145145
impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer> Default for Upds<O, T, D> {

interactive/src/plan/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ impl<V: ExchangeData+Hash+Datum> Render for Plan<V> {
170170
input_arrangement
171171
};
172172

173-
let output = input.reduce_abelian::<_,_,_,KeyBuilder<_,_,_>,KeySpine<_,_,_>>("Distinct", move |_,_,t| t.push(((), 1)));
173+
let output = input.reduce_abelian::<_,KeyBuilder<_,_,_>,KeySpine<_,_,_>>("Distinct", move |_,_,t| t.push(((), 1)));
174174

175175
arrangements.set_unkeyed(&self, &output.trace);
176176
output.as_collection(|k,&()| k.clone())

0 commit comments

Comments
 (0)