Skip to content

Commit 5c12df4

Browse files
committed
Refactor QueryPlanner
1 parent 445ccef commit 5c12df4

File tree

5 files changed

+93
-41
lines changed

5 files changed

+93
-41
lines changed

src/explain.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ pub fn format_distributed_tasks(
4444
Ok(result)
4545
}
4646

47+
/// Builds a single RecordBatch with two columns, the first column with the type of plan and
48+
/// the second column containing the formatted logical plan, physical plan, distributed plan,
49+
/// and distributed tasks.
4750
pub fn build_explain_batch(
4851
logical_plan: &LogicalPlan,
4952
physical_plan: &Arc<dyn ExecutionPlan>,

src/planning.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use crate::{
4545
},
4646
};
4747

48-
#[derive(Debug)]
48+
#[derive(Debug, Clone)]
4949
pub struct DDStage {
5050
/// our stage id
5151
pub stage_id: u64,
@@ -221,6 +221,7 @@ pub async fn logical_planning(sql: &str, ctx: &SessionContext) -> Result<Logical
221221
Ok(plan)
222222
}
223223

224+
/// Builds the physical plan from the logical plan, using the default Physical Planner from DataFusion
224225
pub async fn physical_planning(
225226
logical_plan: &LogicalPlan,
226227
ctx: &SessionContext,
@@ -239,7 +240,7 @@ pub async fn physical_planning(
239240
}
240241

241242
/// Returns distributed plan and execution stages for both query execution and EXPLAIN display
242-
pub async fn execution_planning(
243+
pub async fn distributed_physical_planning(
243244
physical_plan: Arc<dyn ExecutionPlan>,
244245
batch_size: usize,
245246
partitions_per_worker: Option<usize>,

src/proxy_service.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,12 +177,17 @@ impl FlightSqlHandler for DDProxyHandler {
177177
query: arrow_flight::sql::CommandStatementQuery,
178178
_request: Request<FlightDescriptor>,
179179
) -> Result<Response<FlightInfo>, Status> {
180-
let query_plan = self
180+
let mut query_plan = self
181181
.planner
182182
.prepare(&query.query)
183183
.await
184184
.map_err(|e| Status::internal(format!("Could not prepare query {e:?}")))?;
185185

186+
self.planner
187+
.distribute_plan(&mut query_plan)
188+
.await
189+
.map_err(|e| Status::internal(format!("Could not distribute plan {e:?}")))?;
190+
186191
self.create_flight_info_response(query_plan)
187192
.map(Response::new)
188193
.context("Could not create flight info response")
@@ -200,12 +205,17 @@ impl FlightSqlHandler for DDProxyHandler {
200205
None => return Err(Status::invalid_argument("Missing Substrait plan")),
201206
};
202207

203-
let query_plan = self
208+
let mut query_plan = self
204209
.planner
205210
.prepare_substrait(plan)
206211
.await
207212
.map_err(|e| Status::internal(format!("Could not prepare query {e:?}")))?;
208213

214+
self.planner
215+
.distribute_plan(&mut query_plan)
216+
.await
217+
.map_err(|e| Status::internal(format!("Could not distribute plan {e:?}")))?;
218+
209219
self.create_flight_info_response(query_plan)
210220
.map(Response::new)
211221
.context("Could not create flight info response")

src/query_planner.rs

Lines changed: 75 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::sync::Arc;
22

3-
use anyhow::{anyhow, Context as AnyhowContext};
3+
use anyhow::anyhow;
44
use arrow::{compute::concat_batches, datatypes::SchemaRef};
55
use datafusion::{
66
logical_expr::LogicalPlan,
@@ -17,7 +17,8 @@ use crate::{
1717
customizer::Customizer,
1818
explain::build_explain_batch,
1919
planning::{
20-
distribute_stages, execution_planning, get_ctx, logical_planning, physical_planning,
20+
distribute_stages, distributed_physical_planning, get_ctx, logical_planning,
21+
physical_planning, DDStage,
2122
},
2223
record_batch_exec::RecordBatchExec,
2324
result::Result,
@@ -36,6 +37,7 @@ pub struct QueryPlan {
3637
pub physical_plan: Arc<dyn ExecutionPlan>,
3738
pub distributed_plan: Arc<dyn ExecutionPlan>,
3839
pub distributed_tasks: Vec<DDTask>,
40+
pub distributed_stages: Vec<DDStage>,
3941
}
4042

4143
impl std::fmt::Debug for QueryPlan {
@@ -84,8 +86,10 @@ impl QueryPlanner {
8486

8587
/// Common planning steps shared by both query and its EXPLAIN
8688
///
87-
/// Prepare a query by parsing the SQL, planning it, and distributing the
88-
/// physical plan into stages that can be executed by workers.
89+
/// Prepare a query by parsing the SQL, performing logical planning,
90+
/// and then physical planning to create a distributed plan.
91+
/// Builds an initial `QueryPlan` with the logical and physical plans,
92+
/// but without worker addresses and tasks, which will be set later in distribute_plan().
8993
pub async fn prepare(&self, sql: &str) -> Result<QueryPlan> {
9094
let mut ctx = get_ctx().map_err(|e| anyhow!("Could not create context: {e}"))?;
9195
if let Some(customizer) = &self.customizer {
@@ -118,14 +122,29 @@ impl QueryPlanner {
118122
}
119123
}
120124

125+
/// Prepare the query plan for distributed execution
121126
async fn prepare_query(
122127
&self,
123128
logical_plan: LogicalPlan,
124129
ctx: SessionContext,
125130
) -> Result<QueryPlan> {
131+
// construct the initial physical plan from the logical plan
126132
let physical_plan = physical_planning(&logical_plan, &ctx).await?;
127133

128-
self.send_it(logical_plan, physical_plan, ctx).await
134+
// construct the distributed physical plan and stages
135+
let (distributed_plan, distributed_stages) =
136+
distributed_physical_planning(physical_plan.clone(), 8192, Some(2)).await?;
137+
138+
// build the initial plan, without the worker addresses and DDTasks
139+
return self
140+
.build_initial_plan(
141+
distributed_plan,
142+
distributed_stages,
143+
ctx,
144+
logical_plan,
145+
physical_plan,
146+
)
147+
.await;
129148
}
130149

131150
async fn prepare_local(
@@ -151,7 +170,20 @@ impl QueryPlanner {
151170
let combined_batch = concat_batches(&batches[0].schema(), &batches)?;
152171
let physical_plan = Arc::new(RecordBatchExec::new(combined_batch));
153172

154-
self.send_it(logical_plan, physical_plan, ctx).await
173+
// construct the distributed physical plan and stages
174+
let (distributed_plan, distributed_stages) =
175+
distributed_physical_planning(physical_plan.clone(), 8192, Some(2)).await?;
176+
177+
// build the initial plan, without the worker addresses and DDTasks
178+
return self
179+
.build_initial_plan(
180+
distributed_plan,
181+
distributed_stages,
182+
ctx,
183+
logical_plan,
184+
physical_plan,
185+
)
186+
.await;
155187
}
156188

157189
async fn prepare_explain(
@@ -166,7 +198,8 @@ impl QueryPlanner {
166198

167199
let logical_plan = child_plan[0];
168200

169-
let query_plan = self.prepare_query(logical_plan.clone(), ctx).await?;
201+
// construct the initial distributed physical plan, without the worker addresses and DDTasks
202+
let mut query_plan = self.prepare_query(logical_plan.clone(), ctx).await?;
170203

171204
let batch = build_explain_batch(
172205
&query_plan.logical_plan,
@@ -176,56 +209,62 @@ impl QueryPlanner {
176209
self.codec.as_ref(),
177210
)?;
178211
let physical_plan = Arc::new(RecordBatchExec::new(batch));
212+
query_plan.physical_plan = physical_plan.clone();
179213

180-
self.send_it(
181-
query_plan.logical_plan,
182-
physical_plan,
183-
query_plan.session_context,
184-
)
185-
.await
214+
Ok(query_plan)
186215
}
187-
async fn send_it(
216+
217+
async fn build_initial_plan(
188218
&self,
219+
distributed_plan: Arc<dyn ExecutionPlan>,
220+
distributed_stages: Vec<DDStage>,
221+
ctx: SessionContext,
189222
logical_plan: LogicalPlan,
190223
physical_plan: Arc<dyn ExecutionPlan>,
191-
ctx: SessionContext,
192224
) -> Result<QueryPlan> {
193225
let query_id = uuid::Uuid::new_v4().to_string();
194226

195-
// divide the physical plan into chunks (tasks) that we can distribute to workers
196-
let (distributed_plan, distributed_stages) =
197-
execution_planning(physical_plan.clone(), 8192, Some(2)).await?;
198-
199-
let worker_addrs = get_worker_addresses()?;
200-
201227
// gather some information we need to send back such that
202228
// we can send a ticket to the client
203229
let final_stage = &distributed_stages[distributed_stages.len() - 1];
204230
let schema = Arc::clone(&final_stage.plan.schema());
205231
let final_stage_id = final_stage.stage_id;
206232

207-
// distribute the stages to workers, further dividing them up
208-
// into chunks of partitions (partition_groups)
209-
let (final_workers, tasks) = distribute_stages(
210-
&query_id,
211-
distributed_stages,
212-
worker_addrs,
213-
self.codec.as_ref(),
214-
)
215-
.await?;
216-
217-
let qp = QueryPlan {
233+
Ok(QueryPlan {
218234
query_id,
219235
session_context: ctx,
220-
worker_addresses: final_workers,
221236
final_stage_id,
222237
schema,
223238
logical_plan,
224239
physical_plan,
225240
distributed_plan,
226-
distributed_tasks: tasks,
227-
};
241+
distributed_stages,
242+
// will be populated on distribute_plan
243+
worker_addresses: Addrs::default(),
244+
distributed_tasks: Vec::new(),
245+
})
246+
}
247+
248+
/// Performs worker discovery, and distributes the query plan to workers.
249+
/// also sets the final worker addresses and distributed tasks in the query plan.
250+
pub async fn distribute_plan(&self, initial_plan: &mut QueryPlan) -> Result<()> {
251+
// Perform worker discovery
252+
let worker_addrs = get_worker_addresses()?;
253+
254+
// Distribute the stages to workers, further dividing them up
255+
// into chunks of partitions (partition_groups)
256+
let (final_workers, tasks) = distribute_stages(
257+
&initial_plan.query_id,
258+
initial_plan.distributed_stages.clone(),
259+
worker_addrs,
260+
self.codec.as_ref(),
261+
)
262+
.await?;
263+
264+
// set the distributed tasks and final worker addresses
265+
initial_plan.worker_addresses = final_workers;
266+
initial_plan.distributed_tasks = tasks;
228267

229-
Ok(qp)
268+
Ok(())
230269
}
231270
}

src/util.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ use tonic::transport::Channel;
5454
use url::Url;
5555

5656
use crate::{
57-
codec::DDCodec,
5857
logging::{debug, error, trace},
5958
protobuf::StageAddrs,
6059
result::Result,

0 commit comments

Comments
 (0)