Skip to content

Commit 0bf2255

Browse files
committed
Fix group by sorting bug
1 parent a95cef3 commit 0bf2255

File tree

2 files changed

+74
-0
lines changed

2 files changed

+74
-0
lines changed

datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use datafusion_physical_expr_common::sort_expr::{
3535
LexOrdering, LexRequirement, OrderingRequirements, PhysicalSortExpr,
3636
PhysicalSortRequirement,
3737
};
38+
use datafusion_physical_plan::aggregates::AggregateExec;
3839
use datafusion_physical_plan::execution_plan::CardinalityEffect;
3940
use datafusion_physical_plan::filter::FilterExec;
4041
use datafusion_physical_plan::joins::utils::{
@@ -355,6 +356,10 @@ fn pushdown_requirement_to_children(
355356
}
356357
} else if maintains_input_order.is_empty()
357358
|| !maintains_input_order.iter().any(|o| *o)
359+
// Aggregate output columns can be computed expressions that are not
360+
// order-preserving wrt input columns. The generic index-based rewrite
361+
// in handle_custom_pushdown is not safe for AggregateExec.
362+
|| plan.as_any().is::<AggregateExec>()
358363
|| plan.as_any().is::<RepartitionExec>()
359364
|| plan.as_any().is::<FilterExec>()
360365
// TODO: Add support for Projection push down

datafusion/sqllogictest/test_files/sort_pushdown.slt

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -851,6 +851,69 @@ LIMIT 3;
851851
5 4
852852
2 -3
853853

854+
# Test 3.7: Aggregate ORDER BY expression should keep SortExec
855+
# Source pattern declared on parquet scan: [x ASC, y ASC].
856+
# Requested pattern in ORDER BY: [x ASC, CAST(y AS BIGINT) % 2 ASC].
857+
# Example for x=1 input y order 1,2,3 gives bucket order 1,0,1, which does not
858+
# match requested bucket ASC order. SortExec is required above AggregateExec.
859+
statement ok
860+
SET datafusion.execution.target_partitions = 1;
861+
862+
statement ok
863+
CREATE TABLE agg_expr_data(x INT, y INT, v INT) AS VALUES
864+
(1, 1, 10),
865+
(1, 2, 20),
866+
(1, 3, 30),
867+
(2, 1, 40),
868+
(2, 2, 50),
869+
(2, 3, 60);
870+
871+
query I
872+
COPY (SELECT * FROM agg_expr_data ORDER BY x, y)
873+
TO 'test_files/scratch/sort_pushdown/agg_expr_sorted.parquet';
874+
----
875+
6
876+
877+
statement ok
878+
CREATE EXTERNAL TABLE agg_expr_parquet(x INT, y INT, v INT)
879+
STORED AS PARQUET
880+
LOCATION 'test_files/scratch/sort_pushdown/agg_expr_sorted.parquet'
881+
WITH ORDER (x ASC, y ASC);
882+
883+
query TT
884+
EXPLAIN SELECT
885+
x,
886+
CAST(y AS BIGINT) % 2,
887+
SUM(v)
888+
FROM agg_expr_parquet
889+
GROUP BY x, CAST(y AS BIGINT) % 2
890+
ORDER BY x, CAST(y AS BIGINT) % 2;
891+
----
892+
logical_plan
893+
01)Sort: agg_expr_parquet.x ASC NULLS LAST, agg_expr_parquet.y % Int64(2) ASC NULLS LAST
894+
02)--Aggregate: groupBy=[[agg_expr_parquet.x, CAST(agg_expr_parquet.y AS Int64) % Int64(2)]], aggr=[[sum(CAST(agg_expr_parquet.v AS Int64))]]
895+
03)----TableScan: agg_expr_parquet projection=[x, y, v]
896+
physical_plan
897+
01)SortExec: expr=[x@0 ASC NULLS LAST, agg_expr_parquet.y % Int64(2)@1 ASC NULLS LAST], preserve_partitioning=[false]
898+
02)--AggregateExec: mode=Single, gby=[x@0 as x, CAST(y@1 AS Int64) % 2 as agg_expr_parquet.y % Int64(2)], aggr=[sum(agg_expr_parquet.v)], ordering_mode=PartiallySorted([0])
899+
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/agg_expr_sorted.parquet]]}, projection=[x, y, v], output_ordering=[x@0 ASC NULLS LAST, y@1 ASC NULLS LAST], file_type=parquet
900+
901+
# Expected output pattern from ORDER BY [x, bucket]:
902+
# rows grouped by x, and within each x bucket appears as 0 then 1.
903+
query III
904+
SELECT
905+
x,
906+
CAST(y AS BIGINT) % 2,
907+
SUM(v)
908+
FROM agg_expr_parquet
909+
GROUP BY x, CAST(y AS BIGINT) % 2
910+
ORDER BY x, CAST(y AS BIGINT) % 2;
911+
----
912+
1 0 20
913+
1 1 40
914+
2 0 50
915+
2 1 100
916+
854917
# Cleanup
855918
statement ok
856919
DROP TABLE timestamp_data;
@@ -882,5 +945,11 @@ DROP TABLE signed_data;
882945
statement ok
883946
DROP TABLE signed_parquet;
884947

948+
statement ok
949+
DROP TABLE agg_expr_data;
950+
951+
statement ok
952+
DROP TABLE agg_expr_parquet;
953+
885954
statement ok
886955
SET datafusion.optimizer.enable_sort_pushdown = true;

0 commit comments

Comments
 (0)