diff --git a/src/distributed_physical_optimizer_rule.rs b/src/distributed_planner/distributed_physical_optimizer_rule.rs similarity index 86% rename from src/distributed_physical_optimizer_rule.rs rename to src/distributed_planner/distributed_physical_optimizer_rule.rs index a5ff057..d3ffc48 100644 --- a/src/distributed_physical_optimizer_rule.rs +++ b/src/distributed_planner/distributed_physical_optimizer_rule.rs @@ -1,6 +1,10 @@ -use super::{NetworkShuffleExec, PartitionIsolatorExec}; +use crate::distributed_planner::distributed_plan_error::get_distribute_plan_err; +use crate::distributed_planner::{ + DistributedPlanError, NetworkBoundaryExt, limit_tasks_err, non_distributable_err, +}; use crate::execution_plans::{DistributedExec, NetworkCoalesceExec}; use crate::stage::Stage; +use crate::{NetworkShuffleExec, PartitionIsolatorExec}; use datafusion::common::plan_err; use datafusion::common::tree_node::TreeNodeRecursion; use datafusion::datasource::source::DataSourceExec; @@ -18,8 +22,6 @@ use datafusion::{ physical_optimizer::PhysicalOptimizerRule, physical_plan::{ExecutionPlan, repartition::RepartitionExec}, }; -use std::error::Error; -use std::fmt::{Display, Formatter}; use std::sync::Arc; use uuid::Uuid; @@ -305,82 +307,6 @@ impl DistributedPhysicalOptimizerRule { Ok(stage) } } - -/// Necessary information for building a [Stage] during distributed planning. -/// -/// [NetworkBoundary]s return this piece of data so that the distributed planner know how to -/// build the next [Stage] from which the [NetworkBoundary] is going to receive data. -/// -/// Some network boundaries might perform some modifications in their children, like scaling -/// up the number of partitions, or injecting a specific [ExecutionPlan] on top. -pub struct InputStageInfo { - /// The head plan of the [Stage] that is about to be built. - pub plan: Arc, - /// The amount of tasks the [Stage] will have. - pub task_count: usize, -} - -/// This trait represents a node that introduces the necessity of a network boundary in the plan. -/// The distributed planner, upon stepping into one of these, will break the plan and build a stage -/// out of it. -pub trait NetworkBoundary: ExecutionPlan { - /// Returns the information necessary for building the next stage from which this - /// [NetworkBoundary] is going to collect data. - fn get_input_stage_info(&self, task_count: usize) -> Result; - - /// re-assigns a different number of input tasks to the current [NetworkBoundary]. - /// - /// This will be called if upon building a stage, a [DistributedPlanError::LimitTasks] error - /// is returned, prompting the [NetworkBoundary] to choose a different number of input tasks. - fn with_input_task_count(&self, input_tasks: usize) -> Result>; - - /// Called when a [Stage] is correctly formed. The [NetworkBoundary] can use this - /// information to perform any internal transformations necessary for distributed execution. - /// - /// Typically, [NetworkBoundary]s will use this call for transitioning from "Pending" to "ready". - fn with_input_stage(&self, input_stage: Stage) -> Result>; - - /// Returns the assigned input [Stage], if any. - fn input_stage(&self) -> Option<&Stage>; - - /// The planner might decide to remove this [NetworkBoundary] from the plan if it decides that - /// it's not going to bring any benefit. The [NetworkBoundary] will be replaced with whatever - /// this function returns. - fn rollback(&self) -> Result> { - let children = self.children(); - if children.len() != 1 { - return plan_err!( - "Expected distributed node {} to have exactly 1 children, but got {}", - self.name(), - children.len() - ); - } - Ok(Arc::clone(children.first().unwrap())) - } -} - -/// Extension trait for downcasting dynamic types to [NetworkBoundary]. -pub trait NetworkBoundaryExt { - /// Downcasts self to a [NetworkBoundary] if possible. - fn as_network_boundary(&self) -> Option<&dyn NetworkBoundary>; - /// Returns whether self is a [NetworkBoundary] or not. - fn is_network_boundary(&self) -> bool { - self.as_network_boundary().is_some() - } -} - -impl NetworkBoundaryExt for dyn ExecutionPlan { - fn as_network_boundary(&self) -> Option<&dyn NetworkBoundary> { - if let Some(node) = self.as_any().downcast_ref::() { - Some(node) - } else if let Some(node) = self.as_any().downcast_ref::() { - Some(node) - } else { - None - } - } -} - /// Helper enum for storing either borrowed or owned trait object references enum Referenced<'a, T: ?Sized> { Borrowed(&'a T), @@ -396,59 +322,15 @@ impl Referenced<'_, T> { } } -/// Error thrown during distributed planning that prompts the planner to change something and -/// try again. -#[derive(Debug)] -enum DistributedPlanError { - /// Prompts the planner to limit the amount of tasks used in the stage that is currently - /// being planned. - LimitTasks(usize), - /// Signals the planner that this whole plan is non-distributable. This can happen if - /// certain nodes are present, like [StreamingTableExec], which are typically used in - /// queries that rather performing some execution, they perform some introspection. - NonDistributable(&'static str), -} - -impl Display for DistributedPlanError { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - DistributedPlanError::LimitTasks(n) => write!(f, "LimitTasksErr: {n}"), - DistributedPlanError::NonDistributable(name) => write!(f, "NonDistributable: {name}"), - } - } -} - -impl Error for DistributedPlanError {} - -/// Builds a [DistributedPlanError::LimitTasks] error. This error prompts the distributed planner -/// to try rebuilding the current stage with a limited amount of tasks. -pub fn limit_tasks_err(limit: usize) -> DataFusionError { - DataFusionError::External(Box::new(DistributedPlanError::LimitTasks(limit))) -} - -/// Builds a [DistributedPlanError::NonDistributable] error. This error prompts the distributed -/// planner to not distribute the query at all. -pub fn non_distributable_err(name: &'static str) -> DataFusionError { - DataFusionError::External(Box::new(DistributedPlanError::NonDistributable(name))) -} - -fn get_distribute_plan_err(err: &DataFusionError) -> Option<&DistributedPlanError> { - let DataFusionError::External(err) = err else { - return None; - }; - err.downcast_ref() -} - #[cfg(test)] mod tests { - use crate::distributed_physical_optimizer_rule::DistributedPhysicalOptimizerRule; + use crate::distributed_planner::distributed_physical_optimizer_rule::DistributedPhysicalOptimizerRule; use crate::test_utils::parquet::register_parquet_tables; use crate::{assert_snapshot, display_plan_ascii}; use datafusion::error::DataFusionError; use datafusion::execution::SessionStateBuilder; use datafusion::prelude::{SessionConfig, SessionContext}; use std::sync::Arc; - /* shema for the "weather" table MinTemp [type=DOUBLE] [repetitiontype=OPTIONAL] diff --git a/src/distributed_planner/distributed_plan_error.rs b/src/distributed_planner/distributed_plan_error.rs new file mode 100644 index 0000000..2b15dc4 --- /dev/null +++ b/src/distributed_planner/distributed_plan_error.rs @@ -0,0 +1,46 @@ +use datafusion::common::DataFusionError; +use std::error::Error; +use std::fmt::{Display, Formatter}; + +/// Error thrown during distributed planning that prompts the planner to change something and +/// try again. +#[derive(Debug)] +pub enum DistributedPlanError { + /// Prompts the planner to limit the amount of tasks used in the stage that is currently + /// being planned. + LimitTasks(usize), + /// Signals the planner that this whole plan is non-distributable. This can happen if + /// certain nodes are present, like `StreamingTableExec`, which are typically used in + /// queries that rather performing some execution, they perform some introspection. + NonDistributable(&'static str), +} + +impl Display for DistributedPlanError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + DistributedPlanError::LimitTasks(n) => write!(f, "LimitTasksErr: {n}"), + DistributedPlanError::NonDistributable(name) => write!(f, "NonDistributable: {name}"), + } + } +} + +impl Error for DistributedPlanError {} + +/// Builds a [DistributedPlanError::LimitTasks] error. This error prompts the distributed planner +/// to try rebuilding the current stage with a limited amount of tasks. +pub fn limit_tasks_err(limit: usize) -> DataFusionError { + DataFusionError::External(Box::new(DistributedPlanError::LimitTasks(limit))) +} + +/// Builds a [DistributedPlanError::NonDistributable] error. This error prompts the distributed +/// planner to not distribute the query at all. +pub fn non_distributable_err(name: &'static str) -> DataFusionError { + DataFusionError::External(Box::new(DistributedPlanError::NonDistributable(name))) +} + +pub(crate) fn get_distribute_plan_err(err: &DataFusionError) -> Option<&DistributedPlanError> { + let DataFusionError::External(err) = err else { + return None; + }; + err.downcast_ref() +} diff --git a/src/distributed_planner/mod.rs b/src/distributed_planner/mod.rs new file mode 100644 index 0000000..23f24e8 --- /dev/null +++ b/src/distributed_planner/mod.rs @@ -0,0 +1,7 @@ +mod distributed_physical_optimizer_rule; +mod distributed_plan_error; +mod network_boundary; + +pub use distributed_physical_optimizer_rule::DistributedPhysicalOptimizerRule; +pub use distributed_plan_error::{DistributedPlanError, limit_tasks_err, non_distributable_err}; +pub use network_boundary::{InputStageInfo, NetworkBoundary, NetworkBoundaryExt}; diff --git a/src/distributed_planner/network_boundary.rs b/src/distributed_planner/network_boundary.rs new file mode 100644 index 0000000..2fd25cf --- /dev/null +++ b/src/distributed_planner/network_boundary.rs @@ -0,0 +1,86 @@ +use crate::{NetworkCoalesceExec, NetworkShuffleExec, Stage}; +use datafusion::common::plan_err; +use datafusion::physical_plan::ExecutionPlan; +use std::sync::Arc; + +/// Necessary information for building a [Stage] during distributed planning. +/// +/// [NetworkBoundary]s return this piece of data so that the distributed planner know how to +/// build the next [Stage] from which the [NetworkBoundary] is going to receive data. +/// +/// Some network boundaries might perform some modifications in their children, like scaling +/// up the number of partitions, or injecting a specific [ExecutionPlan] on top. +pub struct InputStageInfo { + /// The head plan of the [Stage] that is about to be built. + pub plan: Arc, + /// The amount of tasks the [Stage] will have. + pub task_count: usize, +} + +/// This trait represents a node that introduces the necessity of a network boundary in the plan. +/// The distributed planner, upon stepping into one of these, will break the plan and build a stage +/// out of it. +pub trait NetworkBoundary: ExecutionPlan { + /// Returns the information necessary for building the next stage from which this + /// [NetworkBoundary] is going to collect data. + fn get_input_stage_info(&self, task_count: usize) + -> datafusion::common::Result; + + /// re-assigns a different number of input tasks to the current [NetworkBoundary]. + /// + /// This will be called if upon building a stage, a [crate::distributed_planner::distributed_physical_optimizer_rule::DistributedPlanError::LimitTasks] error + /// is returned, prompting the [NetworkBoundary] to choose a different number of input tasks. + fn with_input_task_count( + &self, + input_tasks: usize, + ) -> datafusion::common::Result>; + + /// Called when a [Stage] is correctly formed. The [NetworkBoundary] can use this + /// information to perform any internal transformations necessary for distributed execution. + /// + /// Typically, [NetworkBoundary]s will use this call for transitioning from "Pending" to "ready". + fn with_input_stage( + &self, + input_stage: Stage, + ) -> datafusion::common::Result>; + + /// Returns the assigned input [Stage], if any. + fn input_stage(&self) -> Option<&Stage>; + + /// The planner might decide to remove this [NetworkBoundary] from the plan if it decides that + /// it's not going to bring any benefit. The [NetworkBoundary] will be replaced with whatever + /// this function returns. + fn rollback(&self) -> datafusion::common::Result> { + let children = self.children(); + if children.len() != 1 { + return plan_err!( + "Expected distributed node {} to have exactly 1 children, but got {}", + self.name(), + children.len() + ); + } + Ok(Arc::clone(children.first().unwrap())) + } +} + +/// Extension trait for downcasting dynamic types to [NetworkBoundary]. +pub trait NetworkBoundaryExt { + /// Downcasts self to a [NetworkBoundary] if possible. + fn as_network_boundary(&self) -> Option<&dyn NetworkBoundary>; + /// Returns whether self is a [NetworkBoundary] or not. + fn is_network_boundary(&self) -> bool { + self.as_network_boundary().is_some() + } +} + +impl NetworkBoundaryExt for dyn ExecutionPlan { + fn as_network_boundary(&self) -> Option<&dyn NetworkBoundary> { + if let Some(node) = self.as_any().downcast_ref::() { + Some(node) + } else if let Some(node) = self.as_any().downcast_ref::() { + Some(node) + } else { + None + } + } +} diff --git a/src/execution_plans/distributed.rs b/src/execution_plans/distributed.rs index e1be91d..e7b2ac4 100644 --- a/src/execution_plans/distributed.rs +++ b/src/execution_plans/distributed.rs @@ -1,5 +1,5 @@ use crate::channel_resolver_ext::get_distributed_channel_resolver; -use crate::distributed_physical_optimizer_rule::NetworkBoundaryExt; +use crate::distributed_planner::NetworkBoundaryExt; use crate::execution_plans::common::require_one_child; use crate::protobuf::DistributedCodec; use crate::stage::{ExecutionTask, Stage}; diff --git a/src/execution_plans/network_coalesce.rs b/src/execution_plans/network_coalesce.rs index 3a873ed..9db7f59 100644 --- a/src/execution_plans/network_coalesce.rs +++ b/src/execution_plans/network_coalesce.rs @@ -1,8 +1,6 @@ use crate::channel_resolver_ext::get_distributed_channel_resolver; use crate::config_extension_ext::ContextGrpcMetadata; -use crate::distributed_physical_optimizer_rule::{ - InputStageInfo, NetworkBoundary, limit_tasks_err, -}; +use crate::distributed_planner::{InputStageInfo, NetworkBoundary, limit_tasks_err}; use crate::execution_plans::common::{require_one_child, scale_partitioning_props}; use crate::flight_service::DoGet; use crate::metrics::MetricsCollectingStream; diff --git a/src/execution_plans/network_shuffle.rs b/src/execution_plans/network_shuffle.rs index 92413c8..5769bfc 100644 --- a/src/execution_plans/network_shuffle.rs +++ b/src/execution_plans/network_shuffle.rs @@ -1,6 +1,5 @@ use crate::channel_resolver_ext::get_distributed_channel_resolver; use crate::config_extension_ext::ContextGrpcMetadata; -use crate::distributed_physical_optimizer_rule::{InputStageInfo, NetworkBoundary}; use crate::execution_plans::common::{require_one_child, scale_partitioning}; use crate::flight_service::DoGet; use crate::metrics::MetricsCollectingStream; @@ -8,7 +7,7 @@ use crate::metrics::proto::MetricsSetProto; use crate::protobuf::StageKey; use crate::protobuf::{map_flight_to_datafusion_error, map_status_to_datafusion_error}; use crate::stage::{MaybeEncodedPlan, Stage}; -use crate::{ChannelResolver, DistributedTaskContext}; +use crate::{ChannelResolver, DistributedTaskContext, InputStageInfo, NetworkBoundary}; use arrow_flight::Ticket; use arrow_flight::decode::FlightRecordBatchStream; use arrow_flight::error::FlightError; diff --git a/src/execution_plans/partition_isolator.rs b/src/execution_plans/partition_isolator.rs index afa4cbf..dfe64c9 100644 --- a/src/execution_plans/partition_isolator.rs +++ b/src/execution_plans/partition_isolator.rs @@ -1,5 +1,5 @@ use crate::DistributedTaskContext; -use crate::distributed_physical_optimizer_rule::limit_tasks_err; +use crate::distributed_planner::limit_tasks_err; use datafusion::common::{exec_err, plan_err}; use datafusion::error::DataFusionError; use datafusion::execution::TaskContext; diff --git a/src/lib.rs b/src/lib.rs index 21bed14..6d49c6f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,19 +4,19 @@ mod channel_resolver_ext; mod common; mod config_extension_ext; mod distributed_ext; -mod distributed_physical_optimizer_rule; mod execution_plans; mod flight_service; mod metrics; mod stage; +mod distributed_planner; mod protobuf; #[cfg(any(feature = "integration", test))] pub mod test_utils; pub use channel_resolver_ext::{BoxCloneSyncChannel, ChannelResolver}; pub use distributed_ext::DistributedExt; -pub use distributed_physical_optimizer_rule::{ +pub use distributed_planner::{ DistributedPhysicalOptimizerRule, InputStageInfo, NetworkBoundary, NetworkBoundaryExt, }; pub use execution_plans::{ diff --git a/src/metrics/task_metrics_rewriter.rs b/src/metrics/task_metrics_rewriter.rs index 6ddc334..4d1c06b 100644 --- a/src/metrics/task_metrics_rewriter.rs +++ b/src/metrics/task_metrics_rewriter.rs @@ -1,4 +1,4 @@ -use crate::NetworkBoundaryExt; +use crate::distributed_planner::NetworkBoundaryExt; use crate::execution_plans::DistributedExec; use crate::execution_plans::MetricsWrapperExec; use crate::metrics::MetricsCollectorResult; diff --git a/src/protobuf/distributed_codec.rs b/src/protobuf/distributed_codec.rs index ec51d8e..957cd95 100644 --- a/src/protobuf/distributed_codec.rs +++ b/src/protobuf/distributed_codec.rs @@ -1,5 +1,5 @@ use super::get_distributed_user_codecs; -use crate::distributed_physical_optimizer_rule::NetworkBoundary; +use crate::NetworkBoundary; use crate::execution_plans::{NetworkCoalesceExec, NetworkCoalesceReady, NetworkShuffleReadyExec}; use crate::stage::{ExecutionTask, MaybeEncodedPlan, Stage}; use crate::{NetworkShuffleExec, PartitionIsolatorExec}; diff --git a/src/stage.rs b/src/stage.rs index 0ed1043..1dd8a79 100644 --- a/src/stage.rs +++ b/src/stage.rs @@ -168,8 +168,8 @@ impl Stage { } } -use crate::distributed_physical_optimizer_rule::{NetworkBoundary, NetworkBoundaryExt}; use crate::rewrite_distributed_plan_with_metrics; +use crate::{NetworkBoundary, NetworkBoundaryExt}; use bytes::Bytes; use datafusion::common::DataFusionError; use datafusion::physical_expr::Partitioning; diff --git a/src/test_utils/plans.rs b/src/test_utils/plans.rs index c182c90..6a09a5e 100644 --- a/src/test_utils/plans.rs +++ b/src/test_utils/plans.rs @@ -5,7 +5,7 @@ use datafusion::{ }; use std::sync::Arc; -use crate::distributed_physical_optimizer_rule::NetworkBoundaryExt; +use crate::NetworkBoundaryExt; use crate::execution_plans::DistributedExec; use crate::protobuf::StageKey; use crate::stage::Stage;