Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub fn format_distributed_tasks(
Ok(result)
}

/// Builds a single RecordBatch with two columns, the first column with the type of plan and
/// the second column containing the formatted logical plan, physical plan, distributed plan,
/// and distributed tasks.
pub fn build_explain_batch(
logical_plan: &LogicalPlan,
physical_plan: &Arc<dyn ExecutionPlan>,
Expand Down
7 changes: 4 additions & 3 deletions src/planning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::{
},
};

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DDStage {
/// our stage id
pub stage_id: u64,
Expand Down Expand Up @@ -221,6 +221,7 @@ pub async fn logical_planning(sql: &str, ctx: &SessionContext) -> Result<Logical
Ok(plan)
}

/// Builds the physical plan from the logical plan, using the default Physical Planner from DataFusion
pub async fn physical_planning(
logical_plan: &LogicalPlan,
ctx: &SessionContext,
Expand All @@ -239,7 +240,7 @@ pub async fn physical_planning(
}

/// Returns distributed plan and execution stages for both query execution and EXPLAIN display
pub async fn execution_planning(
pub async fn distributed_physical_planning(
physical_plan: Arc<dyn ExecutionPlan>,
batch_size: usize,
partitions_per_worker: Option<usize>,
Expand Down Expand Up @@ -438,7 +439,7 @@ pub fn add_distributed_analyze(
/// final stage only as that's all we care about from the call site
pub async fn distribute_stages(
query_id: &str,
stages: Vec<DDStage>,
stages: &[DDStage],
worker_addrs: Vec<Host>,
codec: &dyn PhysicalExtensionCodec,
) -> Result<(Addrs, Vec<DDTask>)> {
Expand Down
14 changes: 12 additions & 2 deletions src/proxy_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,17 @@ impl FlightSqlHandler for DDProxyHandler {
query: arrow_flight::sql::CommandStatementQuery,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
let query_plan = self
let mut query_plan = self
.planner
.prepare(&query.query)
.await
.map_err(|e| Status::internal(format!("Could not prepare query {e:?}")))?;

self.planner
.distribute_plan(&mut query_plan)
.await
.map_err(|e| Status::internal(format!("Could not distribute plan {e:?}")))?;

self.create_flight_info_response(query_plan)
.map(Response::new)
.context("Could not create flight info response")
Expand All @@ -200,12 +205,17 @@ impl FlightSqlHandler for DDProxyHandler {
None => return Err(Status::invalid_argument("Missing Substrait plan")),
};

let query_plan = self
let mut query_plan = self
.planner
.prepare_substrait(plan)
.await
.map_err(|e| Status::internal(format!("Could not prepare query {e:?}")))?;

self.planner
.distribute_plan(&mut query_plan)
.await
.map_err(|e| Status::internal(format!("Could not distribute plan {e:?}")))?;

self.create_flight_info_response(query_plan)
.map(Response::new)
.context("Could not create flight info response")
Expand Down
137 changes: 97 additions & 40 deletions src/query_planner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use anyhow::{anyhow, Context as AnyhowContext};
use anyhow::anyhow;
use arrow::{compute::concat_batches, datatypes::SchemaRef};
use datafusion::{
logical_expr::LogicalPlan,
Expand All @@ -17,7 +17,8 @@ use crate::{
customizer::Customizer,
explain::build_explain_batch,
planning::{
distribute_stages, execution_planning, get_ctx, logical_planning, physical_planning,
distribute_stages, distributed_physical_planning, get_ctx, logical_planning,
physical_planning, DDStage,
},
record_batch_exec::RecordBatchExec,
result::Result,
Expand All @@ -36,6 +37,7 @@ pub struct QueryPlan {
pub physical_plan: Arc<dyn ExecutionPlan>,
pub distributed_plan: Arc<dyn ExecutionPlan>,
pub distributed_tasks: Vec<DDTask>,
pub distributed_stages: Vec<DDStage>,
}

impl std::fmt::Debug for QueryPlan {
Expand Down Expand Up @@ -75,17 +77,19 @@ impl QueryPlanner {
customizer
.clone()
.map(|c| c as Arc<dyn PhysicalExtensionCodec>)
.or(Some(Arc::new(DefaultPhysicalExtensionCodec {})))
.unwrap(),
.unwrap_or(Arc::new(DefaultPhysicalExtensionCodec {})),
));

Self { customizer, codec }
}

/// Common planning steps shared by both query and its EXPLAIN
/// Prepare a Distributed DataFusion plan from a sql query.
///
/// Prepare a query by parsing the SQL, planning it, and distributing the
/// physical plan into stages that can be executed by workers.
/// This function parses the SQL, produces a logical plan, then derives the
/// physical plan and its distributed counterpart.
/// The resulting `QueryPlan` includes the logical plan, physical plan,
/// distributed plan, and distributed stages, but it does not yet contain
/// worker addresses or tasks, as they are filled in later by `distribute_plan()`.
pub async fn prepare(&self, sql: &str) -> Result<QueryPlan> {
let mut ctx = get_ctx().map_err(|e| anyhow!("Could not create context: {e}"))?;
if let Some(customizer) = &self.customizer {
Expand All @@ -105,8 +109,23 @@ impl QueryPlanner {
}
}

/// Prepare a Distributed DataFusion plan from a Substrait plan.
///
/// 1. Convert the incoming Substrait plan into a `LogicalPlan` with DataFusion’s
/// default Substrait consumer.
/// 2. Derive the corresponding physical plan and distributed variant.
///
/// The resulting `QueryPlan` contains the logical plan, physical plan,
/// distributed plan, and distributed stages, but it does not yet contain
/// worker addresses or tasks, as they are filled in later by `distribute_plan()`.
pub async fn prepare_substrait(&self, substrait_plan: Plan) -> Result<QueryPlan> {
let ctx = get_ctx().map_err(|e| anyhow!("Could not create context: {e}"))?;
let mut ctx = get_ctx().map_err(|e| anyhow!("Could not create context: {e}"))?;
if let Some(customizer) = &self.customizer {
customizer
.customize(&mut ctx)
.await
.map_err(|e| anyhow!("Customization failed: {e:#?}"))?;
}

let logical_plan = from_substrait_plan(&ctx.state(), &substrait_plan).await?;

Expand All @@ -118,16 +137,33 @@ impl QueryPlanner {
}
}

/// Prepare a `QueryPlan` for a regular SELECT query
async fn prepare_query(
&self,
logical_plan: LogicalPlan,
ctx: SessionContext,
) -> Result<QueryPlan> {
// construct the initial physical plan from the logical plan
let physical_plan = physical_planning(&logical_plan, &ctx).await?;

self.send_it(logical_plan, physical_plan, ctx).await
// construct the distributed physical plan and stages
let (distributed_plan, distributed_stages) =
distributed_physical_planning(physical_plan.clone(), 8192, Some(2)).await?;

// build the initial plan, without the worker addresses and DDTasks
return self
.build_initial_plan(
distributed_plan,
distributed_stages,
ctx,
logical_plan,
physical_plan,
)
.await;
}

/// Prepare a `QueryPlan` for statements that should run locally on the proxy
/// node (e.g. `DESCRIBE TABLE`).
async fn prepare_local(
&self,
logical_plan: LogicalPlan,
Expand All @@ -151,9 +187,23 @@ impl QueryPlanner {
let combined_batch = concat_batches(&batches[0].schema(), &batches)?;
let physical_plan = Arc::new(RecordBatchExec::new(combined_batch));

self.send_it(logical_plan, physical_plan, ctx).await
// construct the distributed physical plan and stages
let (distributed_plan, distributed_stages) =
distributed_physical_planning(physical_plan.clone(), 8192, Some(2)).await?;

// build the initial plan, without the worker addresses and DDTasks
return self
.build_initial_plan(
distributed_plan,
distributed_stages,
ctx,
logical_plan,
physical_plan,
)
.await;
}

/// Prepare a `QueryPlan` for an EXPLAIN statement.
async fn prepare_explain(
&self,
explain_plan: LogicalPlan,
Expand All @@ -166,7 +216,8 @@ impl QueryPlanner {

let logical_plan = child_plan[0];

let query_plan = self.prepare_query(logical_plan.clone(), ctx).await?;
// construct the initial distributed physical plan, without the worker addresses and DDTasks
let mut query_plan = self.prepare_query(logical_plan.clone(), ctx).await?;

let batch = build_explain_batch(
&query_plan.logical_plan,
Expand All @@ -176,56 +227,62 @@ impl QueryPlanner {
self.codec.as_ref(),
)?;
let physical_plan = Arc::new(RecordBatchExec::new(batch));
query_plan.physical_plan = physical_plan.clone();

self.send_it(
query_plan.logical_plan,
physical_plan,
query_plan.session_context,
)
.await
Ok(query_plan)
}
async fn send_it(

async fn build_initial_plan(
&self,
distributed_plan: Arc<dyn ExecutionPlan>,
distributed_stages: Vec<DDStage>,
ctx: SessionContext,
logical_plan: LogicalPlan,
physical_plan: Arc<dyn ExecutionPlan>,
ctx: SessionContext,
) -> Result<QueryPlan> {
let query_id = uuid::Uuid::new_v4().to_string();

// divide the physical plan into chunks (tasks) that we can distribute to workers
let (distributed_plan, distributed_stages) =
execution_planning(physical_plan.clone(), 8192, Some(2)).await?;

let worker_addrs = get_worker_addresses()?;

// gather some information we need to send back such that
// we can send a ticket to the client
let final_stage = &distributed_stages[distributed_stages.len() - 1];
let schema = Arc::clone(&final_stage.plan.schema());
let final_stage_id = final_stage.stage_id;

// distribute the stages to workers, further dividing them up
// into chunks of partitions (partition_groups)
let (final_workers, tasks) = distribute_stages(
&query_id,
distributed_stages,
worker_addrs,
self.codec.as_ref(),
)
.await?;

let qp = QueryPlan {
Ok(QueryPlan {
query_id,
session_context: ctx,
worker_addresses: final_workers,
final_stage_id,
schema,
logical_plan,
physical_plan,
distributed_plan,
distributed_tasks: tasks,
};
distributed_stages,
// will be populated on distribute_plan
worker_addresses: Addrs::default(),
distributed_tasks: Vec::new(),
})
}

/// Performs worker discovery, and distributes the query plan to workers,
/// also sets the final worker addresses and distributed tasks in the query plan.
pub async fn distribute_plan(&self, initial_plan: &mut QueryPlan) -> Result<()> {
// Perform worker discovery
let worker_addrs = get_worker_addresses()?;

// Distribute the stages to workers, further dividing them up
// into chunks of partitions (partition_groups)
let (final_workers, tasks) = distribute_stages(
&initial_plan.query_id,
&initial_plan.distributed_stages,
worker_addrs,
self.codec.as_ref(),
)
.await?;

// set the distributed tasks and final worker addresses
initial_plan.worker_addresses = final_workers;
initial_plan.distributed_tasks = tasks;

Ok(qp)
Ok(())
}
}
1 change: 0 additions & 1 deletion src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use tonic::transport::Channel;
use url::Url;

use crate::{
codec::DDCodec,
logging::{debug, error, trace},
protobuf::StageAddrs,
result::Result,
Expand Down
Loading