Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub fn format_distributed_tasks(
Ok(result)
}

/// Builds a single RecordBatch with two columns, the first column with the type of plan and
/// the second column containing the formatted logical plan, physical plan, distributed plan,
/// and distributed tasks.
pub fn build_explain_batch(
logical_plan: &LogicalPlan,
physical_plan: &Arc<dyn ExecutionPlan>,
Expand Down
5 changes: 3 additions & 2 deletions src/planning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::{
},
};

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DDStage {
/// our stage id
pub stage_id: u64,
Expand Down Expand Up @@ -221,6 +221,7 @@ pub async fn logical_planning(sql: &str, ctx: &SessionContext) -> Result<Logical
Ok(plan)
}

/// Builds the physical plan from the logical plan, using the default Physical Planner from DataFusion
pub async fn physical_planning(
logical_plan: &LogicalPlan,
ctx: &SessionContext,
Expand All @@ -239,7 +240,7 @@ pub async fn physical_planning(
}

/// Returns distributed plan and execution stages for both query execution and EXPLAIN display
pub async fn execution_planning(
pub async fn distributed_physical_planning(
physical_plan: Arc<dyn ExecutionPlan>,
batch_size: usize,
partitions_per_worker: Option<usize>,
Expand Down
14 changes: 12 additions & 2 deletions src/proxy_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,17 @@ impl FlightSqlHandler for DDProxyHandler {
query: arrow_flight::sql::CommandStatementQuery,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
let query_plan = self
let mut query_plan = self
.planner
.prepare(&query.query)
.await
.map_err(|e| Status::internal(format!("Could not prepare query {e:?}")))?;

self.planner
.distribute_plan(&mut query_plan)
.await
.map_err(|e| Status::internal(format!("Could not distribute plan {e:?}")))?;

self.create_flight_info_response(query_plan)
.map(Response::new)
.context("Could not create flight info response")
Expand All @@ -200,12 +205,17 @@ impl FlightSqlHandler for DDProxyHandler {
None => return Err(Status::invalid_argument("Missing Substrait plan")),
};

let query_plan = self
let mut query_plan = self
.planner
.prepare_substrait(plan)
.await
.map_err(|e| Status::internal(format!("Could not prepare query {e:?}")))?;

self.planner
.distribute_plan(&mut query_plan)
.await
.map_err(|e| Status::internal(format!("Could not distribute plan {e:?}")))?;

self.create_flight_info_response(query_plan)
.map(Response::new)
.context("Could not create flight info response")
Expand Down
111 changes: 75 additions & 36 deletions src/query_planner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use anyhow::{anyhow, Context as AnyhowContext};
use anyhow::anyhow;
use arrow::{compute::concat_batches, datatypes::SchemaRef};
use datafusion::{
logical_expr::LogicalPlan,
Expand All @@ -17,7 +17,8 @@ use crate::{
customizer::Customizer,
explain::build_explain_batch,
planning::{
distribute_stages, execution_planning, get_ctx, logical_planning, physical_planning,
distribute_stages, distributed_physical_planning, get_ctx, logical_planning,
physical_planning, DDStage,
},
record_batch_exec::RecordBatchExec,
result::Result,
Expand All @@ -36,6 +37,7 @@ pub struct QueryPlan {
pub physical_plan: Arc<dyn ExecutionPlan>,
pub distributed_plan: Arc<dyn ExecutionPlan>,
pub distributed_tasks: Vec<DDTask>,
pub distributed_stages: Vec<DDStage>,
}

impl std::fmt::Debug for QueryPlan {
Expand Down Expand Up @@ -84,8 +86,10 @@ impl QueryPlanner {

/// Common planning steps shared by both query and its EXPLAIN
///
/// Prepare a query by parsing the SQL, planning it, and distributing the
/// physical plan into stages that can be executed by workers.
/// Prepare a query by parsing the SQL, performing logical planning,
/// and then physical planning to create a distributed plan.
/// Builds an initial `QueryPlan` with the logical and physical plans,
/// but without worker addresses and tasks, which will be set later in distribute_plan().
pub async fn prepare(&self, sql: &str) -> Result<QueryPlan> {
let mut ctx = get_ctx().map_err(|e| anyhow!("Could not create context: {e}"))?;
if let Some(customizer) = &self.customizer {
Expand Down Expand Up @@ -118,14 +122,29 @@ impl QueryPlanner {
}
}

/// Prepare the query plan for distributed execution
async fn prepare_query(
&self,
logical_plan: LogicalPlan,
ctx: SessionContext,
) -> Result<QueryPlan> {
// construct the initial physical plan from the logical plan
let physical_plan = physical_planning(&logical_plan, &ctx).await?;

self.send_it(logical_plan, physical_plan, ctx).await
// construct the distributed physical plan and stages
let (distributed_plan, distributed_stages) =
distributed_physical_planning(physical_plan.clone(), 8192, Some(2)).await?;

// build the initial plan, without the worker addresses and DDTasks
return self
.build_initial_plan(
distributed_plan,
distributed_stages,
ctx,
logical_plan,
physical_plan,
)
.await;
}

async fn prepare_local(
Expand All @@ -151,7 +170,20 @@ impl QueryPlanner {
let combined_batch = concat_batches(&batches[0].schema(), &batches)?;
let physical_plan = Arc::new(RecordBatchExec::new(combined_batch));

self.send_it(logical_plan, physical_plan, ctx).await
// construct the distributed physical plan and stages
let (distributed_plan, distributed_stages) =
distributed_physical_planning(physical_plan.clone(), 8192, Some(2)).await?;

// build the initial plan, without the worker addresses and DDTasks
return self
.build_initial_plan(
distributed_plan,
distributed_stages,
ctx,
logical_plan,
physical_plan,
)
.await;
}

async fn prepare_explain(
Expand All @@ -166,7 +198,8 @@ impl QueryPlanner {

let logical_plan = child_plan[0];

let query_plan = self.prepare_query(logical_plan.clone(), ctx).await?;
// construct the initial distributed physical plan, without the worker addresses and DDTasks
let mut query_plan = self.prepare_query(logical_plan.clone(), ctx).await?;

let batch = build_explain_batch(
&query_plan.logical_plan,
Expand All @@ -176,56 +209,62 @@ impl QueryPlanner {
self.codec.as_ref(),
)?;
let physical_plan = Arc::new(RecordBatchExec::new(batch));
query_plan.physical_plan = physical_plan.clone();

self.send_it(
query_plan.logical_plan,
physical_plan,
query_plan.session_context,
)
.await
Ok(query_plan)
}
async fn send_it(

async fn build_initial_plan(
&self,
distributed_plan: Arc<dyn ExecutionPlan>,
distributed_stages: Vec<DDStage>,
ctx: SessionContext,
logical_plan: LogicalPlan,
physical_plan: Arc<dyn ExecutionPlan>,
ctx: SessionContext,
) -> Result<QueryPlan> {
let query_id = uuid::Uuid::new_v4().to_string();

// divide the physical plan into chunks (tasks) that we can distribute to workers
let (distributed_plan, distributed_stages) =
execution_planning(physical_plan.clone(), 8192, Some(2)).await?;

let worker_addrs = get_worker_addresses()?;

// gather some information we need to send back such that
// we can send a ticket to the client
let final_stage = &distributed_stages[distributed_stages.len() - 1];
let schema = Arc::clone(&final_stage.plan.schema());
let final_stage_id = final_stage.stage_id;

// distribute the stages to workers, further dividing them up
// into chunks of partitions (partition_groups)
let (final_workers, tasks) = distribute_stages(
&query_id,
distributed_stages,
worker_addrs,
self.codec.as_ref(),
)
.await?;

let qp = QueryPlan {
Ok(QueryPlan {
query_id,
session_context: ctx,
worker_addresses: final_workers,
final_stage_id,
schema,
logical_plan,
physical_plan,
distributed_plan,
distributed_tasks: tasks,
};
distributed_stages,
// will be populated on distribute_plan
worker_addresses: Addrs::default(),
distributed_tasks: Vec::new(),
})
}

/// Performs worker discovery, and distributes the query plan to workers.
/// also sets the final worker addresses and distributed tasks in the query plan.
pub async fn distribute_plan(&self, initial_plan: &mut QueryPlan) -> Result<()> {
// Perform worker discovery
let worker_addrs = get_worker_addresses()?;

// Distribute the stages to workers, further dividing them up
// into chunks of partitions (partition_groups)
let (final_workers, tasks) = distribute_stages(
&initial_plan.query_id,
initial_plan.distributed_stages.clone(),
worker_addrs,
self.codec.as_ref(),
)
.await?;

// set the distributed tasks and final worker addresses
initial_plan.worker_addresses = final_workers;
initial_plan.distributed_tasks = tasks;

Ok(qp)
Ok(())
}
}
1 change: 0 additions & 1 deletion src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use tonic::transport::Channel;
use url::Url;

use crate::{
codec::DDCodec,
logging::{debug, error, trace},
protobuf::StageAddrs,
result::Result,
Expand Down
Loading