Skip to content

Commit b786b9a

Browse files
authored
Eliminate all redundant aggregations (#17139)
* Regression test * Eliminate all redundant aggregations Before the change, it was disallowed to have an aggregation without GROUP BY and without any aggregate functions. This prevented the optimizer from removing each redundant aggregation if all were redundant. The first one would always be retained. This commit removes the limitation, allowing for queries to be further optimized. * fixup! Eliminate all redundant aggregations
1 parent b8bf7c5 commit b786b9a

File tree

9 files changed

+88
-73
lines changed

9 files changed

+88
-73
lines changed

datafusion/core/tests/dataframe/mod.rs

Lines changed: 28 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2744,23 +2744,20 @@ async fn test_count_wildcard_on_where_exist() -> Result<()> {
27442744

27452745
assert_snapshot!(
27462746
pretty_format_batches(&sql_results).unwrap(),
2747-
@r###"
2748-
+---------------+---------------------------------------------------------+
2749-
| plan_type | plan |
2750-
+---------------+---------------------------------------------------------+
2751-
| logical_plan | LeftSemi Join: |
2752-
| | TableScan: t1 projection=[a, b] |
2753-
| | SubqueryAlias: __correlated_sq_1 |
2754-
| | Projection: |
2755-
| | Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] |
2756-
| | TableScan: t2 projection=[] |
2757-
| physical_plan | NestedLoopJoinExec: join_type=RightSemi |
2758-
| | ProjectionExec: expr=[] |
2759-
| | PlaceholderRowExec |
2760-
| | DataSourceExec: partitions=1, partition_sizes=[1] |
2761-
| | |
2762-
+---------------+---------------------------------------------------------+
2763-
"###
2747+
@r"
2748+
+---------------+-----------------------------------------------------+
2749+
| plan_type | plan |
2750+
+---------------+-----------------------------------------------------+
2751+
| logical_plan | LeftSemi Join: |
2752+
| | TableScan: t1 projection=[a, b] |
2753+
| | SubqueryAlias: __correlated_sq_1 |
2754+
| | EmptyRelation |
2755+
| physical_plan | NestedLoopJoinExec: join_type=RightSemi |
2756+
| | PlaceholderRowExec |
2757+
| | DataSourceExec: partitions=1, partition_sizes=[1] |
2758+
| | |
2759+
+---------------+-----------------------------------------------------+
2760+
"
27642761
);
27652762

27662763
let df_results = ctx
@@ -2783,23 +2780,20 @@ async fn test_count_wildcard_on_where_exist() -> Result<()> {
27832780

27842781
assert_snapshot!(
27852782
pretty_format_batches(&df_results).unwrap(),
2786-
@r###"
2787-
+---------------+---------------------------------------------------------------------+
2788-
| plan_type | plan |
2789-
+---------------+---------------------------------------------------------------------+
2790-
| logical_plan | LeftSemi Join: |
2791-
| | TableScan: t1 projection=[a, b] |
2792-
| | SubqueryAlias: __correlated_sq_1 |
2793-
| | Projection: |
2794-
| | Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] |
2795-
| | TableScan: t2 projection=[] |
2796-
| physical_plan | NestedLoopJoinExec: join_type=RightSemi |
2797-
| | ProjectionExec: expr=[] |
2798-
| | PlaceholderRowExec |
2799-
| | DataSourceExec: partitions=1, partition_sizes=[1] |
2800-
| | |
2801-
+---------------+---------------------------------------------------------------------+
2802-
"###
2783+
@r"
2784+
+---------------+-----------------------------------------------------+
2785+
| plan_type | plan |
2786+
+---------------+-----------------------------------------------------+
2787+
| logical_plan | LeftSemi Join: |
2788+
| | TableScan: t1 projection=[a, b] |
2789+
| | SubqueryAlias: __correlated_sq_1 |
2790+
| | EmptyRelation |
2791+
| physical_plan | NestedLoopJoinExec: join_type=RightSemi |
2792+
| | PlaceholderRowExec |
2793+
| | DataSourceExec: partitions=1, partition_sizes=[1] |
2794+
| | |
2795+
+---------------+-----------------------------------------------------+
2796+
"
28032797
);
28042798

28052799
Ok(())

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3522,7 +3522,10 @@ impl Aggregate {
35223522
) -> Result<Self> {
35233523
if group_expr.is_empty() && aggr_expr.is_empty() {
35243524
return plan_err!(
3525-
"Aggregate requires at least one grouping or aggregate expression"
3525+
"Aggregate requires at least one grouping or aggregate expression. \
3526+
Aggregate without grouping expressions nor aggregate expressions is \
3527+
logically equivalent to, but less efficient than, VALUES producing \
3528+
single row. Please use VALUES instead."
35263529
);
35273530
}
35283531
let group_expr_count = grouping_set_expr_count(&group_expr)?;

datafusion/optimizer/src/optimize_projections/mod.rs

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@ use std::sync::Arc;
2626

2727
use datafusion_common::{
2828
get_required_group_by_exprs_indices, internal_datafusion_err, internal_err, Column,
29-
HashMap, JoinType, Result,
29+
DFSchema, HashMap, JoinType, Result,
3030
};
3131
use datafusion_expr::expr::Alias;
3232
use datafusion_expr::{
33-
logical_plan::LogicalPlan, Aggregate, Distinct, Expr, Projection, TableScan, Unnest,
34-
Window,
33+
logical_plan::LogicalPlan, Aggregate, Distinct, EmptyRelation, Expr, Projection,
34+
TableScan, Unnest, Window,
3535
};
3636

3737
use crate::optimize_projections::required_indices::RequiredIndices;
@@ -153,23 +153,16 @@ fn optimize_projections(
153153

154154
// Only use the absolutely necessary aggregate expressions required
155155
// by the parent:
156-
let mut new_aggr_expr = aggregate_reqs.get_at_indices(&aggregate.aggr_expr);
157-
158-
// Aggregations always need at least one aggregate expression.
159-
// With a nested count, we don't require any column as input, but
160-
// still need to create a correct aggregate, which may be optimized
161-
// out later. As an example, consider the following query:
162-
//
163-
// SELECT count(*) FROM (SELECT count(*) FROM [...])
164-
//
165-
// which always returns 1.
166-
if new_aggr_expr.is_empty()
167-
&& new_group_bys.is_empty()
168-
&& !aggregate.aggr_expr.is_empty()
169-
{
170-
// take the old, first aggregate expression
171-
new_aggr_expr = aggregate.aggr_expr;
172-
new_aggr_expr.resize_with(1, || unreachable!());
156+
let new_aggr_expr = aggregate_reqs.get_at_indices(&aggregate.aggr_expr);
157+
158+
if new_group_bys.is_empty() && new_aggr_expr.is_empty() {
159+
// Global aggregation with no aggregate functions always produces 1 row and no columns.
160+
return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
161+
EmptyRelation {
162+
produce_one_row: true,
163+
schema: Arc::new(DFSchema::empty()),
164+
},
165+
)));
173166
}
174167

175168
let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter());
@@ -1146,9 +1139,7 @@ mod tests {
11461139
plan,
11471140
@r"
11481141
Aggregate: groupBy=[[]], aggr=[[count(Int32(1))]]
1149-
Projection:
1150-
Aggregate: groupBy=[[]], aggr=[[count(Int32(1))]]
1151-
TableScan: ?table? projection=[]
1142+
EmptyRelation
11521143
"
11531144
)
11541145
}

datafusion/sqllogictest/test_files/explain.slt

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -429,14 +429,11 @@ logical_plan
429429
01)LeftSemi Join:
430430
02)--TableScan: t1 projection=[a]
431431
03)--SubqueryAlias: __correlated_sq_1
432-
04)----Projection:
433-
05)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
434-
06)--------TableScan: t2 projection=[]
432+
04)----EmptyRelation
435433
physical_plan
436434
01)NestedLoopJoinExec: join_type=LeftSemi
437435
02)--DataSourceExec: partitions=1, partition_sizes=[0]
438-
03)--ProjectionExec: expr=[]
439-
04)----PlaceholderRowExec
436+
03)--PlaceholderRowExec
440437

441438
statement ok
442439
drop table t1;

datafusion/sqllogictest/test_files/explain_tree.slt

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1263,14 +1263,11 @@ physical_plan
12631263
04)│ join_type: LeftSemi │ │
12641264
05)└─────────────┬─────────────┘ │
12651265
06)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
1266-
07)│ DataSourceExec ││ ProjectionExec
1266+
07)│ DataSourceExec ││ PlaceholderRowExec
12671267
08)│ -------------------- ││ │
12681268
09)│ files: 1 ││ │
12691269
10)│ format: csv ││ │
1270-
11)└───────────────────────────┘└─────────────┬─────────────┘
1271-
12)-----------------------------┌─────────────┴─────────────┐
1272-
13)-----------------------------│ PlaceholderRowExec │
1273-
14)-----------------------------└───────────────────────────┘
1270+
11)└───────────────────────────┘└───────────────────────────┘
12741271

12751272
# Query with cross join.
12761273
query TT

datafusion/sqllogictest/test_files/expr/date_part.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1089,4 +1089,4 @@ SELECT EXTRACT("isodow" FROM to_timestamp('2020-09-08T12:00:00+00:00'))
10891089
query I
10901090
SELECT EXTRACT('isodow' FROM to_timestamp('2020-09-08T12:00:00+00:00'))
10911091
----
1092-
1
1092+
1
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
statement ok
2+
CREATE TABLE tab1(col0 INTEGER, col1 INTEGER, col2 INTEGER)
3+
4+
statement ok
5+
INSERT INTO tab1 VALUES(51,14,96)
6+
7+
query R
8+
SELECT NULL * AVG(DISTINCT 4) + SUM(col1) AS col0 FROM tab1
9+
----
10+
NULL
11+
12+
query TT
13+
EXPLAIN SELECT NULL * AVG(DISTINCT 4) + SUM(col1) AS col0 FROM tab1
14+
----
15+
logical_plan
16+
01)Projection: Float64(NULL) AS col0
17+
02)--EmptyRelation
18+
physical_plan
19+
01)ProjectionExec: expr=[NULL as col0]
20+
02)--PlaceholderRowExec
21+
22+
# Similar, with a few more arithmetic operations
23+
query R
24+
SELECT + CAST ( NULL AS INTEGER ) * + + AVG ( DISTINCT 4 ) + - SUM ( ALL + col1 ) AS col0 FROM tab1
25+
----
26+
NULL
27+
28+
query TT
29+
EXPLAIN SELECT + CAST ( NULL AS INTEGER ) * + + AVG ( DISTINCT 4 ) + - SUM ( ALL + col1 ) AS col0 FROM tab1
30+
----
31+
logical_plan
32+
01)Projection: Float64(NULL) AS col0
33+
02)--EmptyRelation
34+
physical_plan
35+
01)ProjectionExec: expr=[NULL as col0]
36+
02)--PlaceholderRowExec

datafusion/sqllogictest/test_files/spark/bitwise/getbit.slt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,3 @@ query I
7373
SELECT getbit(11, NULL);
7474
----
7575
NULL
76-

datafusion/sqllogictest/test_files/subquery.slt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1453,9 +1453,7 @@ logical_plan
14531453
01)LeftSemi Join:
14541454
02)--TableScan: t1 projection=[a]
14551455
03)--SubqueryAlias: __correlated_sq_1
1456-
04)----Projection:
1457-
05)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
1458-
06)--------TableScan: t2 projection=[]
1456+
04)----EmptyRelation
14591457

14601458
statement count 0
14611459
drop table t1;

0 commit comments

Comments
 (0)