Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use super::{NetworkShuffleExec, PartitionIsolatorExec};
use crate::distributed_planner::distributed_plan_error::get_distribute_plan_err;
use crate::distributed_planner::{
DistributedPlanError, NetworkBoundaryExt, limit_tasks_err, non_distributable_err,
};
use crate::execution_plans::{DistributedExec, NetworkCoalesceExec};
use crate::stage::Stage;
use crate::{NetworkShuffleExec, PartitionIsolatorExec};
use datafusion::common::plan_err;
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::datasource::source::DataSourceExec;
Expand All @@ -18,8 +22,6 @@ use datafusion::{
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{ExecutionPlan, repartition::RepartitionExec},
};
use std::error::Error;
use std::fmt::{Display, Formatter};
use std::sync::Arc;
use uuid::Uuid;

Expand Down Expand Up @@ -305,82 +307,6 @@ impl DistributedPhysicalOptimizerRule {
Ok(stage)
}
}

/// Necessary information for building a [Stage] during distributed planning.
///
/// [NetworkBoundary]s return this piece of data so that the distributed planner know how to
/// build the next [Stage] from which the [NetworkBoundary] is going to receive data.
///
/// Some network boundaries might perform some modifications in their children, like scaling
/// up the number of partitions, or injecting a specific [ExecutionPlan] on top.
pub struct InputStageInfo {
/// The head plan of the [Stage] that is about to be built.
pub plan: Arc<dyn ExecutionPlan>,
/// The amount of tasks the [Stage] will have.
pub task_count: usize,
}

/// This trait represents a node that introduces the necessity of a network boundary in the plan.
/// The distributed planner, upon stepping into one of these, will break the plan and build a stage
/// out of it.
pub trait NetworkBoundary: ExecutionPlan {
/// Returns the information necessary for building the next stage from which this
/// [NetworkBoundary] is going to collect data.
fn get_input_stage_info(&self, task_count: usize) -> Result<InputStageInfo>;

/// re-assigns a different number of input tasks to the current [NetworkBoundary].
///
/// This will be called if upon building a stage, a [DistributedPlanError::LimitTasks] error
/// is returned, prompting the [NetworkBoundary] to choose a different number of input tasks.
fn with_input_task_count(&self, input_tasks: usize) -> Result<Arc<dyn NetworkBoundary>>;

/// Called when a [Stage] is correctly formed. The [NetworkBoundary] can use this
/// information to perform any internal transformations necessary for distributed execution.
///
/// Typically, [NetworkBoundary]s will use this call for transitioning from "Pending" to "ready".
fn with_input_stage(&self, input_stage: Stage) -> Result<Arc<dyn ExecutionPlan>>;

/// Returns the assigned input [Stage], if any.
fn input_stage(&self) -> Option<&Stage>;

/// The planner might decide to remove this [NetworkBoundary] from the plan if it decides that
/// it's not going to bring any benefit. The [NetworkBoundary] will be replaced with whatever
/// this function returns.
fn rollback(&self) -> Result<Arc<dyn ExecutionPlan>> {
let children = self.children();
if children.len() != 1 {
return plan_err!(
"Expected distributed node {} to have exactly 1 children, but got {}",
self.name(),
children.len()
);
}
Ok(Arc::clone(children.first().unwrap()))
}
}

/// Extension trait for downcasting dynamic types to [NetworkBoundary].
pub trait NetworkBoundaryExt {
/// Downcasts self to a [NetworkBoundary] if possible.
fn as_network_boundary(&self) -> Option<&dyn NetworkBoundary>;
/// Returns whether self is a [NetworkBoundary] or not.
fn is_network_boundary(&self) -> bool {
self.as_network_boundary().is_some()
}
}

impl NetworkBoundaryExt for dyn ExecutionPlan {
fn as_network_boundary(&self) -> Option<&dyn NetworkBoundary> {
if let Some(node) = self.as_any().downcast_ref::<NetworkShuffleExec>() {
Some(node)
} else if let Some(node) = self.as_any().downcast_ref::<NetworkCoalesceExec>() {
Some(node)
} else {
None
}
}
}

/// Helper enum for storing either borrowed or owned trait object references
enum Referenced<'a, T: ?Sized> {
Borrowed(&'a T),
Expand All @@ -396,59 +322,15 @@ impl<T: ?Sized> Referenced<'_, T> {
}
}

/// Error thrown during distributed planning that prompts the planner to change something and
/// try again.
#[derive(Debug)]
enum DistributedPlanError {
/// Prompts the planner to limit the amount of tasks used in the stage that is currently
/// being planned.
LimitTasks(usize),
/// Signals the planner that this whole plan is non-distributable. This can happen if
/// certain nodes are present, like [StreamingTableExec], which are typically used in
/// queries that rather performing some execution, they perform some introspection.
NonDistributable(&'static str),
}

impl Display for DistributedPlanError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
DistributedPlanError::LimitTasks(n) => write!(f, "LimitTasksErr: {n}"),
DistributedPlanError::NonDistributable(name) => write!(f, "NonDistributable: {name}"),
}
}
}

impl Error for DistributedPlanError {}

/// Builds a [DistributedPlanError::LimitTasks] error. This error prompts the distributed planner
/// to try rebuilding the current stage with a limited amount of tasks.
pub fn limit_tasks_err(limit: usize) -> DataFusionError {
DataFusionError::External(Box::new(DistributedPlanError::LimitTasks(limit)))
}

/// Builds a [DistributedPlanError::NonDistributable] error. This error prompts the distributed
/// planner to not distribute the query at all.
pub fn non_distributable_err(name: &'static str) -> DataFusionError {
DataFusionError::External(Box::new(DistributedPlanError::NonDistributable(name)))
}

fn get_distribute_plan_err(err: &DataFusionError) -> Option<&DistributedPlanError> {
let DataFusionError::External(err) = err else {
return None;
};
err.downcast_ref()
}

#[cfg(test)]
mod tests {
use crate::distributed_physical_optimizer_rule::DistributedPhysicalOptimizerRule;
use crate::distributed_planner::distributed_physical_optimizer_rule::DistributedPhysicalOptimizerRule;
use crate::test_utils::parquet::register_parquet_tables;
use crate::{assert_snapshot, display_plan_ascii};
use datafusion::error::DataFusionError;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::{SessionConfig, SessionContext};
use std::sync::Arc;

/* shema for the "weather" table

MinTemp [type=DOUBLE] [repetitiontype=OPTIONAL]
Expand Down
46 changes: 46 additions & 0 deletions src/distributed_planner/distributed_plan_error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use datafusion::common::DataFusionError;
use std::error::Error;
use std::fmt::{Display, Formatter};

/// Error thrown during distributed planning that prompts the planner to change something and
/// try again.
#[derive(Debug)]
pub enum DistributedPlanError {
/// Prompts the planner to limit the amount of tasks used in the stage that is currently
/// being planned.
LimitTasks(usize),
/// Signals the planner that this whole plan is non-distributable. This can happen if
/// certain nodes are present, like `StreamingTableExec`, which are typically used in
/// queries that rather performing some execution, they perform some introspection.
NonDistributable(&'static str),
}

impl Display for DistributedPlanError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
DistributedPlanError::LimitTasks(n) => write!(f, "LimitTasksErr: {n}"),
DistributedPlanError::NonDistributable(name) => write!(f, "NonDistributable: {name}"),
}
}
}

impl Error for DistributedPlanError {}

/// Builds a [DistributedPlanError::LimitTasks] error. This error prompts the distributed planner
/// to try rebuilding the current stage with a limited amount of tasks.
pub fn limit_tasks_err(limit: usize) -> DataFusionError {
DataFusionError::External(Box::new(DistributedPlanError::LimitTasks(limit)))
}

/// Builds a [DistributedPlanError::NonDistributable] error. This error prompts the distributed
/// planner to not distribute the query at all.
pub fn non_distributable_err(name: &'static str) -> DataFusionError {
DataFusionError::External(Box::new(DistributedPlanError::NonDistributable(name)))
}

pub(crate) fn get_distribute_plan_err(err: &DataFusionError) -> Option<&DistributedPlanError> {
let DataFusionError::External(err) = err else {
return None;
};
err.downcast_ref()
}
7 changes: 7 additions & 0 deletions src/distributed_planner/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod distributed_physical_optimizer_rule;
mod distributed_plan_error;
mod network_boundary;

pub use distributed_physical_optimizer_rule::DistributedPhysicalOptimizerRule;
pub use distributed_plan_error::{DistributedPlanError, limit_tasks_err, non_distributable_err};
pub use network_boundary::{InputStageInfo, NetworkBoundary, NetworkBoundaryExt};
86 changes: 86 additions & 0 deletions src/distributed_planner/network_boundary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use crate::{NetworkCoalesceExec, NetworkShuffleExec, Stage};
use datafusion::common::plan_err;
use datafusion::physical_plan::ExecutionPlan;
use std::sync::Arc;

/// Necessary information for building a [Stage] during distributed planning.
///
/// [NetworkBoundary]s return this piece of data so that the distributed planner know how to
/// build the next [Stage] from which the [NetworkBoundary] is going to receive data.
///
/// Some network boundaries might perform some modifications in their children, like scaling
/// up the number of partitions, or injecting a specific [ExecutionPlan] on top.
pub struct InputStageInfo {
/// The head plan of the [Stage] that is about to be built.
pub plan: Arc<dyn ExecutionPlan>,
/// The amount of tasks the [Stage] will have.
pub task_count: usize,
}

/// This trait represents a node that introduces the necessity of a network boundary in the plan.
/// The distributed planner, upon stepping into one of these, will break the plan and build a stage
/// out of it.
pub trait NetworkBoundary: ExecutionPlan {
/// Returns the information necessary for building the next stage from which this
/// [NetworkBoundary] is going to collect data.
fn get_input_stage_info(&self, task_count: usize)
-> datafusion::common::Result<InputStageInfo>;

/// re-assigns a different number of input tasks to the current [NetworkBoundary].
///
/// This will be called if upon building a stage, a [crate::distributed_planner::distributed_physical_optimizer_rule::DistributedPlanError::LimitTasks] error
/// is returned, prompting the [NetworkBoundary] to choose a different number of input tasks.
fn with_input_task_count(
&self,
input_tasks: usize,
) -> datafusion::common::Result<Arc<dyn NetworkBoundary>>;

/// Called when a [Stage] is correctly formed. The [NetworkBoundary] can use this
/// information to perform any internal transformations necessary for distributed execution.
///
/// Typically, [NetworkBoundary]s will use this call for transitioning from "Pending" to "ready".
fn with_input_stage(
&self,
input_stage: Stage,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>>;

/// Returns the assigned input [Stage], if any.
fn input_stage(&self) -> Option<&Stage>;

/// The planner might decide to remove this [NetworkBoundary] from the plan if it decides that
/// it's not going to bring any benefit. The [NetworkBoundary] will be replaced with whatever
/// this function returns.
fn rollback(&self) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
let children = self.children();
if children.len() != 1 {
return plan_err!(
"Expected distributed node {} to have exactly 1 children, but got {}",
self.name(),
children.len()
);
}
Ok(Arc::clone(children.first().unwrap()))
}
}

/// Extension trait for downcasting dynamic types to [NetworkBoundary].
pub trait NetworkBoundaryExt {
/// Downcasts self to a [NetworkBoundary] if possible.
fn as_network_boundary(&self) -> Option<&dyn NetworkBoundary>;
/// Returns whether self is a [NetworkBoundary] or not.
fn is_network_boundary(&self) -> bool {
self.as_network_boundary().is_some()
}
}

impl NetworkBoundaryExt for dyn ExecutionPlan {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious if this can be made generic

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 what do you mean by generic here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right. Please ignore 🙇🏽

fn as_network_boundary(&self) -> Option<&dyn NetworkBoundary> {
if let Some(node) = self.as_any().downcast_ref::<NetworkShuffleExec>() {
Some(node)
} else if let Some(node) = self.as_any().downcast_ref::<NetworkCoalesceExec>() {
Some(node)
} else {
None
}
}
}
2 changes: 1 addition & 1 deletion src/execution_plans/distributed.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::channel_resolver_ext::get_distributed_channel_resolver;
use crate::distributed_physical_optimizer_rule::NetworkBoundaryExt;
use crate::distributed_planner::NetworkBoundaryExt;
use crate::execution_plans::common::require_one_child;
use crate::protobuf::DistributedCodec;
use crate::stage::{ExecutionTask, Stage};
Expand Down
4 changes: 1 addition & 3 deletions src/execution_plans/network_coalesce.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use crate::channel_resolver_ext::get_distributed_channel_resolver;
use crate::config_extension_ext::ContextGrpcMetadata;
use crate::distributed_physical_optimizer_rule::{
InputStageInfo, NetworkBoundary, limit_tasks_err,
};
use crate::distributed_planner::{InputStageInfo, NetworkBoundary, limit_tasks_err};
use crate::execution_plans::common::{require_one_child, scale_partitioning_props};
use crate::flight_service::DoGet;
use crate::metrics::MetricsCollectingStream;
Expand Down
3 changes: 1 addition & 2 deletions src/execution_plans/network_shuffle.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use crate::channel_resolver_ext::get_distributed_channel_resolver;
use crate::config_extension_ext::ContextGrpcMetadata;
use crate::distributed_physical_optimizer_rule::{InputStageInfo, NetworkBoundary};
use crate::execution_plans::common::{require_one_child, scale_partitioning};
use crate::flight_service::DoGet;
use crate::metrics::MetricsCollectingStream;
use crate::metrics::proto::MetricsSetProto;
use crate::protobuf::StageKey;
use crate::protobuf::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
use crate::stage::{MaybeEncodedPlan, Stage};
use crate::{ChannelResolver, DistributedTaskContext};
use crate::{ChannelResolver, DistributedTaskContext, InputStageInfo, NetworkBoundary};
use arrow_flight::Ticket;
use arrow_flight::decode::FlightRecordBatchStream;
use arrow_flight::error::FlightError;
Expand Down
2 changes: 1 addition & 1 deletion src/execution_plans/partition_isolator.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::DistributedTaskContext;
use crate::distributed_physical_optimizer_rule::limit_tasks_err;
use crate::distributed_planner::limit_tasks_err;
use datafusion::common::{exec_err, plan_err};
use datafusion::error::DataFusionError;
use datafusion::execution::TaskContext;
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ mod channel_resolver_ext;
mod common;
mod config_extension_ext;
mod distributed_ext;
mod distributed_physical_optimizer_rule;
mod execution_plans;
mod flight_service;
mod metrics;
mod stage;

mod distributed_planner;
mod protobuf;
#[cfg(any(feature = "integration", test))]
pub mod test_utils;

pub use channel_resolver_ext::{BoxCloneSyncChannel, ChannelResolver};
pub use distributed_ext::DistributedExt;
pub use distributed_physical_optimizer_rule::{
pub use distributed_planner::{
DistributedPhysicalOptimizerRule, InputStageInfo, NetworkBoundary, NetworkBoundaryExt,
};
pub use execution_plans::{
Expand Down
2 changes: 1 addition & 1 deletion src/metrics/task_metrics_rewriter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::NetworkBoundaryExt;
use crate::distributed_planner::NetworkBoundaryExt;
use crate::execution_plans::DistributedExec;
use crate::execution_plans::MetricsWrapperExec;
use crate::metrics::MetricsCollectorResult;
Expand Down
2 changes: 1 addition & 1 deletion src/protobuf/distributed_codec.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::get_distributed_user_codecs;
use crate::distributed_physical_optimizer_rule::NetworkBoundary;
use crate::NetworkBoundary;
use crate::execution_plans::{NetworkCoalesceExec, NetworkCoalesceReady, NetworkShuffleReadyExec};
use crate::stage::{ExecutionTask, MaybeEncodedPlan, Stage};
use crate::{NetworkShuffleExec, PartitionIsolatorExec};
Expand Down
2 changes: 1 addition & 1 deletion src/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ impl Stage {
}
}

use crate::distributed_physical_optimizer_rule::{NetworkBoundary, NetworkBoundaryExt};
use crate::rewrite_distributed_plan_with_metrics;
use crate::{NetworkBoundary, NetworkBoundaryExt};
use bytes::Bytes;
use datafusion::common::DataFusionError;
use datafusion::physical_expr::Partitioning;
Expand Down
Loading