Skip to content

Commit e8196f4

Browse files
authored
Remove coalesce batches rule and deprecate CoalesceBatchesExec (#19622)
## Which issue does this PR close? - Closes #19591 ## Rationale for this change Explained in issue itself ## What changes are included in this PR? - Removed coalesce batches rule - Deprecate `CoalesceBatchesExec` ## Are these changes tested? Yes ## Are there any user-facing changes? Yes, added a deprecation tag on `CoalesceBatchesExec`
1 parent ed01b67 commit e8196f4

File tree

22 files changed

+271
-778
lines changed

22 files changed

+271
-778
lines changed

datafusion-examples/examples/udf/async_udf.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,7 @@ pub async fn async_udf() -> Result<()> {
102102
"| physical_plan | FilterExec: __async_fn_0@2, projection=[id@0, name@1] |",
103103
"| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |",
104104
"| | AsyncFuncExec: async_expr=[async_expr(name=__async_fn_0, expr=ask_llm(CAST(name@1 AS Utf8View), Is this animal furry?))] |",
105-
"| | CoalesceBatchesExec: target_batch_size=8192 |",
106-
"| | DataSourceExec: partitions=1, partition_sizes=[1] |",
105+
"| | DataSourceExec: partitions=1, partition_sizes=[1] |",
107106
"| | |",
108107
"+---------------+------------------------------------------------------------------------------------------------------------------------------+",
109108
],

datafusion/core/tests/execution/coop.rs

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
4141
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
4242
use datafusion_physical_optimizer::PhysicalOptimizerRule;
4343
use datafusion_physical_optimizer::ensure_coop::EnsureCooperative;
44-
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
4544
use datafusion_physical_plan::coop::make_cooperative;
4645
use datafusion_physical_plan::filter::FilterExec;
4746
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec};
@@ -425,10 +424,7 @@ async fn filter_reject_all_batches_yields(
425424
));
426425
let filtered = Arc::new(FilterExec::try_new(false_predicate, Arc::new(infinite))?);
427426

428-
// Use CoalesceBatchesExec to guarantee each Filter pull always yields an 8192-row batch
429-
let coalesced = Arc::new(CoalesceBatchesExec::new(filtered, 8_192));
430-
431-
query_yields(coalesced, session_ctx.task_ctx()).await
427+
query_yields(filtered, session_ctx.task_ctx()).await
432428
}
433429

434430
#[rstest]
@@ -584,17 +580,18 @@ async fn join_yields(
584580
let left_keys: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new("value", 0))];
585581
let right_keys: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new("value", 0))];
586582

587-
// Wrap each side in CoalesceBatches + Repartition so they are both hashed into 1 partition
588-
let coalesced_left =
589-
Arc::new(CoalesceBatchesExec::new(Arc::new(infinite_left), 8_192));
590-
let coalesced_right =
591-
Arc::new(CoalesceBatchesExec::new(Arc::new(infinite_right), 8_192));
592-
593583
let part_left = Partitioning::Hash(left_keys, 1);
594584
let part_right = Partitioning::Hash(right_keys, 1);
595585

596-
let hashed_left = Arc::new(RepartitionExec::try_new(coalesced_left, part_left)?);
597-
let hashed_right = Arc::new(RepartitionExec::try_new(coalesced_right, part_right)?);
586+
// Wrap each side in Repartition so they are both hashed into 1 partition
587+
let hashed_left = Arc::new(RepartitionExec::try_new(
588+
Arc::new(infinite_left),
589+
part_left,
590+
)?);
591+
let hashed_right = Arc::new(RepartitionExec::try_new(
592+
Arc::new(infinite_right),
593+
part_right,
594+
)?);
598595

599596
// Build an Inner HashJoinExec → left.value = right.value
600597
let join = Arc::new(HashJoinExec::try_new(
@@ -632,17 +629,18 @@ async fn join_agg_yields(
632629
let left_keys: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new("value", 0))];
633630
let right_keys: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new("value", 0))];
634631

635-
// Wrap each side in CoalesceBatches + Repartition so they are both hashed into 1 partition
636-
let coalesced_left =
637-
Arc::new(CoalesceBatchesExec::new(Arc::new(infinite_left), 8_192));
638-
let coalesced_right =
639-
Arc::new(CoalesceBatchesExec::new(Arc::new(infinite_right), 8_192));
640-
641632
let part_left = Partitioning::Hash(left_keys, 1);
642633
let part_right = Partitioning::Hash(right_keys, 1);
643634

644-
let hashed_left = Arc::new(RepartitionExec::try_new(coalesced_left, part_left)?);
645-
let hashed_right = Arc::new(RepartitionExec::try_new(coalesced_right, part_right)?);
635+
// Wrap each side in Repartition so they are both hashed into 1 partition
636+
let hashed_left = Arc::new(RepartitionExec::try_new(
637+
Arc::new(infinite_left),
638+
part_left,
639+
)?);
640+
let hashed_right = Arc::new(RepartitionExec::try_new(
641+
Arc::new(infinite_right),
642+
part_right,
643+
)?);
646644

647645
// Build an Inner HashJoinExec → left.value = right.value
648646
let join = Arc::new(HashJoinExec::try_new(

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ use datafusion_physical_optimizer::output_requirements::OutputRequirements;
5656
use datafusion_physical_plan::aggregates::{
5757
AggregateExec, AggregateMode, PhysicalGroupBy,
5858
};
59-
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
59+
6060
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
6161
use datafusion_physical_plan::execution_plan::ExecutionPlan;
6262
use datafusion_physical_plan::expressions::col;
@@ -1741,9 +1741,6 @@ fn merge_does_not_need_sort() -> Result<()> {
17411741
// Scan some sorted parquet files
17421742
let exec = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
17431743

1744-
// CoalesceBatchesExec to mimic behavior after a filter
1745-
let exec = Arc::new(CoalesceBatchesExec::new(exec, 4096));
1746-
17471744
// Merge from multiple parquet files and keep the data sorted
17481745
let exec: Arc<dyn ExecutionPlan> =
17491746
Arc::new(SortPreservingMergeExec::new(sort_key, exec));
@@ -1757,8 +1754,7 @@ fn merge_does_not_need_sort() -> Result<()> {
17571754
assert_plan!(plan_distrib,
17581755
@r"
17591756
SortPreservingMergeExec: [a@0 ASC]
1760-
CoalesceBatchesExec: target_batch_size=4096
1761-
DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
1757+
DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
17621758
");
17631759

17641760
// Test: result IS DIFFERENT, if EnforceSorting is run first:
@@ -1772,8 +1768,7 @@ fn merge_does_not_need_sort() -> Result<()> {
17721768
@r"
17731769
SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
17741770
CoalescePartitionsExec
1775-
CoalesceBatchesExec: target_batch_size=4096
1776-
DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
1771+
DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
17771772
");
17781773

17791774
Ok(())

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,13 @@ use std::sync::Arc;
2020
use crate::memory_limit::DummyStreamPartition;
2121
use crate::physical_optimizer::test_utils::{
2222
RequirementsTestExec, aggregate_exec, bounded_window_exec,
23-
bounded_window_exec_with_partition, check_integrity, coalesce_batches_exec,
24-
coalesce_partitions_exec, create_test_schema, create_test_schema2,
25-
create_test_schema3, filter_exec, global_limit_exec, hash_join_exec,
26-
local_limit_exec, memory_exec, parquet_exec, parquet_exec_with_sort, projection_exec,
27-
repartition_exec, sort_exec, sort_exec_with_fetch, sort_expr, sort_expr_options,
28-
sort_merge_join_exec, sort_preserving_merge_exec,
29-
sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered,
30-
union_exec,
23+
bounded_window_exec_with_partition, check_integrity, coalesce_partitions_exec,
24+
create_test_schema, create_test_schema2, create_test_schema3, filter_exec,
25+
global_limit_exec, hash_join_exec, local_limit_exec, memory_exec, parquet_exec,
26+
parquet_exec_with_sort, projection_exec, repartition_exec, sort_exec,
27+
sort_exec_with_fetch, sort_expr, sort_expr_options, sort_merge_join_exec,
28+
sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch,
29+
spr_repartition_exec, stream_exec_ordered, union_exec,
3130
};
3231

3332
use arrow::compute::SortOptions;
@@ -1845,9 +1844,7 @@ async fn test_remove_unnecessary_sort_window_multilayer() -> Result<()> {
18451844
)]
18461845
.into();
18471846
let sort = sort_exec(ordering.clone(), source);
1848-
// Add dummy layer propagating Sort above, to test whether sort can be removed from multi layer before
1849-
let coalesce_batches = coalesce_batches_exec(sort, 128);
1850-
let window_agg = bounded_window_exec("non_nullable_col", ordering, coalesce_batches);
1847+
let window_agg = bounded_window_exec("non_nullable_col", ordering, sort);
18511848
let ordering2: LexOrdering = [sort_expr_options(
18521849
"non_nullable_col",
18531850
&window_agg.schema(),
@@ -1873,17 +1870,15 @@ async fn test_remove_unnecessary_sort_window_multilayer() -> Result<()> {
18731870
FilterExec: NOT non_nullable_col@1
18741871
SortExec: expr=[non_nullable_col@1 ASC NULLS LAST], preserve_partitioning=[false]
18751872
BoundedWindowAggExec: wdw=[count: Field { "count": Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
1876-
CoalesceBatchesExec: target_batch_size=128
1877-
SortExec: expr=[non_nullable_col@1 DESC], preserve_partitioning=[false]
1878-
DataSourceExec: partitions=1, partition_sizes=[0]
1873+
SortExec: expr=[non_nullable_col@1 DESC], preserve_partitioning=[false]
1874+
DataSourceExec: partitions=1, partition_sizes=[0]
18791875
18801876
Optimized Plan:
18811877
WindowAggExec: wdw=[count: Ok(Field { name: "count", data_type: Int64 }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(UInt64(NULL)), is_causal: false }]
18821878
FilterExec: NOT non_nullable_col@1
18831879
BoundedWindowAggExec: wdw=[count: Field { "count": Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
1884-
CoalesceBatchesExec: target_batch_size=128
1885-
SortExec: expr=[non_nullable_col@1 DESC], preserve_partitioning=[false]
1886-
DataSourceExec: partitions=1, partition_sizes=[0]
1880+
SortExec: expr=[non_nullable_col@1 DESC], preserve_partitioning=[false]
1881+
DataSourceExec: partitions=1, partition_sizes=[0]
18871882
"#);
18881883

18891884
Ok(())

0 commit comments

Comments
 (0)