Skip to content

Commit 670eff3

Browse files
committed
test: demonstrate EnforceSorting can remove a needed coalesce
1 parent abb7669 commit 670eff3

File tree

4 files changed

+139
-4
lines changed

4 files changed

+139
-4
lines changed

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ fn csv_exec_multiple_sorted(output_ordering: Vec<LexOrdering>) -> Arc<DataSource
220220
.new_exec()
221221
}
222222

223-
fn projection_exec_with_alias(
223+
pub(crate) fn projection_exec_with_alias(
224224
input: Arc<dyn ExecutionPlan>,
225225
alias_pairs: Vec<(String, String)>,
226226
) -> Arc<dyn ExecutionPlan> {

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@
1717

1818
use std::sync::Arc;
1919

20+
use crate::physical_optimizer::enforce_distribution::projection_exec_with_alias;
21+
use crate::physical_optimizer::sanity_checker::{
22+
assert_sanity_check, assert_sanity_check_err,
23+
};
2024
use crate::physical_optimizer::test_utils::{
2125
aggregate_exec, bounded_window_exec, bounded_window_exec_non_set_monotonic,
2226
bounded_window_exec_with_partition, check_integrity, coalesce_batches_exec,
2327
coalesce_partitions_exec, create_test_schema, create_test_schema2,
2428
create_test_schema3, create_test_schema4, filter_exec, global_limit_exec,
2529
hash_join_exec, limit_exec, local_limit_exec, memory_exec, parquet_exec,
26-
repartition_exec, sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec,
30+
parquet_exec_with_stats, repartition_exec, schema, single_partitioned_aggregate,
31+
sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec,
2732
sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch,
2833
spr_repartition_exec, stream_exec_ordered, union_exec, RequirementsTestExec,
2934
};
@@ -2280,3 +2285,62 @@ async fn test_not_replaced_with_partial_sort_for_unbounded_input() -> Result<()>
22802285
assert_optimized!(expected_input, expected_no_change, physical_plan, true);
22812286
Ok(())
22822287
}
2288+
2289+
#[tokio::test]
2290+
async fn test_preserve_needed_coalesce() -> Result<()> {
2291+
// Input to EnforceSorting, from our test case.
2292+
let plan = projection_exec_with_alias(
2293+
union_exec(vec![parquet_exec_with_stats(); 2]),
2294+
vec![
2295+
("a".to_string(), "a".to_string()),
2296+
("b".to_string(), "value".to_string()),
2297+
],
2298+
);
2299+
let plan = Arc::new(CoalescePartitionsExec::new(plan));
2300+
let schema = schema();
2301+
let sort_key = LexOrdering::new(vec![PhysicalSortExpr {
2302+
expr: col("a", &schema).unwrap(),
2303+
options: SortOptions::default(),
2304+
}]);
2305+
let plan: Arc<dyn ExecutionPlan> =
2306+
single_partitioned_aggregate(plan, vec![("a".to_string(), "a1".to_string())]);
2307+
let plan = sort_exec(sort_key, plan);
2308+
2309+
// Starting plan: as in our test case.
2310+
assert_eq!(
2311+
get_plan_string(&plan),
2312+
vec![
2313+
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
2314+
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
2315+
" CoalescePartitionsExec",
2316+
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
2317+
" UnionExec",
2318+
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
2319+
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
2320+
],
2321+
);
2322+
// Test: plan is valid.
2323+
assert_sanity_check(&plan, true);
2324+
2325+
// EnforceSorting will remove the coalesce, and add an SPM further up (above the aggregate).
2326+
let optimizer = EnforceSorting::new();
2327+
let optimized = optimizer.optimize(plan, &Default::default())?;
2328+
assert_eq!(
2329+
get_plan_string(&optimized),
2330+
vec![
2331+
"SortPreservingMergeExec: [a@0 ASC]",
2332+
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
2333+
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
2334+
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
2335+
" UnionExec",
2336+
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
2337+
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
2338+
],
2339+
);
2340+
2341+
// Bug: Plan is now invalid.
2342+
let err = "does not satisfy distribution requirements: HashPartitioned[[a@0]]). Child-0 output partitioning: UnknownPartitioning(2)";
2343+
assert_sanity_check_err(&optimized, err);
2344+
2345+
Ok(())
2346+
}

datafusion/core/tests/physical_optimizer/sanity_checker.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ fn create_test_schema2() -> SchemaRef {
388388
}
389389

390390
/// Check if sanity checker should accept or reject plans.
391-
fn assert_sanity_check(plan: &Arc<dyn ExecutionPlan>, is_sane: bool) {
391+
pub(crate) fn assert_sanity_check(plan: &Arc<dyn ExecutionPlan>, is_sane: bool) {
392392
let sanity_checker = SanityCheckPlan::new();
393393
let opts = ConfigOptions::default();
394394
assert_eq!(
@@ -397,6 +397,14 @@ fn assert_sanity_check(plan: &Arc<dyn ExecutionPlan>, is_sane: bool) {
397397
);
398398
}
399399

400+
/// Assert reason for sanity check failure.
401+
pub(crate) fn assert_sanity_check_err(plan: &Arc<dyn ExecutionPlan>, err: &str) {
402+
let sanity_checker = SanityCheckPlan::new();
403+
let opts = ConfigOptions::default();
404+
let error = sanity_checker.optimize(plan.clone(), &opts).unwrap_err();
405+
assert!(error.message().contains(err));
406+
}
407+
400408
/// Check if the plan we created is as expected by comparing the plan
401409
/// formatted as a string.
402410
fn assert_plan(plan: &dyn ExecutionPlan, expected_lines: Vec<&str>) {

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@ use arrow::record_batch::RecordBatch;
2828
use datafusion::datasource::listing::PartitionedFile;
2929
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
3030
use datafusion_common::config::ConfigOptions;
31+
use datafusion_common::stats::Precision;
3132
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
3233
use datafusion_common::utils::expr::COUNT_STAR_EXPANSION;
33-
use datafusion_common::{JoinType, Result};
34+
use datafusion_common::{ColumnStatistics, JoinType, Result, Statistics};
3435
use datafusion_execution::object_store::ObjectStoreUrl;
3536
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
3637
use datafusion_expr::{WindowFrame, WindowFunctionDefinition};
@@ -102,6 +103,44 @@ pub fn schema() -> SchemaRef {
102103
]))
103104
}
104105

106+
fn int64_stats() -> ColumnStatistics {
107+
ColumnStatistics {
108+
null_count: Precision::Absent,
109+
sum_value: Precision::Absent,
110+
max_value: Precision::Exact(1_000_000.into()),
111+
min_value: Precision::Exact(0.into()),
112+
distinct_count: Precision::Absent,
113+
}
114+
}
115+
116+
fn column_stats() -> Vec<ColumnStatistics> {
117+
vec![
118+
int64_stats(), // a
119+
int64_stats(), // b
120+
int64_stats(), // c
121+
ColumnStatistics::default(),
122+
ColumnStatistics::default(),
123+
]
124+
}
125+
126+
/// Create parquet datasource exec using schema from [`schema`].
127+
pub(crate) fn parquet_exec_with_stats() -> Arc<DataSourceExec> {
128+
let mut statistics = Statistics::new_unknown(&schema());
129+
statistics.num_rows = Precision::Inexact(10);
130+
statistics.column_statistics = column_stats();
131+
132+
let config = FileScanConfig::new(
133+
ObjectStoreUrl::parse("test:///").unwrap(),
134+
schema(),
135+
Arc::new(ParquetSource::new(Default::default())),
136+
)
137+
.with_file(PartitionedFile::new("x".to_string(), 10000))
138+
.with_statistics(statistics);
139+
assert_eq!(config.statistics.num_rows, Precision::Inexact(10));
140+
141+
config.new_exec()
142+
}
143+
105144
pub fn create_test_schema() -> Result<SchemaRef> {
106145
let nullable_column = Field::new("nullable_col", DataType::Int32, true);
107146
let non_nullable_column = Field::new("non_nullable_col", DataType::Int32, false);
@@ -575,6 +614,30 @@ pub fn build_group_by(input_schema: &SchemaRef, columns: Vec<String>) -> Physica
575614
PhysicalGroupBy::new_single(group_by_expr.clone())
576615
}
577616

617+
pub(crate) fn single_partitioned_aggregate(
618+
input: Arc<dyn ExecutionPlan>,
619+
alias_pairs: Vec<(String, String)>,
620+
) -> Arc<dyn ExecutionPlan> {
621+
let schema = schema();
622+
let group_by = alias_pairs
623+
.iter()
624+
.map(|(column, alias)| (col(column, &input.schema()).unwrap(), alias.to_string()))
625+
.collect::<Vec<_>>();
626+
let group_by = PhysicalGroupBy::new_single(group_by);
627+
628+
Arc::new(
629+
AggregateExec::try_new(
630+
AggregateMode::SinglePartitioned,
631+
group_by,
632+
vec![],
633+
vec![],
634+
input,
635+
schema,
636+
)
637+
.unwrap(),
638+
)
639+
}
640+
578641
pub fn assert_plan_matches_expected(
579642
plan: &Arc<dyn ExecutionPlan>,
580643
expected: &[&str],

0 commit comments

Comments
 (0)