Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 50 additions & 29 deletions src/distributed_physical_optimizer_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::datasource::source::DataSourceExec;
use datafusion::error::DataFusionError;
use datafusion::physical_expr::Partitioning;
use datafusion::physical_plan::ExecutionPlanProperties;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
Expand Down Expand Up @@ -119,8 +120,14 @@ impl PhysicalOptimizerRule for DistributedPhysicalOptimizerRule {
impl DistributedPhysicalOptimizerRule {
fn apply_network_boundaries(
&self,
plan: Arc<dyn ExecutionPlan>,
mut plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
if plan.output_partitioning().partition_count() > 1 {
// Coalescing partitions here will allow us to put a NetworkCoalesceExec on top
// of the plan, executing it in parallel.
plan = Arc::new(CoalescePartitionsExec::new(plan))
}

let result = plan.transform_up(|plan| {
// If this node is a DataSourceExec, we need to wrap it with PartitionIsolatorExec so
// that not all tasks have access to all partitions of the underlying DataSource.
Expand Down Expand Up @@ -205,7 +212,7 @@ impl DistributedPhysicalOptimizerRule {
) -> Result<StageExec, DataFusionError> {
let mut inputs = vec![];

let distributed = plan.clone().transform_down(|plan| {
let mut distributed = plan.clone().transform_down(|plan| {
// We cannot break down CollectLeft hash joins into more than 1 task, as these need
// a full materialized build size with all the data in it.
//
Expand Down Expand Up @@ -268,6 +275,13 @@ impl DistributedPhysicalOptimizerRule {
Ok(Transformed::new(node, true, TreeNodeRecursion::Jump))
})?;

// The head stage is executable, and upon execution, it will lazily assign worker URLs to
// all tasks. This must only be done once, so the executable StageExec must only be called
// once on 1 partition.
if depth == 0 && distributed.data.output_partitioning().partition_count() > 1 {
distributed.data = Arc::new(CoalescePartitionsExec::new(distributed.data));
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: What did we do before this PR? I am asking because I do not see any removed or modified code in this PR

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we did pretty much nothing, and therefore, we were potentially executing multiple partitions with potentially conflicting URL assignations.


let inputs = inputs.into_iter().map(Arc::new).collect();
let mut stage = StageExec::new(query_id, *num, distributed.data, inputs, n_tasks);
*num += 1;
Expand Down Expand Up @@ -396,8 +410,9 @@ mod tests {
let query = r#"SELECT * FROM weather"#;
let plan = sql_to_explain(query, 1).await.unwrap();
assert_snapshot!(plan, @r"
┌───── Stage 1 Tasks: t0:[p0,p1,p2]
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustDir, WindGustSpeed, WindDir9am, WindDir3pm, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RainToday, RISK_MM, RainTomorrow], file_type=parquet
┌───── Stage 1 Tasks: t0:[p0]
│ CoalescePartitionsExec
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustDir, WindGustSpeed, WindDir9am, WindDir3pm, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RainToday, RISK_MM, RainTomorrow], file_type=parquet
└──────────────────────────────────────────────────
");
}
Expand Down Expand Up @@ -463,12 +478,13 @@ mod tests {
let query = r#"SELECT a."MinTemp", b."MaxTemp" FROM weather a LEFT JOIN weather b ON a."RainToday" = b."RainToday" "#;
let plan = sql_to_explain(query, 2).await.unwrap();
assert_snapshot!(plan, @r"
┌───── Stage 1 Tasks: t0:[p0,p1,p2]
│ CoalesceBatchesExec: target_batch_size=8192
│ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
│ CoalescePartitionsExec
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet
┌───── Stage 1 Tasks: t0:[p0]
│ CoalescePartitionsExec
│ CoalesceBatchesExec: target_batch_size=8192
│ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
│ CoalescePartitionsExec
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet
└──────────────────────────────────────────────────
");
}
Expand Down Expand Up @@ -501,15 +517,16 @@ mod tests {
"#;
let plan = sql_to_explain(query, 2).await.unwrap();
assert_snapshot!(plan, @r"
┌───── Stage 4 Tasks: t0:[p0,p1,p2,p3]
│ CoalesceBatchesExec: target_batch_size=8192
│ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainTomorrow@1, RainTomorrow@1)], projection=[MinTemp@0, MaxTemp@2]
│ CoalescePartitionsExec
│ NetworkCoalesceExec read_from=Stage 2, output_partitions=8, input_tasks=2
│ ProjectionExec: expr=[avg(weather.MaxTemp)@1 as MaxTemp, RainTomorrow@0 as RainTomorrow]
│ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
│ CoalesceBatchesExec: target_batch_size=8192
│ NetworkShuffleExec read_from=Stage 3, output_partitions=4, n_tasks=1, input_tasks=2
┌───── Stage 4 Tasks: t0:[p0]
│ CoalescePartitionsExec
│ CoalesceBatchesExec: target_batch_size=8192
│ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainTomorrow@1, RainTomorrow@1)], projection=[MinTemp@0, MaxTemp@2]
│ CoalescePartitionsExec
│ NetworkCoalesceExec read_from=Stage 2, output_partitions=8, input_tasks=2
│ ProjectionExec: expr=[avg(weather.MaxTemp)@1 as MaxTemp, RainTomorrow@0 as RainTomorrow]
│ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
│ CoalesceBatchesExec: target_batch_size=8192
│ NetworkShuffleExec read_from=Stage 3, output_partitions=4, n_tasks=1, input_tasks=2
└──────────────────────────────────────────────────
┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
│ ProjectionExec: expr=[avg(weather.MinTemp)@1 as MinTemp, RainTomorrow@0 as RainTomorrow]
Expand Down Expand Up @@ -560,18 +577,22 @@ mod tests {
let query = r#"SELECT DISTINCT "RainToday", "WindGustDir" FROM weather"#;
let plan = sql_to_explain(query, 2).await.unwrap();
assert_snapshot!(plan, @r"
┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3]
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
│ CoalesceBatchesExec: target_batch_size=8192
│ NetworkShuffleExec read_from=Stage 1, output_partitions=4, n_tasks=1, input_tasks=2
┌───── Stage 3 Tasks: t0:[p0]
│ CoalescePartitionsExec
│ NetworkCoalesceExec read_from=Stage 2, output_partitions=8, input_tasks=2
└──────────────────────────────────────────────────
┌───── Stage 1 Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
│ RepartitionExec: partitioning=Hash([RainToday@0, WindGustDir@1], 4), input_partitions=4
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
│ PartitionIsolatorExec Tasks: t0:[p0,p1,__] t1:[__,__,p0]
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday, WindGustDir], file_type=parquet
┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3]
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
│ CoalesceBatchesExec: target_batch_size=8192
│ NetworkShuffleExec read_from=Stage 1, output_partitions=4, n_tasks=2, input_tasks=2
└──────────────────────────────────────────────────
┌───── Stage 1 Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7]
│ RepartitionExec: partitioning=Hash([RainToday@0, WindGustDir@1], 8), input_partitions=4
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
│ PartitionIsolatorExec Tasks: t0:[p0,p1,__] t1:[__,__,p0]
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday, WindGustDir], file_type=parquet
└──────────────────────────────────────────────────
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This turns out to be a very good optimization for large data set that the final aggregation is done in many tasks/workers 👍 🎉 .

This isn’t directly related to the current PR, but worth noting as a future optimization:
If the input files are partitioned on the group-by columns RainToday, WindGustDir (i.e., the three files don’t overlap on those values), we can eliminate RoundRobin partitioning. This would allow stage 1 and stage 2 to merge into a single stage, executing both partial and final aggregation together. I’m currently exploring use cases for this and will share query plans soon to illustrate the idea more clearly.

");
}

Expand Down
17 changes: 7 additions & 10 deletions src/execution_plans/stage.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::channel_resolver_ext::get_distributed_channel_resolver;
use crate::execution_plans::NetworkCoalesceExec;
use crate::{ChannelResolver, NetworkShuffleExec, PartitionIsolatorExec};
use datafusion::common::{internal_datafusion_err, internal_err};
use datafusion::common::{exec_err, internal_datafusion_err};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::{
Expand Down Expand Up @@ -170,15 +170,6 @@ impl StageExec {
format!("Stage {:<3}{}", self.num, child_str)
}

pub fn try_assign(self, channel_resolver: &impl ChannelResolver) -> Result<Self> {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove as it's unused

let urls: Vec<Url> = channel_resolver.get_urls()?;
if urls.is_empty() {
return internal_err!("No URLs found in ChannelManager");
}

Ok(self)
}

fn try_assign_urls(&self, urls: &[Url]) -> Result<Self> {
let assigned_children = self
.child_stages_iter()
Expand Down Expand Up @@ -268,6 +259,12 @@ impl ExecutionPlan for StageExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<datafusion::execution::SendableRecordBatchStream> {
if partition > 0 {
return exec_err!(
"an executable StageExec must only have 1 partition, but it was called with partition index {partition}"
);
}

let channel_resolver = get_distributed_channel_resolver(context.session_config())?;

let assigned_stage = self
Expand Down
56 changes: 56 additions & 0 deletions tests/distributed_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,60 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn distributed_aggregation_head_node_partitioned() -> Result<(), Box<dyn Error>> {
let (ctx, _guard) = start_localhost_context(6, DefaultSessionBuilder).await;
register_parquet_tables(&ctx).await?;

let df = ctx
.sql(r#"SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday""#)
.await?;
let physical = df.create_physical_plan().await?;

let physical_str = displayable(physical.as_ref()).indent(true).to_string();

let physical_distributed = DistributedPhysicalOptimizerRule::default()
.with_network_shuffle_tasks(6)
.with_network_coalesce_tasks(6)
.optimize(physical.clone(), &Default::default())?;

let physical_distributed_str = displayable(physical_distributed.as_ref())
.indent(true)
.to_string();

assert_snapshot!(physical_str,
@r"
ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday]
AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([RainToday@0], 3), input_partitions=3
AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
",
);

assert_snapshot!(physical_distributed_str,
@r"
┌───── Stage 3 Tasks: t0:[p0]
│ CoalescePartitionsExec
│ NetworkCoalesceExec read_from=Stage 2, output_partitions=18, input_tasks=6
└──────────────────────────────────────────────────
┌───── Stage 2 Tasks: t0:[p0,p1,p2] t1:[p0,p1,p2] t2:[p0,p1,p2] t3:[p0,p1,p2] t4:[p0,p1,p2] t5:[p0,p1,p2]
│ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday]
│ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
│ CoalesceBatchesExec: target_batch_size=8192
│ NetworkShuffleExec read_from=Stage 1, output_partitions=3, n_tasks=6, input_tasks=3
└──────────────────────────────────────────────────
┌───── Stage 1 Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17] t1:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17] t2:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17]
│ RepartitionExec: partitioning=Hash([RainToday@0], 18), input_partitions=1
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
│ PartitionIsolatorExec Tasks: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0]
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
└──────────────────────────────────────────────────
",
);

Ok(())
}
}