diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 868f69a93..05cab5328 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -283,22 +283,20 @@ where T1: TraceReader + Clone + 'static, { /// A direct implementation of `ReduceCore::reduce_abelian`. - pub fn reduce_abelian(&self, name: &str, mut logic: L) -> Arranged> + pub fn reduce_abelian(&self, name: &str, mut logic: L) -> Arranged> where - T1: TraceReader, + T1: TraceReader, T2: for<'a> Trace< Key<'a>= T1::Key<'a>, - KeyOwn = K, - ValOwn = V, + KeyOwn=T1::KeyOwn, + ValOwn: Data, Time=T1::Time, Diff: Abelian, >+'static, - K: Ord + 'static, - V: Data, - Bu: Builder>, - L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static, + Bu: Builder>, + L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)+'static, { - self.reduce_core::<_,K,V,Bu,T2>(name, move |key, input, output, change| { + self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| { if !input.is_empty() { logic(key, input, change); } @@ -308,22 +306,20 @@ where } /// A direct implementation of `ReduceCore::reduce_core`. - pub fn reduce_core(&self, name: &str, logic: L) -> Arranged> + pub fn reduce_core(&self, name: &str, logic: L) -> Arranged> where - T1: TraceReader, + T1: TraceReader, T2: for<'a> Trace< Key<'a>=T1::Key<'a>, - KeyOwn = K, - ValOwn = V, + KeyOwn=T1::KeyOwn, + ValOwn: Data, Time=T1::Time, >+'static, - K: Ord + 'static, - V: Data, - Bu: Builder>, - L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, + Bu: Builder>, + L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static, { use crate::operators::reduce::reduce_trace; - reduce_trace::<_,_,Bu,_,_,V,_>(self, name, logic) + reduce_trace::<_,_,Bu,_,_>(self, name, logic) } } diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 317b908f6..8d1880d87 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -89,7 +89,7 @@ where { fn reduce_named(&self, name: &str, logic: L) -> Collection where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { - self.reduce_abelian::<_,K,V2,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(name, logic) + self.reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine>(name, logic) .as_collection(|k,v| (k.clone(), v.clone())) } } @@ -164,7 +164,7 @@ where T1: for<'a> TraceReader=&'a K, KeyOwn = K, Val<'a>=&'a (), Diff=R1>+Clone+'static, { fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> Collection { - self.reduce_abelian::<_,K,(),KeyBuilder,KeySpine>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) + self.reduce_abelian::<_,KeyBuilder,KeySpine>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) .as_collection(|k,_| k.clone()) } } @@ -211,7 +211,7 @@ where T1: for<'a> TraceReader=&'a K, KeyOwn = K, Val<'a>=&'a (), Diff=R>+Clone+'static, { fn count_core + 'static>(&self) -> Collection { - self.reduce_abelian::<_,K,R,ValBuilder,ValSpine>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) + self.reduce_abelian::<_,ValBuilder,ValSpine>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) .as_collection(|k,c| (k.clone(), c.clone())) } } @@ -303,22 +303,20 @@ where L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { self.arrange_by_key_named(&format!("Arrange: {}", name)) - .reduce_core::<_,_,_,Bu,_>(name, logic) + .reduce_core::<_,Bu,_>(name, logic) } } /// A key-wise reduction of values in an input trace. /// /// This method exists to provide reduce functionality without opinions about qualifying trace types. -pub fn reduce_trace(trace: &Arranged, name: &str, mut logic: L) -> Arranged> +pub fn reduce_trace(trace: &Arranged, name: &str, mut logic: L) -> Arranged> where G: Scope, - T1: for<'a> TraceReader + Clone + 'static, - T2: for<'a> Trace=T1::Key<'a>, ValOwn = V, Time=T1::Time> + 'static, - K: Ord + 'static, - V: Data, - Bu: Builder>, - L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, + T1: TraceReader + Clone + 'static, + T2: for<'a> Trace=T1::Key<'a>, KeyOwn=T1::KeyOwn, ValOwn: Data, Time=T1::Time> + 'static, + Bu: Builder>, + L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn,T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static, { let mut result_trace = None; @@ -352,7 +350,7 @@ where // Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times, // as well as capabilities for these times (or their lower envelope, at least). - let mut interesting = Vec::<(K, G::Timestamp)>::new(); + let mut interesting = Vec::<(T1::KeyOwn, G::Timestamp)>::new(); let mut capabilities = Vec::>::new(); // buffers and logic for computing per-key interesting times "efficiently". @@ -451,7 +449,7 @@ where // // TODO: It would be better if all updates went into one batch, but timely dataflow prevents // this as long as it requires that there is only one capability for each message. - let mut buffers = Vec::<(G::Timestamp, Vec<(V, G::Timestamp, T2::Diff)>)>::new(); + let mut buffers = Vec::<(G::Timestamp, Vec<(T2::ValOwn, G::Timestamp, T2::Diff)>)>::new(); let mut builders = Vec::new(); for cap in capabilities.iter() { buffers.push((cap.time().clone(), Vec::new())); diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index 0c01c3378..218fc612a 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -135,11 +135,11 @@ pub mod layers { #[derive(Debug, Serialize, Deserialize)] pub struct Upds { /// Offsets used to provide indexes from values to updates. - offs: O, + pub offs: O, /// Concatenated ordered lists of update times, bracketed by offsets in `offs`. - times: T, + pub times: T, /// Concatenated ordered lists of update diffs, bracketed by offsets in `offs`. - diffs: D, + pub diffs: D, } impl BatchContainer = usize>, T: BatchContainer, D: BatchContainer> Default for Upds { diff --git a/interactive/src/plan/mod.rs b/interactive/src/plan/mod.rs index faad5e562..a8cbadf2e 100644 --- a/interactive/src/plan/mod.rs +++ b/interactive/src/plan/mod.rs @@ -170,7 +170,7 @@ impl Render for Plan { input_arrangement }; - let output = input.reduce_abelian::<_,_,_,KeyBuilder<_,_,_>,KeySpine<_,_,_>>("Distinct", move |_,_,t| t.push(((), 1))); + let output = input.reduce_abelian::<_,KeyBuilder<_,_,_>,KeySpine<_,_,_>>("Distinct", move |_,_,t| t.push(((), 1))); arrangements.set_unkeyed(&self, &output.trace); output.as_collection(|k,&()| k.clone())