diff --git a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs index a1134c3d83ed..066e52614a12 100644 --- a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs @@ -29,6 +29,7 @@ use arrow::array::{ArrayRef, Int32Array}; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; +use insta::{allow_duplicates, assert_snapshot}; use datafusion_common::tree_node::{TransformedResult, TreeNode}; use datafusion_common::{assert_contains, NullEquality, Result}; use datafusion_common::config::ConfigOptions; @@ -46,7 +47,7 @@ use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; use datafusion::datasource::memory::MemorySourceConfig; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::{ - collect, displayable, get_plan_string, ExecutionPlan, Partitioning, + collect, displayable, ExecutionPlan, Partitioning, }; use object_store::memory::InMemory; @@ -54,215 +55,199 @@ use object_store::ObjectStore; use rstest::rstest; use url::Url; -/// Runs the `replace_with_order_preserving_variants` sub-rule and asserts -/// the plan against the original and expected plans. -/// -/// # Parameters -/// -/// * `$EXPECTED_PLAN_LINES`: Expected input plan. -/// * `EXPECTED_OPTIMIZED_PLAN_LINES`: Optimized plan when the flag -/// `prefer_existing_sort` is `false`. -/// * `EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES`: Optimized plan when -/// the flag `prefer_existing_sort` is `true`. -/// * `$PLAN`: The plan to optimize. -macro_rules! assert_optimized_prefer_sort_on_off { - ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $PREFER_EXISTING_SORT: expr, $SOURCE_UNBOUNDED: expr) => { - if $PREFER_EXISTING_SORT { - assert_optimized!( - $EXPECTED_PLAN_LINES, - $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES, - $PLAN, - $PREFER_EXISTING_SORT, - $SOURCE_UNBOUNDED - ); - } else { - assert_optimized!( - $EXPECTED_PLAN_LINES, - $EXPECTED_OPTIMIZED_PLAN_LINES, - $PLAN, - $PREFER_EXISTING_SORT, - $SOURCE_UNBOUNDED - ); - } - }; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum Boundedness { + Unbounded, + Bounded, } -/// Runs the `replace_with_order_preserving_variants` sub-rule and asserts -/// the plan against the original and expected plans for both bounded and -/// unbounded cases. -/// -/// # Parameters -/// -/// * `EXPECTED_UNBOUNDED_PLAN_LINES`: Expected input unbounded plan. -/// * `EXPECTED_BOUNDED_PLAN_LINES`: Expected input bounded plan. -/// * `EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES`: Optimized plan, which is -/// the same regardless of the value of the `prefer_existing_sort` flag. -/// * `EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES`: Optimized plan when the flag -/// `prefer_existing_sort` is `false` for bounded cases. -/// * `EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES`: Optimized plan -/// when the flag `prefer_existing_sort` is `true` for bounded cases. -/// * `$PLAN`: The plan to optimize. -/// * `$SOURCE_UNBOUNDED`: Whether the given plan contains an unbounded source. -macro_rules! assert_optimized_in_all_boundedness_situations { - ($EXPECTED_UNBOUNDED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PLAN_LINES: expr, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $SOURCE_UNBOUNDED: expr, $PREFER_EXISTING_SORT: expr) => { - if $SOURCE_UNBOUNDED { - assert_optimized_prefer_sort_on_off!( - $EXPECTED_UNBOUNDED_PLAN_LINES, - $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES, - $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES, - $PLAN, - $PREFER_EXISTING_SORT, - $SOURCE_UNBOUNDED - ); - } else { - assert_optimized_prefer_sort_on_off!( - $EXPECTED_BOUNDED_PLAN_LINES, - $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES, - $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES, - $PLAN, - $PREFER_EXISTING_SORT, - $SOURCE_UNBOUNDED - ); - } - }; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum SortPreference { + PreserveOrder, + MaximizeParallelism, } -/// Runs the `replace_with_order_preserving_variants` sub-rule and asserts -/// the plan against the original and expected plans. -/// -/// # Parameters -/// -/// * `$EXPECTED_PLAN_LINES`: Expected input plan. -/// * `$EXPECTED_OPTIMIZED_PLAN_LINES`: Expected optimized plan. -/// * `$PLAN`: The plan to optimize. -/// * `$PREFER_EXISTING_SORT`: Value of the `prefer_existing_sort` flag. -#[macro_export] -macro_rules! assert_optimized { - ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $PREFER_EXISTING_SORT: expr, $SOURCE_UNBOUNDED: expr) => { - let physical_plan = $PLAN; - let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - - let expected_plan_lines: Vec<&str> = $EXPECTED_PLAN_LINES - .iter().map(|s| *s).collect(); - - assert_eq!( - expected_plan_lines, actual, - "\n**Original Plan Mismatch\n\nexpected:\n\n{expected_plan_lines:#?}\nactual:\n\n{actual:#?}\n\n" - ); +struct ReplaceTest { + plan: Arc, + boundedness: Boundedness, + sort_preference: SortPreference, +} - let expected_optimized_lines: Vec<&str> = $EXPECTED_OPTIMIZED_PLAN_LINES.iter().map(|s| *s).collect(); +impl ReplaceTest { + fn new(plan: Arc) -> Self { + Self { + plan, + boundedness: Boundedness::Bounded, + sort_preference: SortPreference::MaximizeParallelism, + } + } + + fn with_boundedness(mut self, boundedness: Boundedness) -> Self { + self.boundedness = boundedness; + self + } - // Run the rule top-down - let mut config = ConfigOptions::new(); - config.optimizer.prefer_existing_sort=$PREFER_EXISTING_SORT; - let plan_with_pipeline_fixer = OrderPreservationContext::new_default(physical_plan); - let parallel = plan_with_pipeline_fixer.transform_up(|plan_with_pipeline_fixer| replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false, &config)).data().and_then(check_integrity)?; - let optimized_physical_plan = parallel.plan; + fn with_sort_preference(mut self, sort_preference: SortPreference) -> Self { + self.sort_preference = sort_preference; + self + } - // Get string representation of the plan - let actual = get_plan_string(&optimized_physical_plan); - assert_eq!( - expected_optimized_lines, actual, - "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n" + async fn execute_plan(&self) -> String { + let mut config = ConfigOptions::new(); + config.optimizer.prefer_existing_sort = + self.sort_preference == SortPreference::PreserveOrder; + + let plan_with_pipeline_fixer = OrderPreservationContext::new_default( + self.plan.clone().reset_state().unwrap(), + ); + + let parallel = plan_with_pipeline_fixer + .transform_up(|plan_with_pipeline_fixer| { + replace_with_order_preserving_variants( + plan_with_pipeline_fixer, + false, + false, + &config, + ) + }) + .data() + .and_then(check_integrity) + .unwrap(); + + let optimized_physical_plan = parallel.plan; + let optimized_plan_string = displayable(optimized_physical_plan.as_ref()) + .indent(true) + .to_string(); + + if self.boundedness == Boundedness::Bounded { + let ctx = SessionContext::new(); + let object_store = InMemory::new(); + object_store + .put( + &object_store::path::Path::from("file_path"), + bytes::Bytes::from("").into(), + ) + .await + .expect("could not create object store"); + ctx.register_object_store( + &Url::parse("test://").unwrap(), + Arc::new(object_store), + ); + let task_ctx = Arc::new(TaskContext::from(&ctx)); + let res = collect(optimized_physical_plan, task_ctx).await; + assert!( + res.is_ok(), + "Some errors occurred while executing the optimized physical plan: {:?}\nPlan: {}", + res.unwrap_err(), optimized_plan_string ); + } + + optimized_plan_string + } + + async fn run(&self) -> String { + let input_plan_string = displayable(self.plan.as_ref()).indent(true).to_string(); + + let optimized = self.execute_plan().await; - if !$SOURCE_UNBOUNDED { - let ctx = SessionContext::new(); - let object_store = InMemory::new(); - object_store.put(&object_store::path::Path::from("file_path"), bytes::Bytes::from("").into()).await?; - ctx.register_object_store(&Url::parse("test://").unwrap(), Arc::new(object_store)); - let task_ctx = Arc::new(TaskContext::from(&ctx)); - let res = collect(optimized_physical_plan, task_ctx).await; - assert!( - res.is_ok(), - "Some errors occurred while executing the optimized physical plan: {:?}", res.unwrap_err() - ); - } - }; + if input_plan_string == optimized { + format!("Input / Optimized:\n{input_plan_string}") + } else { + format!("Input:\n{input_plan_string}\nOptimized:\n{optimized}") + } } +} #[rstest] #[tokio::test] // Searches for a simple sort and a repartition just after it, the second repartition with 1 input partition should not be affected async fn test_replace_multiple_input_repartition_1( - #[values(false, true)] source_unbounded: bool, - #[values(false, true)] prefer_existing_sort: bool, + #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: Boundedness, + #[values(SortPreference::PreserveOrder, SortPreference::MaximizeParallelism)] + sort_pref: SortPreference, ) -> Result<()> { let schema = create_test_schema()?; let sort_exprs: LexOrdering = [sort_expr("a", &schema)].into(); - let source = if source_unbounded { - stream_exec_ordered_with_projection(&schema, sort_exprs.clone()) - } else { - memory_exec_sorted(&schema, sort_exprs.clone()) + let source = match boundedness { + Boundedness::Unbounded => { + stream_exec_ordered_with_projection(&schema, sort_exprs.clone()) + } + Boundedness::Bounded => memory_exec_sorted(&schema, sort_exprs.clone()), }; let repartition = repartition_exec_hash(repartition_exec_round_robin(source)); let sort = sort_exec_with_preserve_partitioning(sort_exprs.clone(), repartition); let physical_plan = sort_preserving_merge_exec(sort_exprs, sort); - // Expected inputs unbounded and bounded - let expected_input_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - - // Expected unbounded result (same for with and without flag) - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - - // Expected bounded results with and without flag - let expected_optimized_bounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - let expected_optimized_bounded_sort_preserve = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - assert_optimized_in_all_boundedness_situations!( - expected_input_unbounded, - expected_input_bounded, - expected_optimized_unbounded, - expected_optimized_bounded, - expected_optimized_bounded_sort_preserve, - physical_plan, - source_unbounded, - prefer_existing_sort - ); + let run = ReplaceTest::new(physical_plan) + .with_boundedness(boundedness) + .with_sort_preference(sort_pref); + + let physical_plan = run.run().await; + + allow_duplicates! { + match (boundedness, sort_pref) { + (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + }, + (Boundedness::Unbounded, _) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + "); + }, + (Boundedness::Bounded, SortPreference::PreserveOrder) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + } + } + } + Ok(()) } #[rstest] #[tokio::test] async fn test_with_inter_children_change_only( - #[values(false, true)] source_unbounded: bool, - #[values(false, true)] prefer_existing_sort: bool, + #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: Boundedness, + #[values(SortPreference::PreserveOrder, SortPreference::MaximizeParallelism)] + sort_pref: SortPreference, ) -> Result<()> { let schema = create_test_schema()?; let ordering: LexOrdering = [sort_expr_default("a", &schema)].into(); - let source = if source_unbounded { - stream_exec_ordered_with_projection(&schema, ordering.clone()) - } else { - memory_exec_sorted(&schema, ordering.clone()) + let source = match boundedness { + Boundedness::Unbounded => { + stream_exec_ordered_with_projection(&schema, ordering.clone()) + } + Boundedness::Bounded => memory_exec_sorted(&schema, ordering.clone()), }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -275,92 +260,99 @@ async fn test_with_inter_children_change_only( let physical_plan = sort_preserving_merge_exec(ordering, sort2); - // Expected inputs unbounded and bounded - let expected_input_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC]", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC]", - ]; - let expected_input_bounded = [ - "SortPreservingMergeExec: [a@0 ASC]", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC", - ]; - - // Expected unbounded result (same for with and without flag) - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC]", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortPreservingMergeExec: [a@0 ASC]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC]", - ]; - - // Expected bounded results with and without flag - let expected_optimized_bounded = [ - "SortPreservingMergeExec: [a@0 ASC]", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC", - ]; - let expected_optimized_bounded_sort_preserve = [ - "SortPreservingMergeExec: [a@0 ASC]", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortPreservingMergeExec: [a@0 ASC]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC", - ]; - assert_optimized_in_all_boundedness_situations!( - expected_input_unbounded, - expected_input_bounded, - expected_optimized_unbounded, - expected_optimized_bounded, - expected_optimized_bounded_sort_preserve, - physical_plan, - source_unbounded, - prefer_existing_sort - ); + let run = ReplaceTest::new(physical_plan) + .with_boundedness(boundedness) + .with_sort_preference(sort_pref); + + let physical_plan = run.run().await; + + allow_duplicates! { + match (boundedness, sort_pref) { + (Boundedness::Unbounded, _) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC] + SortExec: expr=[a@0 ASC], preserve_partitioning=[true] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC] + + Optimized: + SortPreservingMergeExec: [a@0 ASC] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + SortPreservingMergeExec: [a@0 ASC] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC] + "); + }, + (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [a@0 ASC] + SortExec: expr=[a@0 ASC], preserve_partitioning=[true] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC + "); + }, + (Boundedness::Bounded, SortPreference::PreserveOrder) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC] + SortExec: expr=[a@0 ASC], preserve_partitioning=[true] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC + + Optimized: + SortPreservingMergeExec: [a@0 ASC] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + SortPreservingMergeExec: [a@0 ASC] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC + "); + } + } + } + Ok(()) } #[rstest] #[tokio::test] async fn test_replace_multiple_input_repartition_2( - #[values(false, true)] source_unbounded: bool, - #[values(false, true)] prefer_existing_sort: bool, + #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: Boundedness, + #[values(SortPreference::PreserveOrder, SortPreference::MaximizeParallelism)] + sort_pref: SortPreference, ) -> Result<()> { let schema = create_test_schema()?; let ordering: LexOrdering = [sort_expr("a", &schema)].into(); - let source = if source_unbounded { - stream_exec_ordered_with_projection(&schema, ordering.clone()) - } else { - memory_exec_sorted(&schema, ordering.clone()) + let source = match boundedness { + Boundedness::Unbounded => { + stream_exec_ordered_with_projection(&schema, ordering.clone()) + } + Boundedness::Bounded => memory_exec_sorted(&schema, ordering.clone()), }; let repartition_rr = repartition_exec_round_robin(source); let filter = filter_exec(repartition_rr); @@ -368,74 +360,81 @@ async fn test_replace_multiple_input_repartition_2( let sort = sort_exec_with_preserve_partitioning(ordering.clone(), repartition_hash); let physical_plan = sort_preserving_merge_exec(ordering, sort); - // Expected inputs unbounded and bounded - let expected_input_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - - // Expected unbounded result (same for with and without flag) - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - - // Expected bounded results with and without flag - let expected_optimized_bounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - let expected_optimized_bounded_sort_preserve = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - assert_optimized_in_all_boundedness_situations!( - expected_input_unbounded, - expected_input_bounded, - expected_optimized_unbounded, - expected_optimized_bounded, - expected_optimized_bounded_sort_preserve, - physical_plan, - source_unbounded, - prefer_existing_sort - ); + let run = ReplaceTest::new(physical_plan) + .with_boundedness(boundedness) + .with_sort_preference(sort_pref); + + let physical_plan = run.run().await; + + allow_duplicates! { + match (boundedness, sort_pref) { + (Boundedness::Unbounded, _) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + FilterExec: c@1 > 3 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + "); + }, + (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + }, + (Boundedness::Bounded, SortPreference::PreserveOrder) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + FilterExec: c@1 > 3 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + } + } + } + Ok(()) } #[rstest] #[tokio::test] async fn test_replace_multiple_input_repartition_with_extra_steps( - #[values(false, true)] source_unbounded: bool, - #[values(false, true)] prefer_existing_sort: bool, + #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: Boundedness, + #[values(SortPreference::PreserveOrder, SortPreference::MaximizeParallelism)] + sort_pref: SortPreference, ) -> Result<()> { let schema = create_test_schema()?; let ordering: LexOrdering = [sort_expr("a", &schema)].into(); - let source = if source_unbounded { - stream_exec_ordered_with_projection(&schema, ordering.clone()) - } else { - memory_exec_sorted(&schema, ordering.clone()) + let source = match boundedness { + Boundedness::Unbounded => { + stream_exec_ordered_with_projection(&schema, ordering.clone()) + } + Boundedness::Bounded => memory_exec_sorted(&schema, ordering.clone()), }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -445,79 +444,86 @@ async fn test_replace_multiple_input_repartition_with_extra_steps( sort_exec_with_preserve_partitioning(ordering.clone(), coalesce_batches_exec); let physical_plan = sort_preserving_merge_exec(ordering, sort); - // Expected inputs unbounded and bounded - let expected_input_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - - // Expected unbounded result (same for with and without flag) - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - - // Expected bounded results with and without flag - let expected_optimized_bounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - let expected_optimized_bounded_sort_preserve = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - assert_optimized_in_all_boundedness_situations!( - expected_input_unbounded, - expected_input_bounded, - expected_optimized_unbounded, - expected_optimized_bounded, - expected_optimized_bounded_sort_preserve, - physical_plan, - source_unbounded, - prefer_existing_sort - ); + let run = ReplaceTest::new(physical_plan) + .with_boundedness(boundedness) + .with_sort_preference(sort_pref); + + let physical_plan = run.run().await; + + allow_duplicates! { + match (boundedness, sort_pref) { + (Boundedness::Unbounded, _) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + "); + }, + (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + }, + (Boundedness::Bounded, SortPreference::PreserveOrder) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + } + } + } + Ok(()) } #[rstest] #[tokio::test] async fn test_replace_multiple_input_repartition_with_extra_steps_2( - #[values(false, true)] source_unbounded: bool, - #[values(false, true)] prefer_existing_sort: bool, + #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: Boundedness, + #[values(SortPreference::PreserveOrder, SortPreference::MaximizeParallelism)] + sort_pref: SortPreference, ) -> Result<()> { let schema = create_test_schema()?; let ordering: LexOrdering = [sort_expr("a", &schema)].into(); - let source = if source_unbounded { - stream_exec_ordered_with_projection(&schema, ordering.clone()) - } else { - memory_exec_sorted(&schema, ordering.clone()) + let source = match boundedness { + Boundedness::Unbounded => { + stream_exec_ordered_with_projection(&schema, ordering.clone()) + } + Boundedness::Bounded => memory_exec_sorted(&schema, ordering.clone()), }; let repartition_rr = repartition_exec_round_robin(source); let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr, 8192); @@ -528,84 +534,89 @@ async fn test_replace_multiple_input_repartition_with_extra_steps_2( sort_exec_with_preserve_partitioning(ordering.clone(), coalesce_batches_exec_2); let physical_plan = sort_preserving_merge_exec(ordering, sort); - // Expected inputs unbounded and bounded - let expected_input_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " CoalesceBatchesExec: target_batch_size=8192", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " CoalesceBatchesExec: target_batch_size=8192", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - - // Expected unbounded result (same for with and without flag) - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " CoalesceBatchesExec: target_batch_size=8192", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - - // Expected bounded results with and without flag - let expected_optimized_bounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " CoalesceBatchesExec: target_batch_size=8192", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - let expected_optimized_bounded_sort_preserve = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " CoalesceBatchesExec: target_batch_size=8192", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - assert_optimized_in_all_boundedness_situations!( - expected_input_unbounded, - expected_input_bounded, - expected_optimized_unbounded, - expected_optimized_bounded, - expected_optimized_bounded_sort_preserve, - physical_plan, - source_unbounded, - prefer_existing_sort - ); + let run = ReplaceTest::new(physical_plan) + .with_boundedness(boundedness) + .with_sort_preference(sort_pref); + + let physical_plan = run.run().await; + + allow_duplicates! { + match (boundedness, sort_pref) { + (Boundedness::Unbounded, _) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + "); + }, + (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + }, + (Boundedness::Bounded, SortPreference::PreserveOrder) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + } + } + } + Ok(()) } #[rstest] #[tokio::test] async fn test_not_replacing_when_no_need_to_preserve_sorting( - #[values(false, true)] source_unbounded: bool, - #[values(false, true)] prefer_existing_sort: bool, + #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: Boundedness, + #[values(SortPreference::PreserveOrder, SortPreference::MaximizeParallelism)] + sort_pref: SortPreference, ) -> Result<()> { let schema = create_test_schema()?; let ordering: LexOrdering = [sort_expr("a", &schema)].into(); - let source = if source_unbounded { - stream_exec_ordered_with_projection(&schema, ordering) - } else { - memory_exec_sorted(&schema, ordering) + let source = match boundedness { + Boundedness::Unbounded => stream_exec_ordered_with_projection(&schema, ordering), + Boundedness::Bounded => memory_exec_sorted(&schema, ordering), }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -613,70 +624,68 @@ async fn test_not_replacing_when_no_need_to_preserve_sorting( let coalesce_batches_exec = coalesce_batches_exec(filter, 8192); let physical_plan = coalesce_partitions_exec(coalesce_batches_exec); - // Expected inputs unbounded and bounded - let expected_input_unbounded = [ - "CoalescePartitionsExec", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ - "CoalescePartitionsExec", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - - // Expected unbounded result (same for with and without flag) - let expected_optimized_unbounded = [ - "CoalescePartitionsExec", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - - // Expected bounded results same with and without flag, because there is no executor with ordering requirement - let expected_optimized_bounded = [ - "CoalescePartitionsExec", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - let expected_optimized_bounded_sort_preserve = expected_optimized_bounded; - - assert_optimized_in_all_boundedness_situations!( - expected_input_unbounded, - expected_input_bounded, - expected_optimized_unbounded, - expected_optimized_bounded, - expected_optimized_bounded_sort_preserve, - physical_plan, - source_unbounded, - prefer_existing_sort - ); + let run = ReplaceTest::new(physical_plan) + .with_boundedness(boundedness) + .with_sort_preference(sort_pref); + + let physical_plan = run.run().await; + + allow_duplicates! { + match (boundedness, sort_pref) { + (Boundedness::Unbounded, _) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + CoalescePartitionsExec + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + "); + }, + (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + CoalescePartitionsExec + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + // Expected bounded results same with and without flag, because there is no executor with ordering requirement + }, + (Boundedness::Bounded, SortPreference::PreserveOrder) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + CoalescePartitionsExec + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + } + } + } + Ok(()) } #[rstest] #[tokio::test] async fn test_with_multiple_replaceable_repartitions( - #[values(false, true)] source_unbounded: bool, - #[values(false, true)] prefer_existing_sort: bool, + #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: Boundedness, + #[values(SortPreference::PreserveOrder, SortPreference::MaximizeParallelism)] + sort_pref: SortPreference, ) -> Result<()> { let schema = create_test_schema()?; let ordering: LexOrdering = [sort_expr("a", &schema)].into(); - let source = if source_unbounded { - stream_exec_ordered_with_projection(&schema, ordering.clone()) - } else { - memory_exec_sorted(&schema, ordering.clone()) + let source = match boundedness { + Boundedness::Unbounded => { + stream_exec_ordered_with_projection(&schema, ordering.clone()) + } + Boundedness::Bounded => memory_exec_sorted(&schema, ordering.clone()), }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -686,86 +695,93 @@ async fn test_with_multiple_replaceable_repartitions( let sort = sort_exec_with_preserve_partitioning(ordering.clone(), repartition_hash_2); let physical_plan = sort_preserving_merge_exec(ordering, sort); - // Expected inputs unbounded and bounded - let expected_input_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - - // Expected unbounded result (same for with and without flag) - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - - // Expected bounded results with and without flag - let expected_optimized_bounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - let expected_optimized_bounded_sort_preserve = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - assert_optimized_in_all_boundedness_situations!( - expected_input_unbounded, - expected_input_bounded, - expected_optimized_unbounded, - expected_optimized_bounded, - expected_optimized_bounded_sort_preserve, - physical_plan, - source_unbounded, - prefer_existing_sort - ); + let run = ReplaceTest::new(physical_plan) + .with_boundedness(boundedness) + .with_sort_preference(sort_pref); + + let physical_plan = run.run().await; + + allow_duplicates! { + match (boundedness, sort_pref) { + (Boundedness::Unbounded, _) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + "); + }, + (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + }, + (Boundedness::Bounded, SortPreference::PreserveOrder) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + } + } + } + Ok(()) } #[rstest] #[tokio::test] async fn test_not_replace_with_different_orderings( - #[values(false, true)] source_unbounded: bool, - #[values(false, true)] prefer_existing_sort: bool, + #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: Boundedness, + #[values(SortPreference::PreserveOrder, SortPreference::MaximizeParallelism)] + sort_pref: SortPreference, ) -> Result<()> { use datafusion_physical_expr::LexOrdering; let schema = create_test_schema()?; let ordering_a = [sort_expr("a", &schema)].into(); - let source = if source_unbounded { - stream_exec_ordered_with_projection(&schema, ordering_a) - } else { - memory_exec_sorted(&schema, ordering_a) + let source = match boundedness { + Boundedness::Unbounded => { + stream_exec_ordered_with_projection(&schema, ordering_a) + } + Boundedness::Bounded => memory_exec_sorted(&schema, ordering_a), }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -774,137 +790,144 @@ async fn test_not_replace_with_different_orderings( let sort = sort_exec_with_preserve_partitioning(ordering_c.clone(), repartition_hash); let physical_plan = sort_preserving_merge_exec(ordering_c, sort); - // Expected inputs unbounded and bounded - let expected_input_unbounded = [ - "SortPreservingMergeExec: [c@1 ASC]", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ - "SortPreservingMergeExec: [c@1 ASC]", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - - // Expected unbounded result (same for with and without flag) - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [c@1 ASC]", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - - // Expected bounded results same with and without flag, because ordering requirement of the executor is different than the existing ordering. - let expected_optimized_bounded = [ - "SortPreservingMergeExec: [c@1 ASC]", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - let expected_optimized_bounded_sort_preserve = expected_optimized_bounded; - - assert_optimized_in_all_boundedness_situations!( - expected_input_unbounded, - expected_input_bounded, - expected_optimized_unbounded, - expected_optimized_bounded, - expected_optimized_bounded_sort_preserve, - physical_plan, - source_unbounded, - prefer_existing_sort - ); + let run = ReplaceTest::new(physical_plan) + .with_boundedness(boundedness) + .with_sort_preference(sort_pref); + + let physical_plan = run.run().await; + + allow_duplicates! { + match (boundedness, sort_pref) { + (Boundedness::Unbounded, _) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [c@1 ASC] + SortExec: expr=[c@1 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + "); + }, + (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [c@1 ASC] + SortExec: expr=[c@1 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + // Expected bounded results same with and without flag, because ordering requirement of the executor is + // different from the existing ordering. + }, + (Boundedness::Bounded, SortPreference::PreserveOrder) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [c@1 ASC] + SortExec: expr=[c@1 ASC], preserve_partitioning=[true] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + } + } + } + Ok(()) } #[rstest] #[tokio::test] async fn test_with_lost_ordering( - #[values(false, true)] source_unbounded: bool, - #[values(false, true)] prefer_existing_sort: bool, + #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: Boundedness, + #[values(SortPreference::PreserveOrder, SortPreference::MaximizeParallelism)] + sort_pref: SortPreference, ) -> Result<()> { let schema = create_test_schema()?; let ordering: LexOrdering = [sort_expr("a", &schema)].into(); - let source = if source_unbounded { - stream_exec_ordered_with_projection(&schema, ordering.clone()) - } else { - memory_exec_sorted(&schema, ordering.clone()) + let source = match boundedness { + Boundedness::Unbounded => { + stream_exec_ordered_with_projection(&schema, ordering.clone()) + } + Boundedness::Bounded => memory_exec_sorted(&schema, ordering.clone()), }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let coalesce_partitions = coalesce_partitions_exec(repartition_hash); let physical_plan = sort_exec(ordering, coalesce_partitions); - // Expected inputs unbounded and bounded - let expected_input_unbounded = [ - "SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ - "SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - - // Expected unbounded result (same for with and without flag) - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - - // Expected bounded results with and without flag - let expected_optimized_bounded = [ - "SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - let expected_optimized_bounded_sort_preserve = [ - "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - assert_optimized_in_all_boundedness_situations!( - expected_input_unbounded, - expected_input_bounded, - expected_optimized_unbounded, - expected_optimized_bounded, - expected_optimized_bounded_sort_preserve, - physical_plan, - source_unbounded, - prefer_existing_sort - ); + let run = ReplaceTest::new(physical_plan) + .with_boundedness(boundedness) + .with_sort_preference(sort_pref); + + let physical_plan = run.run().await; + + allow_duplicates! { + match (boundedness, sort_pref) { + (Boundedness::Unbounded, _) => { + assert_snapshot!(physical_plan, @r" + Input: + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + "); + }, + (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + }, + (Boundedness::Bounded, SortPreference::PreserveOrder) => { + assert_snapshot!(physical_plan, @r" + Input: + SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + } + } + } + Ok(()) } #[rstest] #[tokio::test] async fn test_with_lost_and_kept_ordering( - #[values(false, true)] source_unbounded: bool, - #[values(false, true)] prefer_existing_sort: bool, + #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: Boundedness, + #[values(SortPreference::PreserveOrder, SortPreference::MaximizeParallelism)] + sort_pref: SortPreference, ) -> Result<()> { use datafusion_physical_expr::LexOrdering; let schema = create_test_schema()?; let ordering_a = [sort_expr("a", &schema)].into(); - let source = if source_unbounded { - stream_exec_ordered_with_projection(&schema, ordering_a) - } else { - memory_exec_sorted(&schema, ordering_a) + let source = match boundedness { + Boundedness::Unbounded => { + stream_exec_ordered_with_projection(&schema, ordering_a) + } + Boundedness::Bounded => memory_exec_sorted(&schema, ordering_a), }; let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); @@ -918,95 +941,102 @@ async fn test_with_lost_and_kept_ordering( let sort2 = sort_exec_with_preserve_partitioning(ordering_c.clone(), filter); let physical_plan = sort_preserving_merge_exec(ordering_c, sort2); - // Expected inputs unbounded and bounded - let expected_input_unbounded = [ - "SortPreservingMergeExec: [c@1 ASC]", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[true]", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ - "SortPreservingMergeExec: [c@1 ASC]", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[true]", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - - // Expected unbounded result (same for with and without flag) - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [c@1 ASC]", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@1 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - - // Expected bounded results with and without flag - let expected_optimized_bounded = [ - "SortPreservingMergeExec: [c@1 ASC]", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[true]", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - let expected_optimized_bounded_sort_preserve = [ - "SortPreservingMergeExec: [c@1 ASC]", - " FilterExec: c@1 > 3", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@1 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " SortExec: expr=[c@1 ASC], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - assert_optimized_in_all_boundedness_situations!( - expected_input_unbounded, - expected_input_bounded, - expected_optimized_unbounded, - expected_optimized_bounded, - expected_optimized_bounded_sort_preserve, - physical_plan, - source_unbounded, - prefer_existing_sort - ); + let run = ReplaceTest::new(physical_plan) + .with_boundedness(boundedness) + .with_sort_preference(sort_pref); + + let physical_plan = run.run().await; + + allow_duplicates! { + match (boundedness, sort_pref) { + (Boundedness::Unbounded, _) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [c@1 ASC] + SortExec: expr=[c@1 ASC], preserve_partitioning=[true] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + SortExec: expr=[c@1 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + + Optimized: + SortPreservingMergeExec: [c@1 ASC] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@1 ASC + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + SortExec: expr=[c@1 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + "); + }, + (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [c@1 ASC] + SortExec: expr=[c@1 ASC], preserve_partitioning=[true] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + SortExec: expr=[c@1 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + }, + (Boundedness::Bounded, SortPreference::PreserveOrder) => { + assert_snapshot!(physical_plan, @r" + Input: + SortPreservingMergeExec: [c@1 ASC] + SortExec: expr=[c@1 ASC], preserve_partitioning=[true] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + SortExec: expr=[c@1 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + + Optimized: + SortPreservingMergeExec: [c@1 ASC] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@1 ASC + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + SortExec: expr=[c@1 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + } + } + } + Ok(()) } #[rstest] #[tokio::test] async fn test_with_multiple_child_trees( - #[values(false, true)] source_unbounded: bool, - #[values(false, true)] prefer_existing_sort: bool, + #[values(Boundedness::Unbounded, Boundedness::Bounded)] boundedness: Boundedness, + #[values(SortPreference::PreserveOrder, SortPreference::MaximizeParallelism)] + sort_pref: SortPreference, ) -> Result<()> { let schema = create_test_schema()?; let left_ordering = [sort_expr("a", &schema)].into(); - let left_source = if source_unbounded { - stream_exec_ordered_with_projection(&schema, left_ordering) - } else { - memory_exec_sorted(&schema, left_ordering) + let left_source = match boundedness { + Boundedness::Unbounded => { + stream_exec_ordered_with_projection(&schema, left_ordering) + } + Boundedness::Bounded => memory_exec_sorted(&schema, left_ordering), }; let left_repartition_rr = repartition_exec_round_robin(left_source); let left_repartition_hash = repartition_exec_hash(left_repartition_rr); @@ -1014,10 +1044,11 @@ async fn test_with_multiple_child_trees( Arc::new(CoalesceBatchesExec::new(left_repartition_hash, 4096)); let right_ordering = [sort_expr("a", &schema)].into(); - let right_source = if source_unbounded { - stream_exec_ordered_with_projection(&schema, right_ordering) - } else { - memory_exec_sorted(&schema, right_ordering) + let right_source = match boundedness { + Boundedness::Unbounded => { + stream_exec_ordered_with_projection(&schema, right_ordering) + } + Boundedness::Bounded => memory_exec_sorted(&schema, right_ordering), }; let right_repartition_rr = repartition_exec_round_robin(right_source); let right_repartition_hash = repartition_exec_hash(right_repartition_rr); @@ -1030,76 +1061,51 @@ async fn test_with_multiple_child_trees( let sort = sort_exec_with_preserve_partitioning(ordering.clone(), hash_join_exec); let physical_plan = sort_preserving_merge_exec(ordering, sort); - // Expected inputs unbounded and bounded - let expected_input_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC]", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - let expected_input_bounded = [ - "SortPreservingMergeExec: [a@0 ASC]", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - - // Expected unbounded result (same for with and without flag) - let expected_optimized_unbounded = [ - "SortPreservingMergeExec: [a@0 ASC]", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]", - ]; - - // Expected bounded results same with and without flag, because ordering get lost during intermediate executor anyway. Hence no need to preserve - // existing ordering. - let expected_optimized_bounded = [ - "SortPreservingMergeExec: [a@0 ASC]", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST", - ]; - let expected_optimized_bounded_sort_preserve = expected_optimized_bounded; - - assert_optimized_in_all_boundedness_situations!( - expected_input_unbounded, - expected_input_bounded, - expected_optimized_unbounded, - expected_optimized_bounded, - expected_optimized_bounded_sort_preserve, - physical_plan, - source_unbounded, - prefer_existing_sort - ); + let run = ReplaceTest::new(physical_plan) + .with_boundedness(boundedness) + .with_sort_preference(sort_pref); + + let physical_plan = run.run().await; + + allow_duplicates! { + match (boundedness, sort_pref) { + (Boundedness::Unbounded, _) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [a@0 ASC] + SortExec: expr=[a@0 ASC], preserve_partitioning=[true] + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)] + CoalesceBatchesExec: target_batch_size=4096 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + CoalesceBatchesExec: target_batch_size=4096 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + "); + }, + (Boundedness::Bounded, _) => { + assert_snapshot!(physical_plan, @r" + Input / Optimized: + SortPreservingMergeExec: [a@0 ASC] + SortExec: expr=[a@0 ASC], preserve_partitioning=[true] + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)] + CoalesceBatchesExec: target_batch_size=4096 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + CoalesceBatchesExec: target_batch_size=4096 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + "); + // Expected bounded results same with and without flag, because ordering get lost during intermediate executor anyway. + // Hence, no need to preserve existing ordering. + } + } + } + Ok(()) }