From 6d8f9dd0988a320c98e7caa5b2cc3d2d8d04029d Mon Sep 17 00:00:00 2001 From: Sam Hughes Date: Mon, 4 Nov 2024 13:57:30 -0800 Subject: [PATCH 1/2] Use GroupsAccumulator exclusively in grouped hash aggregation Makes other AggregateExprs in use GroupsAccumulatorFlatAdapter, and also uses a GroupsAccumulator implementation that has Box inside as a fallback accumulator if some AggregateExpr implementation does not support that. This fully removes a batch keys and hash table iteration and brings that performance benefit from Sum and Avg to other aggregation types. --- .../src/physical_plan/distinct_expressions.rs | 23 +++ .../src/physical_plan/expressions/average.rs | 2 - .../src/physical_plan/expressions/count.rs | 16 ++ .../src/physical_plan/expressions/min_max.rs | 34 ++++ .../src/physical_plan/expressions/sum.rs | 2 - .../src/physical_plan/hash_aggregate.rs | 174 ++++-------------- datafusion/src/physical_plan/mod.rs | 31 ++++ 7 files changed, 142 insertions(+), 140 deletions(-) diff --git a/datafusion/src/physical_plan/distinct_expressions.rs b/datafusion/src/physical_plan/distinct_expressions.rs index 778e603c60c3..8fd8c144956d 100644 --- a/datafusion/src/physical_plan/distinct_expressions.rs +++ b/datafusion/src/physical_plan/distinct_expressions.rs @@ -34,6 +34,9 @@ use smallvec::SmallVec; use std::collections::hash_map::RandomState; use std::collections::HashSet; +use super::groups_accumulator::GroupsAccumulator; +use super::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter; + #[derive(Debug, PartialEq, Eq, Hash, Clone)] struct DistinctScalarValues(Vec); @@ -122,6 +125,26 @@ impl AggregateExpr for DistinctCount { })) } + fn uses_groups_accumulator(&self) -> bool { + return true; + } + + fn create_groups_accumulator( + &self, + ) -> arrow::error::Result>> { + let state_data_types = self.state_data_types.clone(); + let count_data_type = self.data_type.clone(); + Ok(Some(Box::new(GroupsAccumulatorFlatAdapter::< + DistinctCountAccumulator, + >::new(move || { + Ok(DistinctCountAccumulator { + values: HashSet::default(), + state_data_types: state_data_types.clone(), + count_data_type: count_data_type.clone(), + }) + })))) + } + fn name(&self) -> &str { &self.name } diff --git a/datafusion/src/physical_plan/expressions/average.rs b/datafusion/src/physical_plan/expressions/average.rs index 96f908a06d24..37ad5aa5ae29 100644 --- a/datafusion/src/physical_plan/expressions/average.rs +++ b/datafusion/src/physical_plan/expressions/average.rs @@ -118,8 +118,6 @@ impl AggregateExpr for Avg { return true; } - /// the groups accumulator used to accumulate values from the expression. If this returns None, - /// create_accumulator must be used. fn create_groups_accumulator( &self, ) -> arrow::error::Result>> { diff --git a/datafusion/src/physical_plan/expressions/count.rs b/datafusion/src/physical_plan/expressions/count.rs index 97d69735abdf..0ea4353da766 100644 --- a/datafusion/src/physical_plan/expressions/count.rs +++ b/datafusion/src/physical_plan/expressions/count.rs @@ -21,6 +21,8 @@ use std::any::Any; use std::sync::Arc; use crate::error::Result; +use crate::physical_plan::groups_accumulator::GroupsAccumulator; +use crate::physical_plan::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter; use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr}; use crate::scalar::ScalarValue; use arrow::compute; @@ -90,6 +92,20 @@ impl AggregateExpr for Count { Ok(Box::new(CountAccumulator::new())) } + fn uses_groups_accumulator(&self) -> bool { + return true; + } + + fn create_groups_accumulator( + &self, + ) -> arrow::error::Result>> { + Ok(Some(Box::new(GroupsAccumulatorFlatAdapter::< + CountAccumulator, + >::new(move || { + Ok(CountAccumulator::new()) + })))) + } + fn name(&self) -> &str { &self.name } diff --git a/datafusion/src/physical_plan/expressions/min_max.rs b/datafusion/src/physical_plan/expressions/min_max.rs index ff76d526fd36..4df399d05c6b 100644 --- a/datafusion/src/physical_plan/expressions/min_max.rs +++ b/datafusion/src/physical_plan/expressions/min_max.rs @@ -22,6 +22,8 @@ use std::convert::TryFrom; use std::sync::Arc; use crate::error::{DataFusionError, Result}; +use crate::physical_plan::groups_accumulator::GroupsAccumulator; +use crate::physical_plan::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter; use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr}; use crate::scalar::ScalarValue; use arrow::compute; @@ -99,6 +101,23 @@ impl AggregateExpr for Max { Ok(Box::new(MaxAccumulator::try_new(&self.data_type)?)) } + fn uses_groups_accumulator(&self) -> bool { + return true; + } + + /// the groups accumulator used to accumulate values from the expression. If this returns None, + /// create_accumulator must be used. + fn create_groups_accumulator( + &self, + ) -> arrow::error::Result>> { + let data_type = self.data_type.clone(); + Ok(Some(Box::new( + GroupsAccumulatorFlatAdapter::::new(move || { + MaxAccumulator::try_new(&data_type) + }), + ))) + } + fn name(&self) -> &str { &self.name } @@ -523,6 +542,21 @@ impl AggregateExpr for Min { Ok(Box::new(MinAccumulator::try_new(&self.data_type)?)) } + fn uses_groups_accumulator(&self) -> bool { + return true; + } + + fn create_groups_accumulator( + &self, + ) -> arrow::error::Result>> { + let data_type = self.data_type.clone(); + Ok(Some(Box::new( + GroupsAccumulatorFlatAdapter::::new(move || { + MinAccumulator::try_new(&data_type) + }), + ))) + } + fn name(&self) -> &str { &self.name } diff --git a/datafusion/src/physical_plan/expressions/sum.rs b/datafusion/src/physical_plan/expressions/sum.rs index a3a9a5c73c6d..958817ccef6d 100644 --- a/datafusion/src/physical_plan/expressions/sum.rs +++ b/datafusion/src/physical_plan/expressions/sum.rs @@ -124,8 +124,6 @@ impl AggregateExpr for Sum { return true; } - /// the groups accumulator used to accumulate values from the expression. If this returns None, - /// create_accumulator must be used. fn create_groups_accumulator( &self, ) -> arrow::error::Result>> { diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index bd8e5e93f100..9e128694ed86 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -72,6 +72,7 @@ use arrow::array::{ use async_trait::async_trait; use super::groups_accumulator::GroupsAccumulator; +use super::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter; use super::{ expressions::Column, group_scalar::GroupByScalar, RecordBatchStream, SendableRecordBatchStream, @@ -407,10 +408,11 @@ pin_project! { } } +// TODO: _aggr_expr is currently unused; it's kept for perhaps, debugging usage, but probably just remove it. pub(crate) fn group_aggregate_batch( mode: &AggregateMode, group_expr: &[Arc], - aggr_expr: &[Arc], + _aggr_expr: &[Arc], batch: RecordBatch, mut accumulation_state: AccumulationState, aggregate_expressions: &[Vec>], @@ -435,12 +437,6 @@ pub(crate) fn group_aggregate_batch( // 1.2 construct the mapping key if it does not exist // 1.3 add the row' index to `indices` - // Make sure we can create the accumulators or otherwise return an error - create_accumulators(aggr_expr).map_err(DataFusionError::into_arrow_external_error)?; - - let all_groups_accumulators: bool = - aggr_expr.iter().all(|expr| expr.uses_groups_accumulator()); - // Keys received in this batch let mut batch_keys = BinaryBuilder::new(0); @@ -465,8 +461,6 @@ pub(crate) fn group_aggregate_batch( }) // 1.2 .or_insert_with(|| { - // We can safely unwrap here as we checked we can create an accumulator before - let accumulator_set = create_spotty_accumulators(aggr_expr).unwrap(); batch_keys.append_value(&key).expect("must not fail"); let _ = create_group_by_values(&group_values, row, &mut group_by_values); let mut taken_values = @@ -478,7 +472,6 @@ pub(crate) fn group_aggregate_batch( key.clone(), AccumulationGroupState { group_by_values: taken_values, - accumulator_set, indices: smallvec![row as u32], group_index, }, @@ -526,83 +519,29 @@ pub(crate) fn group_aggregate_batch( }) .collect(); - if !all_groups_accumulators { - // 2.1 for each key in this batch - // 2.2 for each aggregation - // 2.3 `slice` from each of its arrays the keys' values - // 2.4 update / merge the accumulator with the values - // 2.5 clear indices - batch_keys - .iter() - .zip(offsets.windows(2)) - .try_for_each(|(key, offsets)| { - let AccumulationGroupState { - accumulator_set, .. - } = accumulation_state - .accumulators - .get_mut(key.unwrap()) - .unwrap(); - // 2.2 - accumulator_set - .iter_mut() - .zip(values.iter()) - .map(|(accumulator, aggr_array)| { - ( - accumulator, - aggr_array - .iter() - .map(|array| { - // 2.3 - array.slice(offsets[0], offsets[1] - offsets[0]) - }) - .collect::>(), - ) - }) - .try_for_each(|(accumulator, values)| { - if let Some(accumulator) = accumulator { - match mode { - AggregateMode::Partial | AggregateMode::Full => { - accumulator.update_batch(&values) - } - AggregateMode::FinalPartitioned - | AggregateMode::Final => { - // note: the aggregation here is over states, not values, thus the merge - accumulator.merge_batch(&values) - } - } - } else { - // We do groups accumulator separately. - Ok(()) - } - }) - })?; - } - for (accumulator_index, accumulator) in accumulation_state .groups_accumulators .iter_mut() .enumerate() { - if let Some(accumulator) = accumulator { - match mode { - AggregateMode::Partial | AggregateMode::Full => accumulator - .update_batch_preordered( - &values[accumulator_index], - &all_group_indices, - &offsets, - None, - accumulation_state.next_group_index, - )?, - AggregateMode::FinalPartitioned | AggregateMode::Final => { - // note: the aggregation here is over states, not values, thus the merge - accumulator.merge_batch_preordered( - &values[accumulator_index], - &all_group_indices, - &offsets, - None, - accumulation_state.next_group_index, - )? - } + match mode { + AggregateMode::Partial | AggregateMode::Full => accumulator + .update_batch_preordered( + &values[accumulator_index], + &all_group_indices, + &offsets, + None, + accumulation_state.next_group_index, + )?, + AggregateMode::FinalPartitioned | AggregateMode::Final => { + // note: the aggregation here is over states, not values, thus the merge + accumulator.merge_batch_preordered( + &values[accumulator_index], + &all_group_indices, + &offsets, + None, + accumulation_state.next_group_index, + )? } } } @@ -940,19 +879,12 @@ pub type KeyVec = SmallVec<[u8; 64]>; type AccumulatorItem = Box; #[allow(missing_docs)] pub type AccumulatorSet = SmallVec<[AccumulatorItem; 2]>; -/// Not really a set. Order matters, as this is a parallel array with some GroupsAccumulator array. -/// There are Nones in place where there is (in some AccumulationState, presumably) a groups -/// accumulator in the parallel array. -pub type SpottyAccumulatorSet = SmallVec<[Option; 2]>; #[allow(missing_docs)] pub type Accumulators = HashMap; #[allow(missing_docs)] pub struct AccumulationGroupState { group_by_values: SmallVec<[GroupByScalar; 2]>, - // Each aggregate either has an Accumulator or a GroupsAccumulator. For each i, - // accumulator_set[i].is_some() != groups_accumulators[i].is_some(). - accumulator_set: SpottyAccumulatorSet, indices: SmallVec<[u32; 4]>, group_index: usize, } @@ -961,7 +893,7 @@ pub struct AccumulationGroupState { #[derive(Default)] pub struct AccumulationState { accumulators: HashMap, - groups_accumulators: Vec>>, + groups_accumulators: Vec>, // For now, always equal to accumulators.len() next_group_index: usize, } @@ -969,7 +901,7 @@ pub struct AccumulationState { impl AccumulationState { /// Constructs an initial AccumulationState. pub fn new( - groups_accumulators: Vec>>, + groups_accumulators: Vec>, ) -> AccumulationState { AccumulationState { accumulators: HashMap::new(), @@ -1243,7 +1175,6 @@ pub(crate) fn create_batch_from_map( _, AccumulationGroupState { group_by_values, - accumulator_set, group_index, .. }, @@ -1253,7 +1184,6 @@ pub(crate) fn create_batch_from_map( write_group_result_row_with_groups_accumulator( *mode, group_by_values, - accumulator_set, &accumulation_state.groups_accumulators, *group_index, &output_schema.fields()[0..num_group_expr], @@ -1325,8 +1255,7 @@ pub fn write_group_result_row( pub fn write_group_result_row_with_groups_accumulator( mode: AggregateMode, group_by_values: &[GroupByScalar], - accumulator_set: &SpottyAccumulatorSet, - groups_accumulators: &[Option>], + groups_accumulators: &[Box], group_index: usize, key_fields: &[Field], key_columns: &mut Vec>, @@ -1356,7 +1285,6 @@ pub fn write_group_result_row_with_groups_accumulator( } } finalize_aggregation_into_with_groups_accumulators( - &accumulator_set, groups_accumulators, group_index, &mode, @@ -1374,34 +1302,23 @@ pub fn create_accumulators( .collect::>>() } -// TODO: Name? (Spotty) -#[allow(missing_docs)] -pub fn create_spotty_accumulators( - aggr_expr: &[Arc], -) -> Result { - aggr_expr - .iter() - .map(|expr| { - Ok(if expr.uses_groups_accumulator() { - None - } else { - Some(expr.create_accumulator()?) - }) - }) - .collect::>>() -} - #[allow(missing_docs)] pub fn create_accumulation_state( aggr_expr: &[Arc], ) -> ArrowResult { let mut groups_accumulators = - Vec::>>::with_capacity(aggr_expr.len()); + Vec::>::with_capacity(aggr_expr.len()); for expr in aggr_expr { if let Some(groups_acc) = expr.create_groups_accumulator()? { - groups_accumulators.push(Some(groups_acc)); + groups_accumulators.push(groups_acc); } else { - groups_accumulators.push(None); + let expr: Arc = expr.clone(); + + groups_accumulators.push(Box::new(GroupsAccumulatorFlatAdapter::< + Box, + >::new(move || { + expr.create_accumulator() + }))); } } @@ -1547,8 +1464,7 @@ fn finalize_aggregation_into( /// adds aggregation results into columns, creating the required builders when necessary. /// final value (mode = Final) or states (mode = Partial) fn finalize_aggregation_into_with_groups_accumulators( - accumulators: &SpottyAccumulatorSet, - groups_accumulators: &[Option>], + groups_accumulators: &[Box], group_index: usize, mode: &AggregateMode, columns: &mut Vec>, @@ -1557,15 +1473,8 @@ fn finalize_aggregation_into_with_groups_accumulators( match mode { AggregateMode::Partial => { let mut col_i = 0; - for (i, a) in accumulators.iter().enumerate() { - let state = if let Some(a) = a { - a.state() - } else { - groups_accumulators[i] - .as_ref() - .unwrap() - .peek_state(group_index) - }?; + for ga in groups_accumulators.iter() { + let state = ga.peek_state(group_index)?; // build the vector of states for v in state { if add_columns { @@ -1578,16 +1487,9 @@ fn finalize_aggregation_into_with_groups_accumulators( } } AggregateMode::Final | AggregateMode::FinalPartitioned | AggregateMode::Full => { - for i in 0..accumulators.len() { + for (i, ga) in groups_accumulators.iter().enumerate() { // merge the state to the final value - let v: ScalarValue = if let Some(accumulator) = &accumulators[i] { - accumulator.evaluate()? - } else { - groups_accumulators[i] - .as_ref() - .unwrap() - .peek_evaluate(group_index)? - }; + let v: ScalarValue = ga.peek_evaluate(group_index)?; if add_columns { columns.push(create_builder(&v)); assert_eq!(i + 1, columns.len()); diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 23e667781109..a9ed176b3642 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -634,6 +634,37 @@ pub trait Accumulator: Send + Sync + Debug { fn evaluate(&self) -> Result; } +// Used for AggregateExpr implementations that still have uses_groups_accumulator being false. +impl Accumulator for Box { + fn reset(&mut self) { + self.as_mut().reset() + } + + fn state(&self) -> Result> { + self.as_ref().state() + } + + fn update(&mut self, values: &[ScalarValue]) -> Result<()> { + self.as_mut().update(values) + } + + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + self.as_mut().update_batch(values) + } + + fn merge(&mut self, states: &[ScalarValue]) -> Result<()> { + self.as_mut().merge(states) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.as_mut().merge_batch(states) + } + + fn evaluate(&self) -> Result { + self.as_ref().evaluate() + } +} + pub mod aggregates; pub mod array_expressions; pub mod coalesce_batches; From 9b7a8d3d95821ccbdbc36450545b730006a1b6a8 Mon Sep 17 00:00:00 2001 From: Sam Hughes Date: Mon, 18 Nov 2024 10:55:55 -0800 Subject: [PATCH 2/2] chore: Minor cleanups --- datafusion/src/cube_ext/joinagg.rs | 1 - datafusion/src/physical_plan/distinct_expressions.rs | 5 ++--- datafusion/src/physical_plan/expressions/min_max.rs | 2 -- datafusion/src/physical_plan/hash_aggregate.rs | 6 +++--- 4 files changed, 5 insertions(+), 9 deletions(-) diff --git a/datafusion/src/cube_ext/joinagg.rs b/datafusion/src/cube_ext/joinagg.rs index 8953aa5cfafe..2324398bcb46 100644 --- a/datafusion/src/cube_ext/joinagg.rs +++ b/datafusion/src/cube_ext/joinagg.rs @@ -259,7 +259,6 @@ impl ExecutionPlan for CrossJoinAggExec { accumulators = hash_aggregate::group_aggregate_batch( &AggregateMode::Full, &group_expr, - &self.agg_expr, joined, std::mem::take(&mut accumulators), &aggs, diff --git a/datafusion/src/physical_plan/distinct_expressions.rs b/datafusion/src/physical_plan/distinct_expressions.rs index 8fd8c144956d..384ce8680540 100644 --- a/datafusion/src/physical_plan/distinct_expressions.rs +++ b/datafusion/src/physical_plan/distinct_expressions.rs @@ -27,6 +27,8 @@ use arrow::datatypes::{DataType, Field}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::group_scalar::GroupByScalar; +use crate::physical_plan::groups_accumulator::GroupsAccumulator; +use crate::physical_plan::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter; use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr}; use crate::scalar::ScalarValue; use itertools::Itertools; @@ -34,9 +36,6 @@ use smallvec::SmallVec; use std::collections::hash_map::RandomState; use std::collections::HashSet; -use super::groups_accumulator::GroupsAccumulator; -use super::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter; - #[derive(Debug, PartialEq, Eq, Hash, Clone)] struct DistinctScalarValues(Vec); diff --git a/datafusion/src/physical_plan/expressions/min_max.rs b/datafusion/src/physical_plan/expressions/min_max.rs index 4df399d05c6b..9d5bb3f9db4a 100644 --- a/datafusion/src/physical_plan/expressions/min_max.rs +++ b/datafusion/src/physical_plan/expressions/min_max.rs @@ -105,8 +105,6 @@ impl AggregateExpr for Max { return true; } - /// the groups accumulator used to accumulate values from the expression. If this returns None, - /// create_accumulator must be used. fn create_groups_accumulator( &self, ) -> arrow::error::Result>> { diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index 9e128694ed86..a27fe6ec6a2c 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -408,16 +408,17 @@ pin_project! { } } -// TODO: _aggr_expr is currently unused; it's kept for perhaps, debugging usage, but probably just remove it. pub(crate) fn group_aggregate_batch( mode: &AggregateMode, group_expr: &[Arc], - _aggr_expr: &[Arc], batch: RecordBatch, mut accumulation_state: AccumulationState, aggregate_expressions: &[Vec>], skip_row: impl Fn(&RecordBatch, /*row_index*/ usize) -> bool, ) -> Result { + // Note: There is some parallel array &[Arc] that simply isn't passed to this + // function, but which exists and might be useful. + // evaluate the grouping expressions let group_values = evaluate(group_expr, &batch)?; @@ -813,7 +814,6 @@ async fn compute_grouped_hash_aggregate( accumulators = group_aggregate_batch( &mode, &group_expr, - &aggr_expr, batch, accumulators, &aggregate_expressions,