Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ tokio = { version = "1.46.1", features = ["full"] }
tonic = { version = "0.12.3", features = ["transport"] }
tower = "0.5.2"
http = "1.3.1"
itertools = "0.14.0"
futures = "0.3.31"
url = "2.5.4"
uuid = "1.17.0"
delegate = "0.13.4"
dashmap = "6.1.0"
prost = "0.13.5"
rand = "0.8.5"
object_store = "0.12.3"

[dev-dependencies]
insta = { version = "1.43.1", features = ["filters"] }
insta = { version = "1.43.1", features = ["filters"] }
2 changes: 2 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod result;
pub mod util;
1 change: 1 addition & 0 deletions src/common/result.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

36 changes: 36 additions & 0 deletions src/common/util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use datafusion::error::Result;
use datafusion::physical_plan::{displayable, ExecutionPlan, ExecutionPlanProperties};

use std::fmt::Write;

pub fn display_plan_with_partition_in_out(plan: &dyn ExecutionPlan) -> Result<String> {
let mut f = String::new();

fn visit(plan: &dyn ExecutionPlan, indent: usize, f: &mut String) -> Result<()> {
let output_partitions = plan.output_partitioning().partition_count();
let input_partitions = plan
.children()
.first()
.map(|child| child.output_partitioning().partition_count());

write!(
f,
"partitions [out:{:<3}{}]{} {}",
output_partitions,
input_partitions
.map(|p| format!("<-- in:{:<3}", p))
.unwrap_or(" ".to_string()),
" ".repeat(indent),
displayable(plan).one_line()
)?;

plan.children()
.iter()
.try_for_each(|input| visit(input.as_ref(), indent + 2, f))?;

Ok(())
}

visit(plan, 0, &mut f)?;
Ok(f)
}
20 changes: 0 additions & 20 deletions src/context.rs

This file was deleted.

137 changes: 28 additions & 109 deletions src/flight_service/do_get.rs
Original file line number Diff line number Diff line change
@@ -1,78 +1,28 @@
use crate::composed_extension_codec::ComposedPhysicalExtensionCodec;
use crate::context::StageTaskContext;
use crate::errors::datafusion_error_to_tonic_status;
use crate::flight_service::service::ArrowFlightEndpoint;
use crate::plan::ArrowFlightReadExecProtoCodec;
use crate::plan::DistributedCodec;
use crate::stage::{stage_from_proto, ExecutionStageProto};
use arrow_flight::encode::FlightDataEncoderBuilder;
use arrow_flight::error::FlightError;
use arrow_flight::flight_service_server::FlightService;
use arrow_flight::Ticket;
use datafusion::error::DataFusionError;
use datafusion::execution::SessionStateBuilder;
use datafusion::optimizer::OptimizerConfig;
use datafusion::physical_expr::{Partitioning, PhysicalExpr};
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::physical_plan::from_proto::parse_physical_exprs;
use datafusion_proto::physical_plan::to_proto::serialize_physical_exprs;
use datafusion_proto::physical_plan::{AsExecutionPlan, PhysicalExtensionCodec};
use datafusion_proto::protobuf::{PhysicalExprNode, PhysicalPlanNode};
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use futures::TryStreamExt;
use prost::Message;
use std::sync::Arc;
use tonic::{Request, Response, Status};
use uuid::Uuid;

#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DoGet {
#[prost(oneof = "DoGetInner", tags = "1")]
pub inner: Option<DoGetInner>,
}

#[derive(Clone, PartialEq, prost::Oneof)]
pub enum DoGetInner {
#[prost(message, tag = "1")]
RemotePlanExec(RemotePlanExec),
}

#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RemotePlanExec {
#[prost(message, optional, boxed, tag = "1")]
pub plan: Option<Box<PhysicalPlanNode>>,
#[prost(string, tag = "2")]
pub stage_id: String,
#[prost(uint64, tag = "3")]
pub task_idx: u64,
#[prost(uint64, tag = "4")]
pub output_task_idx: u64,
#[prost(uint64, tag = "5")]
pub output_tasks: u64,
#[prost(message, repeated, tag = "6")]
pub hash_expr: Vec<PhysicalExprNode>,
}

impl DoGet {
pub fn new_remote_plan_exec_ticket(
plan: Arc<dyn ExecutionPlan>,
stage_id: Uuid,
task_idx: usize,
output_task_idx: usize,
output_tasks: usize,
hash_expr: &[Arc<dyn PhysicalExpr>],
extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Ticket, DataFusionError> {
let node = PhysicalPlanNode::try_from_physical_plan(plan, extension_codec)?;
let do_get = Self {
inner: Some(DoGetInner::RemotePlanExec(RemotePlanExec {
plan: Some(Box::new(node)),
stage_id: stage_id.to_string(),
task_idx: task_idx as u64,
output_task_idx: output_task_idx as u64,
output_tasks: output_tasks as u64,
hash_expr: serialize_physical_exprs(hash_expr, extension_codec)?,
})),
};
Ok(Ticket::new(do_get.encode_to_vec()))
}
/// The ExecutionStage that we are going to execute
#[prost(message, optional, tag = "1")]
pub stage_proto: Option<ExecutionStageProto>,
/// the partition of the stage to execute
#[prost(uint64, tag = "2")]
pub partition: u64,
}

impl ArrowFlightEndpoint {
Expand All @@ -81,74 +31,43 @@ impl ArrowFlightEndpoint {
request: Request<Ticket>,
) -> Result<Response<<ArrowFlightEndpoint as FlightService>::DoGetStream>, Status> {
let Ticket { ticket } = request.into_inner();
let action = DoGet::decode(ticket).map_err(|err| {
let doget = DoGet::decode(ticket).map_err(|err| {
Status::invalid_argument(format!("Cannot decode DoGet message: {err}"))
})?;

let Some(action) = action.inner else {
return invalid_argument("DoGet message is empty");
};

let DoGetInner::RemotePlanExec(action) = action;
let stage_msg = doget
.stage_proto
.ok_or(Status::invalid_argument("DoGet is missing the stage proto"))?;

let state_builder = SessionStateBuilder::new()
.with_runtime_env(Arc::clone(&self.runtime))
.with_default_features();

let mut state = self.session_builder.on_new_session(state_builder).build();

let Some(function_registry) = state.function_registry() else {
return invalid_argument("FunctionRegistry not present in newly built SessionState");
};
let function_registry = state.function_registry().ok_or(Status::invalid_argument(
"FunctionRegistry not present in newly built SessionState",
))?;

let Some(plan_proto) = action.plan else {
return invalid_argument("RemotePlanExec is missing the plan");
};
let codec = DistributedCodec {};
let codec = Arc::new(codec) as Arc<dyn PhysicalExtensionCodec>;

let mut codec = ComposedPhysicalExtensionCodec::default();
codec.push(ArrowFlightReadExecProtoCodec);
codec.push_from_config(state.config());

let plan = plan_proto
.try_into_physical_plan(function_registry, &self.runtime, &codec)
.map_err(|err| Status::internal(format!("Cannot deserialize plan: {err}")))?;

let stage_id = Uuid::parse_str(&action.stage_id).map_err(|err| {
Status::invalid_argument(format!(
"Cannot parse stage id '{}': {err}",
action.stage_id
))
})?;

let task_idx = action.task_idx as usize;
let caller_actor_idx = action.output_task_idx as usize;
let prev_n = action.output_tasks as usize;
let partitioning = match parse_physical_exprs(
&action.hash_expr,
function_registry,
&plan.schema(),
&codec,
) {
Ok(expr) if expr.is_empty() => Partitioning::Hash(expr, prev_n),
Ok(_) => Partitioning::RoundRobinBatch(prev_n),
Err(err) => return invalid_argument(format!("Cannot parse hash expressions {err}")),
};
let stage = stage_from_proto(stage_msg, function_registry, &self.runtime.as_ref(), codec)
.map(Arc::new)
.map_err(|err| Status::invalid_argument(format!("Cannot decode stage proto: {err}")))?;

// Add the extensions that might be required for ExecutionPlan nodes in the plan
let config = state.config_mut();
config.set_extension(Arc::clone(&self.channel_manager));
config.set_extension(Arc::new(StageTaskContext { task_idx }));

let stream_partitioner = self
.partitioner_registry
.get_or_create_stream_partitioner(stage_id, task_idx, plan, partitioning)
.map_err(|err| datafusion_error_to_tonic_status(&err))?;
config.set_extension(stage.clone());

let stream = stream_partitioner
.execute(caller_actor_idx, state.task_ctx())
.map_err(|err| datafusion_error_to_tonic_status(&err))?;
let stream = stage
.plan
.execute(doget.partition as usize, state.task_ctx())
.map_err(|err| Status::internal(format!("Error executing stage plan: {err:#?}")))?;

let flight_data_stream = FlightDataEncoderBuilder::new()
.with_schema(stream_partitioner.schema())
.with_schema(stage.plan.schema().clone())
.build(stream.map_err(|err| {
FlightError::Tonic(Box::new(datafusion_error_to_tonic_status(&err)))
}));
Expand Down
9 changes: 6 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
mod channel_manager;
mod common;
mod composed_extension_codec;
pub(crate) mod context;
mod errors;
mod flight_service;
mod plan;
#[cfg(test)]
pub mod test_utils;
mod test_utils;

pub mod physical_optimizer;
pub mod stage;
pub mod task;
pub use channel_manager::{BoxCloneSyncChannel, ChannelManager, ChannelResolver};
pub use flight_service::{ArrowFlightEndpoint, SessionBuilder};
pub use plan::{assign_stages, ArrowFlightReadExec};
pub use plan::ArrowFlightReadExec;
Loading
Loading