@@ -20,30 +20,73 @@ use std::fmt::{Display, Formatter};
2020use std:: sync:: Arc ;
2121use uuid:: Uuid ;
2222
23+ /// Physical optimizer rule that inspects the plan, places the appropriate network
24+ /// boundaries and breaks it down into stages that can be executed in a distributed manner.
25+ ///
26+ /// The rule has two steps:
27+ ///
28+ /// 1. Inject the appropriate distributed execution nodes in the appropriate places.
29+ ///
30+ /// This is done by looking at specific nodes in the original plan and enhancing them
31+ /// with new additional nodes:
32+ /// - a [DataSourceExec] is wrapped with a [PartitionIsolatorExec] for exposing just a subset
33+ /// of the [DataSourceExec] partitions to the rest of the plan.
34+ /// - a [CoalescePartitionsExec] is followed by a [NetworkCoalesceExec] so that all tasks in the
35+ /// previous stage collapse into just 1 in the next stage.
36+ /// - a [SortPreservingMergeExec] is followed by a [NetworkCoalesceExec] for the same reasons as
37+ /// above
38+ /// - a [RepartitionExec] with a hash partition is wrapped with a [NetworkShuffleExec] for
39+ /// shuffling data to different tasks.
40+ ///
41+ ///
42+ /// 2. Break down the plan into stages
43+ ///
44+ /// Based on the network boundaries ([NetworkShuffleExec], [NetworkCoalesceExec], ...) placed in
45+ /// the plan by the first step, the plan is divided into stages and tasks are assigned to each
46+ /// stage.
47+ ///
48+ /// This step might decide to not respect the amount of tasks each network boundary is requesting,
49+ /// like when a plan is not parallelizable in different tasks (e.g. a collect left [HashJoinExec])
50+ /// or when a [DataSourceExec] has not enough partitions to be spread across tasks.
2351#[ derive( Debug , Default ) ]
2452pub struct DistributedPhysicalOptimizerRule {
25- /// maximum number of partitions per task. This is used to determine how many
26- /// tasks to create for each stage
27- network_shuffle_exec_tasks : Option < usize > ,
28-
29- coalesce_partitions_exec_tasks : Option < usize > ,
53+ /// Upon shuffling data, this defines how many tasks are employed into performing the shuffling.
54+ /// ```text
55+ /// ( task 1 ) ( task 2 ) ( task 3 )
56+ /// ▲ ▲ ▲
57+ /// └────┬──────┴─────┬────┘
58+ /// ( task 1 ) ( task 2 ) N tasks
59+ /// ```
60+ /// This parameter defines N
61+ network_shuffle_tasks : Option < usize > ,
62+ /// Upon merging multiple tasks into one, this defines how many tasks are merged.
63+ /// ```text
64+ /// ( task 1 )
65+ /// ▲
66+ /// ┌───────────┴──────────┐
67+ /// ( task 1 ) ( task 2 ) ( task 3 ) N tasks
68+ /// ```
69+ /// This parameter defines N
70+ network_coalesce_tasks : Option < usize > ,
3071}
3172
3273impl DistributedPhysicalOptimizerRule {
3374 pub fn new ( ) -> Self {
3475 DistributedPhysicalOptimizerRule {
35- network_shuffle_exec_tasks : None ,
36- coalesce_partitions_exec_tasks : None ,
76+ network_shuffle_tasks : None ,
77+ network_coalesce_tasks : None ,
3778 }
3879 }
3980
81+ /// Sets the amount of tasks employed in performing shuffles.
4082 pub fn with_network_shuffle_tasks ( mut self , tasks : usize ) -> Self {
41- self . network_shuffle_exec_tasks = Some ( tasks) ;
83+ self . network_shuffle_tasks = Some ( tasks) ;
4284 self
4385 }
4486
87+ /// Sets the amount of input tasks for every task coalescing operation.
4588 pub fn with_network_coalesce_tasks ( mut self , tasks : usize ) -> Self {
46- self . coalesce_partitions_exec_tasks = Some ( tasks) ;
89+ self . network_coalesce_tasks = Some ( tasks) ;
4790 self
4891 }
4992}
@@ -74,19 +117,22 @@ impl PhysicalOptimizerRule for DistributedPhysicalOptimizerRule {
74117}
75118
76119impl DistributedPhysicalOptimizerRule {
77- pub fn apply_network_boundaries (
120+ fn apply_network_boundaries (
78121 & self ,
79122 plan : Arc < dyn ExecutionPlan > ,
80123 ) -> Result < Arc < dyn ExecutionPlan > , DataFusionError > {
81124 let result = plan. transform_up ( |plan| {
125+ // If this node is a DataSourceExec, we need to wrap it with PartitionIsolatorExec so
126+ // that not all tasks have access to all partitions of the underlying DataSource.
82127 if plan. as_any ( ) . is :: < DataSourceExec > ( ) {
83128 let node = PartitionIsolatorExec :: new_pending ( plan) ;
84129
85130 return Ok ( Transformed :: yes ( Arc :: new ( node) ) ) ;
86131 }
87132
133+ // If this is a hash RepartitionExec, introduce a shuffle.
88134 if let Some ( node) = plan. as_any ( ) . downcast_ref :: < RepartitionExec > ( ) {
89- let Some ( tasks) = self . network_shuffle_exec_tasks else {
135+ let Some ( tasks) = self . network_shuffle_tasks else {
90136 return Ok ( Transformed :: no ( plan) ) ;
91137 } ;
92138 if !matches ! ( node. partitioning( ) , Partitioning :: Hash ( _, _) ) {
@@ -97,10 +143,17 @@ impl DistributedPhysicalOptimizerRule {
97143 return Ok ( Transformed :: yes ( Arc :: new ( node) ) ) ;
98144 }
99145
146+ // If this is a CoalescePartitionsExec, it means that the original plan is trying to
147+ // merge all partitions into one. We need to go one step ahead and also merge all tasks
148+ // into one.
100149 if let Some ( node) = plan. as_any ( ) . downcast_ref :: < CoalescePartitionsExec > ( ) {
101- let Some ( tasks) = self . coalesce_partitions_exec_tasks else {
150+ let Some ( tasks) = self . network_coalesce_tasks else {
102151 return Ok ( Transformed :: no ( plan) ) ;
103152 } ;
153+
154+ // If the immediate child is a PartitionIsolatorExec, it means that the rest of the
155+ // plan is just a couple of non-computational nodes that are probably not worth
156+ // distributing.
104157 if node
105158 . children ( )
106159 . first ( )
@@ -115,8 +168,11 @@ impl DistributedPhysicalOptimizerRule {
115168 return Ok ( Transformed :: yes ( plan) ) ;
116169 }
117170
171+ // The SortPreservingMergeExec node will try to coalesce all partitions into just 1.
172+ // We need to account for it and help it by also coalescing all tasks into one, therefore
173+ // a NetworkCoalesceExec is introduced.
118174 if let Some ( node) = plan. as_any ( ) . downcast_ref :: < SortPreservingMergeExec > ( ) {
119- let Some ( tasks) = self . coalesce_partitions_exec_tasks else {
175+ let Some ( tasks) = self . network_coalesce_tasks else {
120176 return Ok ( Transformed :: no ( plan) ) ;
121177 } ;
122178 let node = NetworkCoalesceExec :: from_sort_preserving_merge_exec ( node, tasks) ?;
@@ -131,6 +187,12 @@ impl DistributedPhysicalOptimizerRule {
131187 Ok ( result. data )
132188 }
133189
190+ /// Takes a plan with certain network boundaries in it ([NetworkShuffleExec], [NetworkCoalesceExec], ...)
191+ /// and breaks it down into stages.
192+ ///
193+ /// This can be used a standalone function for distributing arbitrary plans in which users have
194+ /// manually placed network boundaries, or as part of the [DistributedPhysicalOptimizerRule] that
195+ /// places the network boundaries automatically as a standard [PhysicalOptimizerRule].
134196 pub fn distribute_plan ( plan : Arc < dyn ExecutionPlan > ) -> Result < StageExec , DataFusionError > {
135197 Self :: _distribute_plan_inner ( Uuid :: new_v4 ( ) , plan, & mut 1 , 0 , 1 )
136198 }
@@ -145,13 +207,18 @@ impl DistributedPhysicalOptimizerRule {
145207 let mut inputs = vec ! [ ] ;
146208
147209 let distributed = plan. clone ( ) . transform_down ( |plan| {
210+ // We cannot break down CollectLeft hash joins into more than 1 task, as these need
211+ // a full materialized build size with all the data in it.
212+ //
213+ // Maybe in the future these can be broadcast joins?
148214 if let Some ( node) = plan. as_any ( ) . downcast_ref :: < HashJoinExec > ( ) {
149215 if n_tasks > 1 && node. mode == PartitionMode :: CollectLeft {
150216 return Err ( limit_tasks_err ( 1 ) )
151217 }
152218 }
153219
154220 if let Some ( node) = plan. as_any ( ) . downcast_ref :: < PartitionIsolatorExec > ( ) {
221+ // If there's only 1 task, no need to perform any isolation.
155222 if n_tasks == 1 {
156223 return Ok ( Transformed :: yes ( Arc :: clone ( plan. children ( ) . first ( ) . unwrap ( ) ) ) ) ;
157224 }
@@ -160,15 +227,18 @@ impl DistributedPhysicalOptimizerRule {
160227 }
161228
162229 let mut dnode = if let Some ( node) = plan. as_any ( ) . downcast_ref :: < NetworkShuffleExec > ( ) {
163- Arc :: new ( node. clone ( ) ) as Arc < dyn DistributedExecutionPlan >
230+ Arc :: new ( node. clone ( ) ) as Arc < dyn NetworkBoundary >
164231 } else if let Some ( node) = plan. as_any ( ) . downcast_ref :: < NetworkCoalesceExec > ( ) {
165- Arc :: new ( node. clone ( ) ) as Arc < dyn DistributedExecutionPlan >
232+ Arc :: new ( node. clone ( ) ) as Arc < dyn NetworkBoundary >
166233 } else {
167234 return Ok ( Transformed :: no ( plan) ) ;
168235 } ;
169236
170237 let stage = loop {
171238 let ( inner_plan, in_tasks) = dnode. to_stage_info ( n_tasks) ?;
239+ // If the current stage has just 1 task, and the next stage is only going to have
240+ // 1 task, there's no point in having a network boundary in between, they can just
241+ // communicate in memory.
172242 if n_tasks == 1 && in_tasks == 1 {
173243 return Ok ( Transformed :: no ( dnode. rollback ( ) ?) ) ;
174244 }
@@ -177,6 +247,10 @@ impl DistributedPhysicalOptimizerRule {
177247 Err ( e) => match get_distribute_plan_err ( & e) {
178248 None => return Err ( e) ,
179249 Some ( DistributedPlanError :: LimitTasks ( limit) ) => {
250+ // While attempting to build a new stage, a failure was raised stating
251+ // that no more than `limit` tasks can be used for it, so we are going
252+ // to limit the amount of tasks to the requested number and try building
253+ // the stage again.
180254 if in_tasks == * limit {
181255 return plan_err ! ( "A node requested {limit} tasks for the stage its in, but that stage already has that many tasks" ) ;
182256 }
@@ -200,20 +274,35 @@ impl DistributedPhysicalOptimizerRule {
200274 }
201275}
202276
203- pub trait DistributedExecutionPlan : ExecutionPlan {
277+ /// This trait represents a node that introduces the necessity of a network boundary in the plan.
278+ /// The distributed planner, upon stepping into one of these, will break the plan and build a stage
279+ /// out of it.
280+ pub trait NetworkBoundary : ExecutionPlan {
281+ /// Returns the information necessary for building the next stage.
282+ /// - The head node of the stage.
283+ /// - the amount of tasks that stage will have.
204284 fn to_stage_info (
205285 & self ,
206286 n_tasks : usize ,
207287 ) -> Result < ( Arc < dyn ExecutionPlan > , usize ) , DataFusionError > ;
208288
209- fn with_input_tasks ( & self , input_tasks : usize ) -> Arc < dyn DistributedExecutionPlan > ;
289+ /// re-assigns a different number of input tasks to the current [NetworkBoundary].
290+ ///
291+ /// This will be called if upon building a stage, a [DistributedPlanError::LimitTasks] error
292+ /// is returned, prompting the [NetworkBoundary] to choose a different number of input tasks.
293+ fn with_input_tasks ( & self , input_tasks : usize ) -> Arc < dyn NetworkBoundary > ;
210294
295+ /// Called when a [StageExec] is correctly formed. The [NetworkBoundary] can use this
296+ /// information to perform any internal transformations necessary for distributed execution.
211297 fn to_distributed (
212298 & self ,
213299 stage_num : usize ,
214300 stage_head : & Arc < dyn ExecutionPlan > ,
215301 ) -> Result < Arc < dyn ExecutionPlan > , DataFusionError > ;
216302
303+ /// The planner might decide to remove this [NetworkBoundary] from the plan if it decides that
304+ /// it's not going to bring any benefit. The [NetworkBoundary] will be replaced with whatever
305+ /// this function returns.
217306 fn rollback ( & self ) -> Result < Arc < dyn ExecutionPlan > , DataFusionError > {
218307 let children = self . children ( ) ;
219308 if children. len ( ) != 1 {
@@ -227,8 +316,12 @@ pub trait DistributedExecutionPlan: ExecutionPlan {
227316 }
228317}
229318
319+ /// Error thrown during distributed planning that prompts the planner to change something and
320+ /// try again.
230321#[ derive( Debug ) ]
231322enum DistributedPlanError {
323+ /// Prompts the planner to limit the amount of tasks used in the stage that is currently
324+ /// being planned.
232325 LimitTasks ( usize ) ,
233326}
234327
@@ -244,6 +337,8 @@ impl Display for DistributedPlanError {
244337
245338impl Error for DistributedPlanError { }
246339
340+ /// Builds a [DistributedPlanError::LimitTasks] error. This error prompts the distributed planner
341+ /// to try rebuilding the current stage with a limited amount of tasks.
247342pub fn limit_tasks_err ( limit : usize ) -> DataFusionError {
248343 DataFusionError :: External ( Box :: new ( DistributedPlanError :: LimitTasks ( limit) ) )
249344}
0 commit comments