Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 14 additions & 18 deletions differential-dataflow/src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,22 +283,20 @@ where
T1: TraceReader + Clone + 'static,
{
/// A direct implementation of `ReduceCore::reduce_abelian`.
pub fn reduce_abelian<L, K, V, Bu, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
pub fn reduce_abelian<L, Bu, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
where
T1: TraceReader<KeyOwn = K>,
T1: TraceReader<KeyOwn: Ord>,
T2: for<'a> Trace<
Key<'a>= T1::Key<'a>,
KeyOwn = K,
ValOwn = V,
KeyOwn=T1::KeyOwn,
ValOwn: Data,
Time=T1::Time,
Diff: Abelian,
>+'static,
K: Ord + 'static,
V: Data,
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static,
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
{
self.reduce_core::<_,K,V,Bu,T2>(name, move |key, input, output, change| {
self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
if !input.is_empty() {
logic(key, input, change);
}
Expand All @@ -308,22 +306,20 @@ where
}

/// A direct implementation of `ReduceCore::reduce_core`.
pub fn reduce_core<L, K, V, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
pub fn reduce_core<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
where
T1: TraceReader<KeyOwn = K>,
T1: TraceReader<KeyOwn: Ord>,
T2: for<'a> Trace<
Key<'a>=T1::Key<'a>,
KeyOwn = K,
ValOwn = V,
KeyOwn=T1::KeyOwn,
ValOwn: Data,
Time=T1::Time,
>+'static,
K: Ord + 'static,
V: Data,
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
{
use crate::operators::reduce::reduce_trace;
reduce_trace::<_,_,Bu,_,_,V,_>(self, name, logic)
reduce_trace::<_,_,Bu,_,_>(self, name, logic)
}
}

Expand Down
24 changes: 11 additions & 13 deletions differential-dataflow/src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ where
{
fn reduce_named<L, V2: Data, R2: Ord+Abelian+'static>(&self, name: &str, logic: L) -> Collection<G, (K, V2), R2>
where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
self.reduce_abelian::<_,K,V2,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(name, logic)
self.reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<K,V2,_,_>>(name, logic)
.as_collection(|k,v| (k.clone(), v.clone()))
}
}
Expand Down Expand Up @@ -164,7 +164,7 @@ where
T1: for<'a> TraceReader<Key<'a>=&'a K, KeyOwn = K, Val<'a>=&'a (), Diff=R1>+Clone+'static,
{
fn threshold_named<R2: Ord+Abelian+'static, F: FnMut(&K,&R1)->R2+'static>(&self, name: &str, mut thresh: F) -> Collection<G, K, R2> {
self.reduce_abelian::<_,K,(),KeyBuilder<K,G::Timestamp,R2>,KeySpine<K,G::Timestamp,R2>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1))))
self.reduce_abelian::<_,KeyBuilder<K,G::Timestamp,R2>,KeySpine<K,G::Timestamp,R2>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1))))
.as_collection(|k,_| k.clone())
}
}
Expand Down Expand Up @@ -211,7 +211,7 @@ where
T1: for<'a> TraceReader<Key<'a>=&'a K, KeyOwn = K, Val<'a>=&'a (), Diff=R>+Clone+'static,
{
fn count_core<R2: Ord + Abelian + From<i8> + 'static>(&self) -> Collection<G, (K, R), R2> {
self.reduce_abelian::<_,K,R,ValBuilder<K,R,G::Timestamp,R2>,ValSpine<K,R,G::Timestamp,R2>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))))
self.reduce_abelian::<_,ValBuilder<K,R,G::Timestamp,R2>,ValSpine<K,R,G::Timestamp,R2>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))))
.as_collection(|k,c| (k.clone(), c.clone()))
}
}
Expand Down Expand Up @@ -303,22 +303,20 @@ where
L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
self.arrange_by_key_named(&format!("Arrange: {}", name))
.reduce_core::<_,_,_,Bu,_>(name, logic)
.reduce_core::<_,Bu,_>(name, logic)
}
}

/// A key-wise reduction of values in an input trace.
///
/// This method exists to provide reduce functionality without opinions about qualifying trace types.
pub fn reduce_trace<G, T1, Bu, T2, K, V, L>(trace: &Arranged<G, T1>, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
pub fn reduce_trace<G, T1, Bu, T2, L>(trace: &Arranged<G, T1>, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
where
G: Scope<Timestamp=T1::Time>,
T1: for<'a> TraceReader<KeyOwn = K> + Clone + 'static,
T2: for<'a> Trace<Key<'a>=T1::Key<'a>, ValOwn = V, Time=T1::Time> + 'static,
K: Ord + 'static,
V: Data,
Bu: Builder<Time=T2::Time, Output = T2::Batch, Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
T1: TraceReader<KeyOwn: Ord> + Clone + 'static,
T2: for<'a> Trace<Key<'a>=T1::Key<'a>, KeyOwn=T1::KeyOwn, ValOwn: Data, Time=T1::Time> + 'static,
Bu: Builder<Time=T2::Time, Output = T2::Batch, Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn,T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
{
let mut result_trace = None;

Expand Down Expand Up @@ -352,7 +350,7 @@ where

// Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times,
// as well as capabilities for these times (or their lower envelope, at least).
let mut interesting = Vec::<(K, G::Timestamp)>::new();
let mut interesting = Vec::<(T1::KeyOwn, G::Timestamp)>::new();
let mut capabilities = Vec::<Capability<G::Timestamp>>::new();

// buffers and logic for computing per-key interesting times "efficiently".
Expand Down Expand Up @@ -451,7 +449,7 @@ where
//
// TODO: It would be better if all updates went into one batch, but timely dataflow prevents
// this as long as it requires that there is only one capability for each message.
let mut buffers = Vec::<(G::Timestamp, Vec<(V, G::Timestamp, T2::Diff)>)>::new();
let mut buffers = Vec::<(G::Timestamp, Vec<(T2::ValOwn, G::Timestamp, T2::Diff)>)>::new();
let mut builders = Vec::new();
for cap in capabilities.iter() {
buffers.push((cap.time().clone(), Vec::new()));
Expand Down
6 changes: 3 additions & 3 deletions differential-dataflow/src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,11 @@ pub mod layers {
#[derive(Debug, Serialize, Deserialize)]
pub struct Upds<O, T, D> {
/// Offsets used to provide indexes from values to updates.
offs: O,
pub offs: O,
/// Concatenated ordered lists of update times, bracketed by offsets in `offs`.
times: T,
pub times: T,
/// Concatenated ordered lists of update diffs, bracketed by offsets in `offs`.
diffs: D,
pub diffs: D,
}

impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer> Default for Upds<O, T, D> {
Expand Down
2 changes: 1 addition & 1 deletion interactive/src/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl<V: ExchangeData+Hash+Datum> Render for Plan<V> {
input_arrangement
};

let output = input.reduce_abelian::<_,_,_,KeyBuilder<_,_,_>,KeySpine<_,_,_>>("Distinct", move |_,_,t| t.push(((), 1)));
let output = input.reduce_abelian::<_,KeyBuilder<_,_,_>,KeySpine<_,_,_>>("Distinct", move |_,_,t| t.push(((), 1)));

arrangements.set_unkeyed(&self, &output.trace);
output.as_collection(|k,&()| k.clone())
Expand Down
Loading