@@ -105,12 +105,12 @@ struct StagePlanner {
105105 plan_head : Option < Arc < dyn ExecutionPlan > > ,
106106 /// Current depth in the plan tree, as we walk the tree
107107 depth : usize ,
108- /// Input stages collected so far. Each entry is a tuple of (tree depth, stage).
108+ /// Input stages collected so far. Each entry is a tuple of (plan tree depth, stage).
109109 /// This allows us to keep track of the depth in the plan tree
110110 /// where we created the stage. That way when we create a new
111111 /// stage, we can tell if it is a peer to the current input stages or
112112 /// should be a parent (if its depth is a smaller number)
113- input_stages : Vec < ( usize , Arc < ExecutionStage > ) > ,
113+ input_stages : Vec < ( usize , ExecutionStage ) > ,
114114 /// current stage number
115115 stage_counter : usize ,
116116 /// Optional codec to assist in serializing and deserializing any custom
@@ -135,32 +135,45 @@ impl StagePlanner {
135135 }
136136
137137 fn finish ( mut self ) -> Result < Arc < ExecutionStage > > {
138- if self . input_stages . is_empty ( ) {
139- Ok ( Arc :: new ( ExecutionStage :: new (
138+ let stage = if self . input_stages . is_empty ( ) {
139+ ExecutionStage :: new (
140140 self . stage_counter ,
141141 self . plan_head
142142 . take ( )
143143 . ok_or_else ( || internal_datafusion_err ! ( "No plan head set" ) ) ?,
144144 vec ! [ ] ,
145- ) ) )
145+ )
146146 } else if self . depth < self . input_stages [ 0 ] . 0 {
147147 // There is more plan above the last stage we created, so we need to
148148 // create a new stage that includes the last plan head
149- Ok ( Arc :: new ( ExecutionStage :: new (
149+ ExecutionStage :: new (
150150 self . stage_counter ,
151151 self . plan_head
152152 . take ( )
153153 . ok_or_else ( || internal_datafusion_err ! ( "No plan head set" ) ) ?,
154154 self . input_stages
155- . iter ( )
156- . map ( |( _, stage) | stage . clone ( ) )
155+ . into_iter ( )
156+ . map ( |( _, stage) | Arc :: new ( stage ) )
157157 . collect ( ) ,
158- ) ) )
158+ )
159159 } else {
160160 // We have a plan head, and we are at the same depth as the last stage we created,
161161 // so we can just return the last stage
162- Ok ( self . input_stages . last ( ) . unwrap ( ) . 1 . clone ( ) )
162+ self . input_stages . last ( ) . unwrap ( ) . 1 . clone ( )
163+ } ;
164+
165+ // assign the proper tree depth to each stage in the tree
166+ fn assign_tree_depth ( stage : & ExecutionStage , depth : usize ) {
167+ stage
168+ . depth
169+ . store ( depth as u64 , std:: sync:: atomic:: Ordering :: Relaxed ) ;
170+ for input in stage. child_stages_iter ( ) {
171+ assign_tree_depth ( input, depth + 1 ) ;
172+ }
163173 }
174+ assign_tree_depth ( & stage, 0 ) ;
175+
176+ Ok ( Arc :: new ( stage) )
164177 }
165178}
166179
@@ -191,26 +204,13 @@ impl TreeNodeRewriter for StagePlanner {
191204 . map ( |( _, stage) | stage. clone ( ) )
192205 . collect :: < Vec < _ > > ( ) ;
193206
194- println ! (
195- "\n \n \n ---------------------- \n creating a stage, depth = {}, input_stages ={}, child_stages = {}, plan=\n {}" ,
196- self . depth,
197- self . input_stages
198- . iter( )
199- . map( |( depth, stage) | format!( "({},{})" , depth, stage. num) )
200- . collect:: <Vec <_>>( )
201- . join( ", " ) ,
202- child_stages
203- . iter( )
204- . map( |stage| format!( "{}" , stage. num) )
205- . collect:: <Vec <_>>( )
206- . join( ", " ) ,
207-
208- displayable( plan. as_ref( ) ) . indent( false )
209- ) ;
210-
211207 self . input_stages . retain ( |( depth, _) | * depth <= self . depth ) ;
212208
213- let mut stage = ExecutionStage :: new ( self . stage_counter , plan. clone ( ) , child_stages) ;
209+ let mut stage = ExecutionStage :: new (
210+ self . stage_counter ,
211+ plan. clone ( ) ,
212+ child_stages. into_iter ( ) . map ( Arc :: new) . collect ( ) ,
213+ ) ;
214214
215215 if let Some ( partitions_per_task) = self . partitions_per_task {
216216 stage = stage. with_maximum_partitions_per_task ( partitions_per_task) ;
@@ -219,7 +219,7 @@ impl TreeNodeRewriter for StagePlanner {
219219 stage = stage. with_codec ( codec. clone ( ) ) ;
220220 }
221221
222- self . input_stages . push ( ( self . depth , Arc :: new ( stage) ) ) ;
222+ self . input_stages . push ( ( self . depth , stage) ) ;
223223
224224 // As we are walking up the plan tree, we've now put what we've encountered so far
225225 // into a stage. We want to replace this plan now with an ArrowFlightReadExec
0 commit comments