From 221645eeb80b0fc5369707ee7b6eed2272f3c81b Mon Sep 17 00:00:00 2001 From: blaginin Date: Wed, 8 Oct 2025 12:11:53 +0100 Subject: [PATCH 1/6] Move `test_replace_multiple_input_repartition_1` --- .../replace_with_order_preserving_variants.rs | 191 +++++++++++++----- 1 file changed, 144 insertions(+), 47 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs index a1134c3d83ed..c77ebb77d749 100644 --- a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs @@ -29,6 +29,7 @@ use arrow::array::{ArrayRef, Int32Array}; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; +use insta::{allow_duplicates, assert_snapshot}; use datafusion_common::tree_node::{TransformedResult, TreeNode}; use datafusion_common::{assert_contains, NullEquality, Result}; use datafusion_common::config::ConfigOptions; @@ -54,6 +55,97 @@ use object_store::ObjectStore; use rstest::rstest; use url::Url; +struct ReplaceTest { + plan: Arc, + source_unbounded: bool, + prefer_existing_sort: bool, +} + +impl ReplaceTest { + fn new(plan: Arc) -> Self { + Self { + plan, + source_unbounded: false, + prefer_existing_sort: false, + } + } + + // Set whether the source is unbounded + fn with_source_unbounded(mut self, source_unbounded: bool) -> Self { + self.source_unbounded = source_unbounded; + self + } + + fn with_prefer_existing_sort(mut self, prefer_existing_sort: bool) -> Self { + self.prefer_existing_sort = prefer_existing_sort; + self + } + + async fn execute_plan(&self) -> String { + let mut config = ConfigOptions::new(); + config.optimizer.prefer_existing_sort = self.prefer_existing_sort; + + let plan_with_pipeline_fixer = OrderPreservationContext::new_default( + self.plan.clone().reset_state().unwrap(), + ); + + let parallel = plan_with_pipeline_fixer + .transform_up(|plan_with_pipeline_fixer| { + replace_with_order_preserving_variants( + plan_with_pipeline_fixer, + false, + false, + &config, + ) + }) + .data() + .and_then(check_integrity) + .unwrap(); + + let optimized_physical_plan = parallel.plan; + let optimized_plan_string = displayable(optimized_physical_plan.as_ref()) + .indent(true) + .to_string(); + + if !self.source_unbounded { + let ctx = SessionContext::new(); + let object_store = InMemory::new(); + object_store + .put( + &object_store::path::Path::from("file_path"), + bytes::Bytes::from("").into(), + ) + .await + .expect("could not create object store"); + ctx.register_object_store( + &Url::parse("test://").unwrap(), + Arc::new(object_store), + ); + let task_ctx = Arc::new(TaskContext::from(&ctx)); + let res = collect(optimized_physical_plan, task_ctx).await; + assert!( + res.is_ok(), + "Some errors occurred while executing the optimized physical plan: {:?}\nPlan: {}", + res.unwrap_err(), optimized_plan_string + ); + } + + optimized_plan_string + } + + async fn run(&self) -> String { + let input_plan_string = displayable(self.plan.as_ref()).indent(true).to_string(); + + let optimized = self.execute_plan().await; + + if input_plan_string == optimized { + format!("Input / Optimized:\n{}", input_plan_string) + } else { + format!("Input:\n{}\nOptimized:\n{}", input_plan_string, optimized) + } + } +} + /// Runs the `replace_with_order_preserving_variants` sub-rule and asserts /// the plan against the original and expected plans. /// @@ -200,54 +292,59 @@ async fn test_replace_multiple_input_repartition_1( let sort = sort_exec_with_preserve_partitioning(sort_exprs.clone(), repartition); let physical_plan = sort_preserving_merge_exec(sort_exprs, sort); - // Expected inputs unbounded and bounded - let expected_input_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - - // Expected unbounded result (same for with and without flag) - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; + let run = ReplaceTest::new(physical_plan) + .with_source_unbounded(source_unbounded) + .with_prefer_existing_sort(prefer_existing_sort); + + let physical_plan = run.run().await; + + allow_duplicates! { + match (source_unbounded, prefer_existing_sort) { + (false, false) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + }, + (true, _) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + "); + }, + (false, true) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + } + } + } - // Expected bounded results with and without flag - let expected_optimized_bounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - let expected_optimized_bounded_sort_preserve = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - assert_optimized_in_all_boundedness_situations!( - expected_input_unbounded, - expected_input_bounded, - expected_optimized_unbounded, - expected_optimized_bounded, - expected_optimized_bounded_sort_preserve, - physical_plan, - source_unbounded, - prefer_existing_sort - ); Ok(()) } From 5b4c483686ef334099952527246b155e5bf251f0 Mon Sep 17 00:00:00 2001 From: blaginin Date: Wed, 8 Oct 2025 12:27:57 +0100 Subject: [PATCH 2/6] Migrate replace_with_order_preserving_variants tests to snapshot-based testing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Converted all tests in replace_with_order_preserving_variants.rs from macro-based assertions to snapshot-based testing using the insta crate. Key changes: - Replaced assert_optimized_in_all_boundedness_situations! and related macros with ReplaceTest helper struct - Used allow_duplicates! and assert_snapshot! for cleaner test assertions - Match order follows original test parameter ordering: (true, _), (false, false), (false, true) - All snapshots match the original expected plans from the macros - Removed unused imports and macros 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../replace_with_order_preserving_variants.rs | 1315 ++++++++--------- 1 file changed, 593 insertions(+), 722 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs index c77ebb77d749..b93cde997f1f 100644 --- a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs @@ -47,7 +47,7 @@ use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; use datafusion::datasource::memory::MemorySourceConfig; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::{ - collect, displayable, get_plan_string, ExecutionPlan, Partitioning, + collect, displayable, ExecutionPlan, Partitioning, }; use object_store::memory::InMemory; @@ -146,134 +146,6 @@ impl ReplaceTest { } } -/// Runs the `replace_with_order_preserving_variants` sub-rule and asserts -/// the plan against the original and expected plans. -/// -/// # Parameters -/// -/// * `$EXPECTED_PLAN_LINES`: Expected input plan. -/// * `EXPECTED_OPTIMIZED_PLAN_LINES`: Optimized plan when the flag -/// `prefer_existing_sort` is `false`. -/// * `EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES`: Optimized plan when -/// the flag `prefer_existing_sort` is `true`. -/// * `$PLAN`: The plan to optimize. -macro_rules! assert_optimized_prefer_sort_on_off { - ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $PREFER_EXISTING_SORT: expr, $SOURCE_UNBOUNDED: expr) => { - if $PREFER_EXISTING_SORT { - assert_optimized!( - $EXPECTED_PLAN_LINES, - $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES, - $PLAN, - $PREFER_EXISTING_SORT, - $SOURCE_UNBOUNDED - ); - } else { - assert_optimized!( - $EXPECTED_PLAN_LINES, - $EXPECTED_OPTIMIZED_PLAN_LINES, - $PLAN, - $PREFER_EXISTING_SORT, - $SOURCE_UNBOUNDED - ); - } - }; -} - -/// Runs the `replace_with_order_preserving_variants` sub-rule and asserts -/// the plan against the original and expected plans for both bounded and -/// unbounded cases. -/// -/// # Parameters -/// -/// * `EXPECTED_UNBOUNDED_PLAN_LINES`: Expected input unbounded plan. -/// * `EXPECTED_BOUNDED_PLAN_LINES`: Expected input bounded plan. -/// * `EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES`: Optimized plan, which is -/// the same regardless of the value of the `prefer_existing_sort` flag. -/// * `EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES`: Optimized plan when the flag -/// `prefer_existing_sort` is `false` for bounded cases. -/// * `EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES`: Optimized plan -/// when the flag `prefer_existing_sort` is `true` for bounded cases. -/// * `$PLAN`: The plan to optimize. -/// * `$SOURCE_UNBOUNDED`: Whether the given plan contains an unbounded source. -macro_rules! assert_optimized_in_all_boundedness_situations { - ($EXPECTED_UNBOUNDED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PLAN_LINES: expr, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $SOURCE_UNBOUNDED: expr, $PREFER_EXISTING_SORT: expr) => { - if $SOURCE_UNBOUNDED { - assert_optimized_prefer_sort_on_off!( - $EXPECTED_UNBOUNDED_PLAN_LINES, - $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES, - $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES, - $PLAN, - $PREFER_EXISTING_SORT, - $SOURCE_UNBOUNDED - ); - } else { - assert_optimized_prefer_sort_on_off!( - $EXPECTED_BOUNDED_PLAN_LINES, - $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES, - $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES, - $PLAN, - $PREFER_EXISTING_SORT, - $SOURCE_UNBOUNDED - ); - } - }; -} - -/// Runs the `replace_with_order_preserving_variants` sub-rule and asserts -/// the plan against the original and expected plans. -/// -/// # Parameters -/// -/// * `$EXPECTED_PLAN_LINES`: Expected input plan. -/// * `$EXPECTED_OPTIMIZED_PLAN_LINES`: Expected optimized plan. -/// * `$PLAN`: The plan to optimize. -/// * `$PREFER_EXISTING_SORT`: Value of the `prefer_existing_sort` flag. -#[macro_export] -macro_rules! assert_optimized { - ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $PREFER_EXISTING_SORT: expr, $SOURCE_UNBOUNDED: expr) => { - let physical_plan = $PLAN; - let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - - let expected_plan_lines: Vec<&str> = $EXPECTED_PLAN_LINES - .iter().map(|s| *s).collect(); - - assert_eq!( - expected_plan_lines, actual, - "\n**Original Plan Mismatch\n\nexpected:\n\n{expected_plan_lines:#?}\nactual:\n\n{actual:#?}\n\n" - ); - - let expected_optimized_lines: Vec<&str> = $EXPECTED_OPTIMIZED_PLAN_LINES.iter().map(|s| *s).collect(); - - // Run the rule top-down - let mut config = ConfigOptions::new(); - config.optimizer.prefer_existing_sort=$PREFER_EXISTING_SORT; - let plan_with_pipeline_fixer = OrderPreservationContext::new_default(physical_plan); - let parallel = plan_with_pipeline_fixer.transform_up(|plan_with_pipeline_fixer| replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false, &config)).data().and_then(check_integrity)?; - let optimized_physical_plan = parallel.plan; - - // Get string representation of the plan - let actual = get_plan_string(&optimized_physical_plan); - assert_eq!( - expected_optimized_lines, actual, - "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n" - ); - - if !$SOURCE_UNBOUNDED { - let ctx = SessionContext::new(); - let object_store = InMemory::new(); - object_store.put(&object_store::path::Path::from("file_path"), bytes::Bytes::from("").into()).await?; - ctx.register_object_store(&Url::parse("test://").unwrap(), Arc::new(object_store)); - let task_ctx = Arc::new(TaskContext::from(&ctx)); - let res = collect(optimized_physical_plan, task_ctx).await; - assert!( - res.is_ok(), - "Some errors occurred while executing the optimized physical plan: {:?}", res.unwrap_err() - ); - } - }; - } - #[rstest] #[tokio::test] // Searches for a simple sort and a repartition just after it, the second repartition with 1 input partition should not be affected @@ -372,77 +244,82 @@ async fn test_with_inter_children_change_only( let physical_plan = sort_preserving_merge_exec(ordering, sort2); - // Expected inputs unbounded and bounded - let expected_input_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC]", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC]", - ]; - let expected_input_bounded = [ - "SortPreservingMergeExec: [a@0 ASC]", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC", - ]; - - // Expected unbounded result (same for with and without flag) - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC]", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortPreservingMergeExec: [a@0 ASC]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC]", - ]; - - // Expected bounded results with and without flag - let expected_optimized_bounded = [ - "SortPreservingMergeExec: [a@0 ASC]", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC", - ]; - let expected_optimized_bounded_sort_preserve = [ - "SortPreservingMergeExec: [a@0 ASC]", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortPreservingMergeExec: [a@0 ASC]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC", - ]; - assert_optimized_in_all_boundedness_situations!( - expected_input_unbounded, - expected_input_bounded, - expected_optimized_unbounded, - expected_optimized_bounded, - expected_optimized_bounded_sort_preserve, - physical_plan, - source_unbounded, - prefer_existing_sort - ); + let run = ReplaceTest::new(physical_plan) + .with_source_unbounded(source_unbounded) + .with_prefer_existing_sort(prefer_existing_sort); + + let physical_plan = run.run().await; + + allow_duplicates! { + match (source_unbounded, prefer_existing_sort) { + (true, _) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC] + SortExec: expr=[a@0 ASC], preserve_partitioning=[true] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC] + + Optimized: + SortPreservingMergeExec: [a@0 ASC] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + SortPreservingMergeExec: [a@0 ASC] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC] + "); + }, + (false, false) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [a@0 ASC] + SortExec: expr=[a@0 ASC], preserve_partitioning=[true] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC + "); + }, + (false, true) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC] + SortExec: expr=[a@0 ASC], preserve_partitioning=[true] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC + + Optimized: + SortPreservingMergeExec: [a@0 ASC] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + SortPreservingMergeExec: [a@0 ASC] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC + "); + } + } + } + Ok(()) } @@ -465,59 +342,64 @@ async fn test_replace_multiple_input_repartition_2( let sort = sort_exec_with_preserve_partitioning(ordering.clone(), repartition_hash); let physical_plan = sort_preserving_merge_exec(ordering, sort); - // Expected inputs unbounded and bounded - let expected_input_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - - // Expected unbounded result (same for with and without flag) - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - - // Expected bounded results with and without flag - let expected_optimized_bounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - let expected_optimized_bounded_sort_preserve = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - assert_optimized_in_all_boundedness_situations!( - expected_input_unbounded, - expected_input_bounded, - expected_optimized_unbounded, - expected_optimized_bounded, - expected_optimized_bounded_sort_preserve, - physical_plan, - source_unbounded, - prefer_existing_sort - ); + let run = ReplaceTest::new(physical_plan) + .with_source_unbounded(source_unbounded) + .with_prefer_existing_sort(prefer_existing_sort); + + let physical_plan = run.run().await; + + allow_duplicates! { + match (source_unbounded, prefer_existing_sort) { + (true, _) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + FilterExec: c@1 > 3 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + "); + }, + (false, false) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + }, + (false, true) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + FilterExec: c@1 > 3 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + } + } + } + Ok(()) } @@ -542,64 +424,69 @@ async fn test_replace_multiple_input_repartition_with_extra_steps( sort_exec_with_preserve_partitioning(ordering.clone(), coalesce_batches_exec); let physical_plan = sort_preserving_merge_exec(ordering, sort); - // Expected inputs unbounded and bounded - let expected_input_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - - // Expected unbounded result (same for with and without flag) - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - - // Expected bounded results with and without flag - let expected_optimized_bounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - let expected_optimized_bounded_sort_preserve = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - assert_optimized_in_all_boundedness_situations!( - expected_input_unbounded, - expected_input_bounded, - expected_optimized_unbounded, - expected_optimized_bounded, - expected_optimized_bounded_sort_preserve, - physical_plan, - source_unbounded, - prefer_existing_sort - ); + let run = ReplaceTest::new(physical_plan) + .with_source_unbounded(source_unbounded) + .with_prefer_existing_sort(prefer_existing_sort); + + let physical_plan = run.run().await; + + allow_duplicates! { + match (source_unbounded, prefer_existing_sort) { + (true, _) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + "); + }, + (false, false) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + }, + (false, true) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + } + } + } + Ok(()) } @@ -625,69 +512,74 @@ async fn test_replace_multiple_input_repartition_with_extra_steps_2( sort_exec_with_preserve_partitioning(ordering.clone(), coalesce_batches_exec_2); let physical_plan = sort_preserving_merge_exec(ordering, sort); - // Expected inputs unbounded and bounded - let expected_input_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " CoalesceBatchesExec: target_batch_size=8192", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " CoalesceBatchesExec: target_batch_size=8192", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - - // Expected unbounded result (same for with and without flag) - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " CoalesceBatchesExec: target_batch_size=8192", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - - // Expected bounded results with and without flag - let expected_optimized_bounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " CoalesceBatchesExec: target_batch_size=8192", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - let expected_optimized_bounded_sort_preserve = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " CoalesceBatchesExec: target_batch_size=8192", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - assert_optimized_in_all_boundedness_situations!( - expected_input_unbounded, - expected_input_bounded, - expected_optimized_unbounded, - expected_optimized_bounded, - expected_optimized_bounded_sort_preserve, - physical_plan, - source_unbounded, - prefer_existing_sort - ); + let run = ReplaceTest::new(physical_plan) + .with_source_unbounded(source_unbounded) + .with_prefer_existing_sort(prefer_existing_sort); + + let physical_plan = run.run().await; + + allow_duplicates! { + match (source_unbounded, prefer_existing_sort) { + (true, _) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + "); + }, + (false, false) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + }, + (false, true) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + } + } + } + Ok(()) } @@ -710,55 +602,50 @@ async fn test_not_replacing_when_no_need_to_preserve_sorting( let coalesce_batches_exec = coalesce_batches_exec(filter, 8192); let physical_plan = coalesce_partitions_exec(coalesce_batches_exec); - // Expected inputs unbounded and bounded - let expected_input_unbounded = [ - "CoalescePartitionsExec", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ - "CoalescePartitionsExec", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - - // Expected unbounded result (same for with and without flag) - let expected_optimized_unbounded = [ - "CoalescePartitionsExec", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - - // Expected bounded results same with and without flag, because there is no executor with ordering requirement - let expected_optimized_bounded = [ - "CoalescePartitionsExec", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - let expected_optimized_bounded_sort_preserve = expected_optimized_bounded; - - assert_optimized_in_all_boundedness_situations!( - expected_input_unbounded, - expected_input_bounded, - expected_optimized_unbounded, - expected_optimized_bounded, - expected_optimized_bounded_sort_preserve, - physical_plan, - source_unbounded, - prefer_existing_sort - ); + let run = ReplaceTest::new(physical_plan) + .with_source_unbounded(source_unbounded) + .with_prefer_existing_sort(prefer_existing_sort); + + let physical_plan = run.run().await; + + allow_duplicates! { + match (source_unbounded, prefer_existing_sort) { + (true, _) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + CoalescePartitionsExec + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + "); + }, + (false, false) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + CoalescePartitionsExec + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + }, + (false, true) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + CoalescePartitionsExec + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + } + } + } + Ok(()) } @@ -783,69 +670,74 @@ async fn test_with_multiple_replaceable_repartitions( let sort = sort_exec_with_preserve_partitioning(ordering.clone(), repartition_hash_2); let physical_plan = sort_preserving_merge_exec(ordering, sort); - // Expected inputs unbounded and bounded - let expected_input_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - - // Expected unbounded result (same for with and without flag) - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - - // Expected bounded results with and without flag - let expected_optimized_bounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - let expected_optimized_bounded_sort_preserve = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - assert_optimized_in_all_boundedness_situations!( - expected_input_unbounded, - expected_input_bounded, - expected_optimized_unbounded, - expected_optimized_bounded, - expected_optimized_bounded_sort_preserve, - physical_plan, - source_unbounded, - prefer_existing_sort - ); + let run = ReplaceTest::new(physical_plan) + .with_source_unbounded(source_unbounded) + .with_prefer_existing_sort(prefer_existing_sort); + + let physical_plan = run.run().await; + + allow_duplicates! { + match (source_unbounded, prefer_existing_sort) { + (true, _) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + "); + }, + (false, false) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + }, + (false, true) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + } + } + } + Ok(()) } @@ -871,51 +763,47 @@ async fn test_not_replace_with_different_orderings( let sort = sort_exec_with_preserve_partitioning(ordering_c.clone(), repartition_hash); let physical_plan = sort_preserving_merge_exec(ordering_c, sort); - // Expected inputs unbounded and bounded - let expected_input_unbounded = [ - "SortPreservingMergeExec: [c@1 ASC]", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ - "SortPreservingMergeExec: [c@1 ASC]", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - - // Expected unbounded result (same for with and without flag) - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [c@1 ASC]", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - - // Expected bounded results same with and without flag, because ordering requirement of the executor is different than the existing ordering. - let expected_optimized_bounded = [ - "SortPreservingMergeExec: [c@1 ASC]", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - let expected_optimized_bounded_sort_preserve = expected_optimized_bounded; - - assert_optimized_in_all_boundedness_situations!( - expected_input_unbounded, - expected_input_bounded, - expected_optimized_unbounded, - expected_optimized_bounded, - expected_optimized_bounded_sort_preserve, - physical_plan, - source_unbounded, - prefer_existing_sort - ); + let run = ReplaceTest::new(physical_plan) + .with_source_unbounded(source_unbounded) + .with_prefer_existing_sort(prefer_existing_sort); + + let physical_plan = run.run().await; + + allow_duplicates! { + match (source_unbounded, prefer_existing_sort) { + (true, _) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [c@1 ASC] + SortExec: expr=[c@1 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + "); + }, + (false, false) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [c@1 ASC] + SortExec: expr=[c@1 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + }, + (false, true) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [c@1 ASC] + SortExec: expr=[c@1 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + } + } + } + Ok(()) } @@ -937,54 +825,59 @@ async fn test_with_lost_ordering( let coalesce_partitions = coalesce_partitions_exec(repartition_hash); let physical_plan = sort_exec(ordering, coalesce_partitions); - // Expected inputs unbounded and bounded - let expected_input_unbounded = [ - "SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ - "SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - - // Expected unbounded result (same for with and without flag) - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - - // Expected bounded results with and without flag - let expected_optimized_bounded = [ - "SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - let expected_optimized_bounded_sort_preserve = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - assert_optimized_in_all_boundedness_situations!( - expected_input_unbounded, - expected_input_bounded, - expected_optimized_unbounded, - expected_optimized_bounded, - expected_optimized_bounded_sort_preserve, - physical_plan, - source_unbounded, - prefer_existing_sort - ); + let run = ReplaceTest::new(physical_plan) + .with_source_unbounded(source_unbounded) + .with_prefer_existing_sort(prefer_existing_sort); + + let physical_plan = run.run().await; + + allow_duplicates! { + match (source_unbounded, prefer_existing_sort) { + (true, _) => { + assert_snapshot!(physical_plan, @r" + Input: + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + "); + }, + (false, false) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + }, + (false, true) => { + assert_snapshot!(physical_plan, @r" + Input: + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + } + } + } + Ok(()) } @@ -1015,79 +908,84 @@ async fn test_with_lost_and_kept_ordering( let sort2 = sort_exec_with_preserve_partitioning(ordering_c.clone(), filter); let physical_plan = sort_preserving_merge_exec(ordering_c, sort2); - // Expected inputs unbounded and bounded - let expected_input_unbounded = [ - "SortPreservingMergeExec: [c@1 ASC]", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[true]", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ - "SortPreservingMergeExec: [c@1 ASC]", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[true]", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - - // Expected unbounded result (same for with and without flag) - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [c@1 ASC]", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@1 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - - // Expected bounded results with and without flag - let expected_optimized_bounded = [ - "SortPreservingMergeExec: [c@1 ASC]", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[true]", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - let expected_optimized_bounded_sort_preserve = [ - "SortPreservingMergeExec: [c@1 ASC]", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@1 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - assert_optimized_in_all_boundedness_situations!( - expected_input_unbounded, - expected_input_bounded, - expected_optimized_unbounded, - expected_optimized_bounded, - expected_optimized_bounded_sort_preserve, - physical_plan, - source_unbounded, - prefer_existing_sort - ); + let run = ReplaceTest::new(physical_plan) + .with_source_unbounded(source_unbounded) + .with_prefer_existing_sort(prefer_existing_sort); + + let physical_plan = run.run().await; + + allow_duplicates! { + match (source_unbounded, prefer_existing_sort) { + (true, _) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [c@1 ASC] + SortExec: expr=[c@1 ASC], preserve_partitioning=[true] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + SortExec: expr=[c@1 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + + Optimized: + SortPreservingMergeExec: [c@1 ASC] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@1 ASC + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + SortExec: expr=[c@1 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + "); + }, + (false, false) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [c@1 ASC] + SortExec: expr=[c@1 ASC], preserve_partitioning=[true] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + SortExec: expr=[c@1 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + }, + (false, true) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [c@1 ASC] + SortExec: expr=[c@1 ASC], preserve_partitioning=[true] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + SortExec: expr=[c@1 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + + Optimized: + SortPreservingMergeExec: [c@1 ASC] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@1 ASC + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + SortExec: expr=[c@1 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + } + } + } + Ok(()) } @@ -1127,76 +1025,49 @@ async fn test_with_multiple_child_trees( let sort = sort_exec_with_preserve_partitioning(ordering.clone(), hash_join_exec); let physical_plan = sort_preserving_merge_exec(ordering, sort); - // Expected inputs unbounded and bounded - let expected_input_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC]", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ - "SortPreservingMergeExec: [a@0 ASC]", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - - // Expected unbounded result (same for with and without flag) - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC]", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - - // Expected bounded results same with and without flag, because ordering get lost during intermediate executor anyway. Hence no need to preserve - // existing ordering. - let expected_optimized_bounded = [ - "SortPreservingMergeExec: [a@0 ASC]", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - let expected_optimized_bounded_sort_preserve = expected_optimized_bounded; - - assert_optimized_in_all_boundedness_situations!( - expected_input_unbounded, - expected_input_bounded, - expected_optimized_unbounded, - expected_optimized_bounded, - expected_optimized_bounded_sort_preserve, - physical_plan, - source_unbounded, - prefer_existing_sort - ); + let run = ReplaceTest::new(physical_plan) + .with_source_unbounded(source_unbounded) + .with_prefer_existing_sort(prefer_existing_sort); + + let physical_plan = run.run().await; + + allow_duplicates! { + match (source_unbounded, prefer_existing_sort) { + (true, _) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [a@0 ASC] + SortExec: expr=[a@0 ASC], preserve_partitioning=[true] + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)] + CoalesceBatchesExec: target_batch_size=4096 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + CoalesceBatchesExec: target_batch_size=4096 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + "); + }, + (false, _) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [a@0 ASC] + SortExec: expr=[a@0 ASC], preserve_partitioning=[true] + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)] + CoalesceBatchesExec: target_batch_size=4096 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + CoalesceBatchesExec: target_batch_size=4096 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + } + } + } + Ok(()) } From 56602fc665ea6caef6f87fb8be28773bd876ce9c Mon Sep 17 00:00:00 2001 From: blaginin Date: Wed, 8 Oct 2025 13:13:32 +0100 Subject: [PATCH 3/6] refactor: replace bool test parameters with enums in replace_with_order_preserving_variants tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace boolean test parameters with more descriptive enums: - Add `Boundedness` enum with `Unbounded` and `Bounded` variants - Add `SortPreference` enum with `PreserveOrder` and `MaximizeParallelism` variants - Update `ReplaceTest` struct to use these enums instead of bool fields - Rename methods from `with_source_unbounded`/`with_prefer_existing_sort` to `with_boundedness`/`with_sort_preference` - Update all test functions to use enum parameters instead of bool - Update all match patterns to use enum variants This makes the tests more self-documenting and easier to understand, as the enum variant names clearly indicate what behavior is being tested. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../replace_with_order_preserving_variants.rs | 243 ++++++++++-------- 1 file changed, 133 insertions(+), 110 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs index b93cde997f1f..2853e8acc5b0 100644 --- a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs @@ -55,35 +55,47 @@ use object_store::ObjectStore; use rstest::rstest; use url::Url; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum Boundedness { + Unbounded, + Bounded, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum SortPreference { + PreserveOrder, + MaximizeParallelism, +} + struct ReplaceTest { plan: Arc, - source_unbounded: bool, - prefer_existing_sort: bool, + boundedness: Boundedness, + sort_preference: SortPreference, } impl ReplaceTest { fn new(plan: Arc) -> Self { Self { plan, - source_unbounded: false, - prefer_existing_sort: false, + boundedness: Boundedness::Bounded, + sort_preference: SortPreference::MaximizeParallelism, } } - // Set whether the source is unbounded - fn with_source_unbounded(mut self, source_unbounded: bool) -> Self { - self.source_unbounded = source_unbounded; + fn with_boundedness(mut self, boundedness: Boundedness) -> Self { + self.boundedness = boundedness; self } - fn with_prefer_existing_sort(mut self, prefer_existing_sort: bool) -> Self { - self.prefer_existing_sort = prefer_existing_sort; + fn with_sort_preference(mut self, sort_preference: SortPreference) -> Self { + self.sort_preference = sort_preference; self } async fn execute_plan(&self) -> String { let mut config = ConfigOptions::new(); - config.optimizer.prefer_existing_sort = self.prefer_existing_sort; + config.optimizer.prefer_existing_sort = + self.sort_preference == SortPreference::PreserveOrder; let plan_with_pipeline_fixer = OrderPreservationContext::new_default( self.plan.clone().reset_state().unwrap(), @@ -107,7 +119,7 @@ impl ReplaceTest { .indent(true) .to_string(); - if !self.source_unbounded { + if self.boundedness == Boundedness::Bounded { let ctx = SessionContext::new(); let object_store = InMemory::new(); object_store @@ -150,12 +162,13 @@ impl ReplaceTest { #[tokio::test] // Searches for a simple sort and a repartition just after it, the second repartition with 1 input partition should not be affected async fn test_replace_multiple_input_repartition_1( - #[values(false, true)] source_unbounded: bool, - #[values(false, true)] prefer_existing_sort: bool, + #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: Boundedness, + #[values(SortPreference::PreserveOrder, SortPreference::MaximizeParallelism)] + sort_pref: SortPreference, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs: LexOrdering = [sort_expr("a", &schema)].into(); - let source = if source_unbounded { + let source = if boundedness == Boundedness::Unbounded { stream_exec_ordered_with_projection(&schema, sort_exprs.clone()) } else { memory_exec_sorted(&schema, sort_exprs.clone()) @@ -165,14 +178,14 @@ async fn test_replace_multiple_input_repartition_1( let physical_plan = sort_preserving_merge_exec(sort_exprs, sort); let run = ReplaceTest::new(physical_plan) - .with_source_unbounded(source_unbounded) - .with_prefer_existing_sort(prefer_existing_sort); + .with_boundedness(boundedness) + .with_sort_preference(sort_pref); let physical_plan = run.run().await; allow_duplicates! { - match (source_unbounded, prefer_existing_sort) { - (false, false) => { + match (boundedness, sort_pref) { + (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { assert_snapshot!(physical_plan, @r" Input / Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] @@ -182,7 +195,7 @@ async fn test_replace_multiple_input_repartition_1( DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, - (true, _) => { + (Boundedness::Unbounded, _) => { assert_snapshot!(physical_plan, @r" Input: SortPreservingMergeExec: [a@0 ASC NULLS LAST] @@ -198,7 +211,7 @@ async fn test_replace_multiple_input_repartition_1( StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, - (false, true) => { + (Boundedness::Bounded, SortPreference::PreserveOrder) => { assert_snapshot!(physical_plan, @r" Input: SortPreservingMergeExec: [a@0 ASC NULLS LAST] @@ -223,12 +236,13 @@ async fn test_replace_multiple_input_repartition_1( #[rstest] #[tokio::test] async fn test_with_inter_children_change_only( - #[values(false, true)] source_unbounded: bool, - #[values(false, true)] prefer_existing_sort: bool, + #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: Boundedness, + #[values(SortPreference::PreserveOrder, SortPreference::MaximizeParallelism)] + sort_pref: SortPreference, ) -> Result<()> { let schema = create_test_schema()?; let ordering: LexOrdering = [sort_expr_default("a", &schema)].into(); - let source = if source_unbounded { + let source = if boundedness == Boundedness::Unbounded { stream_exec_ordered_with_projection(&schema, ordering.clone()) } else { memory_exec_sorted(&schema, ordering.clone()) @@ -245,14 +259,14 @@ async fn test_with_inter_children_change_only( let physical_plan = sort_preserving_merge_exec(ordering, sort2); let run = ReplaceTest::new(physical_plan) - .with_source_unbounded(source_unbounded) - .with_prefer_existing_sort(prefer_existing_sort); + .with_boundedness(boundedness) + .with_sort_preference(sort_pref); let physical_plan = run.run().await; allow_duplicates! { - match (source_unbounded, prefer_existing_sort) { - (true, _) => { + match (boundedness, sort_pref) { + (Boundedness::Unbounded, _) => { assert_snapshot!(physical_plan, @r" Input: SortPreservingMergeExec: [a@0 ASC] @@ -277,7 +291,7 @@ async fn test_with_inter_children_change_only( StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC] "); }, - (false, false) => { + (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { assert_snapshot!(physical_plan, @r" Input / Optimized: SortPreservingMergeExec: [a@0 ASC] @@ -292,7 +306,7 @@ async fn test_with_inter_children_change_only( DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC "); }, - (false, true) => { + (Boundedness::Bounded, SortPreference::PreserveOrder) => { assert_snapshot!(physical_plan, @r" Input: SortPreservingMergeExec: [a@0 ASC] @@ -326,12 +340,13 @@ async fn test_with_inter_children_change_only( #[rstest] #[tokio::test] async fn test_replace_multiple_input_repartition_2( - #[values(false, true)] source_unbounded: bool, - #[values(false, true)] prefer_existing_sort: bool, + #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: Boundedness, + #[values(SortPreference::PreserveOrder, SortPreference::MaximizeParallelism)] + sort_pref: SortPreference, ) -> Result<()> { let schema = create_test_schema()?; let ordering: LexOrdering = [sort_expr("a", &schema)].into(); - let source = if source_unbounded { + let source = if boundedness == Boundedness::Unbounded { stream_exec_ordered_with_projection(&schema, ordering.clone()) } else { memory_exec_sorted(&schema, ordering.clone()) @@ -343,14 +358,14 @@ async fn test_replace_multiple_input_repartition_2( let physical_plan = sort_preserving_merge_exec(ordering, sort); let run = ReplaceTest::new(physical_plan) - .with_source_unbounded(source_unbounded) - .with_prefer_existing_sort(prefer_existing_sort); + .with_boundedness(boundedness) + .with_sort_preference(sort_pref); let physical_plan = run.run().await; allow_duplicates! { - match (source_unbounded, prefer_existing_sort) { - (true, _) => { + match (boundedness, sort_pref) { + (Boundedness::Unbounded, _) => { assert_snapshot!(physical_plan, @r" Input: SortPreservingMergeExec: [a@0 ASC NULLS LAST] @@ -368,7 +383,7 @@ async fn test_replace_multiple_input_repartition_2( StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, - (false, false) => { + (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { assert_snapshot!(physical_plan, @r" Input / Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] @@ -379,7 +394,7 @@ async fn test_replace_multiple_input_repartition_2( DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, - (false, true) => { + (Boundedness::Bounded, SortPreference::PreserveOrder) => { assert_snapshot!(physical_plan, @r" Input: SortPreservingMergeExec: [a@0 ASC NULLS LAST] @@ -406,12 +421,13 @@ async fn test_replace_multiple_input_repartition_2( #[rstest] #[tokio::test] async fn test_replace_multiple_input_repartition_with_extra_steps( - #[values(false, true)] source_unbounded: bool, - #[values(false, true)] prefer_existing_sort: bool, + #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: Boundedness, + #[values(SortPreference::PreserveOrder, SortPreference::MaximizeParallelism)] + sort_pref: SortPreference, ) -> Result<()> { let schema = create_test_schema()?; let ordering: LexOrdering = [sort_expr("a", &schema)].into(); - let source = if source_unbounded { + let source = if boundedness == Boundedness::Unbounded { stream_exec_ordered_with_projection(&schema, ordering.clone()) } else { memory_exec_sorted(&schema, ordering.clone()) @@ -425,14 +441,14 @@ async fn test_replace_multiple_input_repartition_with_extra_steps( let physical_plan = sort_preserving_merge_exec(ordering, sort); let run = ReplaceTest::new(physical_plan) - .with_source_unbounded(source_unbounded) - .with_prefer_existing_sort(prefer_existing_sort); + .with_boundedness(boundedness) + .with_sort_preference(sort_pref); let physical_plan = run.run().await; allow_duplicates! { - match (source_unbounded, prefer_existing_sort) { - (true, _) => { + match (boundedness, sort_pref) { + (Boundedness::Unbounded, _) => { assert_snapshot!(physical_plan, @r" Input: SortPreservingMergeExec: [a@0 ASC NULLS LAST] @@ -452,7 +468,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps( StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, - (false, false) => { + (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { assert_snapshot!(physical_plan, @r" Input / Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] @@ -464,7 +480,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps( DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, - (false, true) => { + (Boundedness::Bounded, SortPreference::PreserveOrder) => { assert_snapshot!(physical_plan, @r" Input: SortPreservingMergeExec: [a@0 ASC NULLS LAST] @@ -493,12 +509,13 @@ async fn test_replace_multiple_input_repartition_with_extra_steps( #[rstest] #[tokio::test] async fn test_replace_multiple_input_repartition_with_extra_steps_2( - #[values(false, true)] source_unbounded: bool, - #[values(false, true)] prefer_existing_sort: bool, + #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: Boundedness, + #[values(SortPreference::PreserveOrder, SortPreference::MaximizeParallelism)] + sort_pref: SortPreference, ) -> Result<()> { let schema = create_test_schema()?; let ordering: LexOrdering = [sort_expr("a", &schema)].into(); - let source = if source_unbounded { + let source = if boundedness == Boundedness::Unbounded { stream_exec_ordered_with_projection(&schema, ordering.clone()) } else { memory_exec_sorted(&schema, ordering.clone()) @@ -513,14 +530,14 @@ async fn test_replace_multiple_input_repartition_with_extra_steps_2( let physical_plan = sort_preserving_merge_exec(ordering, sort); let run = ReplaceTest::new(physical_plan) - .with_source_unbounded(source_unbounded) - .with_prefer_existing_sort(prefer_existing_sort); + .with_boundedness(boundedness) + .with_sort_preference(sort_pref); let physical_plan = run.run().await; allow_duplicates! { - match (source_unbounded, prefer_existing_sort) { - (true, _) => { + match (boundedness, sort_pref) { + (Boundedness::Unbounded, _) => { assert_snapshot!(physical_plan, @r" Input: SortPreservingMergeExec: [a@0 ASC NULLS LAST] @@ -542,7 +559,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps_2( StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, - (false, false) => { + (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { assert_snapshot!(physical_plan, @r" Input / Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] @@ -555,7 +572,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps_2( DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, - (false, true) => { + (Boundedness::Bounded, SortPreference::PreserveOrder) => { assert_snapshot!(physical_plan, @r" Input: SortPreservingMergeExec: [a@0 ASC NULLS LAST] @@ -586,12 +603,13 @@ async fn test_replace_multiple_input_repartition_with_extra_steps_2( #[rstest] #[tokio::test] async fn test_not_replacing_when_no_need_to_preserve_sorting( - #[values(false, true)] source_unbounded: bool, - #[values(false, true)] prefer_existing_sort: bool, + #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: Boundedness, + #[values(SortPreference::PreserveOrder, SortPreference::MaximizeParallelism)] + sort_pref: SortPreference, ) -> Result<()> { let schema = create_test_schema()?; let ordering: LexOrdering = [sort_expr("a", &schema)].into(); - let source = if source_unbounded { + let source = if boundedness == Boundedness::Unbounded { stream_exec_ordered_with_projection(&schema, ordering) } else { memory_exec_sorted(&schema, ordering) @@ -603,14 +621,14 @@ async fn test_not_replacing_when_no_need_to_preserve_sorting( let physical_plan = coalesce_partitions_exec(coalesce_batches_exec); let run = ReplaceTest::new(physical_plan) - .with_source_unbounded(source_unbounded) - .with_prefer_existing_sort(prefer_existing_sort); + .with_boundedness(boundedness) + .with_sort_preference(sort_pref); let physical_plan = run.run().await; allow_duplicates! { - match (source_unbounded, prefer_existing_sort) { - (true, _) => { + match (boundedness, sort_pref) { + (Boundedness::Unbounded, _) => { assert_snapshot!(physical_plan, @r" Input / Optimized: CoalescePartitionsExec @@ -621,7 +639,7 @@ async fn test_not_replacing_when_no_need_to_preserve_sorting( StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, - (false, false) => { + (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { assert_snapshot!(physical_plan, @r" Input / Optimized: CoalescePartitionsExec @@ -632,7 +650,7 @@ async fn test_not_replacing_when_no_need_to_preserve_sorting( DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, - (false, true) => { + (Boundedness::Bounded, SortPreference::PreserveOrder) => { assert_snapshot!(physical_plan, @r" Input / Optimized: CoalescePartitionsExec @@ -652,12 +670,13 @@ async fn test_not_replacing_when_no_need_to_preserve_sorting( #[rstest] #[tokio::test] async fn test_with_multiple_replaceable_repartitions( - #[values(false, true)] source_unbounded: bool, - #[values(false, true)] prefer_existing_sort: bool, + #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: Boundedness, + #[values(SortPreference::PreserveOrder, SortPreference::MaximizeParallelism)] + sort_pref: SortPreference, ) -> Result<()> { let schema = create_test_schema()?; let ordering: LexOrdering = [sort_expr("a", &schema)].into(); - let source = if source_unbounded { + let source = if boundedness == Boundedness::Unbounded { stream_exec_ordered_with_projection(&schema, ordering.clone()) } else { memory_exec_sorted(&schema, ordering.clone()) @@ -671,14 +690,14 @@ async fn test_with_multiple_replaceable_repartitions( let physical_plan = sort_preserving_merge_exec(ordering, sort); let run = ReplaceTest::new(physical_plan) - .with_source_unbounded(source_unbounded) - .with_prefer_existing_sort(prefer_existing_sort); + .with_boundedness(boundedness) + .with_sort_preference(sort_pref); let physical_plan = run.run().await; allow_duplicates! { - match (source_unbounded, prefer_existing_sort) { - (true, _) => { + match (boundedness, sort_pref) { + (Boundedness::Unbounded, _) => { assert_snapshot!(physical_plan, @r" Input: SortPreservingMergeExec: [a@0 ASC NULLS LAST] @@ -700,7 +719,7 @@ async fn test_with_multiple_replaceable_repartitions( StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, - (false, false) => { + (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { assert_snapshot!(physical_plan, @r" Input / Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] @@ -713,7 +732,7 @@ async fn test_with_multiple_replaceable_repartitions( DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, - (false, true) => { + (Boundedness::Bounded, SortPreference::PreserveOrder) => { assert_snapshot!(physical_plan, @r" Input: SortPreservingMergeExec: [a@0 ASC NULLS LAST] @@ -744,14 +763,15 @@ async fn test_with_multiple_replaceable_repartitions( #[rstest] #[tokio::test] async fn test_not_replace_with_different_orderings( - #[values(false, true)] source_unbounded: bool, - #[values(false, true)] prefer_existing_sort: bool, + #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: Boundedness, + #[values(SortPreference::PreserveOrder, SortPreference::MaximizeParallelism)] + sort_pref: SortPreference, ) -> Result<()> { use datafusion_physical_expr::LexOrdering; let schema = create_test_schema()?; let ordering_a = [sort_expr("a", &schema)].into(); - let source = if source_unbounded { + let source = if boundedness == Boundedness::Unbounded { stream_exec_ordered_with_projection(&schema, ordering_a) } else { memory_exec_sorted(&schema, ordering_a) @@ -764,14 +784,14 @@ async fn test_not_replace_with_different_orderings( let physical_plan = sort_preserving_merge_exec(ordering_c, sort); let run = ReplaceTest::new(physical_plan) - .with_source_unbounded(source_unbounded) - .with_prefer_existing_sort(prefer_existing_sort); + .with_boundedness(boundedness) + .with_sort_preference(sort_pref); let physical_plan = run.run().await; allow_duplicates! { - match (source_unbounded, prefer_existing_sort) { - (true, _) => { + match (boundedness, sort_pref) { + (Boundedness::Unbounded, _) => { assert_snapshot!(physical_plan, @r" Input / Optimized: SortPreservingMergeExec: [c@1 ASC] @@ -781,7 +801,7 @@ async fn test_not_replace_with_different_orderings( StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, - (false, false) => { + (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { assert_snapshot!(physical_plan, @r" Input / Optimized: SortPreservingMergeExec: [c@1 ASC] @@ -791,7 +811,7 @@ async fn test_not_replace_with_different_orderings( DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, - (false, true) => { + (Boundedness::Bounded, SortPreference::PreserveOrder) => { assert_snapshot!(physical_plan, @r" Input / Optimized: SortPreservingMergeExec: [c@1 ASC] @@ -810,12 +830,13 @@ async fn test_not_replace_with_different_orderings( #[rstest] #[tokio::test] async fn test_with_lost_ordering( - #[values(false, true)] source_unbounded: bool, - #[values(false, true)] prefer_existing_sort: bool, + #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: Boundedness, + #[values(SortPreference::PreserveOrder, SortPreference::MaximizeParallelism)] + sort_pref: SortPreference, ) -> Result<()> { let schema = create_test_schema()?; let ordering: LexOrdering = [sort_expr("a", &schema)].into(); - let source = if source_unbounded { + let source = if boundedness == Boundedness::Unbounded { stream_exec_ordered_with_projection(&schema, ordering.clone()) } else { memory_exec_sorted(&schema, ordering.clone()) @@ -826,14 +847,14 @@ async fn test_with_lost_ordering( let physical_plan = sort_exec(ordering, coalesce_partitions); let run = ReplaceTest::new(physical_plan) - .with_source_unbounded(source_unbounded) - .with_prefer_existing_sort(prefer_existing_sort); + .with_boundedness(boundedness) + .with_sort_preference(sort_pref); let physical_plan = run.run().await; allow_duplicates! { - match (source_unbounded, prefer_existing_sort) { - (true, _) => { + match (boundedness, sort_pref) { + (Boundedness::Unbounded, _) => { assert_snapshot!(physical_plan, @r" Input: SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] @@ -849,7 +870,7 @@ async fn test_with_lost_ordering( StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, - (false, false) => { + (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { assert_snapshot!(physical_plan, @r" Input / Optimized: SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] @@ -859,7 +880,7 @@ async fn test_with_lost_ordering( DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, - (false, true) => { + (Boundedness::Bounded, SortPreference::PreserveOrder) => { assert_snapshot!(physical_plan, @r" Input: SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] @@ -884,14 +905,15 @@ async fn test_with_lost_ordering( #[rstest] #[tokio::test] async fn test_with_lost_and_kept_ordering( - #[values(false, true)] source_unbounded: bool, - #[values(false, true)] prefer_existing_sort: bool, + #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: Boundedness, + #[values(SortPreference::PreserveOrder, SortPreference::MaximizeParallelism)] + sort_pref: SortPreference, ) -> Result<()> { use datafusion_physical_expr::LexOrdering; let schema = create_test_schema()?; let ordering_a = [sort_expr("a", &schema)].into(); - let source = if source_unbounded { + let source = if boundedness == Boundedness::Unbounded { stream_exec_ordered_with_projection(&schema, ordering_a) } else { memory_exec_sorted(&schema, ordering_a) @@ -909,14 +931,14 @@ async fn test_with_lost_and_kept_ordering( let physical_plan = sort_preserving_merge_exec(ordering_c, sort2); let run = ReplaceTest::new(physical_plan) - .with_source_unbounded(source_unbounded) - .with_prefer_existing_sort(prefer_existing_sort); + .with_boundedness(boundedness) + .with_sort_preference(sort_pref); let physical_plan = run.run().await; allow_duplicates! { - match (source_unbounded, prefer_existing_sort) { - (true, _) => { + match (boundedness, sort_pref) { + (Boundedness::Unbounded, _) => { assert_snapshot!(physical_plan, @r" Input: SortPreservingMergeExec: [c@1 ASC] @@ -942,7 +964,7 @@ async fn test_with_lost_and_kept_ordering( StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, - (false, false) => { + (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { assert_snapshot!(physical_plan, @r" Input / Optimized: SortPreservingMergeExec: [c@1 ASC] @@ -957,7 +979,7 @@ async fn test_with_lost_and_kept_ordering( DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, - (false, true) => { + (Boundedness::Bounded, SortPreference::PreserveOrder) => { assert_snapshot!(physical_plan, @r" Input: SortPreservingMergeExec: [c@1 ASC] @@ -992,13 +1014,14 @@ async fn test_with_lost_and_kept_ordering( #[rstest] #[tokio::test] async fn test_with_multiple_child_trees( - #[values(false, true)] source_unbounded: bool, - #[values(false, true)] prefer_existing_sort: bool, + #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: Boundedness, + #[values(SortPreference::PreserveOrder, SortPreference::MaximizeParallelism)] + sort_pref: SortPreference, ) -> Result<()> { let schema = create_test_schema()?; let left_ordering = [sort_expr("a", &schema)].into(); - let left_source = if source_unbounded { + let left_source = if boundedness == Boundedness::Unbounded { stream_exec_ordered_with_projection(&schema, left_ordering) } else { memory_exec_sorted(&schema, left_ordering) @@ -1009,7 +1032,7 @@ async fn test_with_multiple_child_trees( Arc::new(CoalesceBatchesExec::new(left_repartition_hash, 4096)); let right_ordering = [sort_expr("a", &schema)].into(); - let right_source = if source_unbounded { + let right_source = if boundedness == Boundedness::Unbounded { stream_exec_ordered_with_projection(&schema, right_ordering) } else { memory_exec_sorted(&schema, right_ordering) @@ -1026,14 +1049,14 @@ async fn test_with_multiple_child_trees( let physical_plan = sort_preserving_merge_exec(ordering, sort); let run = ReplaceTest::new(physical_plan) - .with_source_unbounded(source_unbounded) - .with_prefer_existing_sort(prefer_existing_sort); + .with_boundedness(boundedness) + .with_sort_preference(sort_pref); let physical_plan = run.run().await; allow_duplicates! { - match (source_unbounded, prefer_existing_sort) { - (true, _) => { + match (boundedness, sort_pref) { + (Boundedness::Unbounded, _) => { assert_snapshot!(physical_plan, @r" Input / Optimized: SortPreservingMergeExec: [a@0 ASC] @@ -1049,7 +1072,7 @@ async fn test_with_multiple_child_trees( StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, - (false, _) => { + (Boundedness::Bounded, _) => { assert_snapshot!(physical_plan, @r" Input / Optimized: SortPreservingMergeExec: [a@0 ASC] From 1e0495a0afba6f66719437042677e3f4f7ef116d Mon Sep 17 00:00:00 2001 From: blaginin Date: Wed, 8 Oct 2025 17:53:22 +0100 Subject: [PATCH 4/6] Clippy --- .../replace_with_order_preserving_variants.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs index 2853e8acc5b0..c8593068d37a 100644 --- a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs @@ -151,9 +151,9 @@ impl ReplaceTest { let optimized = self.execute_plan().await; if input_plan_string == optimized { - format!("Input / Optimized:\n{}", input_plan_string) + format!("Input / Optimized:\n{input_plan_string}") } else { - format!("Input:\n{}\nOptimized:\n{}", input_plan_string, optimized) + format!("Input:\n{input_plan_string}\nOptimized:\n{optimized}") } } } From d37e14c8b5e1c14c4e3fc5285a00a9b472a47862 Mon Sep 17 00:00:00 2001 From: blaginin Date: Wed, 8 Oct 2025 17:59:42 +0100 Subject: [PATCH 5/6] refactor: replace if expressions with match for boundedness checks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace all `if boundedness == Boundedness::Unbounded` checks with cleaner match expressions. This improves code readability and makes the pattern matching more explicit and idiomatic. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../replace_with_order_preserving_variants.rs | 106 ++++++++++-------- 1 file changed, 58 insertions(+), 48 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs index c8593068d37a..5d40b876efdc 100644 --- a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs @@ -168,10 +168,11 @@ async fn test_replace_multiple_input_repartition_1( ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs: LexOrdering = [sort_expr("a", &schema)].into(); - let source = if boundedness == Boundedness::Unbounded { - stream_exec_ordered_with_projection(&schema, sort_exprs.clone()) - } else { - memory_exec_sorted(&schema, sort_exprs.clone()) + let source = match boundedness { + Boundedness::Unbounded => { + stream_exec_ordered_with_projection(&schema, sort_exprs.clone()) + } + Boundedness::Bounded => memory_exec_sorted(&schema, sort_exprs.clone()), }; let repartition = repartition_exec_hash(repartition_exec_round_robin(source)); let sort = sort_exec_with_preserve_partitioning(sort_exprs.clone(), repartition); @@ -242,10 +243,11 @@ async fn test_with_inter_children_change_only( ) -> Result<()> { let schema = create_test_schema()?; let ordering: LexOrdering = [sort_expr_default("a", &schema)].into(); - let source = if boundedness == Boundedness::Unbounded { - stream_exec_ordered_with_projection(&schema, ordering.clone()) - } else { - memory_exec_sorted(&schema, ordering.clone()) + let source = match boundedness { + Boundedness::Unbounded => { + stream_exec_ordered_with_projection(&schema, ordering.clone()) + } + Boundedness::Bounded => memory_exec_sorted(&schema, ordering.clone()), }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -346,10 +348,11 @@ async fn test_replace_multiple_input_repartition_2( ) -> Result<()> { let schema = create_test_schema()?; let ordering: LexOrdering = [sort_expr("a", &schema)].into(); - let source = if boundedness == Boundedness::Unbounded { - stream_exec_ordered_with_projection(&schema, ordering.clone()) - } else { - memory_exec_sorted(&schema, ordering.clone()) + let source = match boundedness { + Boundedness::Unbounded => { + stream_exec_ordered_with_projection(&schema, ordering.clone()) + } + Boundedness::Bounded => memory_exec_sorted(&schema, ordering.clone()), }; let repartition_rr = repartition_exec_round_robin(source); let filter = filter_exec(repartition_rr); @@ -427,10 +430,11 @@ async fn test_replace_multiple_input_repartition_with_extra_steps( ) -> Result<()> { let schema = create_test_schema()?; let ordering: LexOrdering = [sort_expr("a", &schema)].into(); - let source = if boundedness == Boundedness::Unbounded { - stream_exec_ordered_with_projection(&schema, ordering.clone()) - } else { - memory_exec_sorted(&schema, ordering.clone()) + let source = match boundedness { + Boundedness::Unbounded => { + stream_exec_ordered_with_projection(&schema, ordering.clone()) + } + Boundedness::Bounded => memory_exec_sorted(&schema, ordering.clone()), }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -515,10 +519,11 @@ async fn test_replace_multiple_input_repartition_with_extra_steps_2( ) -> Result<()> { let schema = create_test_schema()?; let ordering: LexOrdering = [sort_expr("a", &schema)].into(); - let source = if boundedness == Boundedness::Unbounded { - stream_exec_ordered_with_projection(&schema, ordering.clone()) - } else { - memory_exec_sorted(&schema, ordering.clone()) + let source = match boundedness { + Boundedness::Unbounded => { + stream_exec_ordered_with_projection(&schema, ordering.clone()) + } + Boundedness::Bounded => memory_exec_sorted(&schema, ordering.clone()), }; let repartition_rr = repartition_exec_round_robin(source); let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr, 8192); @@ -609,10 +614,9 @@ async fn test_not_replacing_when_no_need_to_preserve_sorting( ) -> Result<()> { let schema = create_test_schema()?; let ordering: LexOrdering = [sort_expr("a", &schema)].into(); - let source = if boundedness == Boundedness::Unbounded { - stream_exec_ordered_with_projection(&schema, ordering) - } else { - memory_exec_sorted(&schema, ordering) + let source = match boundedness { + Boundedness::Unbounded => stream_exec_ordered_with_projection(&schema, ordering), + Boundedness::Bounded => memory_exec_sorted(&schema, ordering), }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -676,10 +680,11 @@ async fn test_with_multiple_replaceable_repartitions( ) -> Result<()> { let schema = create_test_schema()?; let ordering: LexOrdering = [sort_expr("a", &schema)].into(); - let source = if boundedness == Boundedness::Unbounded { - stream_exec_ordered_with_projection(&schema, ordering.clone()) - } else { - memory_exec_sorted(&schema, ordering.clone()) + let source = match boundedness { + Boundedness::Unbounded => { + stream_exec_ordered_with_projection(&schema, ordering.clone()) + } + Boundedness::Bounded => memory_exec_sorted(&schema, ordering.clone()), }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -771,10 +776,11 @@ async fn test_not_replace_with_different_orderings( let schema = create_test_schema()?; let ordering_a = [sort_expr("a", &schema)].into(); - let source = if boundedness == Boundedness::Unbounded { - stream_exec_ordered_with_projection(&schema, ordering_a) - } else { - memory_exec_sorted(&schema, ordering_a) + let source = match boundedness { + Boundedness::Unbounded => { + stream_exec_ordered_with_projection(&schema, ordering_a) + } + Boundedness::Bounded => memory_exec_sorted(&schema, ordering_a), }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -836,10 +842,11 @@ async fn test_with_lost_ordering( ) -> Result<()> { let schema = create_test_schema()?; let ordering: LexOrdering = [sort_expr("a", &schema)].into(); - let source = if boundedness == Boundedness::Unbounded { - stream_exec_ordered_with_projection(&schema, ordering.clone()) - } else { - memory_exec_sorted(&schema, ordering.clone()) + let source = match boundedness { + Boundedness::Unbounded => { + stream_exec_ordered_with_projection(&schema, ordering.clone()) + } + Boundedness::Bounded => memory_exec_sorted(&schema, ordering.clone()), }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -913,10 +920,11 @@ async fn test_with_lost_and_kept_ordering( let schema = create_test_schema()?; let ordering_a = [sort_expr("a", &schema)].into(); - let source = if boundedness == Boundedness::Unbounded { - stream_exec_ordered_with_projection(&schema, ordering_a) - } else { - memory_exec_sorted(&schema, ordering_a) + let source = match boundedness { + Boundedness::Unbounded => { + stream_exec_ordered_with_projection(&schema, ordering_a) + } + Boundedness::Bounded => memory_exec_sorted(&schema, ordering_a), }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -1021,10 +1029,11 @@ async fn test_with_multiple_child_trees( let schema = create_test_schema()?; let left_ordering = [sort_expr("a", &schema)].into(); - let left_source = if boundedness == Boundedness::Unbounded { - stream_exec_ordered_with_projection(&schema, left_ordering) - } else { - memory_exec_sorted(&schema, left_ordering) + let left_source = match boundedness { + Boundedness::Unbounded => { + stream_exec_ordered_with_projection(&schema, left_ordering) + } + Boundedness::Bounded => memory_exec_sorted(&schema, left_ordering), }; let left_repartition_rr = repartition_exec_round_robin(left_source); let left_repartition_hash = repartition_exec_hash(left_repartition_rr); @@ -1032,10 +1041,11 @@ async fn test_with_multiple_child_trees( Arc::new(CoalesceBatchesExec::new(left_repartition_hash, 4096)); let right_ordering = [sort_expr("a", &schema)].into(); - let right_source = if boundedness == Boundedness::Unbounded { - stream_exec_ordered_with_projection(&schema, right_ordering) - } else { - memory_exec_sorted(&schema, right_ordering) + let right_source = match boundedness { + Boundedness::Unbounded => { + stream_exec_ordered_with_projection(&schema, right_ordering) + } + Boundedness::Bounded => memory_exec_sorted(&schema, right_ordering), }; let right_repartition_rr = repartition_exec_round_robin(right_source); let right_repartition_hash = repartition_exec_hash(right_repartition_rr); From 258ab63bcbd469278b79430d87848108bcf3dadb Mon Sep 17 00:00:00 2001 From: blaginin Date: Wed, 8 Oct 2025 18:34:44 +0100 Subject: [PATCH 6/6] Missing comments --- .../replace_with_order_preserving_variants.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs index 5d40b876efdc..066e52614a12 100644 --- a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs @@ -653,6 +653,7 @@ async fn test_not_replacing_when_no_need_to_preserve_sorting( RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); + // Expected bounded results same with and without flag, because there is no executor with ordering requirement }, (Boundedness::Bounded, SortPreference::PreserveOrder) => { assert_snapshot!(physical_plan, @r" @@ -816,6 +817,8 @@ async fn test_not_replace_with_different_orderings( RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); + // Expected bounded results same with and without flag, because ordering requirement of the executor is + // different from the existing ordering. }, (Boundedness::Bounded, SortPreference::PreserveOrder) => { assert_snapshot!(physical_plan, @r" @@ -1097,6 +1100,8 @@ async fn test_with_multiple_child_trees( RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); + // Expected bounded results same with and without flag, because ordering get lost during intermediate executor anyway. + // Hence, no need to preserve existing ordering. } } }