Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/channel_manager_ext.rs → src/channel_resolver_ext.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use async_trait::async_trait;
use datafusion::common::exec_datafusion_err;
use datafusion::error::DataFusionError;
use datafusion::prelude::SessionConfig;
use std::sync::Arc;
Expand All @@ -16,9 +17,10 @@ pub(crate) fn set_distributed_channel_resolver(

pub(crate) fn get_distributed_channel_resolver(
cfg: &SessionConfig,
) -> Option<Arc<dyn ChannelResolver + Send + Sync>> {
) -> Result<Arc<dyn ChannelResolver + Send + Sync>, DataFusionError> {
cfg.get_extension::<ChannelResolverExtension>()
.map(|cm| cm.0.clone())
.ok_or_else(|| exec_datafusion_err!("ChannelResolver not present in the session config"))
}

#[derive(Clone)]
Expand Down
4 changes: 2 additions & 2 deletions src/distributed_ext.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::channel_manager_ext::set_distributed_channel_resolver;
use crate::channel_resolver_ext::set_distributed_channel_resolver;
use crate::config_extension_ext::{
set_distributed_option_extension, set_distributed_option_extension_from_headers,
};
use crate::user_codec_ext::set_distributed_user_codec;
use crate::protobuf::set_distributed_user_codec;
use crate::ChannelResolver;
use datafusion::common::DataFusionError;
use datafusion::config::ConfigExtension;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::Arc;

use super::stage::ExecutionStage;
use crate::{plan::PartitionIsolatorExec, ArrowFlightReadExec};
use super::{ArrowFlightReadExec, PartitionIsolatorExec, StageExec};
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::joins::PartitionMode;
Expand Down Expand Up @@ -55,7 +54,7 @@ impl PhysicalOptimizerRule for DistributedPhysicalOptimizerRule {
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
// We can only optimize plans that are not already distributed
if plan.as_any().is::<ExecutionStage>() {
if plan.as_any().is::<StageExec>() {
return Ok(plan);
}

Expand Down Expand Up @@ -106,7 +105,7 @@ impl DistributedPhysicalOptimizerRule {
pub fn distribute_plan(
&self,
plan: Arc<dyn ExecutionPlan>,
) -> Result<ExecutionStage, DataFusionError> {
) -> Result<StageExec, DataFusionError> {
let query_id = Uuid::new_v4();
self._distribute_plan_inner(query_id, plan, &mut 1, 0)
}
Expand All @@ -117,7 +116,7 @@ impl DistributedPhysicalOptimizerRule {
plan: Arc<dyn ExecutionPlan>,
num: &mut usize,
depth: usize,
) -> Result<ExecutionStage, DataFusionError> {
) -> Result<StageExec, DataFusionError> {
let mut inputs = vec![];

let distributed = plan.clone().transform_down(|plan| {
Expand All @@ -134,7 +133,7 @@ impl DistributedPhysicalOptimizerRule {
})?;

let inputs = inputs.into_iter().map(Arc::new).collect();
let mut stage = ExecutionStage::new(query_id, *num, distributed.data, inputs);
let mut stage = StageExec::new(query_id, *num, distributed.data, inputs);
*num += 1;

stage = match (self.partitions_per_task, can_be_divided(&plan)?) {
Expand Down Expand Up @@ -188,7 +187,7 @@ pub fn can_be_divided(plan: &Arc<dyn ExecutionPlan>) -> Result<bool> {
#[cfg(test)]
mod tests {
use crate::assert_snapshot;
use crate::physical_optimizer::DistributedPhysicalOptimizerRule;
use crate::distributed_physical_optimizer_rule::DistributedPhysicalOptimizerRule;
use crate::test_utils::parquet::register_parquet_tables;
use datafusion::error::DataFusionError;
use datafusion::execution::SessionStateBuilder;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::channel_manager_ext::get_distributed_channel_resolver;
use crate::channel_resolver_ext::get_distributed_channel_resolver;
use crate::config_extension_ext::ContextGrpcMetadata;
use crate::errors::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
use crate::execution_plans::StageExec;
use crate::flight_service::{DoGet, StageKey};
use crate::plan::DistributedCodec;
use crate::stage::{proto_from_stage, ExecutionStage};
use crate::protobuf::{proto_from_stage, DistributedCodec};
use crate::ChannelResolver;
use arrow_flight::decode::FlightRecordBatchStream;
use arrow_flight::error::FlightError;
Expand Down Expand Up @@ -147,21 +147,17 @@ impl ExecutionPlan for ArrowFlightReadExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream, DataFusionError> {
let ArrowFlightReadExec::Ready(this) = self else {
let ArrowFlightReadExec::Ready(self_ready) = self else {
return exec_err!("ArrowFlightReadExec is not ready, was the distributed optimization step performed?");
};

// get the channel manager and current stage from our context
let Some(channel_resolver) = get_distributed_channel_resolver(context.session_config())
else {
return exec_err!(
"ArrowFlightReadExec requires a ChannelResolver in the session config"
);
};
let channel_resolver = get_distributed_channel_resolver(context.session_config())?;

// the `ArrowFlightReadExec` node can only be executed in the context of a `StageExec`
let stage = context
.session_config()
.get_extension::<ExecutionStage>()
.get_extension::<StageExec>()
.ok_or(internal_datafusion_err!(
"ArrowFlightReadExec requires an ExecutionStage in the session config"
))?;
Expand All @@ -170,10 +166,10 @@ impl ExecutionPlan for ArrowFlightReadExec {
// reading from
let child_stage = stage
.child_stages_iter()
.find(|s| s.num == this.stage_num)
.find(|s| s.num == self_ready.stage_num)
.ok_or(internal_datafusion_err!(
"ArrowFlightReadExec: no child stage with num {}",
this.stage_num
self_ready.stage_num
))?;

let flight_metadata = context
Expand Down Expand Up @@ -218,7 +214,7 @@ impl ExecutionPlan for ArrowFlightReadExec {
);

async move {
let url = task.url()?.ok_or(internal_datafusion_err!(
let url = task.url.ok_or(internal_datafusion_err!(
"ArrowFlightReadExec: task is unassigned, cannot proceed"
))?;

Expand Down
7 changes: 7 additions & 0 deletions src/execution_plans/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod arrow_flight_read;
mod partition_isolator;
mod stage;

pub use arrow_flight_read::ArrowFlightReadExec;
pub use partition_isolator::{PartitionGroup, PartitionIsolatorExec};
pub use stage::{display_stage_graphviz, ExecutionTask, StageExec};
File renamed without changes.
Loading