diff --git a/datafusion-examples/examples/udf/async_udf.rs b/datafusion-examples/examples/udf/async_udf.rs index c31e8290ccce5..3d8faf623d439 100644 --- a/datafusion-examples/examples/udf/async_udf.rs +++ b/datafusion-examples/examples/udf/async_udf.rs @@ -102,8 +102,7 @@ pub async fn async_udf() -> Result<()> { "| physical_plan | FilterExec: __async_fn_0@2, projection=[id@0, name@1] |", "| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |", "| | AsyncFuncExec: async_expr=[async_expr(name=__async_fn_0, expr=ask_llm(CAST(name@1 AS Utf8View), Is this animal furry?))] |", - "| | CoalesceBatchesExec: target_batch_size=8192 |", - "| | DataSourceExec: partitions=1, partition_sizes=[1] |", + "| | DataSourceExec: partitions=1, partition_sizes=[1] |", "| | |", "+---------------+------------------------------------------------------------------------------------------------------------------------------+", ], diff --git a/datafusion/core/tests/execution/coop.rs b/datafusion/core/tests/execution/coop.rs index 27dacf598c2c0..380a47505ac2d 100644 --- a/datafusion/core/tests/execution/coop.rs +++ b/datafusion/core/tests/execution/coop.rs @@ -41,7 +41,6 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_optimizer::ensure_coop::EnsureCooperative; -use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion_physical_plan::coop::make_cooperative; use datafusion_physical_plan::filter::FilterExec; use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec}; @@ -425,10 +424,7 @@ async fn filter_reject_all_batches_yields( )); let filtered = Arc::new(FilterExec::try_new(false_predicate, Arc::new(infinite))?); - // Use CoalesceBatchesExec to guarantee each Filter pull always yields an 8192-row batch - let coalesced = Arc::new(CoalesceBatchesExec::new(filtered, 8_192)); - - query_yields(coalesced, session_ctx.task_ctx()).await + query_yields(filtered, session_ctx.task_ctx()).await } #[rstest] @@ -584,17 +580,18 @@ async fn join_yields( let left_keys: Vec> = vec![Arc::new(Column::new("value", 0))]; let right_keys: Vec> = vec![Arc::new(Column::new("value", 0))]; - // Wrap each side in CoalesceBatches + Repartition so they are both hashed into 1 partition - let coalesced_left = - Arc::new(CoalesceBatchesExec::new(Arc::new(infinite_left), 8_192)); - let coalesced_right = - Arc::new(CoalesceBatchesExec::new(Arc::new(infinite_right), 8_192)); - let part_left = Partitioning::Hash(left_keys, 1); let part_right = Partitioning::Hash(right_keys, 1); - let hashed_left = Arc::new(RepartitionExec::try_new(coalesced_left, part_left)?); - let hashed_right = Arc::new(RepartitionExec::try_new(coalesced_right, part_right)?); + // Wrap each side in Repartition so they are both hashed into 1 partition + let hashed_left = Arc::new(RepartitionExec::try_new( + Arc::new(infinite_left), + part_left, + )?); + let hashed_right = Arc::new(RepartitionExec::try_new( + Arc::new(infinite_right), + part_right, + )?); // Build an Inner HashJoinExec → left.value = right.value let join = Arc::new(HashJoinExec::try_new( @@ -632,17 +629,18 @@ async fn join_agg_yields( let left_keys: Vec> = vec![Arc::new(Column::new("value", 0))]; let right_keys: Vec> = vec![Arc::new(Column::new("value", 0))]; - // Wrap each side in CoalesceBatches + Repartition so they are both hashed into 1 partition - let coalesced_left = - Arc::new(CoalesceBatchesExec::new(Arc::new(infinite_left), 8_192)); - let coalesced_right = - Arc::new(CoalesceBatchesExec::new(Arc::new(infinite_right), 8_192)); - let part_left = Partitioning::Hash(left_keys, 1); let part_right = Partitioning::Hash(right_keys, 1); - let hashed_left = Arc::new(RepartitionExec::try_new(coalesced_left, part_left)?); - let hashed_right = Arc::new(RepartitionExec::try_new(coalesced_right, part_right)?); + // Wrap each side in Repartition so they are both hashed into 1 partition + let hashed_left = Arc::new(RepartitionExec::try_new( + Arc::new(infinite_left), + part_left, + )?); + let hashed_right = Arc::new(RepartitionExec::try_new( + Arc::new(infinite_right), + part_right, + )?); // Build an Inner HashJoinExec → left.value = right.value let join = Arc::new(HashJoinExec::try_new( diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 7cedaf86cb52f..94ae82a9ad755 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -56,7 +56,7 @@ use datafusion_physical_optimizer::output_requirements::OutputRequirements; use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; -use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; + use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::execution_plan::ExecutionPlan; use datafusion_physical_plan::expressions::col; @@ -1741,9 +1741,6 @@ fn merge_does_not_need_sort() -> Result<()> { // Scan some sorted parquet files let exec = parquet_exec_multiple_sorted(vec![sort_key.clone()]); - // CoalesceBatchesExec to mimic behavior after a filter - let exec = Arc::new(CoalesceBatchesExec::new(exec, 4096)); - // Merge from multiple parquet files and keep the data sorted let exec: Arc = Arc::new(SortPreservingMergeExec::new(sort_key, exec)); @@ -1757,8 +1754,7 @@ fn merge_does_not_need_sort() -> Result<()> { assert_plan!(plan_distrib, @r" SortPreservingMergeExec: [a@0 ASC] - CoalesceBatchesExec: target_batch_size=4096 - DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet "); // Test: result IS DIFFERENT, if EnforceSorting is run first: @@ -1772,8 +1768,7 @@ fn merge_does_not_need_sort() -> Result<()> { @r" SortExec: expr=[a@0 ASC], preserve_partitioning=[false] CoalescePartitionsExec - CoalesceBatchesExec: target_batch_size=4096 - DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet "); Ok(()) diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index 47e3adb455117..4b74aebdf5deb 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -20,14 +20,13 @@ use std::sync::Arc; use crate::memory_limit::DummyStreamPartition; use crate::physical_optimizer::test_utils::{ RequirementsTestExec, aggregate_exec, bounded_window_exec, - bounded_window_exec_with_partition, check_integrity, coalesce_batches_exec, - coalesce_partitions_exec, create_test_schema, create_test_schema2, - create_test_schema3, filter_exec, global_limit_exec, hash_join_exec, - local_limit_exec, memory_exec, parquet_exec, parquet_exec_with_sort, projection_exec, - repartition_exec, sort_exec, sort_exec_with_fetch, sort_expr, sort_expr_options, - sort_merge_join_exec, sort_preserving_merge_exec, - sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered, - union_exec, + bounded_window_exec_with_partition, check_integrity, coalesce_partitions_exec, + create_test_schema, create_test_schema2, create_test_schema3, filter_exec, + global_limit_exec, hash_join_exec, local_limit_exec, memory_exec, parquet_exec, + parquet_exec_with_sort, projection_exec, repartition_exec, sort_exec, + sort_exec_with_fetch, sort_expr, sort_expr_options, sort_merge_join_exec, + sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch, + spr_repartition_exec, stream_exec_ordered, union_exec, }; use arrow::compute::SortOptions; @@ -1845,9 +1844,7 @@ async fn test_remove_unnecessary_sort_window_multilayer() -> Result<()> { )] .into(); let sort = sort_exec(ordering.clone(), source); - // Add dummy layer propagating Sort above, to test whether sort can be removed from multi layer before - let coalesce_batches = coalesce_batches_exec(sort, 128); - let window_agg = bounded_window_exec("non_nullable_col", ordering, coalesce_batches); + let window_agg = bounded_window_exec("non_nullable_col", ordering, sort); let ordering2: LexOrdering = [sort_expr_options( "non_nullable_col", &window_agg.schema(), @@ -1873,17 +1870,15 @@ async fn test_remove_unnecessary_sort_window_multilayer() -> Result<()> { FilterExec: NOT non_nullable_col@1 SortExec: expr=[non_nullable_col@1 ASC NULLS LAST], preserve_partitioning=[false] BoundedWindowAggExec: wdw=[count: Field { "count": Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] - CoalesceBatchesExec: target_batch_size=128 - SortExec: expr=[non_nullable_col@1 DESC], preserve_partitioning=[false] - DataSourceExec: partitions=1, partition_sizes=[0] + SortExec: expr=[non_nullable_col@1 DESC], preserve_partitioning=[false] + DataSourceExec: partitions=1, partition_sizes=[0] Optimized Plan: WindowAggExec: wdw=[count: Ok(Field { name: "count", data_type: Int64 }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(UInt64(NULL)), is_causal: false }] FilterExec: NOT non_nullable_col@1 BoundedWindowAggExec: wdw=[count: Field { "count": Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] - CoalesceBatchesExec: target_batch_size=128 - SortExec: expr=[non_nullable_col@1 DESC], preserve_partitioning=[false] - DataSourceExec: partitions=1, partition_sizes=[0] + SortExec: expr=[non_nullable_col@1 DESC], preserve_partitioning=[false] + DataSourceExec: partitions=1, partition_sizes=[0] "#); Ok(()) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index d6357fdf6bc7d..d12739658c400 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -56,7 +56,6 @@ use datafusion_physical_optimizer::{ use datafusion_physical_plan::{ ExecutionPlan, aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, - coalesce_batches::CoalesceBatchesExec, coalesce_partitions::CoalescePartitionsExec, collect, filter::FilterExec, @@ -527,9 +526,8 @@ fn test_filter_with_projection() { fn test_push_down_through_transparent_nodes() { // expect the predicate to be pushed down into the DataSource let scan = TestScanBuilder::new(schema()).with_support(true).build(); - let coalesce = Arc::new(CoalesceBatchesExec::new(scan, 1)); let predicate = col_lit_predicate("a", "foo", &schema()); - let filter = Arc::new(FilterExec::try_new(predicate, coalesce).unwrap()); + let filter = Arc::new(FilterExec::try_new(predicate, scan).unwrap()); let repartition = Arc::new( RepartitionExec::try_new(filter, Partitioning::RoundRobinBatch(1)).unwrap(), ); @@ -545,13 +543,11 @@ fn test_push_down_through_transparent_nodes() { - FilterExec: b@1 = bar - RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=1 - FilterExec: a@0 = foo - - CoalesceBatchesExec: target_batch_size=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true output: Ok: - RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=1 - - CoalesceBatchesExec: target_batch_size=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar " ); } @@ -564,10 +560,11 @@ fn test_pushdown_through_aggregates_on_grouping_columns() { // 2. An outer filter (b@1 = bar) above the aggregate - also gets pushed through because 'b' is a grouping column let scan = TestScanBuilder::new(schema()).with_support(true).build(); - let coalesce = Arc::new(CoalesceBatchesExec::new(scan, 10)); - let filter = Arc::new( - FilterExec::try_new(col_lit_predicate("a", "foo", &schema()), coalesce).unwrap(), + FilterExec::try_new(col_lit_predicate("a", "foo", &schema()), scan) + .unwrap() + .with_batch_size(10) + .unwrap(), ); let aggregate_expr = vec![ @@ -594,10 +591,13 @@ fn test_pushdown_through_aggregates_on_grouping_columns() { .unwrap(), ); - let coalesce = Arc::new(CoalesceBatchesExec::new(aggregate, 100)); - let predicate = col_lit_predicate("b", "bar", &schema()); - let plan = Arc::new(FilterExec::try_new(predicate, coalesce).unwrap()); + let plan = Arc::new( + FilterExec::try_new(predicate, aggregate) + .unwrap() + .with_batch_size(100) + .unwrap(), + ); // Both filters should be pushed down to the DataSource since both reference grouping columns insta::assert_snapshot!( @@ -606,17 +606,13 @@ fn test_pushdown_through_aggregates_on_grouping_columns() { OptimizationTest: input: - FilterExec: b@1 = bar - - CoalesceBatchesExec: target_batch_size=100 - - AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt], ordering_mode=PartiallySorted([0]) - - FilterExec: a@0 = foo - - CoalesceBatchesExec: target_batch_size=10 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt], ordering_mode=PartiallySorted([0]) + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true output: Ok: - - CoalesceBatchesExec: target_batch_size=100 - - AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt], ordering_mode=Sorted - - CoalesceBatchesExec: target_batch_size=10 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar + - AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt], ordering_mode=Sorted + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar " ); } @@ -921,61 +917,6 @@ async fn test_topk_filter_passes_through_coalesce_partitions() { ); } -#[tokio::test] -async fn test_topk_filter_passes_through_coalesce_batches() { - let batches = vec![ - record_batch!( - ("a", Utf8, ["aa", "ab"]), - ("b", Utf8, ["bd", "bc"]), - ("c", Float64, [1.0, 2.0]) - ) - .unwrap(), - record_batch!( - ("a", Utf8, ["ac", "ad"]), - ("b", Utf8, ["bb", "ba"]), - ("c", Float64, [2.0, 1.0]) - ) - .unwrap(), - ]; - - let scan = TestScanBuilder::new(schema()) - .with_support(true) - .with_batches(batches) - .build(); - - let coalesce_batches = - Arc::new(CoalesceBatchesExec::new(scan, 1024)) as Arc; - - // Add SortExec with TopK - let plan = Arc::new( - SortExec::new( - LexOrdering::new(vec![PhysicalSortExpr::new( - col("b", &schema()).unwrap(), - SortOptions::new(true, false), - )]) - .unwrap(), - coalesce_batches, - ) - .with_fetch(Some(1)), - ) as Arc; - - insta::assert_snapshot!( - OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true), - @r" - OptimizationTest: - input: - - SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], preserve_partitioning=[false] - - CoalesceBatchesExec: target_batch_size=1024 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - output: - Ok: - - SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], preserve_partitioning=[false] - - CoalesceBatchesExec: target_batch_size=1024 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] - " - ); -} - #[tokio::test] async fn test_hashjoin_dynamic_filter_pushdown() { use datafusion_common::JoinType; @@ -1118,23 +1059,11 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { // | plan_type | plan | // +---------------+------------------------------------------------------------+ // | physical_plan | ┌───────────────────────────┐ | - // | | │ CoalesceBatchesExec │ | - // | | │ -------------------- │ | - // | | │ target_batch_size: │ | - // | | │ 8192 │ | - // | | └─────────────┬─────────────┘ | - // | | ┌─────────────┴─────────────┐ | // | | │ HashJoinExec │ | // | | │ -------------------- ├──────────────┐ | // | | │ on: (k = k) │ │ | // | | └─────────────┬─────────────┘ │ | // | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | - // | | │ CoalesceBatchesExec ││ CoalesceBatchesExec │ | - // | | │ -------------------- ││ -------------------- │ | - // | | │ target_batch_size: ││ target_batch_size: │ | - // | | │ 8192 ││ 8192 │ | - // | | └─────────────┬─────────────┘└─────────────┬─────────────┘ | - // | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | // | | │ RepartitionExec ││ RepartitionExec │ | // | | │ -------------------- ││ -------------------- │ | // | | │ partition_count(in->out): ││ partition_count(in->out): │ | @@ -1194,7 +1123,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { // Create RepartitionExec nodes for both sides with hash partitioning on join keys let partition_count = 12; - // Build side: DataSource -> RepartitionExec (Hash) -> CoalesceBatchesExec + // Build side: DataSource -> RepartitionExec (Hash) let build_hash_exprs = vec![ col("a", &build_side_schema).unwrap(), col("b", &build_side_schema).unwrap(), @@ -1206,9 +1135,8 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { ) .unwrap(), ); - let build_coalesce = Arc::new(CoalesceBatchesExec::new(build_repartition, 8192)); - // Probe side: DataSource -> RepartitionExec (Hash) -> CoalesceBatchesExec + // Probe side: DataSource -> RepartitionExec (Hash) let probe_hash_exprs = vec![ col("a", &probe_side_schema).unwrap(), col("b", &probe_side_schema).unwrap(), @@ -1220,7 +1148,6 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { ) .unwrap(), ); - let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition, 8192)); // Create HashJoinExec with partitioned inputs let on = vec![ @@ -1235,8 +1162,8 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { ]; let hash_join = Arc::new( HashJoinExec::try_new( - build_coalesce, - probe_coalesce, + build_repartition, + probe_repartition, on, None, &JoinType::Inner, @@ -1247,11 +1174,8 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { .unwrap(), ); - // Top-level CoalesceBatchesExec - let cb = - Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc; // Top-level CoalescePartitionsExec - let cp = Arc::new(CoalescePartitionsExec::new(cb)) as Arc; + let cp = Arc::new(CoalescePartitionsExec::new(hash_join)) as Arc; // Add a sort for deterministic output let plan = Arc::new(SortExec::new( LexOrdering::new(vec![PhysicalSortExpr::new( @@ -1270,26 +1194,20 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { input: - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] - CoalescePartitionsExec - - CoalesceBatchesExec: target_batch_size=8192 - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - - CoalesceBatchesExec: target_batch_size=8192 - - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - CoalesceBatchesExec: target_batch_size=8192 - - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true output: Ok: - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] - CoalescePartitionsExec - - CoalesceBatchesExec: target_batch_size=8192 - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - - CoalesceBatchesExec: target_batch_size=8192 - - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - CoalesceBatchesExec: target_batch_size=8192 - - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] " ); @@ -1319,14 +1237,11 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { @r" - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] - CoalescePartitionsExec - - CoalesceBatchesExec: target_batch_size=8192 - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - - CoalesceBatchesExec: target_batch_size=8192 - - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - CoalesceBatchesExec: target_batch_size=8192 - - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ELSE false END ] + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ELSE false END ] " ); @@ -1340,14 +1255,11 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { @r" - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] - CoalescePartitionsExec - - CoalesceBatchesExec: target_batch_size=8192 - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - - CoalesceBatchesExec: target_batch_size=8192 - - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - CoalesceBatchesExec: target_batch_size=8192 - - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] " ); @@ -1418,7 +1330,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() { // Create RepartitionExec nodes for both sides with hash partitioning on join keys let partition_count = 12; - // Probe side: DataSource -> RepartitionExec(Hash) -> CoalesceBatchesExec + // Probe side: DataSource -> RepartitionExec(Hash) let probe_hash_exprs = vec![ col("a", &probe_side_schema).unwrap(), col("b", &probe_side_schema).unwrap(), @@ -1430,7 +1342,6 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() { ) .unwrap(), ); - let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition, 8192)); let on = vec![ ( @@ -1445,7 +1356,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() { let hash_join = Arc::new( HashJoinExec::try_new( build_scan, - probe_coalesce, + probe_repartition, on, None, &JoinType::Inner, @@ -1456,11 +1367,8 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() { .unwrap(), ); - // Top-level CoalesceBatchesExec - let cb = - Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc; // Top-level CoalescePartitionsExec - let cp = Arc::new(CoalescePartitionsExec::new(cb)) as Arc; + let cp = Arc::new(CoalescePartitionsExec::new(hash_join)) as Arc; // Add a sort for deterministic output let plan = Arc::new(SortExec::new( LexOrdering::new(vec![PhysicalSortExpr::new( @@ -1479,22 +1387,18 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() { input: - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] - CoalescePartitionsExec - - CoalesceBatchesExec: target_batch_size=8192 - - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - CoalesceBatchesExec: target_batch_size=8192 - - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true output: Ok: - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] - CoalescePartitionsExec - - CoalesceBatchesExec: target_batch_size=8192 - - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - CoalesceBatchesExec: target_batch_size=8192 - - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] " ); @@ -1523,12 +1427,10 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() { @r" - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] - CoalescePartitionsExec - - CoalesceBatchesExec: target_batch_size=8192 - - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - CoalesceBatchesExec: target_batch_size=8192 - - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] " ); @@ -2835,7 +2737,6 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() { ) .unwrap(), ); - let build_coalesce = Arc::new(CoalesceBatchesExec::new(build_repartition, 8192)); let probe_hash_exprs = vec![ col("a", &probe_side_schema).unwrap(), @@ -2848,7 +2749,6 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() { ) .unwrap(), ); - let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition, 8192)); // Create HashJoinExec let on = vec![ @@ -2861,10 +2761,10 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() { col("b", &probe_side_schema).unwrap(), ), ]; - let hash_join = Arc::new( + let plan = Arc::new( HashJoinExec::try_new( - build_coalesce, - probe_coalesce, + build_repartition, + probe_repartition, on, None, &JoinType::Inner, @@ -2875,9 +2775,6 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() { .unwrap(), ); - let plan = - Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc; - // Apply the filter pushdown optimizer let mut config = SessionConfig::new(); config.options_mut().execution.parquet.pushdown_filters = true; @@ -2887,14 +2784,11 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() { insta::assert_snapshot!( format_plan_for_test(&plan), @r" - - CoalesceBatchesExec: target_batch_size=8192 - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - - CoalesceBatchesExec: target_batch_size=8192 - - RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true - - CoalesceBatchesExec: target_batch_size=8192 - - RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true + - RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] " ); @@ -2915,14 +2809,11 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() { insta::assert_snapshot!( format_plan_for_test(&plan), @r" - - CoalesceBatchesExec: target_batch_size=8192 - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - - CoalesceBatchesExec: target_batch_size=8192 - - RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true - - CoalesceBatchesExec: target_batch_size=8192 - - RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ false ] + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true + - RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ false ] " ); } @@ -2998,7 +2889,7 @@ async fn test_hashjoin_dynamic_filter_with_nulls() { col("b", &probe_side_schema).unwrap(), ), ]; - let hash_join = Arc::new( + let plan = Arc::new( HashJoinExec::try_new( build_scan, Arc::clone(&probe_scan), @@ -3012,9 +2903,6 @@ async fn test_hashjoin_dynamic_filter_with_nulls() { .unwrap(), ); - let plan = - Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc; - // Apply the filter pushdown optimizer let mut config = SessionConfig::new(); config.options_mut().execution.parquet.pushdown_filters = true; @@ -3024,10 +2912,9 @@ async fn test_hashjoin_dynamic_filter_with_nulls() { insta::assert_snapshot!( format_plan_for_test(&plan), @r" - - CoalesceBatchesExec: target_batch_size=8192 - - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] " ); @@ -3048,10 +2935,9 @@ async fn test_hashjoin_dynamic_filter_with_nulls() { insta::assert_snapshot!( format_plan_for_test(&plan), @r" - - CoalesceBatchesExec: target_batch_size=8192 - - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= 1 AND b@1 <= 2 AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:1}, {c0:,c1:2}, {c0:ab,c1:}]) ] + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= 1 AND b@1 <= 2 AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:1}, {c0:,c1:2}, {c0:ab,c1:}]) ] " ); @@ -3116,7 +3002,7 @@ async fn test_hashjoin_hash_table_pushdown_partitioned() { // Create RepartitionExec nodes for both sides with hash partitioning on join keys let partition_count = 12; - // Build side: DataSource -> RepartitionExec (Hash) -> CoalesceBatchesExec + // Build side: DataSource -> RepartitionExec (Hash) let build_hash_exprs = vec![ col("a", &build_side_schema).unwrap(), col("b", &build_side_schema).unwrap(), @@ -3128,9 +3014,8 @@ async fn test_hashjoin_hash_table_pushdown_partitioned() { ) .unwrap(), ); - let build_coalesce = Arc::new(CoalesceBatchesExec::new(build_repartition, 8192)); - // Probe side: DataSource -> RepartitionExec (Hash) -> CoalesceBatchesExec + // Probe side: DataSource -> RepartitionExec (Hash) let probe_hash_exprs = vec![ col("a", &probe_side_schema).unwrap(), col("b", &probe_side_schema).unwrap(), @@ -3142,7 +3027,6 @@ async fn test_hashjoin_hash_table_pushdown_partitioned() { ) .unwrap(), ); - let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition, 8192)); // Create HashJoinExec with partitioned inputs let on = vec![ @@ -3157,8 +3041,8 @@ async fn test_hashjoin_hash_table_pushdown_partitioned() { ]; let hash_join = Arc::new( HashJoinExec::try_new( - build_coalesce, - probe_coalesce, + build_repartition, + probe_repartition, on, None, &JoinType::Inner, @@ -3169,11 +3053,8 @@ async fn test_hashjoin_hash_table_pushdown_partitioned() { .unwrap(), ); - // Top-level CoalesceBatchesExec - let cb = - Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc; // Top-level CoalescePartitionsExec - let cp = Arc::new(CoalescePartitionsExec::new(cb)) as Arc; + let cp = Arc::new(CoalescePartitionsExec::new(hash_join)) as Arc; // Add a sort for deterministic output let plan = Arc::new(SortExec::new( LexOrdering::new(vec![PhysicalSortExpr::new( @@ -3285,7 +3166,7 @@ async fn test_hashjoin_hash_table_pushdown_collect_left() { // Create RepartitionExec nodes for both sides with hash partitioning on join keys let partition_count = 12; - // Probe side: DataSource -> RepartitionExec(Hash) -> CoalesceBatchesExec + // Probe side: DataSource -> RepartitionExec(Hash) let probe_hash_exprs = vec![ col("a", &probe_side_schema).unwrap(), col("b", &probe_side_schema).unwrap(), @@ -3297,7 +3178,6 @@ async fn test_hashjoin_hash_table_pushdown_collect_left() { ) .unwrap(), ); - let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition, 8192)); let on = vec![ ( @@ -3312,7 +3192,7 @@ async fn test_hashjoin_hash_table_pushdown_collect_left() { let hash_join = Arc::new( HashJoinExec::try_new( build_scan, - probe_coalesce, + probe_repartition, on, None, &JoinType::Inner, @@ -3323,11 +3203,8 @@ async fn test_hashjoin_hash_table_pushdown_collect_left() { .unwrap(), ); - // Top-level CoalesceBatchesExec - let cb = - Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc; // Top-level CoalescePartitionsExec - let cp = Arc::new(CoalescePartitionsExec::new(cb)) as Arc; + let cp = Arc::new(CoalescePartitionsExec::new(hash_join)) as Arc; // Add a sort for deterministic output let plan = Arc::new(SortExec::new( LexOrdering::new(vec![PhysicalSortExpr::new( @@ -3446,7 +3323,7 @@ async fn test_hashjoin_hash_table_pushdown_integer_keys() { col("id2", &probe_side_schema).unwrap(), ), ]; - let hash_join = Arc::new( + let plan = Arc::new( HashJoinExec::try_new( build_scan, Arc::clone(&probe_scan), @@ -3460,9 +3337,6 @@ async fn test_hashjoin_hash_table_pushdown_integer_keys() { .unwrap(), ); - let plan = - Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc; - // Apply optimization with forced HashTable strategy let session_config = SessionConfig::default() .with_batch_size(10) diff --git a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs index b32a9bbd25432..e8d06d69df414 100644 --- a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs @@ -18,8 +18,8 @@ use std::sync::Arc; use crate::physical_optimizer::test_utils::{ - coalesce_batches_exec, coalesce_partitions_exec, global_limit_exec, local_limit_exec, - sort_exec, sort_preserving_merge_exec, stream_exec, + coalesce_partitions_exec, global_limit_exec, local_limit_exec, sort_exec, + sort_preserving_merge_exec, stream_exec, }; use arrow::compute::SortOptions; @@ -138,45 +138,6 @@ fn transforms_streaming_table_exec_into_fetching_version_and_keeps_the_global_li Ok(()) } -#[test] -fn transforms_coalesce_batches_exec_into_fetching_version_and_removes_local_limit() --> Result<()> { - let schema = create_schema(); - let streaming_table = stream_exec(&schema); - let repartition = repartition_exec(streaming_table)?; - let filter = filter_exec(schema, repartition)?; - let coalesce_batches = coalesce_batches_exec(filter, 8192); - let local_limit = local_limit_exec(coalesce_batches, 5); - let coalesce_partitions = coalesce_partitions_exec(local_limit); - let global_limit = global_limit_exec(coalesce_partitions, 0, Some(5)); - - let initial = get_plan_string(&global_limit); - let expected_initial = [ - "GlobalLimitExec: skip=0, fetch=5", - " CoalescePartitionsExec", - " LocalLimitExec: fetch=5", - " CoalesceBatchesExec: target_batch_size=8192", - " FilterExec: c3@2 > 0", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true", - ]; - assert_eq!(initial, expected_initial); - - let after_optimize = - LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; - - let expected = [ - "CoalescePartitionsExec: fetch=5", - " CoalesceBatchesExec: target_batch_size=8192, fetch=5", - " FilterExec: c3@2 > 0", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true", - ]; - assert_eq!(get_plan_string(&after_optimize), expected); - - Ok(()) -} - #[test] fn pushes_global_limit_exec_through_projection_exec() -> Result<()> { let schema = create_schema(); @@ -207,44 +168,11 @@ fn pushes_global_limit_exec_through_projection_exec() -> Result<()> { Ok(()) } -#[test] -fn pushes_global_limit_exec_through_projection_exec_and_transforms_coalesce_batches_exec_into_fetching_version() --> Result<()> { - let schema = create_schema(); - let streaming_table = stream_exec(&schema); - let coalesce_batches = coalesce_batches_exec(streaming_table, 8192); - let projection = projection_exec(schema, coalesce_batches)?; - let global_limit = global_limit_exec(projection, 0, Some(5)); - - let initial = get_plan_string(&global_limit); - let expected_initial = [ - "GlobalLimitExec: skip=0, fetch=5", - " ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", - " CoalesceBatchesExec: target_batch_size=8192", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true", - ]; - - assert_eq!(initial, expected_initial); - - let after_optimize = - LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; - - let expected = [ - "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", - " CoalesceBatchesExec: target_batch_size=8192, fetch=5", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true", - ]; - assert_eq!(get_plan_string(&after_optimize), expected); - - Ok(()) -} - #[test] fn pushes_global_limit_into_multiple_fetch_plans() -> Result<()> { let schema = create_schema(); let streaming_table = stream_exec(&schema); - let coalesce_batches = coalesce_batches_exec(streaming_table, 8192); - let projection = projection_exec(Arc::clone(&schema), coalesce_batches)?; + let projection = projection_exec(Arc::clone(&schema), streaming_table)?; let repartition = repartition_exec(projection)?; let ordering: LexOrdering = [PhysicalSortExpr { expr: col("c1", &schema)?, @@ -262,8 +190,7 @@ fn pushes_global_limit_into_multiple_fetch_plans() -> Result<()> { " SortExec: expr=[c1@0 ASC], preserve_partitioning=[false]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", - " CoalesceBatchesExec: target_batch_size=8192", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true", + " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true", ]; assert_eq!(initial, expected_initial); @@ -276,8 +203,7 @@ fn pushes_global_limit_into_multiple_fetch_plans() -> Result<()> { " SortExec: TopK(fetch=5), expr=[c1@0 ASC], preserve_partitioning=[false]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", " ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]", - " CoalesceBatchesExec: target_batch_size=8192", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true", + " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true", ]; assert_eq!(get_plan_string(&after_optimize), expected); diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index ba53d079e3059..b33305c23ede2 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -41,7 +41,6 @@ mod test { use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; - use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::common::compute_record_batch_statistics; use datafusion_physical_plan::empty::EmptyExec; @@ -713,43 +712,6 @@ mod test { Ok(()) } - #[tokio::test] - async fn test_statistic_by_partition_of_coalesce_batches() -> Result<()> { - let scan = create_scan_exec_with_statistics(None, Some(2)).await; - let coalesce_batches: Arc = - Arc::new(CoalesceBatchesExec::new(scan, 2)); - // Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02] - let expected_statistic_partition_1 = create_partition_statistics( - 2, - 16, - 3, - 4, - Some((DATE_2025_03_01, DATE_2025_03_02)), - ); - // Partition 2: ids [1,2], dates [2025-03-03, 2025-03-04] - let expected_statistic_partition_2 = create_partition_statistics( - 2, - 16, - 1, - 2, - Some((DATE_2025_03_03, DATE_2025_03_04)), - ); - let statistics = (0..coalesce_batches.output_partitioning().partition_count()) - .map(|idx| coalesce_batches.partition_statistics(Some(idx))) - .collect::>>()?; - assert_eq!(statistics.len(), 2); - assert_eq!(statistics[0], expected_statistic_partition_1); - assert_eq!(statistics[1], expected_statistic_partition_2); - - // Check the statistics_by_partition with real results - let expected_stats = vec![ - ExpectedStatistics::NonEmpty(3, 4, 2), - ExpectedStatistics::NonEmpty(1, 2, 2), - ]; - validate_statistics_with_data(coalesce_batches, expected_stats, 0).await?; - Ok(()) - } - #[tokio::test] async fn test_statistic_by_partition_of_coalesce_partitions() -> Result<()> { let scan = create_scan_exec_with_statistics(None, Some(2)).await; diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs index 480f5c8cc97b1..ff87ad7212967 100644 --- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs @@ -45,7 +45,6 @@ use datafusion_physical_expr_common::sort_expr::{ use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_optimizer::output_requirements::OutputRequirementExec; use datafusion_physical_optimizer::projection_pushdown::ProjectionPushdown; -use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::coop::CooperativeExec; use datafusion_physical_plan::filter::FilterExec; @@ -1680,61 +1679,6 @@ fn test_partition_col_projection_pushdown_expr() -> Result<()> { Ok(()) } -#[test] -fn test_coalesce_batches_after_projection() -> Result<()> { - let csv = create_simple_csv_exec(); - let filter = Arc::new(FilterExec::try_new( - Arc::new(BinaryExpr::new( - Arc::new(Column::new("c", 2)), - Operator::Gt, - Arc::new(Literal::new(ScalarValue::Int32(Some(0)))), - )), - csv, - )?); - let coalesce_batches: Arc = - Arc::new(CoalesceBatchesExec::new(filter, 8192)); - let projection: Arc = Arc::new(ProjectionExec::try_new( - vec![ - ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"), - ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"), - ], - coalesce_batches, - )?); - - let initial = displayable(projection.as_ref()).indent(true).to_string(); - let actual = initial.trim(); - - assert_snapshot!( - actual, - @r" - ProjectionExec: expr=[a@0 as a, b@1 as b] - CoalesceBatchesExec: target_batch_size=8192 - FilterExec: c@2 > 0 - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false - " - ); - - let after_optimize = - ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - - let after_optimize_string = displayable(after_optimize.as_ref()) - .indent(true) - .to_string(); - let actual = after_optimize_string.trim(); - - // Projection should be pushed down through CoalesceBatchesExec - assert_snapshot!( - actual, - @r" - CoalesceBatchesExec: target_batch_size=8192 - FilterExec: c@2 > 0, projection=[a@0, b@1] - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false - " - ); - - Ok(()) -} - #[test] fn test_cooperative_exec_after_projection() -> Result<()> { let csv = create_simple_csv_exec(); diff --git a/datafusion/core/tests/physical_optimizer/pushdown_sort.rs b/datafusion/core/tests/physical_optimizer/pushdown_sort.rs index caef0fba052cb..d6fd4d8d00ae4 100644 --- a/datafusion/core/tests/physical_optimizer/pushdown_sort.rs +++ b/datafusion/core/tests/physical_optimizer/pushdown_sort.rs @@ -32,10 +32,10 @@ use datafusion_physical_optimizer::pushdown_sort::PushdownSort; use std::sync::Arc; use crate::physical_optimizer::test_utils::{ - OptimizationTest, coalesce_batches_exec, coalesce_partitions_exec, parquet_exec, - parquet_exec_with_sort, projection_exec, projection_exec_with_alias, - repartition_exec, schema, simple_projection_exec, sort_exec, sort_exec_with_fetch, - sort_expr, sort_expr_named, test_scan_with_ordering, + OptimizationTest, coalesce_partitions_exec, parquet_exec, parquet_exec_with_sort, + projection_exec, projection_exec_with_alias, repartition_exec, schema, + simple_projection_exec, sort_exec, sort_exec_with_fetch, sort_expr, sort_expr_named, + test_scan_with_ordering, }; #[test] @@ -231,8 +231,7 @@ fn test_prefix_match_through_transparent_nodes() { let source_ordering = LexOrdering::new(vec![a.clone().reverse(), b, c.reverse()]).unwrap(); let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); - let coalesce = coalesce_batches_exec(source, 1024); - let repartition = repartition_exec(coalesce); + let repartition = repartition_exec(source); // Request only [a ASC NULLS FIRST] - prefix of reversed ordering let prefix_ordering = LexOrdering::new(vec![a.clone().asc().nulls_first()]).unwrap(); @@ -245,14 +244,12 @@ fn test_prefix_match_through_transparent_nodes() { input: - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true - - CoalesceBatchesExec: target_batch_size=1024 - - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC, c@2 DESC NULLS LAST], file_type=parquet + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC, c@2 DESC NULLS LAST], file_type=parquet output: Ok: - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 - - CoalesceBatchesExec: target_batch_size=1024 - - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true " ); } @@ -322,35 +319,6 @@ fn test_no_prefix_match_longer_than_source() { // ORIGINAL TESTS // ============================================================================ -#[test] -fn test_sort_through_coalesce_batches() { - // Sort pushes through CoalesceBatchesExec - let schema = schema(); - let a = sort_expr("a", &schema); - let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap(); - let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); - let coalesce = coalesce_batches_exec(source, 1024); - - let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap(); - let plan = sort_exec(desc_ordering, coalesce); - - insta::assert_snapshot!( - OptimizationTest::new(plan, PushdownSort::new(), true), - @r" - OptimizationTest: - input: - - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] - - CoalesceBatchesExec: target_batch_size=1024 - - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet - output: - Ok: - - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] - - CoalesceBatchesExec: target_batch_size=1024 - - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true - " - ); -} - #[test] fn test_sort_through_repartition() { // Sort should push through RepartitionExec @@ -416,20 +384,17 @@ fn test_nested_sorts() { fn test_non_sort_plans_unchanged() { // Plans without SortExec should pass through unchanged let schema = schema(); - let source = parquet_exec(schema.clone()); - let plan = coalesce_batches_exec(source, 1024); + let plan = parquet_exec(schema.clone()); insta::assert_snapshot!( OptimizationTest::new(plan, PushdownSort::new(), true), @r" OptimizationTest: input: - - CoalesceBatchesExec: target_batch_size=1024 - - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet output: Ok: - - CoalesceBatchesExec: target_batch_size=1024 - - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet " ); } @@ -482,8 +447,7 @@ fn test_complex_plan_with_multiple_operators() { let a = sort_expr("a", &schema); let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap(); let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); - let coalesce_batches = coalesce_batches_exec(source, 1024); - let repartition = repartition_exec(coalesce_batches); + let repartition = repartition_exec(source); let coalesce_parts = coalesce_partitions_exec(repartition); let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap(); @@ -497,15 +461,13 @@ fn test_complex_plan_with_multiple_operators() { - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] - CoalescePartitionsExec - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true - - CoalesceBatchesExec: target_batch_size=1024 - - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet output: Ok: - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] - CoalescePartitionsExec - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 - - CoalesceBatchesExec: target_batch_size=1024 - - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true " ); } @@ -870,7 +832,7 @@ fn test_sort_pushdown_projection_with_limit() { } #[test] -fn test_sort_pushdown_through_projection_and_coalesce() { +fn test_sort_pushdown_through_projection() { // Sort pushes through both projection and coalesce batches let schema = schema(); @@ -879,10 +841,8 @@ fn test_sort_pushdown_through_projection_and_coalesce() { let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap(); let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); - let coalesce = coalesce_batches_exec(source, 1024); - // Projection: SELECT a, b - let projection = simple_projection_exec(coalesce, vec![0, 1]); + let projection = simple_projection_exec(source, vec![0, 1]); // Request [a DESC] let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap(); @@ -895,14 +855,12 @@ fn test_sort_pushdown_through_projection_and_coalesce() { input: - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] - ProjectionExec: expr=[a@0 as a, b@1 as b] - - CoalesceBatchesExec: target_batch_size=1024 - - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet output: Ok: - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] - ProjectionExec: expr=[a@0 as a, b@1 as b] - - CoalesceBatchesExec: target_batch_size=1024 - - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true " ); } diff --git a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs index d93081f5ceb80..84534b4fd833d 100644 --- a/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/tests/physical_optimizer/replace_with_order_preserving_variants.rs @@ -18,10 +18,10 @@ use std::sync::Arc; use crate::physical_optimizer::test_utils::{ - check_integrity, coalesce_batches_exec, coalesce_partitions_exec, - create_test_schema3, parquet_exec_with_sort, sort_exec, - sort_exec_with_preserve_partitioning, sort_preserving_merge_exec, - sort_preserving_merge_exec_with_fetch, stream_exec_ordered_with_projection, + check_integrity, coalesce_partitions_exec, create_test_schema3, + parquet_exec_with_sort, sort_exec, sort_exec_with_preserve_partitioning, + sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch, + stream_exec_ordered_with_projection, }; use datafusion::prelude::SessionContext; @@ -41,7 +41,6 @@ use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_optimizer::enforce_sorting::replace_with_order_preserving_variants::{ plan_with_order_breaking_variants, plan_with_order_preserving_variants, replace_with_order_preserving_variants, OrderPreservationContext }; -use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion_physical_plan::filter::FilterExec; use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; use datafusion::datasource::memory::MemorySourceConfig; @@ -440,9 +439,7 @@ async fn test_replace_multiple_input_repartition_with_extra_steps( let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let filter = filter_exec(repartition_hash); - let coalesce_batches_exec = coalesce_batches_exec(filter, 8192); - let sort = - sort_exec_with_preserve_partitioning(ordering.clone(), coalesce_batches_exec); + let sort = sort_exec_with_preserve_partitioning(ordering.clone(), filter); let physical_plan = sort_preserving_merge_exec(ordering, sort); let run = ReplaceTest::new(physical_plan) @@ -458,19 +455,17 @@ async fn test_replace_multiple_input_repartition_with_extra_steps( Input: SortPreservingMergeExec: [a@0 ASC NULLS LAST] SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] - CoalesceBatchesExec: target_batch_size=8192 - FilterExec: c@1 > 3 - RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true - StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] - - Optimized: - SortPreservingMergeExec: [a@0 ASC NULLS LAST] - CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 - RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { @@ -478,11 +473,10 @@ async fn test_replace_multiple_input_repartition_with_extra_steps( Input / Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] - CoalesceBatchesExec: target_batch_size=8192 - FilterExec: c@1 > 3 - RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true - DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, (Boundedness::Bounded, SortPreference::PreserveOrder) => { @@ -490,19 +484,17 @@ async fn test_replace_multiple_input_repartition_with_extra_steps( Input: SortPreservingMergeExec: [a@0 ASC NULLS LAST] SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] - CoalesceBatchesExec: target_batch_size=8192 - FilterExec: c@1 > 3 - RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true - DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST - - Optimized: - SortPreservingMergeExec: [a@0 ASC NULLS LAST] - CoalesceBatchesExec: target_batch_size=8192 FilterExec: c@1 > 3 - RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + + Optimized: + SortPreservingMergeExec: [a@0 ASC NULLS LAST] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); } } @@ -527,12 +519,9 @@ async fn test_replace_multiple_input_repartition_with_extra_steps_2( Boundedness::Bounded => memory_exec_sorted(&schema, ordering.clone()), }; let repartition_rr = repartition_exec_round_robin(source); - let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr, 8192); - let repartition_hash = repartition_exec_hash(coalesce_batches_exec_1); + let repartition_hash = repartition_exec_hash(repartition_rr); let filter = filter_exec(repartition_hash); - let coalesce_batches_exec_2 = coalesce_batches_exec(filter, 8192); - let sort = - sort_exec_with_preserve_partitioning(ordering.clone(), coalesce_batches_exec_2); + let sort = sort_exec_with_preserve_partitioning(ordering.clone(), filter); let physical_plan = sort_preserving_merge_exec(ordering, sort); let run = ReplaceTest::new(physical_plan) @@ -548,21 +537,17 @@ async fn test_replace_multiple_input_repartition_with_extra_steps_2( Input: SortPreservingMergeExec: [a@0 ASC NULLS LAST] SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] - CoalesceBatchesExec: target_batch_size=8192 - FilterExec: c@1 > 3 - RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true - StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] - CoalesceBatchesExec: target_batch_size=8192 - FilterExec: c@1 > 3 - RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST - CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true - StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { @@ -570,12 +555,10 @@ async fn test_replace_multiple_input_repartition_with_extra_steps_2( Input / Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] - CoalesceBatchesExec: target_batch_size=8192 - FilterExec: c@1 > 3 - RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true - DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, (Boundedness::Bounded, SortPreference::PreserveOrder) => { @@ -583,21 +566,17 @@ async fn test_replace_multiple_input_repartition_with_extra_steps_2( Input: SortPreservingMergeExec: [a@0 ASC NULLS LAST] SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] - CoalesceBatchesExec: target_batch_size=8192 - FilterExec: c@1 > 3 - RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true - DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] - CoalesceBatchesExec: target_batch_size=8192 - FilterExec: c@1 > 3 - RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST - CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true - DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); } } @@ -622,8 +601,7 @@ async fn test_not_replacing_when_no_need_to_preserve_sorting( let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let filter = filter_exec(repartition_hash); - let coalesce_batches_exec = coalesce_batches_exec(filter, 8192); - let physical_plan = coalesce_partitions_exec(coalesce_batches_exec); + let physical_plan = coalesce_partitions_exec(filter); let run = ReplaceTest::new(physical_plan) .with_boundedness(boundedness) @@ -637,22 +615,20 @@ async fn test_not_replacing_when_no_need_to_preserve_sorting( assert_snapshot!(physical_plan, @r" Input / Optimized: CoalescePartitionsExec - CoalesceBatchesExec: target_batch_size=8192 - FilterExec: c@1 > 3 - RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true - StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { assert_snapshot!(physical_plan, @r" Input / Optimized: CoalescePartitionsExec - CoalesceBatchesExec: target_batch_size=8192 - FilterExec: c@1 > 3 - RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true - DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); // Expected bounded results same with and without flag, because there is no executor with ordering requirement }, @@ -660,11 +636,10 @@ async fn test_not_replacing_when_no_need_to_preserve_sorting( assert_snapshot!(physical_plan, @r" Input / Optimized: CoalescePartitionsExec - CoalesceBatchesExec: target_batch_size=8192 - FilterExec: c@1 > 3 - RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true - DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); } } @@ -691,8 +666,7 @@ async fn test_with_multiple_replaceable_repartitions( let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let filter = filter_exec(repartition_hash); - let coalesce_batches = coalesce_batches_exec(filter, 8192); - let repartition_hash_2 = repartition_exec_hash(coalesce_batches); + let repartition_hash_2 = repartition_exec_hash(filter); let sort = sort_exec_with_preserve_partitioning(ordering.clone(), repartition_hash_2); let physical_plan = sort_preserving_merge_exec(ordering, sort); @@ -710,20 +684,18 @@ async fn test_with_multiple_replaceable_repartitions( SortPreservingMergeExec: [a@0 ASC NULLS LAST] SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - CoalesceBatchesExec: target_batch_size=8192 - FilterExec: c@1 > 3 - RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true - StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST - CoalesceBatchesExec: target_batch_size=8192 - FilterExec: c@1 > 3 - RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true - StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, (Boundedness::Bounded, SortPreference::MaximizeParallelism) => { @@ -732,11 +704,10 @@ async fn test_with_multiple_replaceable_repartitions( SortPreservingMergeExec: [a@0 ASC NULLS LAST] SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - CoalesceBatchesExec: target_batch_size=8192 - FilterExec: c@1 > 3 - RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true - DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); }, (Boundedness::Bounded, SortPreference::PreserveOrder) => { @@ -745,20 +716,18 @@ async fn test_with_multiple_replaceable_repartitions( SortPreservingMergeExec: [a@0 ASC NULLS LAST] SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - CoalesceBatchesExec: target_batch_size=8192 - FilterExec: c@1 > 3 - RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true - DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST Optimized: SortPreservingMergeExec: [a@0 ASC NULLS LAST] RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST - CoalesceBatchesExec: target_batch_size=8192 - FilterExec: c@1 > 3 - RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true - DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + FilterExec: c@1 > 3 + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); } } @@ -1041,8 +1010,6 @@ async fn test_with_multiple_child_trees( }; let left_repartition_rr = repartition_exec_round_robin(left_source); let left_repartition_hash = repartition_exec_hash(left_repartition_rr); - let left_coalesce_partitions = - Arc::new(CoalesceBatchesExec::new(left_repartition_hash, 4096)); let right_ordering = [sort_expr("a", &schema)].into(); let right_source = match boundedness { @@ -1053,11 +1020,8 @@ async fn test_with_multiple_child_trees( }; let right_repartition_rr = repartition_exec_round_robin(right_source); let right_repartition_hash = repartition_exec_hash(right_repartition_rr); - let right_coalesce_partitions = - Arc::new(CoalesceBatchesExec::new(right_repartition_hash, 4096)); - let hash_join_exec = - hash_join_exec(left_coalesce_partitions, right_coalesce_partitions); + let hash_join_exec = hash_join_exec(left_repartition_hash, right_repartition_hash); let ordering: LexOrdering = [sort_expr_default("a", &hash_join_exec.schema())].into(); let sort = sort_exec_with_preserve_partitioning(ordering.clone(), hash_join_exec); let physical_plan = sort_preserving_merge_exec(ordering, sort); @@ -1076,14 +1040,12 @@ async fn test_with_multiple_child_trees( SortPreservingMergeExec: [a@0 ASC] SortExec: expr=[a@0 ASC], preserve_partitioning=[true] HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)] - CoalesceBatchesExec: target_batch_size=4096 - RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true - StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] - CoalesceBatchesExec: target_batch_size=4096 - RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true - StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true + StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] "); }, (Boundedness::Bounded, _) => { @@ -1092,14 +1054,12 @@ async fn test_with_multiple_child_trees( SortPreservingMergeExec: [a@0 ASC] SortExec: expr=[a@0 ASC], preserve_partitioning=[true] HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)] - CoalesceBatchesExec: target_batch_size=4096 - RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true - DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST - CoalesceBatchesExec: target_batch_size=4096 - RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 - RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true - DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST + RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8 + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, maintains_sort_order=true + DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=a@0 ASC NULLS LAST "); // Expected bounded results same with and without flag, because ordering get lost during intermediate executor anyway. // Hence, no need to preserve existing ordering. diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 5b50181d7fd3e..40beb12d48cdb 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -53,7 +53,6 @@ use datafusion_physical_optimizer::limited_distinct_aggregation::LimitedDistinct use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; -use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion_physical_plan::filter::FilterExec; @@ -360,13 +359,6 @@ pub fn aggregate_exec(input: Arc) -> Arc { ) } -pub fn coalesce_batches_exec( - input: Arc, - batch_size: usize, -) -> Arc { - Arc::new(CoalesceBatchesExec::new(input, batch_size)) -} - pub fn sort_exec( ordering: LexOrdering, input: Arc, diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 75cd78e47aff5..fa248c448683b 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -144,7 +144,6 @@ async fn explain_analyze_baseline_metrics() { || plan.as_any().downcast_ref::().is_some() || plan.as_any().downcast_ref::().is_some() || plan.as_any().downcast_ref::().is_some() - || plan.as_any().downcast_ref::().is_some() || plan.as_any().downcast_ref::().is_some() || plan.as_any().downcast_ref::().is_some() || plan.as_any().downcast_ref::().is_some() diff --git a/datafusion/physical-optimizer/src/coalesce_batches.rs b/datafusion/physical-optimizer/src/coalesce_batches.rs deleted file mode 100644 index bedb7f6be0493..0000000000000 --- a/datafusion/physical-optimizer/src/coalesce_batches.rs +++ /dev/null @@ -1,87 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! CoalesceBatches optimizer that groups batches together rows -//! in bigger batches to avoid overhead with small batches - -use crate::PhysicalOptimizerRule; - -use std::sync::Arc; - -use datafusion_common::assert_eq_or_internal_err; -use datafusion_common::config::ConfigOptions; -use datafusion_common::error::Result; -use datafusion_physical_plan::{ - ExecutionPlan, async_func::AsyncFuncExec, coalesce_batches::CoalesceBatchesExec, -}; - -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; - -/// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that -/// are produced by highly selective filters -#[derive(Default, Debug)] -pub struct CoalesceBatches {} - -impl CoalesceBatches { - #[expect(missing_docs)] - pub fn new() -> Self { - Self::default() - } -} -impl PhysicalOptimizerRule for CoalesceBatches { - fn optimize( - &self, - plan: Arc, - config: &ConfigOptions, - ) -> Result> { - if !config.execution.coalesce_batches { - return Ok(plan); - } - - let target_batch_size = config.execution.batch_size; - plan.transform_up(|plan| { - let plan_any = plan.as_any(); - if let Some(async_exec) = plan_any.downcast_ref::() { - // Coalesce inputs to async functions to reduce number of async function invocations - let children = async_exec.children(); - assert_eq_or_internal_err!( - children.len(), - 1, - "Expected AsyncFuncExec to have exactly one child" - ); - - let coalesce_exec = Arc::new(CoalesceBatchesExec::new( - Arc::clone(children[0]), - target_batch_size, - )); - let new_plan = plan.with_new_children(vec![coalesce_exec])?; - Ok(Transformed::yes(new_plan)) - } else { - Ok(Transformed::no(plan)) - } - }) - .data() - } - - fn name(&self) -> &str { - "coalesce_batches" - } - - fn schema_check(&self) -> bool { - true - } -} diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index 1b45f02ebd511..e98772291cbeb 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -28,7 +28,6 @@ #![deny(clippy::allow_attributes)] pub mod aggregate_statistics; -pub mod coalesce_batches; pub mod combine_partial_final_agg; pub mod enforce_distribution; pub mod enforce_sorting; diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index aa1975d98d48b..ff71c9ec64385 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -21,7 +21,6 @@ use std::fmt::Debug; use std::sync::Arc; use crate::aggregate_statistics::AggregateStatistics; -use crate::coalesce_batches::CoalesceBatches; use crate::combine_partial_final_agg::CombinePartialFinalAggregate; use crate::enforce_distribution::EnforceDistribution; use crate::enforce_sorting::EnforceSorting; @@ -120,9 +119,6 @@ impl PhysicalOptimizer { Arc::new(OptimizeAggregateOrder::new()), // TODO: `try_embed_to_hash_join` in the ProjectionPushdown rule would be block by the CoalesceBatches, so add it before CoalesceBatches. Maybe optimize it in the future. Arc::new(ProjectionPushdown::new()), - // The CoalesceBatches rule will not influence the distribution and ordering of the - // whole plan tree. Therefore, to avoid influencing other rules, it should run last. - Arc::new(CoalesceBatches::new()), // Remove the ancillary output requirement operator since we are done with the planning // phase. Arc::new(OutputRequirements::new_remove_mode()), diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 06f12a90195d2..b0828ff232f94 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1803,7 +1803,6 @@ mod tests { use super::*; use crate::RecordBatchStream; - use crate::coalesce_batches::CoalesceBatchesExec; use crate::coalesce_partitions::CoalescePartitionsExec; use crate::common; use crate::common::collect; @@ -2601,17 +2600,9 @@ mod tests { #[tokio::test] async fn run_first_last_multi_partitions() -> Result<()> { - for use_coalesce_batches in [false, true] { - for is_first_acc in [false, true] { - for spill in [false, true] { - first_last_multi_partitions( - use_coalesce_batches, - is_first_acc, - spill, - 4200, - ) - .await? - } + for is_first_acc in [false, true] { + for spill in [false, true] { + first_last_multi_partitions(is_first_acc, spill, 4200).await? } } Ok(()) @@ -2654,15 +2645,7 @@ mod tests { .map(Arc::new) } - // This function either constructs the physical plan below, - // - // "AggregateExec: mode=Final, gby=[a@0 as a], aggr=[FIRST_VALUE(b)]", - // " CoalesceBatchesExec: target_batch_size=1024", - // " CoalescePartitionsExec", - // " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[FIRST_VALUE(b)], ordering_mode=None", - // " DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1]", - // - // or + // This function constructs the physical plan below, // // "AggregateExec: mode=Final, gby=[a@0 as a], aggr=[FIRST_VALUE(b)]", // " CoalescePartitionsExec", @@ -2672,7 +2655,6 @@ mod tests { // and checks whether the function `merge_batch` works correctly for // FIRST_VALUE and LAST_VALUE functions. async fn first_last_multi_partitions( - use_coalesce_batches: bool, is_first_acc: bool, spill: bool, max_memory: usize, @@ -2720,13 +2702,8 @@ mod tests { memory_exec, Arc::clone(&schema), )?); - let coalesce = if use_coalesce_batches { - let coalesce = Arc::new(CoalescePartitionsExec::new(aggregate_exec)); - Arc::new(CoalesceBatchesExec::new(coalesce, 1024)) as Arc - } else { - Arc::new(CoalescePartitionsExec::new(aggregate_exec)) - as Arc - }; + let coalesce = Arc::new(CoalescePartitionsExec::new(aggregate_exec)) + as Arc; let aggregate_final = Arc::new(AggregateExec::try_new( AggregateMode::Final, groups, diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 13bb862ab9371..dfcd3cb0bcae7 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -57,6 +57,10 @@ use futures::stream::{Stream, StreamExt}; /// reaches the `fetch` value. /// /// See [`LimitedBatchCoalescer`] for more information +#[deprecated( + since = "52.0.0", + note = "We now use BatchCoalescer from arrow-rs instead of a dedicated operator" +)] #[derive(Debug, Clone)] pub struct CoalesceBatchesExec { /// The input plan @@ -70,6 +74,7 @@ pub struct CoalesceBatchesExec { cache: PlanProperties, } +#[expect(deprecated)] impl CoalesceBatchesExec { /// Create a new CoalesceBatchesExec pub fn new(input: Arc, target_batch_size: usize) -> Self { @@ -112,6 +117,7 @@ impl CoalesceBatchesExec { } } +#[expect(deprecated)] impl DisplayAs for CoalesceBatchesExec { fn fmt_as( &self, @@ -142,6 +148,7 @@ impl DisplayAs for CoalesceBatchesExec { } } +#[expect(deprecated)] impl ExecutionPlan for CoalesceBatchesExec { fn name(&self) -> &'static str { "CoalesceBatchesExec" diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 4b26f84099505..0ddea90a98bf3 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -408,7 +408,6 @@ mod tests { use std::time::Duration; use super::*; - use crate::coalesce_batches::CoalesceBatchesExec; use crate::coalesce_partitions::CoalescePartitionsExec; use crate::execution_plan::{Boundedness, EmissionType}; use crate::expressions::col; @@ -444,11 +443,14 @@ mod tests { // The number in the function is highly related to the memory limit we are testing // any change of the constant should be aware of - fn generate_task_ctx_for_round_robin_tie_breaker() -> Result> { + fn generate_task_ctx_for_round_robin_tie_breaker( + target_batch_size: usize, + ) -> Result> { let runtime = RuntimeEnvBuilder::new() .with_memory_limit(20_000_000, 1.0) .build_arc()?; - let config = SessionConfig::new(); + let mut config = SessionConfig::new(); + config.options_mut().execution.batch_size = target_batch_size; let task_ctx = TaskContext::default() .with_runtime(runtime) .with_session_config(config); @@ -459,7 +461,6 @@ mod tests { fn generate_spm_for_round_robin_tie_breaker( enable_round_robin_repartition: bool, ) -> Result> { - let target_batch_size = 12500; let row_size = 12500; let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size])); let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("a"); row_size])); @@ -485,9 +486,7 @@ mod tests { TestMemoryExec::try_new_exec(&[rbs], schema, None)?, Partitioning::RoundRobinBatch(2), )?; - let coalesce_batches_exec = - CoalesceBatchesExec::new(Arc::new(repartition_exec), target_batch_size); - let spm = SortPreservingMergeExec::new(sort, Arc::new(coalesce_batches_exec)) + let spm = SortPreservingMergeExec::new(sort, Arc::new(repartition_exec)) .with_round_robin_repartition(enable_round_robin_repartition); Ok(Arc::new(spm)) } @@ -499,7 +498,8 @@ mod tests { /// based on whether the tie breaker is enabled or disabled. #[tokio::test(flavor = "multi_thread")] async fn test_round_robin_tie_breaker_success() -> Result<()> { - let task_ctx = generate_task_ctx_for_round_robin_tie_breaker()?; + let target_batch_size = 12500; + let task_ctx = generate_task_ctx_for_round_robin_tie_breaker(target_batch_size)?; let spm = generate_spm_for_round_robin_tie_breaker(true)?; let _collected = collect(spm, task_ctx).await?; Ok(()) @@ -512,7 +512,7 @@ mod tests { /// based on whether the tie breaker is enabled or disabled. #[tokio::test(flavor = "multi_thread")] async fn test_round_robin_tie_breaker_fail() -> Result<()> { - let task_ctx = generate_task_ctx_for_round_robin_tie_breaker()?; + let task_ctx = generate_task_ctx_for_round_robin_tie_breaker(8192)?; let spm = generate_spm_for_round_robin_tie_breaker(false)?; let _err = collect(spm, task_ctx).await.unwrap_err(); Ok(()) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 4ff90b61eed9c..0666fc2979b38 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -74,6 +74,7 @@ use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef}; use datafusion_physical_plan::aggregates::AggregateMode; use datafusion_physical_plan::aggregates::{AggregateExec, PhysicalGroupBy}; use datafusion_physical_plan::analyze::AnalyzeExec; +#[expect(deprecated)] use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::coop::CooperativeExec; @@ -358,6 +359,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { ); } + #[expect(deprecated)] if let Some(coalesce_batches) = plan.downcast_ref::() { return protobuf::PhysicalPlanNode::try_from_coalesce_batches_exec( coalesce_batches, @@ -821,6 +823,7 @@ impl protobuf::PhysicalPlanNode { let input: Arc = into_physical_plan(&coalesce_batches.input, ctx, extension_codec)?; Ok(Arc::new( + #[expect(deprecated)] CoalesceBatchesExec::new(input, coalesce_batches.target_batch_size as usize) .with_fetch(coalesce_batches.fetch.map(|f| f as usize)), )) @@ -2574,6 +2577,7 @@ impl protobuf::PhysicalPlanNode { }) } + #[expect(deprecated)] fn try_from_coalesce_batches_exec( coalesce_batches: &CoalesceBatchesExec, extension_codec: &dyn PhysicalExtensionCodec, diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index aa5458849330f..ff073e45feb5c 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -31,6 +31,7 @@ use arrow::array::RecordBatch; use arrow::csv::WriterBuilder; use arrow::datatypes::{Fields, TimeUnit}; use datafusion::physical_expr::aggregate::AggregateExprBuilder; +#[expect(deprecated)] use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::metrics::MetricType; use datafusion_datasource::TableSchema; @@ -845,11 +846,13 @@ fn roundtrip_coalesce_batches_with_fetch() -> Result<()> { let field_b = Field::new("b", DataType::Int64, false); let schema = Arc::new(Schema::new(vec![field_a, field_b])); + #[expect(deprecated)] roundtrip_test(Arc::new(CoalesceBatchesExec::new( Arc::new(EmptyExec::new(schema.clone())), 8096, )))?; + #[expect(deprecated)] roundtrip_test(Arc::new( CoalesceBatchesExec::new(Arc::new(EmptyExec::new(schema)), 8096) .with_fetch(Some(10)), diff --git a/datafusion/sqllogictest/test_files/async_udf.slt b/datafusion/sqllogictest/test_files/async_udf.slt index 31ca87c4354a4..0708b59e519a0 100644 --- a/datafusion/sqllogictest/test_files/async_udf.slt +++ b/datafusion/sqllogictest/test_files/async_udf.slt @@ -37,8 +37,7 @@ physical_plan 03)----AggregateExec: mode=Partial, gby=[], aggr=[min(async_abs(data.x))] 04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 05)--------AsyncFuncExec: async_expr=[async_expr(name=__async_fn_0, expr=async_abs(x@0))] -06)----------CoalesceBatchesExec: target_batch_size=8192 -07)------------DataSourceExec: partitions=1, partition_sizes=[1] +06)----------DataSourceExec: partitions=1, partition_sizes=[1] # Async udf can be used in aggregation with group by query I rowsort @@ -63,8 +62,7 @@ physical_plan 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 06)----------ProjectionExec: expr=[__async_fn_0@1 as __common_expr_1] 07)------------AsyncFuncExec: async_expr=[async_expr(name=__async_fn_0, expr=async_abs(x@0))] -08)--------------CoalesceBatchesExec: target_batch_size=8192 -09)----------------DataSourceExec: partitions=1, partition_sizes=[1] +08)--------------DataSourceExec: partitions=1, partition_sizes=[1] # Async udf can be used in filter query I @@ -82,8 +80,7 @@ physical_plan 01)FilterExec: __async_fn_0@1 < 5, projection=[x@0] 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 03)----AsyncFuncExec: async_expr=[async_expr(name=__async_fn_0, expr=async_abs(x@0))] -04)------CoalesceBatchesExec: target_batch_size=8192 -05)--------DataSourceExec: partitions=1, partition_sizes=[1] +04)------DataSourceExec: partitions=1, partition_sizes=[1] # Async udf can be used in projection query I rowsort @@ -101,5 +98,4 @@ logical_plan physical_plan 01)ProjectionExec: expr=[__async_fn_0@1 as async_abs(data.x)] 02)--AsyncFuncExec: async_expr=[async_expr(name=__async_fn_0, expr=async_abs(x@0))] -03)----CoalesceBatchesExec: target_batch_size=8192 -04)------DataSourceExec: partitions=1, partition_sizes=[1] +03)----DataSourceExec: partitions=1, partition_sizes=[1] diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 9087aee56d978..3cedb648951cf 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -234,7 +234,6 @@ physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE physical_plan after EnforceSorting SAME TEXT AS ABOVE physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE -physical_plan after coalesce_batches SAME TEXT AS ABOVE physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE @@ -313,7 +312,6 @@ physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE physical_plan after EnforceSorting SAME TEXT AS ABOVE physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE -physical_plan after coalesce_batches SAME TEXT AS ABOVE physical_plan after OutputRequirements 01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]: ScanBytes=Exact(32)),(Col[1]: ScanBytes=Inexact(24)),(Col[2]: ScanBytes=Exact(32)),(Col[3]: ScanBytes=Exact(32)),(Col[4]: ScanBytes=Exact(32)),(Col[5]: ScanBytes=Exact(64)),(Col[6]: ScanBytes=Exact(32)),(Col[7]: ScanBytes=Exact(64)),(Col[8]: ScanBytes=Inexact(88)),(Col[9]: ScanBytes=Inexact(49)),(Col[10]: ScanBytes=Exact(64))]] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]: ScanBytes=Exact(32)),(Col[1]: ScanBytes=Inexact(24)),(Col[2]: ScanBytes=Exact(32)),(Col[3]: ScanBytes=Exact(32)),(Col[4]: ScanBytes=Exact(32)),(Col[5]: ScanBytes=Exact(64)),(Col[6]: ScanBytes=Exact(32)),(Col[7]: ScanBytes=Exact(64)),(Col[8]: ScanBytes=Inexact(88)),(Col[9]: ScanBytes=Inexact(49)),(Col[10]: ScanBytes=Exact(64))]] @@ -358,7 +356,6 @@ physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE physical_plan after EnforceSorting SAME TEXT AS ABOVE physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE -physical_plan after coalesce_batches SAME TEXT AS ABOVE physical_plan after OutputRequirements 01)GlobalLimitExec: skip=0, fetch=10 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet @@ -596,7 +593,6 @@ physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE physical_plan after EnforceSorting SAME TEXT AS ABOVE physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE -physical_plan after coalesce_batches SAME TEXT AS ABOVE physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE