Skip to content

Commit 9a73407

Browse files
committed
Refactor enforce_distribution tests: remove redundant tests and clarify repartitioning logic
1 parent 6461c8e commit 9a73407

File tree

1 file changed

+11
-182
lines changed

1 file changed

+11
-182
lines changed

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 11 additions & 182 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ use std::ops::Deref;
2020
use std::sync::Arc;
2121

2222
use crate::physical_optimizer::test_utils::{
23-
bounded_window_exec_with_partition, check_integrity, coalesce_partitions_exec,
24-
memory_exec, parquet_exec_with_sort, parquet_exec_with_stats, repartition_exec,
25-
schema, sort_exec, sort_exec_with_preserve_partitioning, sort_merge_join_exec,
23+
check_integrity, coalesce_partitions_exec, parquet_exec_with_sort,
24+
parquet_exec_with_stats, repartition_exec, schema, sort_exec,
25+
sort_exec_with_preserve_partitioning, sort_merge_join_exec,
2626
sort_preserving_merge_exec, union_exec,
2727
};
2828

@@ -37,16 +37,16 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
3737
use datafusion::datasource::physical_plan::{CsvSource, ParquetSource};
3838
use datafusion::datasource::source::DataSourceExec;
3939
use datafusion::prelude::{SessionConfig, SessionContext};
40+
use datafusion_common::ScalarValue;
4041
use datafusion_common::config::CsvOptions;
4142
use datafusion_common::error::Result;
4243
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
43-
use datafusion_common::{NullEquality, ScalarValue};
4444
use datafusion_datasource::file_groups::FileGroup;
4545
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
4646
use datafusion_expr::{JoinType, Operator};
4747
use datafusion_physical_expr::PartitioningSatisfaction;
4848
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, binary, lit};
49-
use datafusion_physical_expr::{Distribution, Partitioning, physical_exprs_equal};
49+
use datafusion_physical_expr::{Distribution, Partitioning};
5050
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
5151
use datafusion_physical_expr_common::sort_expr::{
5252
LexOrdering, OrderingRequirements, PhysicalSortExpr,
@@ -62,12 +62,10 @@ use datafusion_physical_plan::aggregates::{
6262
};
6363
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
6464
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
65-
use datafusion_physical_plan::empty::EmptyExec;
6665
use datafusion_physical_plan::execution_plan::ExecutionPlan;
6766
use datafusion_physical_plan::expressions::col;
6867
use datafusion_physical_plan::filter::FilterExec;
6968
use datafusion_physical_plan::joins::utils::JoinOn;
70-
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
7169
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
7270
use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
7371
use datafusion_physical_plan::repartition::RepartitionExec;
@@ -1113,94 +1111,6 @@ fn multi_hash_join_key_ordering() -> Result<()> {
11131111
Ok(())
11141112
}
11151113

1116-
#[test]
1117-
fn enforce_distribution_repartitions_superset_hash_keys() -> Result<()> {
1118-
let schema = Arc::new(Schema::new(vec![
1119-
Field::new("region", DataType::Utf8, true),
1120-
Field::new("ts", DataType::Int64, true),
1121-
]));
1122-
let left = memory_exec(&schema);
1123-
let right = memory_exec(&schema);
1124-
1125-
let left_superset = Arc::new(RepartitionExec::try_new(
1126-
Arc::clone(&left),
1127-
Partitioning::Hash(
1128-
vec![
1129-
Arc::new(Column::new_with_schema("region", &schema).unwrap()),
1130-
Arc::new(Column::new_with_schema("ts", &schema).unwrap()),
1131-
],
1132-
4,
1133-
),
1134-
)?);
1135-
1136-
let right_partitioned = Arc::new(RepartitionExec::try_new(
1137-
right,
1138-
Partitioning::Hash(
1139-
vec![Arc::new(Column::new_with_schema("ts", &schema).unwrap())],
1140-
4,
1141-
),
1142-
)?);
1143-
1144-
let on: JoinOn = vec![(
1145-
Arc::new(Column::new_with_schema("ts", &schema).unwrap()),
1146-
Arc::new(Column::new_with_schema("ts", &schema).unwrap()),
1147-
)];
1148-
1149-
let join = Arc::new(HashJoinExec::try_new(
1150-
left_superset,
1151-
right_partitioned,
1152-
on,
1153-
None,
1154-
&JoinType::Inner,
1155-
None,
1156-
PartitionMode::Partitioned,
1157-
NullEquality::NullEqualsNothing,
1158-
)?);
1159-
1160-
let mut config = ConfigOptions::new();
1161-
config.execution.target_partitions = 4;
1162-
1163-
let optimized = EnforceDistribution::new().optimize(join, &config)?;
1164-
let join = optimized
1165-
.as_any()
1166-
.downcast_ref::<HashJoinExec>()
1167-
.expect("expected hash join");
1168-
1169-
// The optimizer should recognize that the left side has a superset partitioning
1170-
// (partition on both "region" and "ts") when the join only requires "ts".
1171-
// The optimizer should replace the superset repartition with one that matches
1172-
// the join requirement exactly (just "ts").
1173-
let repartition = join
1174-
.left()
1175-
.as_any()
1176-
.downcast_ref::<RepartitionExec>()
1177-
.expect("left side should be repartitioned");
1178-
1179-
match repartition.partitioning() {
1180-
Partitioning::Hash(exprs, partitions) => {
1181-
assert_eq!(*partitions, 4);
1182-
let expected =
1183-
vec![Arc::new(Column::new_with_schema("ts", &schema).unwrap()) as _];
1184-
assert!(
1185-
physical_exprs_equal(exprs, &expected),
1186-
"expected repartitioning on [ts]"
1187-
);
1188-
}
1189-
other => panic!("expected hash repartitioning, got {other:?}"),
1190-
}
1191-
1192-
// The optimizer should have removed the unnecessary superset repartition
1193-
// and directly partitioned the source on the required keys.
1194-
// This is a better optimization than keeping the superset.
1195-
let input = repartition.input();
1196-
assert!(
1197-
input.as_any().downcast_ref::<RepartitionExec>().is_none(),
1198-
"optimizer should remove the superset repartition, not stack another on top"
1199-
);
1200-
1201-
Ok(())
1202-
}
1203-
12041114
#[test]
12051115
fn reorder_join_keys_to_left_input() -> Result<()> {
12061116
let left = parquet_exec();
@@ -3746,90 +3656,6 @@ fn distribution_satisfaction_superset_hash_matches_sanity_check() -> Result<()>
37463656
)
37473657
}
37483658

3749-
#[test]
3750-
fn single_partition_join_requires_repartition() -> Result<()> {
3751-
let left = parquet_exec();
3752-
let right = parquet_exec();
3753-
let join_on = vec![(
3754-
Arc::new(Column::new_with_schema("a", &schema()).unwrap()) as _,
3755-
Arc::new(Column::new_with_schema("a", &schema()).unwrap()) as _,
3756-
)];
3757-
3758-
let plan = hash_join_exec(left, right, &join_on, &JoinType::Inner);
3759-
let config = TestConfig::default().with_query_execution_partitions(16);
3760-
let optimized = config.to_plan(plan, &[Run::Distribution]);
3761-
3762-
assert_plan!(
3763-
optimized,
3764-
@r"
3765-
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)]
3766-
RepartitionExec: partitioning=Hash([a@0], 16), input_partitions=1
3767-
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
3768-
RepartitionExec: partitioning=Hash([a@0], 16), input_partitions=1
3769-
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
3770-
"
3771-
);
3772-
3773-
Ok(())
3774-
}
3775-
3776-
#[test]
3777-
fn single_partition_window_partition_skips_repartition() -> Result<()> {
3778-
let schema = schema();
3779-
let sort_exprs = vec![PhysicalSortExpr {
3780-
expr: col("a", &schema)?,
3781-
options: SortOptions::default(),
3782-
}];
3783-
let partition_by = vec![col("b", &schema)?];
3784-
3785-
let window_plan = bounded_window_exec_with_partition(
3786-
"c",
3787-
sort_exprs.clone(),
3788-
&partition_by,
3789-
parquet_exec(),
3790-
);
3791-
let config = TestConfig::default().with_query_execution_partitions(12);
3792-
let optimized = config.to_plan(window_plan, &[Run::Distribution, Run::Sorting]);
3793-
3794-
assert_plan!(
3795-
optimized,
3796-
@r#"
3797-
BoundedWindowAggExec: wdw=[count: Field { "count": Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
3798-
SortExec: expr=[b@1 ASC NULLS LAST, a@0 ASC], preserve_partitioning=[false]
3799-
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
3800-
"#
3801-
);
3802-
3803-
Ok(())
3804-
}
3805-
3806-
#[test]
3807-
fn grouped_union_from_single_partition_requires_repartition() -> Result<()> {
3808-
let union = union_exec(vec![
3809-
parquet_exec(),
3810-
Arc::new(EmptyExec::new(schema()).with_partitions(0)),
3811-
]);
3812-
let aggregated =
3813-
aggregate_exec_with_alias(union, vec![("a".to_string(), "group_a".to_string())]);
3814-
let mut config = TestConfig::default().with_query_execution_partitions(10);
3815-
config.config.optimizer.enable_round_robin_repartition = false;
3816-
let optimized = config.to_plan(aggregated, &[Run::Distribution]);
3817-
3818-
assert_plan!(
3819-
optimized,
3820-
@r"
3821-
AggregateExec: mode=FinalPartitioned, gby=[group_a@0 as group_a], aggr=[]
3822-
RepartitionExec: partitioning=Hash([group_a@0], 10), input_partitions=1
3823-
AggregateExec: mode=Partial, gby=[a@0 as group_a], aggr=[]
3824-
UnionExec
3825-
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
3826-
EmptyExec
3827-
"
3828-
);
3829-
3830-
Ok(())
3831-
}
3832-
38333659
fn assert_hash_satisfaction_alignment(
38343660
partitioning_exprs: Vec<Arc<dyn PhysicalExpr>>,
38353661
required_exprs: Vec<Arc<dyn PhysicalExpr>>,
@@ -3927,8 +3753,12 @@ fn test_replace_order_preserving_variants_with_fetch() -> Result<()> {
39273753

39283754
#[tokio::test]
39293755
async fn repartition_between_chained_aggregates() -> Result<()> {
3930-
// Build a two-partition, empty MemTable with the expected schema to mimic the
3931-
// reported failing plan: Sort -> Aggregate(ts, region) -> Sort -> Aggregate(ts).
3756+
// Regression test for issue #18989: Sanity Check Failure in Multi-Partitioned Tables
3757+
// with Aggregations. This test ensures the optimizer properly handles chained aggregations
3758+
// with different grouping keys (first aggregation groups by [ts, region], second by [ts] only).
3759+
// The plan: Sort -> Aggregate(ts, region) -> Sort -> Aggregate(ts).
3760+
// The optimizer must either insert a repartition between aggregates or maintain a single
3761+
// partition stream to avoid distribution requirement mismatches.
39323762
use arrow::datatypes::{DataType, Field, Schema};
39333763
use arrow::record_batch::RecordBatch;
39343764
use datafusion_expr::col;
@@ -3967,7 +3797,6 @@ async fn repartition_between_chained_aggregates() -> Result<()> {
39673797
// The optimizer should either keep the stream single-partitioned via the
39683798
// sort-preserving merge, or insert a repartition between the two aggregates
39693799
// so that the second grouping sees a consistent hash distribution.
3970-
// This test is similar to the reproducer case in #18989
39713800
let plan_display = displayable(physical_plan.as_ref()).indent(true).to_string();
39723801
let has_repartition =
39733802
plan_display.contains("RepartitionExec: partitioning=Hash([ts@0], 2)");

0 commit comments

Comments
 (0)