Skip to content

Commit 6d8f9dd

Browse files
committed
Use GroupsAccumulator exclusively in grouped hash aggregation
Makes other AggregateExprs in use GroupsAccumulatorFlatAdapter, and also uses a GroupsAccumulator implementation that has Box<dyn Accumulator> inside as a fallback accumulator if some AggregateExpr implementation does not support that. This fully removes a batch keys and hash table iteration and brings that performance benefit from Sum and Avg to other aggregation types.
1 parent a8f045a commit 6d8f9dd

File tree

7 files changed

+142
-140
lines changed

7 files changed

+142
-140
lines changed

datafusion/src/physical_plan/distinct_expressions.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ use smallvec::SmallVec;
3434
use std::collections::hash_map::RandomState;
3535
use std::collections::HashSet;
3636

37+
use super::groups_accumulator::GroupsAccumulator;
38+
use super::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter;
39+
3740
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
3841
struct DistinctScalarValues(Vec<GroupByScalar>);
3942

@@ -122,6 +125,26 @@ impl AggregateExpr for DistinctCount {
122125
}))
123126
}
124127

128+
fn uses_groups_accumulator(&self) -> bool {
129+
return true;
130+
}
131+
132+
fn create_groups_accumulator(
133+
&self,
134+
) -> arrow::error::Result<Option<Box<dyn GroupsAccumulator>>> {
135+
let state_data_types = self.state_data_types.clone();
136+
let count_data_type = self.data_type.clone();
137+
Ok(Some(Box::new(GroupsAccumulatorFlatAdapter::<
138+
DistinctCountAccumulator,
139+
>::new(move || {
140+
Ok(DistinctCountAccumulator {
141+
values: HashSet::default(),
142+
state_data_types: state_data_types.clone(),
143+
count_data_type: count_data_type.clone(),
144+
})
145+
}))))
146+
}
147+
125148
fn name(&self) -> &str {
126149
&self.name
127150
}

datafusion/src/physical_plan/expressions/average.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,6 @@ impl AggregateExpr for Avg {
118118
return true;
119119
}
120120

121-
/// the groups accumulator used to accumulate values from the expression. If this returns None,
122-
/// create_accumulator must be used.
123121
fn create_groups_accumulator(
124122
&self,
125123
) -> arrow::error::Result<Option<Box<dyn GroupsAccumulator>>> {

datafusion/src/physical_plan/expressions/count.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use std::any::Any;
2121
use std::sync::Arc;
2222

2323
use crate::error::Result;
24+
use crate::physical_plan::groups_accumulator::GroupsAccumulator;
25+
use crate::physical_plan::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter;
2426
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
2527
use crate::scalar::ScalarValue;
2628
use arrow::compute;
@@ -90,6 +92,20 @@ impl AggregateExpr for Count {
9092
Ok(Box::new(CountAccumulator::new()))
9193
}
9294

95+
fn uses_groups_accumulator(&self) -> bool {
96+
return true;
97+
}
98+
99+
fn create_groups_accumulator(
100+
&self,
101+
) -> arrow::error::Result<Option<Box<dyn GroupsAccumulator>>> {
102+
Ok(Some(Box::new(GroupsAccumulatorFlatAdapter::<
103+
CountAccumulator,
104+
>::new(move || {
105+
Ok(CountAccumulator::new())
106+
}))))
107+
}
108+
93109
fn name(&self) -> &str {
94110
&self.name
95111
}

datafusion/src/physical_plan/expressions/min_max.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use std::convert::TryFrom;
2222
use std::sync::Arc;
2323

2424
use crate::error::{DataFusionError, Result};
25+
use crate::physical_plan::groups_accumulator::GroupsAccumulator;
26+
use crate::physical_plan::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter;
2527
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
2628
use crate::scalar::ScalarValue;
2729
use arrow::compute;
@@ -99,6 +101,23 @@ impl AggregateExpr for Max {
99101
Ok(Box::new(MaxAccumulator::try_new(&self.data_type)?))
100102
}
101103

104+
fn uses_groups_accumulator(&self) -> bool {
105+
return true;
106+
}
107+
108+
/// the groups accumulator used to accumulate values from the expression. If this returns None,
109+
/// create_accumulator must be used.
110+
fn create_groups_accumulator(
111+
&self,
112+
) -> arrow::error::Result<Option<Box<dyn GroupsAccumulator>>> {
113+
let data_type = self.data_type.clone();
114+
Ok(Some(Box::new(
115+
GroupsAccumulatorFlatAdapter::<MaxAccumulator>::new(move || {
116+
MaxAccumulator::try_new(&data_type)
117+
}),
118+
)))
119+
}
120+
102121
fn name(&self) -> &str {
103122
&self.name
104123
}
@@ -523,6 +542,21 @@ impl AggregateExpr for Min {
523542
Ok(Box::new(MinAccumulator::try_new(&self.data_type)?))
524543
}
525544

545+
fn uses_groups_accumulator(&self) -> bool {
546+
return true;
547+
}
548+
549+
fn create_groups_accumulator(
550+
&self,
551+
) -> arrow::error::Result<Option<Box<dyn GroupsAccumulator>>> {
552+
let data_type = self.data_type.clone();
553+
Ok(Some(Box::new(
554+
GroupsAccumulatorFlatAdapter::<MinAccumulator>::new(move || {
555+
MinAccumulator::try_new(&data_type)
556+
}),
557+
)))
558+
}
559+
526560
fn name(&self) -> &str {
527561
&self.name
528562
}

datafusion/src/physical_plan/expressions/sum.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,6 @@ impl AggregateExpr for Sum {
124124
return true;
125125
}
126126

127-
/// the groups accumulator used to accumulate values from the expression. If this returns None,
128-
/// create_accumulator must be used.
129127
fn create_groups_accumulator(
130128
&self,
131129
) -> arrow::error::Result<Option<Box<dyn GroupsAccumulator>>> {

0 commit comments

Comments
 (0)