From 3775b230ad64dd1cd21ee73dab164e5cbc175de0 Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Sun, 25 Dec 2022 11:52:38 +0100 Subject: [PATCH 1/2] aggregation: allow aggregate to emit multiple values Signed-off-by: Petros Angelatos --- .../operators/aggregation/aggregate.rs | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/timely/src/dataflow/operators/aggregation/aggregate.rs b/timely/src/dataflow/operators/aggregation/aggregate.rs index 55f7d50a4..0ac267ede 100644 --- a/timely/src/dataflow/operators/aggregation/aggregate.rs +++ b/timely/src/dataflow/operators/aggregation/aggregate.rs @@ -16,7 +16,7 @@ pub trait Aggregate { /// /// The `aggregate` method is implemented for streams of `(K, V)` data, /// and takes functions `fold`, `emit`, and `hash`; used to combine new `V` - /// data with existing `D` state, to produce `R` output from `D` state, and + /// data with existing `D` state, to produce `I` output from `D` state, and /// to route `K` keys, respectively. /// /// Aggregation happens within each time, and results are produced once the @@ -33,7 +33,7 @@ pub trait Aggregate { /// .map(|x| (x % 2, x)) /// .aggregate( /// |_key, val, agg| { *agg += val; }, - /// |key, agg: i32| (key, agg), + /// |key, agg: i32| [(key, agg)], /// |key| *key as u64 /// ) /// .inspect(|x| assert!(*x == (0, 20) || *x == (1, 25))); @@ -54,26 +54,31 @@ pub trait Aggregate { /// .map(|x| (x % 2, x)) /// .aggregate::<_,Vec,_,_,_>( /// |_key, val, agg| { agg.push(val); }, - /// |key, agg| (key, agg.len()), + /// |key, agg| [(key, agg.len())], /// |key| *key as u64 /// ) /// .inspect(|x| assert!(*x == (0, 5) || *x == (1, 5))); /// }); /// ``` - fn aggregateR+'static, H: Fn(&K)->u64+'static>( + fn aggregateI+'static, H: Fn(&K)->u64+'static>( &self, fold: F, emit: E, - hash: H) -> Stream where S::Timestamp: Eq; + hash: H) -> Stream + where S::Timestamp: Eq, + I::Item: Data; } impl Aggregate for Stream { - fn aggregateR+'static, H: Fn(&K)->u64+'static>( + fn aggregateI+'static, H: Fn(&K)->u64+'static>( &self, fold: F, emit: E, - hash: H) -> Stream where S::Timestamp: Eq { + hash: H) -> Stream + where S::Timestamp: Eq, + I::Item: Data + { let mut aggregates = HashMap::new(); let mut vector = Vec::new(); @@ -95,7 +100,7 @@ impl Aggregate for if let Some(aggs) = aggregates.remove(time.time()) { let mut session = output.session(&time); for (key, agg) in aggs { - session.give(emit(key, agg)); + session.give_iterator(emit(key, agg).into_iter()); } } }); From 610a2c26de507cb55e2bcc797943342247e949d2 Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Tue, 27 Dec 2022 13:41:15 +0100 Subject: [PATCH 2/2] aggregation: use closure as the deafult constructor This allows users to produce default values that carry over some state from the dataflow construction time. Signed-off-by: Petros Angelatos --- .../dataflow/operators/aggregation/aggregate.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/timely/src/dataflow/operators/aggregation/aggregate.rs b/timely/src/dataflow/operators/aggregation/aggregate.rs index 0ac267ede..6443a250e 100644 --- a/timely/src/dataflow/operators/aggregation/aggregate.rs +++ b/timely/src/dataflow/operators/aggregation/aggregate.rs @@ -32,8 +32,9 @@ pub trait Aggregate { /// (0..10).to_stream(scope) /// .map(|x| (x % 2, x)) /// .aggregate( + /// i32::default, /// |_key, val, agg| { *agg += val; }, - /// |key, agg: i32| [(key, agg)], + /// |key, agg| [(key, agg)], /// |key| *key as u64 /// ) /// .inspect(|x| assert!(*x == (0, 20) || *x == (1, 25))); @@ -52,7 +53,8 @@ pub trait Aggregate { /// /// (0..10).to_stream(scope) /// .map(|x| (x % 2, x)) - /// .aggregate::<_,Vec,_,_,_>( + /// .aggregate( + /// Vec::default, /// |_key, val, agg| { agg.push(val); }, /// |key, agg| [(key, agg.len())], /// |key| *key as u64 @@ -60,8 +62,9 @@ pub trait Aggregate { /// .inspect(|x| assert!(*x == (0, 5) || *x == (1, 5))); /// }); /// ``` - fn aggregateI+'static, H: Fn(&K)->u64+'static>( + fn aggregate D+ 'static, F: Fn(&K, V, &mut D)+'static, E: Fn(K, D)->I+'static, H: Fn(&K)->u64+'static>( &self, + make_default: M, fold: F, emit: E, hash: H) -> Stream @@ -71,8 +74,9 @@ pub trait Aggregate { impl Aggregate for Stream { - fn aggregateI+'static, H: Fn(&K)->u64+'static>( + fn aggregate D + 'static, F: Fn(&K, V, &mut D)+'static, E: Fn(K, D)->I+'static, H: Fn(&K)->u64+'static>( &self, + make_default: M, fold: F, emit: E, hash: H) -> Stream @@ -89,7 +93,7 @@ impl Aggregate for data.swap(&mut vector); let agg_time = aggregates.entry(time.time().clone()).or_insert_with(HashMap::new); for (key, val) in vector.drain(..) { - let agg = agg_time.entry(key.clone()).or_insert_with(Default::default); + let agg = agg_time.entry(key.clone()).or_insert_with(&make_default); fold(&key, val, agg); } notificator.notify_at(time.retain());