11use std:: sync:: Arc ;
22
3- use anyhow:: { anyhow, Context as AnyhowContext } ;
3+ use anyhow:: anyhow;
44use arrow:: { compute:: concat_batches, datatypes:: SchemaRef } ;
55use datafusion:: {
66 logical_expr:: LogicalPlan ,
@@ -17,7 +17,8 @@ use crate::{
1717 customizer:: Customizer ,
1818 explain:: build_explain_batch,
1919 planning:: {
20- distribute_stages, execution_planning, get_ctx, logical_planning, physical_planning,
20+ distribute_stages, distributed_physical_planning, get_ctx, logical_planning,
21+ physical_planning, DDStage ,
2122 } ,
2223 record_batch_exec:: RecordBatchExec ,
2324 result:: Result ,
@@ -36,6 +37,7 @@ pub struct QueryPlan {
3637 pub physical_plan : Arc < dyn ExecutionPlan > ,
3738 pub distributed_plan : Arc < dyn ExecutionPlan > ,
3839 pub distributed_tasks : Vec < DDTask > ,
40+ pub distributed_stages : Vec < DDStage > ,
3941}
4042
4143impl std:: fmt:: Debug for QueryPlan {
@@ -84,8 +86,10 @@ impl QueryPlanner {
8486
8587 /// Common planning steps shared by both query and its EXPLAIN
8688 ///
87- /// Prepare a query by parsing the SQL, planning it, and distributing the
88- /// physical plan into stages that can be executed by workers.
89+ /// Prepare a query by parsing the SQL, performing logical planning,
90+ /// and then physical planning to create a distributed plan.
91+ /// Builds an initial `QueryPlan` with the logical and physical plans,
92+ /// but without worker addresses and tasks, which will be set later in distribute().
8993 pub async fn prepare ( & self , sql : & str ) -> Result < QueryPlan > {
9094 let mut ctx = get_ctx ( ) . map_err ( |e| anyhow ! ( "Could not create context: {e}" ) ) ?;
9195 if let Some ( customizer) = & self . customizer {
@@ -118,14 +122,29 @@ impl QueryPlanner {
118122 }
119123 }
120124
125+ /// Prepare the query plan for distributed execution
121126 async fn prepare_query (
122127 & self ,
123128 logical_plan : LogicalPlan ,
124129 ctx : SessionContext ,
125130 ) -> Result < QueryPlan > {
131+ // construct the initial physical plan from the logical plan
126132 let physical_plan = physical_planning ( & logical_plan, & ctx) . await ?;
127133
128- self . send_it ( logical_plan, physical_plan, ctx) . await
134+ // construct the distributed physical plan and stages
135+ let ( distributed_plan, distributed_stages) =
136+ distributed_physical_planning ( physical_plan. clone ( ) , 8192 , Some ( 2 ) ) . await ?;
137+
138+ // build the initial plan, without the worker addresses and DDTasks
139+ return self
140+ . build_initial_plan (
141+ distributed_plan,
142+ distributed_stages,
143+ ctx,
144+ logical_plan,
145+ physical_plan,
146+ )
147+ . await ;
129148 }
130149
131150 async fn prepare_local (
@@ -151,7 +170,20 @@ impl QueryPlanner {
151170 let combined_batch = concat_batches ( & batches[ 0 ] . schema ( ) , & batches) ?;
152171 let physical_plan = Arc :: new ( RecordBatchExec :: new ( combined_batch) ) ;
153172
154- self . send_it ( logical_plan, physical_plan, ctx) . await
173+ // construct the distributed physical plan and stages
174+ let ( distributed_plan, distributed_stages) =
175+ distributed_physical_planning ( physical_plan. clone ( ) , 8192 , Some ( 2 ) ) . await ?;
176+
177+ // build the initial plan, without the worker addresses and DDTasks
178+ return self
179+ . build_initial_plan (
180+ distributed_plan,
181+ distributed_stages,
182+ ctx,
183+ logical_plan,
184+ physical_plan,
185+ )
186+ . await ;
155187 }
156188
157189 async fn prepare_explain (
@@ -166,7 +198,8 @@ impl QueryPlanner {
166198
167199 let logical_plan = child_plan[ 0 ] ;
168200
169- let query_plan = self . prepare_query ( logical_plan. clone ( ) , ctx) . await ?;
201+ // construct the initial distributed physical plan, without the worker addresses and DDTasks
202+ let mut query_plan = self . prepare_query ( logical_plan. clone ( ) , ctx) . await ?;
170203
171204 let batch = build_explain_batch (
172205 & query_plan. logical_plan ,
@@ -176,56 +209,62 @@ impl QueryPlanner {
176209 self . codec . as_ref ( ) ,
177210 ) ?;
178211 let physical_plan = Arc :: new ( RecordBatchExec :: new ( batch) ) ;
212+ query_plan. physical_plan = physical_plan. clone ( ) ;
179213
180- self . send_it (
181- query_plan. logical_plan ,
182- physical_plan,
183- query_plan. session_context ,
184- )
185- . await
214+ Ok ( query_plan)
186215 }
187- async fn send_it (
216+
217+ async fn build_initial_plan (
188218 & self ,
219+ distributed_plan : Arc < dyn ExecutionPlan > ,
220+ distributed_stages : Vec < DDStage > ,
221+ ctx : SessionContext ,
189222 logical_plan : LogicalPlan ,
190223 physical_plan : Arc < dyn ExecutionPlan > ,
191- ctx : SessionContext ,
192224 ) -> Result < QueryPlan > {
193225 let query_id = uuid:: Uuid :: new_v4 ( ) . to_string ( ) ;
194226
195- // divide the physical plan into chunks (tasks) that we can distribute to workers
196- let ( distributed_plan, distributed_stages) =
197- execution_planning ( physical_plan. clone ( ) , 8192 , Some ( 2 ) ) . await ?;
198-
199- let worker_addrs = get_worker_addresses ( ) ?;
200-
201227 // gather some information we need to send back such that
202228 // we can send a ticket to the client
203229 let final_stage = & distributed_stages[ distributed_stages. len ( ) - 1 ] ;
204230 let schema = Arc :: clone ( & final_stage. plan . schema ( ) ) ;
205231 let final_stage_id = final_stage. stage_id ;
206232
207- // distribute the stages to workers, further dividing them up
208- // into chunks of partitions (partition_groups)
209- let ( final_workers, tasks) = distribute_stages (
210- & query_id,
211- distributed_stages,
212- worker_addrs,
213- self . codec . as_ref ( ) ,
214- )
215- . await ?;
216-
217- let qp = QueryPlan {
233+ Ok ( QueryPlan {
218234 query_id,
219235 session_context : ctx,
220- worker_addresses : final_workers,
221236 final_stage_id,
222237 schema,
223238 logical_plan,
224239 physical_plan,
225240 distributed_plan,
226- distributed_tasks : tasks,
227- } ;
241+ distributed_stages,
242+ // will be populated on distribute_plan
243+ worker_addresses : Addrs :: default ( ) ,
244+ distributed_tasks : Vec :: new ( ) ,
245+ } )
246+ }
247+
248+ /// Performs worker discovery, and distributes the query plan to workers.
249+ /// also sets the final worker addresses and distributed tasks in the query plan.
250+ pub async fn distribute_plan ( & self , initial_plan : & mut QueryPlan ) -> Result < ( ) > {
251+ // Perform worker discovery
252+ let worker_addrs = get_worker_addresses ( ) ?;
253+
254+ // Distribute the stages to workers, further dividing them up
255+ // into chunks of partitions (partition_groups)
256+ let ( final_workers, tasks) = distribute_stages (
257+ & initial_plan. query_id ,
258+ initial_plan. distributed_stages . clone ( ) , // TODO: address this clone
259+ worker_addrs,
260+ self . codec . as_ref ( ) ,
261+ )
262+ . await ?;
263+
264+ // set the distributed tasks and final worker addresses
265+ initial_plan. worker_addresses = final_workers;
266+ initial_plan. distributed_tasks = tasks;
228267
229- Ok ( qp )
268+ Ok ( ( ) )
230269 }
231270}
0 commit comments