1- use super :: { NetworkShuffleExec , PartitionIsolatorExec , Stage } ;
1+ use super :: { NetworkShuffleExec , PartitionIsolatorExec } ;
22use crate :: execution_plans:: { DistributedExec , NetworkCoalesceExec } ;
3+ use crate :: stage:: Stage ;
34use datafusion:: common:: plan_err;
45use datafusion:: common:: tree_node:: TreeNodeRecursion ;
56use datafusion:: datasource:: source:: DataSourceExec ;
@@ -123,68 +124,66 @@ impl DistributedPhysicalOptimizerRule {
123124 plan = Arc :: new ( CoalescePartitionsExec :: new ( plan) )
124125 }
125126
126- let result = plan. transform_up ( |plan| {
127- // If this node is a DataSourceExec, we need to wrap it with PartitionIsolatorExec so
128- // that not all tasks have access to all partitions of the underlying DataSource.
129- if plan. as_any ( ) . is :: < DataSourceExec > ( ) {
130- let node = PartitionIsolatorExec :: new_pending ( plan) ;
127+ let result =
128+ plan. transform_up ( |plan| {
129+ // If this node is a DataSourceExec, we need to wrap it with PartitionIsolatorExec so
130+ // that not all tasks have access to all partitions of the underlying DataSource.
131+ if plan. as_any ( ) . is :: < DataSourceExec > ( ) {
132+ let node = PartitionIsolatorExec :: new ( plan) ;
131133
132- return Ok ( Transformed :: yes ( Arc :: new ( node) ) ) ;
133- }
134-
135- // If this is a hash RepartitionExec, introduce a shuffle.
136- if let ( Some ( node) , Some ( tasks) ) = (
137- plan. as_any ( ) . downcast_ref :: < RepartitionExec > ( ) ,
138- self . network_shuffle_tasks ,
139- ) {
140- if !matches ! ( node. partitioning( ) , Partitioning :: Hash ( _, _) ) {
141- return Ok ( Transformed :: no ( plan) ) ;
134+ return Ok ( Transformed :: yes ( Arc :: new ( node) ) ) ;
142135 }
143- let node = NetworkShuffleExec :: from_repartition_exec ( & plan, tasks) ?;
144136
145- return Ok ( Transformed :: yes ( Arc :: new ( node) ) ) ;
146- }
137+ // If this is a hash RepartitionExec, introduce a shuffle.
138+ if let ( Some ( node) , Some ( tasks) ) = (
139+ plan. as_any ( ) . downcast_ref :: < RepartitionExec > ( ) ,
140+ self . network_shuffle_tasks ,
141+ ) {
142+ if !matches ! ( node. partitioning( ) , Partitioning :: Hash ( _, _) ) {
143+ return Ok ( Transformed :: no ( plan) ) ;
144+ }
145+ let node = NetworkShuffleExec :: try_new ( plan, tasks) ?;
147146
148- // If this is a CoalescePartitionsExec, it means that the original plan is trying to
149- // merge all partitions into one. We need to go one step ahead and also merge all tasks
150- // into one.
151- if let ( Some ( node) , Some ( tasks) ) = (
152- plan. as_any ( ) . downcast_ref :: < CoalescePartitionsExec > ( ) ,
153- self . network_coalesce_tasks ,
154- ) {
155- // If the immediate child is a PartitionIsolatorExec, it means that the rest of the
156- // plan is just a couple of non-computational nodes that are probably not worth
157- // distributing.
158- if node
159- . children ( )
160- . first ( )
161- . is_some_and ( |v| v. as_any ( ) . is :: < PartitionIsolatorExec > ( ) )
162- {
163- return Ok ( Transformed :: no ( plan) ) ;
147+ return Ok ( Transformed :: yes ( Arc :: new ( node) ) ) ;
164148 }
165- let node = NetworkCoalesceExec :: from_input_exec ( node, tasks) ?;
166149
167- let plan = plan. with_new_children ( vec ! [ Arc :: new( node) ] ) ?;
168-
169- return Ok ( Transformed :: yes ( plan) ) ;
170- }
150+ // If this is a CoalescePartitionsExec, it means that the original plan is trying to
151+ // merge all partitions into one. We need to go one step ahead and also merge all tasks
152+ // into one.
153+ if let ( Some ( node) , Some ( tasks) ) = (
154+ plan. as_any ( ) . downcast_ref :: < CoalescePartitionsExec > ( ) ,
155+ self . network_coalesce_tasks ,
156+ ) {
157+ // If the immediate child is a PartitionIsolatorExec, it means that the rest of the
158+ // plan is just a couple of non-computational nodes that are probably not worth
159+ // distributing.
160+ if node. input ( ) . as_any ( ) . is :: < PartitionIsolatorExec > ( ) {
161+ return Ok ( Transformed :: no ( plan) ) ;
162+ }
171163
172- // The SortPreservingMergeExec node will try to coalesce all partitions into just 1.
173- // We need to account for it and help it by also coalescing all tasks into one, therefore
174- // a NetworkCoalesceExec is introduced.
175- if let ( Some ( node) , Some ( tasks) ) = (
176- plan. as_any ( ) . downcast_ref :: < SortPreservingMergeExec > ( ) ,
177- self . network_coalesce_tasks ,
178- ) {
179- let node = NetworkCoalesceExec :: from_input_exec ( node, tasks) ?;
164+ let plan = plan. clone ( ) . with_new_children ( vec ! [ Arc :: new(
165+ NetworkCoalesceExec :: new( Arc :: clone( node. input( ) ) , tasks) ,
166+ ) ] ) ?;
180167
181- let plan = plan. with_new_children ( vec ! [ Arc :: new( node) ] ) ?;
168+ return Ok ( Transformed :: yes ( plan) ) ;
169+ }
182170
183- return Ok ( Transformed :: yes ( plan) ) ;
184- }
171+ // The SortPreservingMergeExec node will try to coalesce all partitions into just 1.
172+ // We need to account for it and help it by also coalescing all tasks into one, therefore
173+ // a NetworkCoalesceExec is introduced.
174+ if let ( Some ( node) , Some ( tasks) ) = (
175+ plan. as_any ( ) . downcast_ref :: < SortPreservingMergeExec > ( ) ,
176+ self . network_coalesce_tasks ,
177+ ) {
178+ let plan = plan. clone ( ) . with_new_children ( vec ! [ Arc :: new(
179+ NetworkCoalesceExec :: new( Arc :: clone( node. input( ) ) , tasks) ,
180+ ) ] ) ?;
181+
182+ return Ok ( Transformed :: yes ( plan) ) ;
183+ }
185184
186- Ok ( Transformed :: no ( plan) )
187- } ) ?;
185+ Ok ( Transformed :: no ( plan) )
186+ } ) ?;
188187 Ok ( result. data )
189188 }
190189
@@ -234,19 +233,19 @@ impl DistributedPhysicalOptimizerRule {
234233 } ;
235234
236235 let stage = loop {
237- let ( inner_plan , in_tasks ) = dnode. as_ref ( ) . to_stage_info ( n_tasks) ?;
236+ let input_stage_info = dnode. as_ref ( ) . get_input_stage_info ( n_tasks) ?;
238237 // If the current stage has just 1 task, and the next stage is only going to have
239238 // 1 task, there's no point in having a network boundary in between, they can just
240239 // communicate in memory.
241- if n_tasks == 1 && in_tasks == 1 {
240+ if n_tasks == 1 && input_stage_info . task_count == 1 {
242241 let mut n = dnode. as_ref ( ) . rollback ( ) ?;
243242 if let Some ( node) = n. as_any ( ) . downcast_ref :: < PartitionIsolatorExec > ( ) {
244243 // Also trim PartitionIsolatorExec out of the plan.
245244 n = Arc :: clone ( node. children ( ) . first ( ) . unwrap ( ) ) ;
246245 }
247246 return Ok ( Transformed :: yes ( n) ) ;
248247 }
249- match Self :: _distribute_plan_inner ( query_id, inner_plan . clone ( ) , num, depth + 1 , in_tasks ) {
248+ match Self :: _distribute_plan_inner ( query_id, input_stage_info . plan , num, depth + 1 , input_stage_info . task_count ) {
250249 Ok ( v) => break v,
251250 Err ( e) => match get_distribute_plan_err ( & e) {
252251 None => return Err ( e) ,
@@ -255,7 +254,7 @@ impl DistributedPhysicalOptimizerRule {
255254 // that no more than `limit` tasks can be used for it, so we are going
256255 // to limit the amount of tasks to the requested number and try building
257256 // the stage again.
258- if in_tasks == * limit {
257+ if input_stage_info . task_count == * limit {
259258 return plan_err ! ( "A node requested {limit} tasks for the stage its in, but that stage already has that many tasks" ) ;
260259 }
261260 dnode = Referenced :: Arced ( dnode. as_ref ( ) . with_input_task_count ( * limit) ?) ;
@@ -280,14 +279,27 @@ impl DistributedPhysicalOptimizerRule {
280279 }
281280}
282281
282+ /// Necessary information for building a [Stage] during distributed planning.
283+ ///
284+ /// [NetworkBoundary]s return this piece of data so that the distributed planner know how to
285+ /// build the next [Stage] from which the [NetworkBoundary] is going to receive data.
286+ ///
287+ /// Some network boundaries might perform some modifications in their children, like scaling
288+ /// up the number of partitions, or injecting a specific [ExecutionPlan] on top.
289+ pub struct InputStageInfo {
290+ /// The head plan of the [Stage] that is about to be built.
291+ pub plan : Arc < dyn ExecutionPlan > ,
292+ /// The amount of tasks the [Stage] will have.
293+ pub task_count : usize ,
294+ }
295+
283296/// This trait represents a node that introduces the necessity of a network boundary in the plan.
284297/// The distributed planner, upon stepping into one of these, will break the plan and build a stage
285298/// out of it.
286299pub trait NetworkBoundary : ExecutionPlan {
287- /// Returns the information necessary for building the next stage.
288- /// - The head node of the stage.
289- /// - the amount of tasks that stage will have.
290- fn to_stage_info ( & self , n_tasks : usize ) -> Result < ( Arc < dyn ExecutionPlan > , usize ) > ;
300+ /// Returns the information necessary for building the next stage from which this
301+ /// [NetworkBoundary] is going to collect data.
302+ fn get_input_stage_info ( & self , task_count : usize ) -> Result < InputStageInfo > ;
291303
292304 /// re-assigns a different number of input tasks to the current [NetworkBoundary].
293305 ///
@@ -297,6 +309,8 @@ pub trait NetworkBoundary: ExecutionPlan {
297309
298310 /// Called when a [Stage] is correctly formed. The [NetworkBoundary] can use this
299311 /// information to perform any internal transformations necessary for distributed execution.
312+ ///
313+ /// Typically, [NetworkBoundary]s will use this call for transitioning from "Pending" to "ready".
300314 fn with_input_stage ( & self , input_stage : Stage ) -> Result < Arc < dyn ExecutionPlan > > ;
301315
302316 /// Returns the assigned input [Stage], if any.
0 commit comments