Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions datafusion-examples/examples/udf/async_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ pub async fn async_udf() -> Result<()> {
"| physical_plan | FilterExec: __async_fn_0@2, projection=[id@0, name@1] |",
"| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |",
"| | AsyncFuncExec: async_expr=[async_expr(name=__async_fn_0, expr=ask_llm(CAST(name@1 AS Utf8View), Is this animal furry?))] |",
"| | CoalesceBatchesExec: target_batch_size=8192 |",
"| | DataSourceExec: partitions=1, partition_sizes=[1] |",
"| | DataSourceExec: partitions=1, partition_sizes=[1] |",
"| | |",
"+---------------+------------------------------------------------------------------------------------------------------------------------------+",
],
Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/tests/execution/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::ensure_coop::EnsureCooperative;
#[expect(deprecated)]
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_physical_plan::coop::make_cooperative;
use datafusion_physical_plan::filter::FilterExec;
Expand Down Expand Up @@ -425,6 +426,7 @@ async fn filter_reject_all_batches_yields(
));
let filtered = Arc::new(FilterExec::try_new(false_predicate, Arc::new(infinite))?);

#[expect(deprecated)]
// Use CoalesceBatchesExec to guarantee each Filter pull always yields an 8192-row batch
let coalesced = Arc::new(CoalesceBatchesExec::new(filtered, 8_192));

Expand Down Expand Up @@ -584,9 +586,11 @@ async fn join_yields(
let left_keys: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new("value", 0))];
let right_keys: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new("value", 0))];

#[expect(deprecated)]
// Wrap each side in CoalesceBatches + Repartition so they are both hashed into 1 partition
let coalesced_left =
Arc::new(CoalesceBatchesExec::new(Arc::new(infinite_left), 8_192));
#[expect(deprecated)]
let coalesced_right =
Arc::new(CoalesceBatchesExec::new(Arc::new(infinite_right), 8_192));

Expand Down Expand Up @@ -632,9 +636,11 @@ async fn join_agg_yields(
let left_keys: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new("value", 0))];
let right_keys: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new("value", 0))];

#[expect(deprecated)]
// Wrap each side in CoalesceBatches + Repartition so they are both hashed into 1 partition
let coalesced_left =
Arc::new(CoalesceBatchesExec::new(Arc::new(infinite_left), 8_192));
#[expect(deprecated)]
let coalesced_right =
Arc::new(CoalesceBatchesExec::new(Arc::new(infinite_right), 8_192));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ use datafusion_physical_optimizer::output_requirements::OutputRequirements;
use datafusion_physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};

#[expect(deprecated)]
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::execution_plan::ExecutionPlan;
Expand Down Expand Up @@ -1741,6 +1743,7 @@ fn merge_does_not_need_sort() -> Result<()> {
// Scan some sorted parquet files
let exec = parquet_exec_multiple_sorted(vec![sort_key.clone()]);

#[expect(deprecated)]
// CoalesceBatchesExec to mimic behavior after a filter
let exec = Arc::new(CoalesceBatchesExec::new(exec, 4096));

Expand Down
20 changes: 20 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use datafusion_physical_expr::{
use datafusion_physical_optimizer::{
PhysicalOptimizerRule, filter_pushdown::FilterPushdown,
};
#[expect(deprecated)]
use datafusion_physical_plan::{
ExecutionPlan,
aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
Expand Down Expand Up @@ -527,6 +528,7 @@ fn test_filter_with_projection() {
fn test_push_down_through_transparent_nodes() {
// expect the predicate to be pushed down into the DataSource
let scan = TestScanBuilder::new(schema()).with_support(true).build();
#[expect(deprecated)]
let coalesce = Arc::new(CoalesceBatchesExec::new(scan, 1));
let predicate = col_lit_predicate("a", "foo", &schema());
let filter = Arc::new(FilterExec::try_new(predicate, coalesce).unwrap());
Expand Down Expand Up @@ -564,6 +566,7 @@ fn test_pushdown_through_aggregates_on_grouping_columns() {
// 2. An outer filter (b@1 = bar) above the aggregate - also gets pushed through because 'b' is a grouping column
let scan = TestScanBuilder::new(schema()).with_support(true).build();

#[expect(deprecated)]
let coalesce = Arc::new(CoalesceBatchesExec::new(scan, 10));

let filter = Arc::new(
Expand Down Expand Up @@ -594,6 +597,7 @@ fn test_pushdown_through_aggregates_on_grouping_columns() {
.unwrap(),
);

#[expect(deprecated)]
let coalesce = Arc::new(CoalesceBatchesExec::new(aggregate, 100));

let predicate = col_lit_predicate("b", "bar", &schema());
Expand Down Expand Up @@ -943,6 +947,7 @@ async fn test_topk_filter_passes_through_coalesce_batches() {
.with_batches(batches)
.build();

#[expect(deprecated)]
let coalesce_batches =
Arc::new(CoalesceBatchesExec::new(scan, 1024)) as Arc<dyn ExecutionPlan>;

Expand Down Expand Up @@ -1206,6 +1211,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
)
.unwrap(),
);
#[expect(deprecated)]
let build_coalesce = Arc::new(CoalesceBatchesExec::new(build_repartition, 8192));

// Probe side: DataSource -> RepartitionExec (Hash) -> CoalesceBatchesExec
Expand All @@ -1220,6 +1226,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
)
.unwrap(),
);
#[expect(deprecated)]
let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition, 8192));

// Create HashJoinExec with partitioned inputs
Expand Down Expand Up @@ -1248,6 +1255,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
);

// Top-level CoalesceBatchesExec
#[expect(deprecated)]
let cb =
Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc<dyn ExecutionPlan>;
// Top-level CoalescePartitionsExec
Expand Down Expand Up @@ -1430,6 +1438,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() {
)
.unwrap(),
);
#[expect(deprecated)]
let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition, 8192));

let on = vec![
Expand All @@ -1456,6 +1465,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() {
.unwrap(),
);

#[expect(deprecated)]
// Top-level CoalesceBatchesExec
let cb =
Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc<dyn ExecutionPlan>;
Expand Down Expand Up @@ -2835,6 +2845,7 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() {
)
.unwrap(),
);
#[expect(deprecated)]
let build_coalesce = Arc::new(CoalesceBatchesExec::new(build_repartition, 8192));

let probe_hash_exprs = vec![
Expand All @@ -2848,6 +2859,7 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() {
)
.unwrap(),
);
#[expect(deprecated)]
let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition, 8192));

// Create HashJoinExec
Expand Down Expand Up @@ -2875,6 +2887,7 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() {
.unwrap(),
);

#[expect(deprecated)]
let plan =
Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc<dyn ExecutionPlan>;

Expand Down Expand Up @@ -3012,6 +3025,7 @@ async fn test_hashjoin_dynamic_filter_with_nulls() {
.unwrap(),
);

#[expect(deprecated)]
let plan =
Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc<dyn ExecutionPlan>;

Expand Down Expand Up @@ -3128,6 +3142,7 @@ async fn test_hashjoin_hash_table_pushdown_partitioned() {
)
.unwrap(),
);
#[expect(deprecated)]
let build_coalesce = Arc::new(CoalesceBatchesExec::new(build_repartition, 8192));

// Probe side: DataSource -> RepartitionExec (Hash) -> CoalesceBatchesExec
Expand All @@ -3142,6 +3157,7 @@ async fn test_hashjoin_hash_table_pushdown_partitioned() {
)
.unwrap(),
);
#[expect(deprecated)]
let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition, 8192));

// Create HashJoinExec with partitioned inputs
Expand Down Expand Up @@ -3169,6 +3185,7 @@ async fn test_hashjoin_hash_table_pushdown_partitioned() {
.unwrap(),
);

#[expect(deprecated)]
// Top-level CoalesceBatchesExec
let cb =
Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc<dyn ExecutionPlan>;
Expand Down Expand Up @@ -3297,6 +3314,7 @@ async fn test_hashjoin_hash_table_pushdown_collect_left() {
)
.unwrap(),
);
#[expect(deprecated)]
let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition, 8192));

let on = vec![
Expand All @@ -3323,6 +3341,7 @@ async fn test_hashjoin_hash_table_pushdown_collect_left() {
.unwrap(),
);

#[expect(deprecated)]
// Top-level CoalesceBatchesExec
let cb =
Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc<dyn ExecutionPlan>;
Expand Down Expand Up @@ -3460,6 +3479,7 @@ async fn test_hashjoin_hash_table_pushdown_integer_keys() {
.unwrap(),
);

#[expect(deprecated)]
let plan =
Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc<dyn ExecutionPlan>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ mod test {
use datafusion_physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
#[expect(deprecated)]
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::common::compute_record_batch_statistics;
Expand Down Expand Up @@ -716,6 +717,7 @@ mod test {
#[tokio::test]
async fn test_statistic_by_partition_of_coalesce_batches() -> Result<()> {
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
#[expect(deprecated)]
let coalesce_batches: Arc<dyn ExecutionPlan> =
Arc::new(CoalesceBatchesExec::new(scan, 2));
// Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use datafusion_physical_expr_common::sort_expr::{
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
use datafusion_physical_optimizer::projection_pushdown::ProjectionPushdown;
#[expect(deprecated)]
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::coop::CooperativeExec;
Expand Down Expand Up @@ -1691,6 +1692,7 @@ fn test_coalesce_batches_after_projection() -> Result<()> {
)),
csv,
)?);
#[expect(deprecated)]
let coalesce_batches: Arc<dyn ExecutionPlan> =
Arc::new(CoalesceBatchesExec::new(filter, 8192));
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_optimizer::enforce_sorting::replace_with_order_preserving_variants::{
plan_with_order_breaking_variants, plan_with_order_preserving_variants, replace_with_order_preserving_variants, OrderPreservationContext
};
#[expect(deprecated)]
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
Expand Down Expand Up @@ -1041,6 +1042,7 @@ async fn test_with_multiple_child_trees(
};
let left_repartition_rr = repartition_exec_round_robin(left_source);
let left_repartition_hash = repartition_exec_hash(left_repartition_rr);
#[expect(deprecated)]
let left_coalesce_partitions =
Arc::new(CoalesceBatchesExec::new(left_repartition_hash, 4096));

Expand All @@ -1053,6 +1055,7 @@ async fn test_with_multiple_child_trees(
};
let right_repartition_rr = repartition_exec_round_robin(right_source);
let right_repartition_hash = repartition_exec_hash(right_repartition_rr);
#[expect(deprecated)]
let right_coalesce_partitions =
Arc::new(CoalesceBatchesExec::new(right_repartition_hash, 4096));

Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/tests/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use datafusion_physical_optimizer::limited_distinct_aggregation::LimitedDistinct
use datafusion_physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
#[expect(deprecated)]
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
Expand Down Expand Up @@ -364,6 +365,7 @@ pub fn coalesce_batches_exec(
input: Arc<dyn ExecutionPlan>,
batch_size: usize,
) -> Arc<dyn ExecutionPlan> {
#[expect(deprecated)]
Arc::new(CoalesceBatchesExec::new(input, batch_size))
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ async fn explain_analyze_baseline_metrics() {
"output_batches=1"
);

#[expect(deprecated)]
fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool {
use datafusion::physical_plan;
use datafusion::physical_plan::sorts;
Expand Down
87 changes: 0 additions & 87 deletions datafusion/physical-optimizer/src/coalesce_batches.rs

This file was deleted.

Loading