Skip to content

Commit c2e7c33

Browse files
committed
fix merge issue
1 parent 5fffc64 commit c2e7c33

File tree

3 files changed

+20
-24
lines changed

3 files changed

+20
-24
lines changed

datafusion/core/benches/topk_aggregate.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ use std::hint::black_box;
2828
use std::sync::Arc;
2929
use tokio::runtime::Runtime;
3030

31+
use crate::data_utils::make_distinct_data;
32+
3133
const LIMIT: usize = 10;
3234

3335
async fn create_context(

datafusion/physical-optimizer/src/topk_aggregation.rs

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,16 @@ impl TopKAggregation {
4848
order_desc: bool,
4949
limit: usize,
5050
) -> Option<Arc<dyn ExecutionPlan>> {
51-
let (field, _) = aggr.get_minmax_desc()?;
52-
let group_key = aggr.group_expr().expr().iter().exactly_one().ok()?;
53-
let kt = group_key.0.data_type(&aggr.input().schema()).ok()?;
54-
let vt = field.data_type();
55-
if !topk_types_supported(&kt, vt) {
51+
// Current only support single group key
52+
let (group_key, group_key_alias) =
53+
aggr.group_expr().expr().iter().exactly_one().ok()?;
54+
let kt = group_key.data_type(&aggr.input().schema()).ok()?;
55+
let vt = if let Some((field, _)) = aggr.get_minmax_desc() {
56+
field.data_type().clone()
57+
} else {
58+
kt.clone()
59+
};
60+
if !topk_types_supported(&kt, &vt) {
5661
return None;
5762
}
5863
if aggr.filter_expr().iter().any(|e| e.is_some()) {
@@ -70,31 +75,20 @@ impl TopKAggregation {
7075
return None;
7176
}
7277
} else if aggr.aggr_expr().is_empty() {
73-
// TODO: remove this after https://github.com/apache/datafusion/issues/19219
74-
if !kt.is_primitive() {
75-
return None;
76-
}
77-
// This is a GROUP BY without aggregates (DISTINCT-like operation)
78-
// Check if ordering is on the group key itself
78+
// This is a GROUP BY without aggregates, check if ordering is on the group key itself
7979
if order_by != group_key_alias {
8080
return None;
8181
}
8282
} else {
83-
// Has aggregates but not MIN/MAX, or doesn't match our patterns
83+
// Has aggregates but not MIN/MAX, or doesn't DISTINCT
8484
return None;
8585
}
8686

8787
// We found what we want: clone, copy the limit down, and return modified node
88-
let new_aggr = AggregateExec::try_new(
89-
*aggr.mode(),
90-
aggr.group_expr().clone(),
91-
aggr.aggr_expr().to_vec(),
92-
aggr.filter_expr().to_vec(),
93-
Arc::clone(aggr.input()),
94-
aggr.input_schema(),
95-
)
96-
.expect("Unable to copy Aggregate!")
97-
.with_limit_options(Some(LimitOptions::new_with_order(limit, order_desc)));
88+
let new_aggr = AggregateExec::with_new_limit_options(
89+
aggr,
90+
Some(LimitOptions::new_with_order(limit, order_desc)),
91+
);
9892
Some(Arc::new(new_aggr))
9993
}
10094

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -603,9 +603,9 @@ impl AggregateExec {
603603
}
604604

605605
/// Clone this exec, overriding only the limit hint.
606-
pub fn with_new_limit(&self, limit: Option<usize>) -> Self {
606+
pub fn with_new_limit_options(&self, limit_options: Option<LimitOptions>) -> Self {
607607
Self {
608-
limit,
608+
limit_options,
609609
// clone the rest of the fields
610610
required_input_ordering: self.required_input_ordering.clone(),
611611
metrics: ExecutionPlanMetricsSet::new(),

0 commit comments

Comments
 (0)