From 2de5bec8efbae8753af76d1d3a3769695c12eb75 Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Sun, 28 Sep 2025 18:12:44 +0200 Subject: [PATCH 1/2] Add partition coalescing at the head of the plan --- src/distributed_physical_optimizer_rule.rs | 79 ++++++++++++++-------- src/execution_plans/stage.rs | 17 ++--- tests/distributed_aggregation.rs | 56 +++++++++++++++ 3 files changed, 113 insertions(+), 39 deletions(-) diff --git a/src/distributed_physical_optimizer_rule.rs b/src/distributed_physical_optimizer_rule.rs index 17935bf..bd50f07 100644 --- a/src/distributed_physical_optimizer_rule.rs +++ b/src/distributed_physical_optimizer_rule.rs @@ -5,6 +5,7 @@ use datafusion::common::tree_node::TreeNodeRecursion; use datafusion::datasource::source::DataSourceExec; use datafusion::error::DataFusionError; use datafusion::physical_expr::Partitioning; +use datafusion::physical_plan::ExecutionPlanProperties; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode}; use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; @@ -119,8 +120,14 @@ impl PhysicalOptimizerRule for DistributedPhysicalOptimizerRule { impl DistributedPhysicalOptimizerRule { fn apply_network_boundaries( &self, - plan: Arc, + mut plan: Arc, ) -> Result, DataFusionError> { + if plan.output_partitioning().partition_count() > 1 { + // Coalescing partitions here will allow us to put a NetworkCoalesceExec on top + // of the plan, executing it in parallel. + plan = Arc::new(CoalescePartitionsExec::new(plan)) + } + let result = plan.transform_up(|plan| { // If this node is a DataSourceExec, we need to wrap it with PartitionIsolatorExec so // that not all tasks have access to all partitions of the underlying DataSource. @@ -205,7 +212,7 @@ impl DistributedPhysicalOptimizerRule { ) -> Result { let mut inputs = vec![]; - let distributed = plan.clone().transform_down(|plan| { + let mut distributed = plan.clone().transform_down(|plan| { // We cannot break down CollectLeft hash joins into more than 1 task, as these need // a full materialized build size with all the data in it. // @@ -268,6 +275,13 @@ impl DistributedPhysicalOptimizerRule { Ok(Transformed::new(node, true, TreeNodeRecursion::Jump)) })?; + // The head stage is executable, and upon execution, it will lazily assign worker URLs to + // all tasks. This must only be done once, so the executable StageExec must only be called + // once on 1 partition. + if depth == 0 && distributed.data.output_partitioning().partition_count() > 1 { + distributed.data = Arc::new(CoalescePartitionsExec::new(distributed.data)); + } + let inputs = inputs.into_iter().map(Arc::new).collect(); let mut stage = StageExec::new(query_id, *num, distributed.data, inputs, n_tasks); *num += 1; @@ -396,8 +410,9 @@ mod tests { let query = r#"SELECT * FROM weather"#; let plan = sql_to_explain(query, 1).await.unwrap(); assert_snapshot!(plan, @r" - ┌───── Stage 1 Tasks: t0:[p0,p1,p2] - │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustDir, WindGustSpeed, WindDir9am, WindDir3pm, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RainToday, RISK_MM, RainTomorrow], file_type=parquet + ┌───── Stage 1 Tasks: t0:[p0] + │ CoalescePartitionsExec + │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustDir, WindGustSpeed, WindDir9am, WindDir3pm, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RainToday, RISK_MM, RainTomorrow], file_type=parquet └────────────────────────────────────────────────── "); } @@ -463,12 +478,13 @@ mod tests { let query = r#"SELECT a."MinTemp", b."MaxTemp" FROM weather a LEFT JOIN weather b ON a."RainToday" = b."RainToday" "#; let plan = sql_to_explain(query, 2).await.unwrap(); assert_snapshot!(plan, @r" - ┌───── Stage 1 Tasks: t0:[p0,p1,p2] - │ CoalesceBatchesExec: target_batch_size=8192 - │ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2] - │ CoalescePartitionsExec - │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet - │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet + ┌───── Stage 1 Tasks: t0:[p0] + │ CoalescePartitionsExec + │ CoalesceBatchesExec: target_batch_size=8192 + │ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2] + │ CoalescePartitionsExec + │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet + │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet └────────────────────────────────────────────────── "); } @@ -501,15 +517,16 @@ mod tests { "#; let plan = sql_to_explain(query, 2).await.unwrap(); assert_snapshot!(plan, @r" - ┌───── Stage 4 Tasks: t0:[p0,p1,p2,p3] - │ CoalesceBatchesExec: target_batch_size=8192 - │ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainTomorrow@1, RainTomorrow@1)], projection=[MinTemp@0, MaxTemp@2] - │ CoalescePartitionsExec - │ NetworkCoalesceExec read_from=Stage 2, output_partitions=8, input_tasks=2 - │ ProjectionExec: expr=[avg(weather.MaxTemp)@1 as MaxTemp, RainTomorrow@0 as RainTomorrow] - │ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MaxTemp)] - │ CoalesceBatchesExec: target_batch_size=8192 - │ NetworkShuffleExec read_from=Stage 3, output_partitions=4, n_tasks=1, input_tasks=2 + ┌───── Stage 4 Tasks: t0:[p0] + │ CoalescePartitionsExec + │ CoalesceBatchesExec: target_batch_size=8192 + │ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainTomorrow@1, RainTomorrow@1)], projection=[MinTemp@0, MaxTemp@2] + │ CoalescePartitionsExec + │ NetworkCoalesceExec read_from=Stage 2, output_partitions=8, input_tasks=2 + │ ProjectionExec: expr=[avg(weather.MaxTemp)@1 as MaxTemp, RainTomorrow@0 as RainTomorrow] + │ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MaxTemp)] + │ CoalesceBatchesExec: target_batch_size=8192 + │ NetworkShuffleExec read_from=Stage 3, output_partitions=4, n_tasks=1, input_tasks=2 └────────────────────────────────────────────────── ┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3] │ ProjectionExec: expr=[avg(weather.MinTemp)@1 as MinTemp, RainTomorrow@0 as RainTomorrow] @@ -560,18 +577,22 @@ mod tests { let query = r#"SELECT DISTINCT "RainToday", "WindGustDir" FROM weather"#; let plan = sql_to_explain(query, 2).await.unwrap(); assert_snapshot!(plan, @r" - ┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3] - │ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[] - │ CoalesceBatchesExec: target_batch_size=8192 - │ NetworkShuffleExec read_from=Stage 1, output_partitions=4, n_tasks=1, input_tasks=2 + ┌───── Stage 3 Tasks: t0:[p0] + │ CoalescePartitionsExec + │ NetworkCoalesceExec read_from=Stage 2, output_partitions=8, input_tasks=2 └────────────────────────────────────────────────── - ┌───── Stage 1 Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3] - │ RepartitionExec: partitioning=Hash([RainToday@0, WindGustDir@1], 4), input_partitions=4 - │ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 - │ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[] - │ PartitionIsolatorExec Tasks: t0:[p0,p1,__] t1:[__,__,p0] - │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday, WindGustDir], file_type=parquet + ┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3] + │ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[] + │ CoalesceBatchesExec: target_batch_size=8192 + │ NetworkShuffleExec read_from=Stage 1, output_partitions=4, n_tasks=2, input_tasks=2 └────────────────────────────────────────────────── + ┌───── Stage 1 Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7] + │ RepartitionExec: partitioning=Hash([RainToday@0, WindGustDir@1], 8), input_partitions=4 + │ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 + │ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[] + │ PartitionIsolatorExec Tasks: t0:[p0,p1,__] t1:[__,__,p0] + │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday, WindGustDir], file_type=parquet + └────────────────────────────────────────────────── "); } diff --git a/src/execution_plans/stage.rs b/src/execution_plans/stage.rs index a57d637..de0446a 100644 --- a/src/execution_plans/stage.rs +++ b/src/execution_plans/stage.rs @@ -1,7 +1,7 @@ use crate::channel_resolver_ext::get_distributed_channel_resolver; use crate::execution_plans::NetworkCoalesceExec; use crate::{ChannelResolver, NetworkShuffleExec, PartitionIsolatorExec}; -use datafusion::common::{internal_datafusion_err, internal_err}; +use datafusion::common::{exec_err, internal_datafusion_err}; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::TaskContext; use datafusion::physical_plan::{ @@ -170,15 +170,6 @@ impl StageExec { format!("Stage {:<3}{}", self.num, child_str) } - pub fn try_assign(self, channel_resolver: &impl ChannelResolver) -> Result { - let urls: Vec = channel_resolver.get_urls()?; - if urls.is_empty() { - return internal_err!("No URLs found in ChannelManager"); - } - - Ok(self) - } - fn try_assign_urls(&self, urls: &[Url]) -> Result { let assigned_children = self .child_stages_iter() @@ -268,6 +259,12 @@ impl ExecutionPlan for StageExec { partition: usize, context: Arc, ) -> Result { + if partition > 0 { + return exec_err!( + "an executable StageExec must only have 1 partition, but it was called with partition index {partition}" + ); + } + let channel_resolver = get_distributed_channel_resolver(context.session_config())?; let assigned_stage = self diff --git a/tests/distributed_aggregation.rs b/tests/distributed_aggregation.rs index 6243225..fa0f3b0 100644 --- a/tests/distributed_aggregation.rs +++ b/tests/distributed_aggregation.rs @@ -96,4 +96,60 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn distributed_aggregation_head_node_partitioned() -> Result<(), Box> { + let (ctx, _guard) = start_localhost_context(6, DefaultSessionBuilder).await; + register_parquet_tables(&ctx).await?; + + let df = ctx + .sql(r#"SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday""#) + .await?; + let physical = df.create_physical_plan().await?; + + let physical_str = displayable(physical.as_ref()).indent(true).to_string(); + + let physical_distributed = DistributedPhysicalOptimizerRule::default() + .with_network_shuffle_tasks(6) + .with_network_coalesce_tasks(6) + .optimize(physical.clone(), &Default::default())?; + + let physical_distributed_str = displayable(physical_distributed.as_ref()) + .indent(true) + .to_string(); + + assert_snapshot!(physical_str, + @r" + ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday] + AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))] + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([RainToday@0], 3), input_partitions=3 + AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))] + DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet + ", + ); + + assert_snapshot!(physical_distributed_str, + @r" + ┌───── Stage 3 Tasks: t0:[p0] + │ CoalescePartitionsExec + │ NetworkCoalesceExec read_from=Stage 2, output_partitions=18, input_tasks=6 + └────────────────────────────────────────────────── + ┌───── Stage 2 Tasks: t0:[p0,p1,p2] t1:[p0,p1,p2] t2:[p0,p1,p2] t3:[p0,p1,p2] t4:[p0,p1,p2] t5:[p0,p1,p2] + │ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday] + │ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))] + │ CoalesceBatchesExec: target_batch_size=8192 + │ NetworkShuffleExec read_from=Stage 1, output_partitions=3, n_tasks=6, input_tasks=3 + └────────────────────────────────────────────────── + ┌───── Stage 1 Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17] t1:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17] t2:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17] + │ RepartitionExec: partitioning=Hash([RainToday@0], 18), input_partitions=1 + │ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))] + │ PartitionIsolatorExec Tasks: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] + │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet + └────────────────────────────────────────────────── + ", + ); + + Ok(()) + } } From eb719a446bf4425209272292c4e9b5daea8261da Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Mon, 29 Sep 2025 16:35:48 +0200 Subject: [PATCH 2/2] Add comments --- src/execution_plans/stage.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/execution_plans/stage.rs b/src/execution_plans/stage.rs index de0446a..d1b7782 100644 --- a/src/execution_plans/stage.rs +++ b/src/execution_plans/stage.rs @@ -254,12 +254,22 @@ impl ExecutionPlan for StageExec { self.plan.properties() } + /// Executes a query in a distributed manner. This method will lazily perform URL assignation + /// to all the tasks, therefore, it must only be called once. + /// + /// [StageExec::execute] is only used for starting the distributed query in the same machine + /// that planned it, but it's not used for task execution in `ArrowFlightEndpoint`, there, + /// the inner `stage.plan` is executed directly. fn execute( &self, partition: usize, context: Arc, ) -> Result { if partition > 0 { + // The StageExec node calls try_assign_urls() lazily upon calling .execute(). This means + // that .execute() must only be called once, as we cannot afford to perform several + // random URL assignation while calling multiple partitions, as they will differ, + // producing an invalid plan return exec_err!( "an executable StageExec must only have 1 partition, but it was called with partition index {partition}" );