Skip to content

Commit 3337ce8

Browse files
authored
Add partition coalescing at the head of the plan (#164)
* Add partition coalescing at the head of the plan * Add comments
1 parent bfbdf51 commit 3337ce8

File tree

3 files changed

+123
-39
lines changed

3 files changed

+123
-39
lines changed

src/distributed_physical_optimizer_rule.rs

Lines changed: 50 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use datafusion::common::tree_node::TreeNodeRecursion;
55
use datafusion::datasource::source::DataSourceExec;
66
use datafusion::error::DataFusionError;
77
use datafusion::physical_expr::Partitioning;
8+
use datafusion::physical_plan::ExecutionPlanProperties;
89
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
910
use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
1011
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
@@ -119,8 +120,14 @@ impl PhysicalOptimizerRule for DistributedPhysicalOptimizerRule {
119120
impl DistributedPhysicalOptimizerRule {
120121
fn apply_network_boundaries(
121122
&self,
122-
plan: Arc<dyn ExecutionPlan>,
123+
mut plan: Arc<dyn ExecutionPlan>,
123124
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
125+
if plan.output_partitioning().partition_count() > 1 {
126+
// Coalescing partitions here will allow us to put a NetworkCoalesceExec on top
127+
// of the plan, executing it in parallel.
128+
plan = Arc::new(CoalescePartitionsExec::new(plan))
129+
}
130+
124131
let result = plan.transform_up(|plan| {
125132
// If this node is a DataSourceExec, we need to wrap it with PartitionIsolatorExec so
126133
// that not all tasks have access to all partitions of the underlying DataSource.
@@ -205,7 +212,7 @@ impl DistributedPhysicalOptimizerRule {
205212
) -> Result<StageExec, DataFusionError> {
206213
let mut inputs = vec![];
207214

208-
let distributed = plan.clone().transform_down(|plan| {
215+
let mut distributed = plan.clone().transform_down(|plan| {
209216
// We cannot break down CollectLeft hash joins into more than 1 task, as these need
210217
// a full materialized build size with all the data in it.
211218
//
@@ -268,6 +275,13 @@ impl DistributedPhysicalOptimizerRule {
268275
Ok(Transformed::new(node, true, TreeNodeRecursion::Jump))
269276
})?;
270277

278+
// The head stage is executable, and upon execution, it will lazily assign worker URLs to
279+
// all tasks. This must only be done once, so the executable StageExec must only be called
280+
// once on 1 partition.
281+
if depth == 0 && distributed.data.output_partitioning().partition_count() > 1 {
282+
distributed.data = Arc::new(CoalescePartitionsExec::new(distributed.data));
283+
}
284+
271285
let inputs = inputs.into_iter().map(Arc::new).collect();
272286
let mut stage = StageExec::new(query_id, *num, distributed.data, inputs, n_tasks);
273287
*num += 1;
@@ -396,8 +410,9 @@ mod tests {
396410
let query = r#"SELECT * FROM weather"#;
397411
let plan = sql_to_explain(query, 1).await.unwrap();
398412
assert_snapshot!(plan, @r"
399-
┌───── Stage 1 Tasks: t0:[p0,p1,p2]
400-
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustDir, WindGustSpeed, WindDir9am, WindDir3pm, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RainToday, RISK_MM, RainTomorrow], file_type=parquet
413+
┌───── Stage 1 Tasks: t0:[p0]
414+
│ CoalescePartitionsExec
415+
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustDir, WindGustSpeed, WindDir9am, WindDir3pm, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RainToday, RISK_MM, RainTomorrow], file_type=parquet
401416
└──────────────────────────────────────────────────
402417
");
403418
}
@@ -463,12 +478,13 @@ mod tests {
463478
let query = r#"SELECT a."MinTemp", b."MaxTemp" FROM weather a LEFT JOIN weather b ON a."RainToday" = b."RainToday" "#;
464479
let plan = sql_to_explain(query, 2).await.unwrap();
465480
assert_snapshot!(plan, @r"
466-
┌───── Stage 1 Tasks: t0:[p0,p1,p2]
467-
│ CoalesceBatchesExec: target_batch_size=8192
468-
│ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
469-
│ CoalescePartitionsExec
470-
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
471-
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet
481+
┌───── Stage 1 Tasks: t0:[p0]
482+
│ CoalescePartitionsExec
483+
│ CoalesceBatchesExec: target_batch_size=8192
484+
│ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
485+
│ CoalescePartitionsExec
486+
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
487+
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet
472488
└──────────────────────────────────────────────────
473489
");
474490
}
@@ -501,15 +517,16 @@ mod tests {
501517
"#;
502518
let plan = sql_to_explain(query, 2).await.unwrap();
503519
assert_snapshot!(plan, @r"
504-
┌───── Stage 4 Tasks: t0:[p0,p1,p2,p3]
505-
│ CoalesceBatchesExec: target_batch_size=8192
506-
│ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainTomorrow@1, RainTomorrow@1)], projection=[MinTemp@0, MaxTemp@2]
507-
│ CoalescePartitionsExec
508-
│ NetworkCoalesceExec read_from=Stage 2, output_partitions=8, input_tasks=2
509-
│ ProjectionExec: expr=[avg(weather.MaxTemp)@1 as MaxTemp, RainTomorrow@0 as RainTomorrow]
510-
│ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
511-
│ CoalesceBatchesExec: target_batch_size=8192
512-
│ NetworkShuffleExec read_from=Stage 3, output_partitions=4, n_tasks=1, input_tasks=2
520+
┌───── Stage 4 Tasks: t0:[p0]
521+
│ CoalescePartitionsExec
522+
│ CoalesceBatchesExec: target_batch_size=8192
523+
│ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainTomorrow@1, RainTomorrow@1)], projection=[MinTemp@0, MaxTemp@2]
524+
│ CoalescePartitionsExec
525+
│ NetworkCoalesceExec read_from=Stage 2, output_partitions=8, input_tasks=2
526+
│ ProjectionExec: expr=[avg(weather.MaxTemp)@1 as MaxTemp, RainTomorrow@0 as RainTomorrow]
527+
│ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
528+
│ CoalesceBatchesExec: target_batch_size=8192
529+
│ NetworkShuffleExec read_from=Stage 3, output_partitions=4, n_tasks=1, input_tasks=2
513530
└──────────────────────────────────────────────────
514531
┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
515532
│ ProjectionExec: expr=[avg(weather.MinTemp)@1 as MinTemp, RainTomorrow@0 as RainTomorrow]
@@ -560,18 +577,22 @@ mod tests {
560577
let query = r#"SELECT DISTINCT "RainToday", "WindGustDir" FROM weather"#;
561578
let plan = sql_to_explain(query, 2).await.unwrap();
562579
assert_snapshot!(plan, @r"
563-
┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3]
564-
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
565-
│ CoalesceBatchesExec: target_batch_size=8192
566-
│ NetworkShuffleExec read_from=Stage 1, output_partitions=4, n_tasks=1, input_tasks=2
580+
┌───── Stage 3 Tasks: t0:[p0]
581+
│ CoalescePartitionsExec
582+
│ NetworkCoalesceExec read_from=Stage 2, output_partitions=8, input_tasks=2
567583
└──────────────────────────────────────────────────
568-
┌───── Stage 1 Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
569-
│ RepartitionExec: partitioning=Hash([RainToday@0, WindGustDir@1], 4), input_partitions=4
570-
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
571-
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
572-
│ PartitionIsolatorExec Tasks: t0:[p0,p1,__] t1:[__,__,p0]
573-
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday, WindGustDir], file_type=parquet
584+
┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
585+
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
586+
│ CoalesceBatchesExec: target_batch_size=8192
587+
│ NetworkShuffleExec read_from=Stage 1, output_partitions=4, n_tasks=2, input_tasks=2
574588
└──────────────────────────────────────────────────
589+
┌───── Stage 1 Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7]
590+
│ RepartitionExec: partitioning=Hash([RainToday@0, WindGustDir@1], 8), input_partitions=4
591+
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
592+
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
593+
│ PartitionIsolatorExec Tasks: t0:[p0,p1,__] t1:[__,__,p0]
594+
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday, WindGustDir], file_type=parquet
595+
└──────────────────────────────────────────────────
575596
");
576597
}
577598

src/execution_plans/stage.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::channel_resolver_ext::get_distributed_channel_resolver;
22
use crate::execution_plans::NetworkCoalesceExec;
33
use crate::{ChannelResolver, NetworkShuffleExec, PartitionIsolatorExec};
4-
use datafusion::common::{internal_datafusion_err, internal_err};
4+
use datafusion::common::{exec_err, internal_datafusion_err};
55
use datafusion::error::{DataFusionError, Result};
66
use datafusion::execution::TaskContext;
77
use datafusion::physical_plan::{
@@ -170,15 +170,6 @@ impl StageExec {
170170
format!("Stage {:<3}{}", self.num, child_str)
171171
}
172172

173-
pub fn try_assign(self, channel_resolver: &impl ChannelResolver) -> Result<Self> {
174-
let urls: Vec<Url> = channel_resolver.get_urls()?;
175-
if urls.is_empty() {
176-
return internal_err!("No URLs found in ChannelManager");
177-
}
178-
179-
Ok(self)
180-
}
181-
182173
fn try_assign_urls(&self, urls: &[Url]) -> Result<Self> {
183174
let assigned_children = self
184175
.child_stages_iter()
@@ -263,11 +254,27 @@ impl ExecutionPlan for StageExec {
263254
self.plan.properties()
264255
}
265256

257+
/// Executes a query in a distributed manner. This method will lazily perform URL assignation
258+
/// to all the tasks, therefore, it must only be called once.
259+
///
260+
/// [StageExec::execute] is only used for starting the distributed query in the same machine
261+
/// that planned it, but it's not used for task execution in `ArrowFlightEndpoint`, there,
262+
/// the inner `stage.plan` is executed directly.
266263
fn execute(
267264
&self,
268265
partition: usize,
269266
context: Arc<TaskContext>,
270267
) -> Result<datafusion::execution::SendableRecordBatchStream> {
268+
if partition > 0 {
269+
// The StageExec node calls try_assign_urls() lazily upon calling .execute(). This means
270+
// that .execute() must only be called once, as we cannot afford to perform several
271+
// random URL assignation while calling multiple partitions, as they will differ,
272+
// producing an invalid plan
273+
return exec_err!(
274+
"an executable StageExec must only have 1 partition, but it was called with partition index {partition}"
275+
);
276+
}
277+
271278
let channel_resolver = get_distributed_channel_resolver(context.session_config())?;
272279

273280
let assigned_stage = self

tests/distributed_aggregation.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,60 @@ mod tests {
9696

9797
Ok(())
9898
}
99+
100+
#[tokio::test]
101+
async fn distributed_aggregation_head_node_partitioned() -> Result<(), Box<dyn Error>> {
102+
let (ctx, _guard) = start_localhost_context(6, DefaultSessionBuilder).await;
103+
register_parquet_tables(&ctx).await?;
104+
105+
let df = ctx
106+
.sql(r#"SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday""#)
107+
.await?;
108+
let physical = df.create_physical_plan().await?;
109+
110+
let physical_str = displayable(physical.as_ref()).indent(true).to_string();
111+
112+
let physical_distributed = DistributedPhysicalOptimizerRule::default()
113+
.with_network_shuffle_tasks(6)
114+
.with_network_coalesce_tasks(6)
115+
.optimize(physical.clone(), &Default::default())?;
116+
117+
let physical_distributed_str = displayable(physical_distributed.as_ref())
118+
.indent(true)
119+
.to_string();
120+
121+
assert_snapshot!(physical_str,
122+
@r"
123+
ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday]
124+
AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
125+
CoalesceBatchesExec: target_batch_size=8192
126+
RepartitionExec: partitioning=Hash([RainToday@0], 3), input_partitions=3
127+
AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
128+
DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
129+
",
130+
);
131+
132+
assert_snapshot!(physical_distributed_str,
133+
@r"
134+
┌───── Stage 3 Tasks: t0:[p0]
135+
│ CoalescePartitionsExec
136+
│ NetworkCoalesceExec read_from=Stage 2, output_partitions=18, input_tasks=6
137+
└──────────────────────────────────────────────────
138+
┌───── Stage 2 Tasks: t0:[p0,p1,p2] t1:[p0,p1,p2] t2:[p0,p1,p2] t3:[p0,p1,p2] t4:[p0,p1,p2] t5:[p0,p1,p2]
139+
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday]
140+
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
141+
│ CoalesceBatchesExec: target_batch_size=8192
142+
│ NetworkShuffleExec read_from=Stage 1, output_partitions=3, n_tasks=6, input_tasks=3
143+
└──────────────────────────────────────────────────
144+
┌───── Stage 1 Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17] t1:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17] t2:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17]
145+
│ RepartitionExec: partitioning=Hash([RainToday@0], 18), input_partitions=1
146+
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
147+
│ PartitionIsolatorExec Tasks: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
148+
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
149+
└──────────────────────────────────────────────────
150+
",
151+
);
152+
153+
Ok(())
154+
}
99155
}

0 commit comments

Comments
 (0)