-
Notifications
You must be signed in to change notification settings - Fork 14
Add partition coalescing at the head of the plan #164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ use datafusion::common::tree_node::TreeNodeRecursion; | |
| use datafusion::datasource::source::DataSourceExec; | ||
| use datafusion::error::DataFusionError; | ||
| use datafusion::physical_expr::Partitioning; | ||
| use datafusion::physical_plan::ExecutionPlanProperties; | ||
| use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; | ||
| use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode}; | ||
| use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; | ||
|
|
@@ -119,8 +120,14 @@ impl PhysicalOptimizerRule for DistributedPhysicalOptimizerRule { | |
| impl DistributedPhysicalOptimizerRule { | ||
| fn apply_network_boundaries( | ||
| &self, | ||
| plan: Arc<dyn ExecutionPlan>, | ||
| mut plan: Arc<dyn ExecutionPlan>, | ||
| ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> { | ||
| if plan.output_partitioning().partition_count() > 1 { | ||
| // Coalescing partitions here will allow us to put a NetworkCoalesceExec on top | ||
| // of the plan, executing it in parallel. | ||
| plan = Arc::new(CoalescePartitionsExec::new(plan)) | ||
| } | ||
|
|
||
| let result = plan.transform_up(|plan| { | ||
| // If this node is a DataSourceExec, we need to wrap it with PartitionIsolatorExec so | ||
| // that not all tasks have access to all partitions of the underlying DataSource. | ||
|
|
@@ -205,7 +212,7 @@ impl DistributedPhysicalOptimizerRule { | |
| ) -> Result<StageExec, DataFusionError> { | ||
| let mut inputs = vec![]; | ||
|
|
||
| let distributed = plan.clone().transform_down(|plan| { | ||
| let mut distributed = plan.clone().transform_down(|plan| { | ||
| // We cannot break down CollectLeft hash joins into more than 1 task, as these need | ||
| // a full materialized build size with all the data in it. | ||
| // | ||
|
|
@@ -268,6 +275,13 @@ impl DistributedPhysicalOptimizerRule { | |
| Ok(Transformed::new(node, true, TreeNodeRecursion::Jump)) | ||
| })?; | ||
|
|
||
| // The head stage is executable, and upon execution, it will lazily assign worker URLs to | ||
| // all tasks. This must only be done once, so the executable StageExec must only be called | ||
| // once on 1 partition. | ||
| if depth == 0 && distributed.data.output_partitioning().partition_count() > 1 { | ||
| distributed.data = Arc::new(CoalescePartitionsExec::new(distributed.data)); | ||
| } | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: What did we do before this PR? I am asking because I do not see any removed or modified code in this PR
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously we did pretty much nothing, and therefore, we were potentially executing multiple partitions with potentially conflicting URL assignations. |
||
|
|
||
| let inputs = inputs.into_iter().map(Arc::new).collect(); | ||
| let mut stage = StageExec::new(query_id, *num, distributed.data, inputs, n_tasks); | ||
| *num += 1; | ||
|
|
@@ -396,8 +410,9 @@ mod tests { | |
| let query = r#"SELECT * FROM weather"#; | ||
| let plan = sql_to_explain(query, 1).await.unwrap(); | ||
| assert_snapshot!(plan, @r" | ||
| ┌───── Stage 1 Tasks: t0:[p0,p1,p2] | ||
| │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustDir, WindGustSpeed, WindDir9am, WindDir3pm, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RainToday, RISK_MM, RainTomorrow], file_type=parquet | ||
| ┌───── Stage 1 Tasks: t0:[p0] | ||
| │ CoalescePartitionsExec | ||
| │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustDir, WindGustSpeed, WindDir9am, WindDir3pm, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RainToday, RISK_MM, RainTomorrow], file_type=parquet | ||
| └────────────────────────────────────────────────── | ||
| "); | ||
| } | ||
|
|
@@ -463,12 +478,13 @@ mod tests { | |
| let query = r#"SELECT a."MinTemp", b."MaxTemp" FROM weather a LEFT JOIN weather b ON a."RainToday" = b."RainToday" "#; | ||
| let plan = sql_to_explain(query, 2).await.unwrap(); | ||
| assert_snapshot!(plan, @r" | ||
| ┌───── Stage 1 Tasks: t0:[p0,p1,p2] | ||
| │ CoalesceBatchesExec: target_batch_size=8192 | ||
| │ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2] | ||
| │ CoalescePartitionsExec | ||
| │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet | ||
| │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet | ||
| ┌───── Stage 1 Tasks: t0:[p0] | ||
| │ CoalescePartitionsExec | ||
| │ CoalesceBatchesExec: target_batch_size=8192 | ||
| │ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2] | ||
| │ CoalescePartitionsExec | ||
| │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet | ||
| │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet | ||
| └────────────────────────────────────────────────── | ||
| "); | ||
| } | ||
|
|
@@ -501,15 +517,16 @@ mod tests { | |
| "#; | ||
| let plan = sql_to_explain(query, 2).await.unwrap(); | ||
| assert_snapshot!(plan, @r" | ||
| ┌───── Stage 4 Tasks: t0:[p0,p1,p2,p3] | ||
| │ CoalesceBatchesExec: target_batch_size=8192 | ||
| │ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainTomorrow@1, RainTomorrow@1)], projection=[MinTemp@0, MaxTemp@2] | ||
| │ CoalescePartitionsExec | ||
| │ NetworkCoalesceExec read_from=Stage 2, output_partitions=8, input_tasks=2 | ||
| │ ProjectionExec: expr=[avg(weather.MaxTemp)@1 as MaxTemp, RainTomorrow@0 as RainTomorrow] | ||
| │ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MaxTemp)] | ||
| │ CoalesceBatchesExec: target_batch_size=8192 | ||
| │ NetworkShuffleExec read_from=Stage 3, output_partitions=4, n_tasks=1, input_tasks=2 | ||
| ┌───── Stage 4 Tasks: t0:[p0] | ||
| │ CoalescePartitionsExec | ||
gabotechs marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| │ CoalesceBatchesExec: target_batch_size=8192 | ||
| │ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainTomorrow@1, RainTomorrow@1)], projection=[MinTemp@0, MaxTemp@2] | ||
| │ CoalescePartitionsExec | ||
| │ NetworkCoalesceExec read_from=Stage 2, output_partitions=8, input_tasks=2 | ||
| │ ProjectionExec: expr=[avg(weather.MaxTemp)@1 as MaxTemp, RainTomorrow@0 as RainTomorrow] | ||
| │ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MaxTemp)] | ||
| │ CoalesceBatchesExec: target_batch_size=8192 | ||
| │ NetworkShuffleExec read_from=Stage 3, output_partitions=4, n_tasks=1, input_tasks=2 | ||
| └────────────────────────────────────────────────── | ||
| ┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3] | ||
| │ ProjectionExec: expr=[avg(weather.MinTemp)@1 as MinTemp, RainTomorrow@0 as RainTomorrow] | ||
|
|
@@ -560,18 +577,22 @@ mod tests { | |
| let query = r#"SELECT DISTINCT "RainToday", "WindGustDir" FROM weather"#; | ||
| let plan = sql_to_explain(query, 2).await.unwrap(); | ||
| assert_snapshot!(plan, @r" | ||
| ┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3] | ||
| │ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[] | ||
| │ CoalesceBatchesExec: target_batch_size=8192 | ||
| │ NetworkShuffleExec read_from=Stage 1, output_partitions=4, n_tasks=1, input_tasks=2 | ||
| ┌───── Stage 3 Tasks: t0:[p0] | ||
| │ CoalescePartitionsExec | ||
| │ NetworkCoalesceExec read_from=Stage 2, output_partitions=8, input_tasks=2 | ||
| └────────────────────────────────────────────────── | ||
| ┌───── Stage 1 Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3] | ||
| │ RepartitionExec: partitioning=Hash([RainToday@0, WindGustDir@1], 4), input_partitions=4 | ||
| │ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 | ||
| │ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[] | ||
| │ PartitionIsolatorExec Tasks: t0:[p0,p1,__] t1:[__,__,p0] | ||
| │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday, WindGustDir], file_type=parquet | ||
| ┌───── Stage 2 Tasks: t0:[p0,p1,p2,p3] t1:[p0,p1,p2,p3] | ||
| │ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[] | ||
| │ CoalesceBatchesExec: target_batch_size=8192 | ||
| │ NetworkShuffleExec read_from=Stage 1, output_partitions=4, n_tasks=2, input_tasks=2 | ||
| └────────────────────────────────────────────────── | ||
| ┌───── Stage 1 Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7] t1:[p0,p1,p2,p3,p4,p5,p6,p7] | ||
| │ RepartitionExec: partitioning=Hash([RainToday@0, WindGustDir@1], 8), input_partitions=4 | ||
| │ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 | ||
| │ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[] | ||
| │ PartitionIsolatorExec Tasks: t0:[p0,p1,__] t1:[__,__,p0] | ||
| │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday, WindGustDir], file_type=parquet | ||
| └────────────────────────────────────────────────── | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This turns out to be a very good optimization for large data set that the final aggregation is done in many tasks/workers 👍 🎉 . This isn’t directly related to the current PR, but worth noting as a future optimization: |
||
| "); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,7 @@ | ||
| use crate::channel_resolver_ext::get_distributed_channel_resolver; | ||
| use crate::execution_plans::NetworkCoalesceExec; | ||
| use crate::{ChannelResolver, NetworkShuffleExec, PartitionIsolatorExec}; | ||
| use datafusion::common::{internal_datafusion_err, internal_err}; | ||
| use datafusion::common::{exec_err, internal_datafusion_err}; | ||
| use datafusion::error::{DataFusionError, Result}; | ||
| use datafusion::execution::TaskContext; | ||
| use datafusion::physical_plan::{ | ||
|
|
@@ -170,15 +170,6 @@ impl StageExec { | |
| format!("Stage {:<3}{}", self.num, child_str) | ||
| } | ||
|
|
||
| pub fn try_assign(self, channel_resolver: &impl ChannelResolver) -> Result<Self> { | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove as it's unused |
||
| let urls: Vec<Url> = channel_resolver.get_urls()?; | ||
| if urls.is_empty() { | ||
| return internal_err!("No URLs found in ChannelManager"); | ||
| } | ||
|
|
||
| Ok(self) | ||
| } | ||
|
|
||
| fn try_assign_urls(&self, urls: &[Url]) -> Result<Self> { | ||
| let assigned_children = self | ||
| .child_stages_iter() | ||
|
|
@@ -263,11 +254,27 @@ impl ExecutionPlan for StageExec { | |
| self.plan.properties() | ||
| } | ||
|
|
||
| /// Executes a query in a distributed manner. This method will lazily perform URL assignation | ||
| /// to all the tasks, therefore, it must only be called once. | ||
| /// | ||
| /// [StageExec::execute] is only used for starting the distributed query in the same machine | ||
| /// that planned it, but it's not used for task execution in `ArrowFlightEndpoint`, there, | ||
| /// the inner `stage.plan` is executed directly. | ||
| fn execute( | ||
| &self, | ||
| partition: usize, | ||
| context: Arc<TaskContext>, | ||
| ) -> Result<datafusion::execution::SendableRecordBatchStream> { | ||
| if partition > 0 { | ||
| // The StageExec node calls try_assign_urls() lazily upon calling .execute(). This means | ||
| // that .execute() must only be called once, as we cannot afford to perform several | ||
| // random URL assignation while calling multiple partitions, as they will differ, | ||
| // producing an invalid plan | ||
| return exec_err!( | ||
| "an executable StageExec must only have 1 partition, but it was called with partition index {partition}" | ||
gabotechs marked this conversation as resolved.
Show resolved
Hide resolved
gabotechs marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ); | ||
| } | ||
|
|
||
| let channel_resolver = get_distributed_channel_resolver(context.session_config())?; | ||
|
|
||
| let assigned_stage = self | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.