Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions datafusion-examples/examples/udf/async_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ pub async fn async_udf() -> Result<()> {
"| physical_plan | FilterExec: __async_fn_0@2, projection=[id@0, name@1] |",
"| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |",
"| | AsyncFuncExec: async_expr=[async_expr(name=__async_fn_0, expr=ask_llm(CAST(name@1 AS Utf8View), Is this animal furry?))] |",
"| | CoalesceBatchesExec: target_batch_size=8192 |",
"| | DataSourceExec: partitions=1, partition_sizes=[1] |",
"| | DataSourceExec: partitions=1, partition_sizes=[1] |",
"| | |",
"+---------------+------------------------------------------------------------------------------------------------------------------------------+",
],
Expand Down
40 changes: 19 additions & 21 deletions datafusion/core/tests/execution/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::ensure_coop::EnsureCooperative;
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_physical_plan::coop::make_cooperative;
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec};
Expand Down Expand Up @@ -425,10 +424,7 @@ async fn filter_reject_all_batches_yields(
));
let filtered = Arc::new(FilterExec::try_new(false_predicate, Arc::new(infinite))?);

// Use CoalesceBatchesExec to guarantee each Filter pull always yields an 8192-row batch
let coalesced = Arc::new(CoalesceBatchesExec::new(filtered, 8_192));

query_yields(coalesced, session_ctx.task_ctx()).await
query_yields(filtered, session_ctx.task_ctx()).await
}

#[rstest]
Expand Down Expand Up @@ -584,17 +580,18 @@ async fn join_yields(
let left_keys: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new("value", 0))];
let right_keys: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new("value", 0))];

// Wrap each side in CoalesceBatches + Repartition so they are both hashed into 1 partition
let coalesced_left =
Arc::new(CoalesceBatchesExec::new(Arc::new(infinite_left), 8_192));
let coalesced_right =
Arc::new(CoalesceBatchesExec::new(Arc::new(infinite_right), 8_192));

let part_left = Partitioning::Hash(left_keys, 1);
let part_right = Partitioning::Hash(right_keys, 1);

let hashed_left = Arc::new(RepartitionExec::try_new(coalesced_left, part_left)?);
let hashed_right = Arc::new(RepartitionExec::try_new(coalesced_right, part_right)?);
// Wrap each side in Repartition so they are both hashed into 1 partition
let hashed_left = Arc::new(RepartitionExec::try_new(
Arc::new(infinite_left),
part_left,
)?);
let hashed_right = Arc::new(RepartitionExec::try_new(
Arc::new(infinite_right),
part_right,
)?);

// Build an Inner HashJoinExec → left.value = right.value
let join = Arc::new(HashJoinExec::try_new(
Expand Down Expand Up @@ -632,17 +629,18 @@ async fn join_agg_yields(
let left_keys: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new("value", 0))];
let right_keys: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new("value", 0))];

// Wrap each side in CoalesceBatches + Repartition so they are both hashed into 1 partition
let coalesced_left =
Arc::new(CoalesceBatchesExec::new(Arc::new(infinite_left), 8_192));
let coalesced_right =
Arc::new(CoalesceBatchesExec::new(Arc::new(infinite_right), 8_192));

let part_left = Partitioning::Hash(left_keys, 1);
let part_right = Partitioning::Hash(right_keys, 1);

let hashed_left = Arc::new(RepartitionExec::try_new(coalesced_left, part_left)?);
let hashed_right = Arc::new(RepartitionExec::try_new(coalesced_right, part_right)?);
// Wrap each side in Repartition so they are both hashed into 1 partition
let hashed_left = Arc::new(RepartitionExec::try_new(
Arc::new(infinite_left),
part_left,
)?);
let hashed_right = Arc::new(RepartitionExec::try_new(
Arc::new(infinite_right),
part_right,
)?);

// Build an Inner HashJoinExec → left.value = right.value
let join = Arc::new(HashJoinExec::try_new(
Expand Down
11 changes: 3 additions & 8 deletions datafusion/core/tests/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use datafusion_physical_optimizer::output_requirements::OutputRequirements;
use datafusion_physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;

use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::execution_plan::ExecutionPlan;
use datafusion_physical_plan::expressions::col;
Expand Down Expand Up @@ -1741,9 +1741,6 @@ fn merge_does_not_need_sort() -> Result<()> {
// Scan some sorted parquet files
let exec = parquet_exec_multiple_sorted(vec![sort_key.clone()]);

// CoalesceBatchesExec to mimic behavior after a filter
let exec = Arc::new(CoalesceBatchesExec::new(exec, 4096));

// Merge from multiple parquet files and keep the data sorted
let exec: Arc<dyn ExecutionPlan> =
Arc::new(SortPreservingMergeExec::new(sort_key, exec));
Expand All @@ -1757,8 +1754,7 @@ fn merge_does_not_need_sort() -> Result<()> {
assert_plan!(plan_distrib,
@r"
SortPreservingMergeExec: [a@0 ASC]
CoalesceBatchesExec: target_batch_size=4096
DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
");

// Test: result IS DIFFERENT, if EnforceSorting is run first:
Expand All @@ -1772,8 +1768,7 @@ fn merge_does_not_need_sort() -> Result<()> {
@r"
SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
CoalescePartitionsExec
CoalesceBatchesExec: target_batch_size=4096
DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
");

Ok(())
Expand Down
29 changes: 12 additions & 17 deletions datafusion/core/tests/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ use std::sync::Arc;
use crate::memory_limit::DummyStreamPartition;
use crate::physical_optimizer::test_utils::{
RequirementsTestExec, aggregate_exec, bounded_window_exec,
bounded_window_exec_with_partition, check_integrity, coalesce_batches_exec,
coalesce_partitions_exec, create_test_schema, create_test_schema2,
create_test_schema3, filter_exec, global_limit_exec, hash_join_exec,
local_limit_exec, memory_exec, parquet_exec, parquet_exec_with_sort, projection_exec,
repartition_exec, sort_exec, sort_exec_with_fetch, sort_expr, sort_expr_options,
sort_merge_join_exec, sort_preserving_merge_exec,
sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered,
union_exec,
bounded_window_exec_with_partition, check_integrity, coalesce_partitions_exec,
create_test_schema, create_test_schema2, create_test_schema3, filter_exec,
global_limit_exec, hash_join_exec, local_limit_exec, memory_exec, parquet_exec,
parquet_exec_with_sort, projection_exec, repartition_exec, sort_exec,
sort_exec_with_fetch, sort_expr, sort_expr_options, sort_merge_join_exec,
sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch,
spr_repartition_exec, stream_exec_ordered, union_exec,
};

use arrow::compute::SortOptions;
Expand Down Expand Up @@ -1845,9 +1844,7 @@ async fn test_remove_unnecessary_sort_window_multilayer() -> Result<()> {
)]
.into();
let sort = sort_exec(ordering.clone(), source);
// Add dummy layer propagating Sort above, to test whether sort can be removed from multi layer before
let coalesce_batches = coalesce_batches_exec(sort, 128);
let window_agg = bounded_window_exec("non_nullable_col", ordering, coalesce_batches);
let window_agg = bounded_window_exec("non_nullable_col", ordering, sort);
let ordering2: LexOrdering = [sort_expr_options(
"non_nullable_col",
&window_agg.schema(),
Expand All @@ -1873,17 +1870,15 @@ async fn test_remove_unnecessary_sort_window_multilayer() -> Result<()> {
FilterExec: NOT non_nullable_col@1
SortExec: expr=[non_nullable_col@1 ASC NULLS LAST], preserve_partitioning=[false]
BoundedWindowAggExec: wdw=[count: Field { "count": Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
CoalesceBatchesExec: target_batch_size=128
SortExec: expr=[non_nullable_col@1 DESC], preserve_partitioning=[false]
DataSourceExec: partitions=1, partition_sizes=[0]
SortExec: expr=[non_nullable_col@1 DESC], preserve_partitioning=[false]
DataSourceExec: partitions=1, partition_sizes=[0]

Optimized Plan:
WindowAggExec: wdw=[count: Ok(Field { name: "count", data_type: Int64 }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(UInt64(NULL)), is_causal: false }]
FilterExec: NOT non_nullable_col@1
BoundedWindowAggExec: wdw=[count: Field { "count": Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
CoalesceBatchesExec: target_batch_size=128
SortExec: expr=[non_nullable_col@1 DESC], preserve_partitioning=[false]
DataSourceExec: partitions=1, partition_sizes=[0]
SortExec: expr=[non_nullable_col@1 DESC], preserve_partitioning=[false]
DataSourceExec: partitions=1, partition_sizes=[0]
"#);

Ok(())
Expand Down
Loading