diff --git a/datafusion/src/cube_ext/joinagg.rs b/datafusion/src/cube_ext/joinagg.rs
index 8953aa5cfafe..2324398bcb46 100644
--- a/datafusion/src/cube_ext/joinagg.rs
+++ b/datafusion/src/cube_ext/joinagg.rs
@@ -259,7 +259,6 @@ impl ExecutionPlan for CrossJoinAggExec {
accumulators = hash_aggregate::group_aggregate_batch(
&AggregateMode::Full,
&group_expr,
- &self.agg_expr,
joined,
std::mem::take(&mut accumulators),
&aggs,
diff --git a/datafusion/src/physical_plan/distinct_expressions.rs b/datafusion/src/physical_plan/distinct_expressions.rs
index 778e603c60c3..384ce8680540 100644
--- a/datafusion/src/physical_plan/distinct_expressions.rs
+++ b/datafusion/src/physical_plan/distinct_expressions.rs
@@ -27,6 +27,8 @@ use arrow::datatypes::{DataType, Field};
use crate::error::{DataFusionError, Result};
use crate::physical_plan::group_scalar::GroupByScalar;
+use crate::physical_plan::groups_accumulator::GroupsAccumulator;
+use crate::physical_plan::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter;
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
use crate::scalar::ScalarValue;
use itertools::Itertools;
@@ -122,6 +124,26 @@ impl AggregateExpr for DistinctCount {
}))
}
+ fn uses_groups_accumulator(&self) -> bool {
+ return true;
+ }
+
+ fn create_groups_accumulator(
+ &self,
+ ) -> arrow::error::Result