diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index c8a5875f..015b6603 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -47,7 +47,7 @@ use datafusion_distributed::test_utils::localhost::{ }; use datafusion_distributed::{ DistributedExt, DistributedPhysicalOptimizerRule, DistributedSessionBuilder, - DistributedSessionBuilderContext, NetworkBoundaryExt, Stage, + DistributedSessionBuilderContext, NetworkBoundaryExt, }; use log::info; use std::fs; diff --git a/src/common/ttl_map.rs b/src/common/ttl_map.rs index 951aed28..b3878ada 100644 --- a/src/common/ttl_map.rs +++ b/src/common/ttl_map.rs @@ -289,7 +289,7 @@ where mod tests { use super::*; use std::sync::atomic::Ordering; - use tokio::time::{Duration, sleep}; + use tokio::time::Duration; #[tokio::test] async fn test_basic_insert_and_get() { diff --git a/src/distributed_physical_optimizer_rule.rs b/src/distributed_physical_optimizer_rule.rs index a602f6a1..90cfd0fb 100644 --- a/src/distributed_physical_optimizer_rule.rs +++ b/src/distributed_physical_optimizer_rule.rs @@ -1,5 +1,6 @@ -use super::{NetworkShuffleExec, PartitionIsolatorExec, Stage}; +use super::{NetworkShuffleExec, PartitionIsolatorExec}; use crate::execution_plans::{DistributedExec, NetworkCoalesceExec}; +use crate::stage::Stage; use datafusion::common::plan_err; use datafusion::common::tree_node::TreeNodeRecursion; use datafusion::datasource::source::DataSourceExec; @@ -123,68 +124,66 @@ impl DistributedPhysicalOptimizerRule { plan = Arc::new(CoalescePartitionsExec::new(plan)) } - let result = plan.transform_up(|plan| { - // If this node is a DataSourceExec, we need to wrap it with PartitionIsolatorExec so - // that not all tasks have access to all partitions of the underlying DataSource. - if plan.as_any().is::() { - let node = PartitionIsolatorExec::new_pending(plan); + let result = + plan.transform_up(|plan| { + // If this node is a DataSourceExec, we need to wrap it with PartitionIsolatorExec so + // that not all tasks have access to all partitions of the underlying DataSource. + if plan.as_any().is::() { + let node = PartitionIsolatorExec::new(plan); - return Ok(Transformed::yes(Arc::new(node))); - } - - // If this is a hash RepartitionExec, introduce a shuffle. - if let (Some(node), Some(tasks)) = ( - plan.as_any().downcast_ref::(), - self.network_shuffle_tasks, - ) { - if !matches!(node.partitioning(), Partitioning::Hash(_, _)) { - return Ok(Transformed::no(plan)); + return Ok(Transformed::yes(Arc::new(node))); } - let node = NetworkShuffleExec::from_repartition_exec(&plan, tasks)?; - return Ok(Transformed::yes(Arc::new(node))); - } + // If this is a hash RepartitionExec, introduce a shuffle. + if let (Some(node), Some(tasks)) = ( + plan.as_any().downcast_ref::(), + self.network_shuffle_tasks, + ) { + if !matches!(node.partitioning(), Partitioning::Hash(_, _)) { + return Ok(Transformed::no(plan)); + } + let node = NetworkShuffleExec::try_new(plan, tasks)?; - // If this is a CoalescePartitionsExec, it means that the original plan is trying to - // merge all partitions into one. We need to go one step ahead and also merge all tasks - // into one. - if let (Some(node), Some(tasks)) = ( - plan.as_any().downcast_ref::(), - self.network_coalesce_tasks, - ) { - // If the immediate child is a PartitionIsolatorExec, it means that the rest of the - // plan is just a couple of non-computational nodes that are probably not worth - // distributing. - if node - .children() - .first() - .is_some_and(|v| v.as_any().is::()) - { - return Ok(Transformed::no(plan)); + return Ok(Transformed::yes(Arc::new(node))); } - let node = NetworkCoalesceExec::from_input_exec(node, tasks)?; - let plan = plan.with_new_children(vec![Arc::new(node)])?; - - return Ok(Transformed::yes(plan)); - } + // If this is a CoalescePartitionsExec, it means that the original plan is trying to + // merge all partitions into one. We need to go one step ahead and also merge all tasks + // into one. + if let (Some(node), Some(tasks)) = ( + plan.as_any().downcast_ref::(), + self.network_coalesce_tasks, + ) { + // If the immediate child is a PartitionIsolatorExec, it means that the rest of the + // plan is just a couple of non-computational nodes that are probably not worth + // distributing. + if node.input().as_any().is::() { + return Ok(Transformed::no(plan)); + } - // The SortPreservingMergeExec node will try to coalesce all partitions into just 1. - // We need to account for it and help it by also coalescing all tasks into one, therefore - // a NetworkCoalesceExec is introduced. - if let (Some(node), Some(tasks)) = ( - plan.as_any().downcast_ref::(), - self.network_coalesce_tasks, - ) { - let node = NetworkCoalesceExec::from_input_exec(node, tasks)?; + let plan = plan.clone().with_new_children(vec![Arc::new( + NetworkCoalesceExec::new(Arc::clone(node.input()), tasks), + )])?; - let plan = plan.with_new_children(vec![Arc::new(node)])?; + return Ok(Transformed::yes(plan)); + } - return Ok(Transformed::yes(plan)); - } + // The SortPreservingMergeExec node will try to coalesce all partitions into just 1. + // We need to account for it and help it by also coalescing all tasks into one, therefore + // a NetworkCoalesceExec is introduced. + if let (Some(node), Some(tasks)) = ( + plan.as_any().downcast_ref::(), + self.network_coalesce_tasks, + ) { + let plan = plan.clone().with_new_children(vec![Arc::new( + NetworkCoalesceExec::new(Arc::clone(node.input()), tasks), + )])?; + + return Ok(Transformed::yes(plan)); + } - Ok(Transformed::no(plan)) - })?; + Ok(Transformed::no(plan)) + })?; Ok(result.data) } @@ -234,11 +233,11 @@ impl DistributedPhysicalOptimizerRule { }; let stage = loop { - let (inner_plan, in_tasks) = dnode.as_ref().to_stage_info(n_tasks)?; + let input_stage_info = dnode.as_ref().get_input_stage_info(n_tasks)?; // If the current stage has just 1 task, and the next stage is only going to have // 1 task, there's no point in having a network boundary in between, they can just // communicate in memory. - if n_tasks == 1 && in_tasks == 1 { + if n_tasks == 1 && input_stage_info.task_count == 1 { let mut n = dnode.as_ref().rollback()?; if let Some(node) = n.as_any().downcast_ref::() { // Also trim PartitionIsolatorExec out of the plan. @@ -246,7 +245,7 @@ impl DistributedPhysicalOptimizerRule { } return Ok(Transformed::yes(n)); } - match Self::_distribute_plan_inner(query_id, inner_plan.clone(), num, depth + 1, in_tasks) { + match Self::_distribute_plan_inner(query_id, input_stage_info.plan, num, depth + 1, input_stage_info.task_count) { Ok(v) => break v, Err(e) => match get_distribute_plan_err(&e) { None => return Err(e), @@ -255,7 +254,7 @@ impl DistributedPhysicalOptimizerRule { // that no more than `limit` tasks can be used for it, so we are going // to limit the amount of tasks to the requested number and try building // the stage again. - if in_tasks == *limit { + if input_stage_info.task_count == *limit { return plan_err!("A node requested {limit} tasks for the stage its in, but that stage already has that many tasks"); } dnode = Referenced::Arced(dnode.as_ref().with_input_task_count(*limit)?); @@ -280,14 +279,27 @@ impl DistributedPhysicalOptimizerRule { } } +/// Necessary information for building a [Stage] during distributed planning. +/// +/// [NetworkBoundary]s return this piece of data so that the distributed planner know how to +/// build the next [Stage] from which the [NetworkBoundary] is going to receive data. +/// +/// Some network boundaries might perform some modifications in their children, like scaling +/// up the number of partitions, or injecting a specific [ExecutionPlan] on top. +pub struct InputStageInfo { + /// The head plan of the [Stage] that is about to be built. + pub plan: Arc, + /// The amount of tasks the [Stage] will have. + pub task_count: usize, +} + /// This trait represents a node that introduces the necessity of a network boundary in the plan. /// The distributed planner, upon stepping into one of these, will break the plan and build a stage /// out of it. pub trait NetworkBoundary: ExecutionPlan { - /// Returns the information necessary for building the next stage. - /// - The head node of the stage. - /// - the amount of tasks that stage will have. - fn to_stage_info(&self, n_tasks: usize) -> Result<(Arc, usize)>; + /// Returns the information necessary for building the next stage from which this + /// [NetworkBoundary] is going to collect data. + fn get_input_stage_info(&self, task_count: usize) -> Result; /// re-assigns a different number of input tasks to the current [NetworkBoundary]. /// @@ -297,6 +309,8 @@ pub trait NetworkBoundary: ExecutionPlan { /// Called when a [Stage] is correctly formed. The [NetworkBoundary] can use this /// information to perform any internal transformations necessary for distributed execution. + /// + /// Typically, [NetworkBoundary]s will use this call for transitioning from "Pending" to "ready". fn with_input_stage(&self, input_stage: Stage) -> Result>; /// Returns the assigned input [Stage], if any. diff --git a/src/execution_plans/common.rs b/src/execution_plans/common.rs index 7f55eee1..f085b3e5 100644 --- a/src/execution_plans/common.rs +++ b/src/execution_plans/common.rs @@ -1,15 +1,21 @@ use datafusion::common::{DataFusionError, plan_err}; use datafusion::physical_expr::Partitioning; use datafusion::physical_plan::{ExecutionPlan, PlanProperties}; +use std::borrow::Borrow; use std::sync::Arc; -pub(super) fn require_one_child( - children: &[Arc], -) -> Result, DataFusionError> { +pub(super) fn require_one_child( + children: L, +) -> Result, DataFusionError> +where + L: AsRef<[T]>, + T: Borrow>, +{ + let children = children.as_ref(); if children.len() != 1 { return plan_err!("Expected exactly 1 children, got {}", children.len()); } - Ok(children[0].clone()) + Ok(children[0].borrow().clone()) } pub(super) fn scale_partitioning_props( diff --git a/src/execution_plans/distributed.rs b/src/execution_plans/distributed.rs index 87d7e62c..9fa4aab8 100644 --- a/src/execution_plans/distributed.rs +++ b/src/execution_plans/distributed.rs @@ -2,7 +2,7 @@ use crate::channel_resolver_ext::get_distributed_channel_resolver; use crate::distributed_physical_optimizer_rule::NetworkBoundaryExt; use crate::execution_plans::common::require_one_child; use crate::protobuf::DistributedCodec; -use crate::{ExecutionTask, Stage}; +use crate::stage::{ExecutionTask, Stage}; use datafusion::common::exec_err; use datafusion::common::tree_node::{Transformed, TreeNode}; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; @@ -98,7 +98,7 @@ impl ExecutionPlan for DistributedExec { children: Vec>, ) -> datafusion::common::Result> { Ok(Arc::new(DistributedExec { - plan: require_one_child(&children)?, + plan: require_one_child(children)?, })) } diff --git a/src/execution_plans/network_coalesce.rs b/src/execution_plans/network_coalesce.rs index a450c394..3a873ed1 100644 --- a/src/execution_plans/network_coalesce.rs +++ b/src/execution_plans/network_coalesce.rs @@ -1,13 +1,15 @@ use crate::channel_resolver_ext::get_distributed_channel_resolver; use crate::config_extension_ext::ContextGrpcMetadata; -use crate::distributed_physical_optimizer_rule::{NetworkBoundary, limit_tasks_err}; +use crate::distributed_physical_optimizer_rule::{ + InputStageInfo, NetworkBoundary, limit_tasks_err, +}; use crate::execution_plans::common::{require_one_child, scale_partitioning_props}; use crate::flight_service::DoGet; use crate::metrics::MetricsCollectingStream; use crate::metrics::proto::MetricsSetProto; use crate::protobuf::{StageKey, map_flight_to_datafusion_error, map_status_to_datafusion_error}; -use crate::stage::MaybeEncodedPlan; -use crate::{ChannelResolver, DistributedTaskContext, Stage}; +use crate::stage::{MaybeEncodedPlan, Stage}; +use crate::{ChannelResolver, DistributedTaskContext}; use arrow_flight::Ticket; use arrow_flight::decode::FlightRecordBatchStream; use arrow_flight::error::FlightError; @@ -75,7 +77,7 @@ pub enum NetworkCoalesceExec { pub struct NetworkCoalescePending { properties: PlanProperties, input_tasks: usize, - child: Arc, + input: Arc, } /// Ready version of the [NetworkCoalesceExec] node. This node can be created in @@ -99,30 +101,23 @@ pub struct NetworkCoalesceReady { } impl NetworkCoalesceExec { - /// Creates a new [NetworkCoalesceExec] node from a [CoalescePartitionsExec] and - /// [SortPreservingMergeExec]. - pub fn from_input_exec( - input: &dyn ExecutionPlan, - input_tasks: usize, - ) -> Result { - let children = input.children(); - let Some(child) = children.first() else { - return internal_err!("Expected a single child"); - }; - - Ok(Self::Pending(NetworkCoalescePending { - properties: child.properties().clone(), + /// Builds a new [NetworkCoalesceExec] in "Pending" state. + /// + /// Typically, this node should be place right after nodes that coalesce all the input + /// partitions into one, for example: + /// - [CoalescePartitionsExec] + /// - [SortPreservingMergeExec] + pub fn new(input: Arc, input_tasks: usize) -> Self { + Self::Pending(NetworkCoalescePending { + properties: input.properties().clone(), input_tasks, - child: Arc::clone(child), - })) + input, + }) } } impl NetworkBoundary for NetworkCoalesceExec { - fn to_stage_info( - &self, - n_tasks: usize, - ) -> Result<(Arc, usize), DataFusionError> { + fn get_input_stage_info(&self, n_tasks: usize) -> Result { let Self::Pending(pending) = self else { return plan_err!("can only return wrapped child if on Pending state"); }; @@ -132,7 +127,10 @@ impl NetworkBoundary for NetworkCoalesceExec { return Err(limit_tasks_err(1)); } - Ok((Arc::clone(&pending.child), pending.input_tasks)) + Ok(InputStageInfo { + plan: Arc::clone(&pending.input), + task_count: pending.input_tasks, + }) } fn with_input_stage( @@ -173,7 +171,7 @@ impl NetworkBoundary for NetworkCoalesceExec { Self::Pending(pending) => Self::Pending(NetworkCoalescePending { properties: pending.properties.clone(), input_tasks, - child: pending.child.clone(), + input: pending.input.clone(), }), Self::Ready(_) => { plan_err!("Self can only re-assign input tasks if in 'Pending' state")? @@ -216,7 +214,7 @@ impl ExecutionPlan for NetworkCoalesceExec { fn children(&self) -> Vec<&Arc> { match self { - NetworkCoalesceExec::Pending(v) => vec![&v.child], + NetworkCoalesceExec::Pending(v) => vec![&v.input], NetworkCoalesceExec::Ready(v) => match &v.input_stage.plan { MaybeEncodedPlan::Decoded(v) => vec![v], MaybeEncodedPlan::Encoded(_) => vec![], @@ -231,12 +229,12 @@ impl ExecutionPlan for NetworkCoalesceExec { match self.as_ref() { Self::Pending(v) => { let mut v = v.clone(); - v.child = require_one_child(&children)?; + v.input = require_one_child(children)?; Ok(Arc::new(Self::Pending(v))) } Self::Ready(v) => { let mut v = v.clone(); - v.input_stage.plan = MaybeEncodedPlan::Decoded(require_one_child(&children)?); + v.input_stage.plan = MaybeEncodedPlan::Decoded(require_one_child(children)?); Ok(Arc::new(Self::Ready(v))) } } diff --git a/src/execution_plans/network_shuffle.rs b/src/execution_plans/network_shuffle.rs index 576f8cbb..92413c81 100644 --- a/src/execution_plans/network_shuffle.rs +++ b/src/execution_plans/network_shuffle.rs @@ -1,14 +1,14 @@ use crate::channel_resolver_ext::get_distributed_channel_resolver; use crate::config_extension_ext::ContextGrpcMetadata; -use crate::distributed_physical_optimizer_rule::NetworkBoundary; +use crate::distributed_physical_optimizer_rule::{InputStageInfo, NetworkBoundary}; use crate::execution_plans::common::{require_one_child, scale_partitioning}; use crate::flight_service::DoGet; use crate::metrics::MetricsCollectingStream; use crate::metrics::proto::MetricsSetProto; use crate::protobuf::StageKey; use crate::protobuf::{map_flight_to_datafusion_error, map_status_to_datafusion_error}; -use crate::stage::MaybeEncodedPlan; -use crate::{ChannelResolver, DistributedTaskContext, Stage}; +use crate::stage::{MaybeEncodedPlan, Stage}; +use crate::{ChannelResolver, DistributedTaskContext}; use arrow_flight::Ticket; use arrow_flight::decode::FlightRecordBatchStream; use arrow_flight::error::FlightError; @@ -124,7 +124,7 @@ pub enum NetworkShuffleExec { /// [NetworkShuffleReadyExec] node. #[derive(Debug, Clone)] pub struct NetworkShufflePendingExec { - repartition_exec: Arc, + input: Arc, input_tasks: usize, } @@ -149,54 +149,44 @@ pub struct NetworkShuffleReadyExec { } impl NetworkShuffleExec { + /// Builds a new [NetworkShuffleExec] in "Pending" state. + /// + /// Typically, the `input` to this + /// node is a [RepartitionExec] with a [Partitioning::Hash] partition scheme. pub fn try_new( input: Arc, - partitioning: Partitioning, input_tasks: usize, ) -> Result { + if !matches!(input.output_partitioning(), Partitioning::Hash(_, _)) { + return plan_err!("NetworkShuffleExec input must be hash partitioned"); + } Ok(Self::Pending(NetworkShufflePendingExec { - repartition_exec: Arc::new(RepartitionExec::try_new(input, partitioning)?), - input_tasks, - })) - } - - pub fn from_repartition_exec( - r_exe: &Arc, - input_tasks: usize, - ) -> Result { - if !r_exe.as_any().is::() { - return plan_err!("Expected RepartitionExec"); - }; - - Ok(Self::Pending(NetworkShufflePendingExec { - repartition_exec: Arc::clone(r_exe), + input, input_tasks, })) } } impl NetworkBoundary for NetworkShuffleExec { - fn to_stage_info( - &self, - n_tasks: usize, - ) -> Result<(Arc, usize), DataFusionError> { + fn get_input_stage_info(&self, n_tasks: usize) -> Result { let Self::Pending(pending) = self else { return plan_err!("cannot only return wrapped child if on Pending state"); }; - let children = pending.repartition_exec.children(); - let Some(child) = children.first() else { - return plan_err!("RepartitionExec must have a child"); + // TODO: Avoid downcasting once https://github.com/apache/datafusion/pull/17990 is shipped. + let Some(r_exe) = pending.input.as_any().downcast_ref::() else { + return plan_err!("NetworkShuffleExec.input must always be RepartitionExec"); }; let next_stage_plan = Arc::new(RepartitionExec::try_new( - Arc::clone(child), - scale_partitioning(pending.repartition_exec.output_partitioning(), |p| { - p * n_tasks - }), + require_one_child(r_exe.children())?, + scale_partitioning(r_exe.partitioning(), |p| p * n_tasks), )?); - Ok((next_stage_plan, pending.input_tasks)) + Ok(InputStageInfo { + plan: next_stage_plan, + task_count: pending.input_tasks, + }) } fn with_input_task_count( @@ -205,7 +195,7 @@ impl NetworkBoundary for NetworkShuffleExec { ) -> Result, DataFusionError> { Ok(Arc::new(match self { Self::Pending(prev) => Self::Pending(NetworkShufflePendingExec { - repartition_exec: Arc::clone(&prev.repartition_exec), + input: Arc::clone(&prev.input), input_tasks, }), Self::Ready(_) => plan_err!( @@ -221,7 +211,7 @@ impl NetworkBoundary for NetworkShuffleExec { match self { Self::Pending(pending) => { let ready = NetworkShuffleReadyExec { - properties: pending.repartition_exec.properties().clone(), + properties: pending.input.properties().clone(), input_stage, metrics_collection: Default::default(), }; @@ -270,14 +260,14 @@ impl ExecutionPlan for NetworkShuffleExec { fn properties(&self) -> &PlanProperties { match self { - NetworkShuffleExec::Pending(v) => v.repartition_exec.properties(), + NetworkShuffleExec::Pending(v) => v.input.properties(), NetworkShuffleExec::Ready(v) => &v.properties, } } fn children(&self) -> Vec<&Arc> { match self { - NetworkShuffleExec::Pending(v) => vec![&v.repartition_exec], + NetworkShuffleExec::Pending(v) => vec![&v.input], NetworkShuffleExec::Ready(v) => match &v.input_stage.plan { MaybeEncodedPlan::Decoded(v) => vec![v], MaybeEncodedPlan::Encoded(_) => vec![], @@ -292,12 +282,12 @@ impl ExecutionPlan for NetworkShuffleExec { match self.as_ref() { Self::Pending(v) => { let mut v = v.clone(); - v.repartition_exec = require_one_child(&children)?; + v.input = require_one_child(children)?; Ok(Arc::new(Self::Pending(v))) } Self::Ready(v) => { let mut v = v.clone(); - v.input_stage.plan = MaybeEncodedPlan::Decoded(require_one_child(&children)?); + v.input_stage.plan = MaybeEncodedPlan::Decoded(require_one_child(children)?); Ok(Arc::new(Self::Ready(v))) } } diff --git a/src/execution_plans/partition_isolator.rs b/src/execution_plans/partition_isolator.rs index 81dd67cb..afa4cbf0 100644 --- a/src/execution_plans/partition_isolator.rs +++ b/src/execution_plans/partition_isolator.rs @@ -68,11 +68,11 @@ pub struct PartitionIsolatorReadyExec { } impl PartitionIsolatorExec { - pub fn new_pending(input: Arc) -> Self { + pub fn new(input: Arc) -> Self { PartitionIsolatorExec::Pending(PartitionIsolatorPendingExec { input }) } - pub fn ready(&self, n_tasks: usize) -> Result { + pub(crate) fn ready(&self, n_tasks: usize) -> Result { let Self::Pending(pending) = self else { return plan_err!("PartitionIsolatorExec is already ready"); }; @@ -101,7 +101,7 @@ impl PartitionIsolatorExec { input: Arc, n_tasks: usize, ) -> Result { - Self::new_pending(input).ready(n_tasks) + Self::new(input).ready(n_tasks) } pub(crate) fn partition_groups(input_partitions: usize, n_tasks: usize) -> Vec> { @@ -193,9 +193,9 @@ impl ExecutionPlan for PartitionIsolatorExec { } Ok(Arc::new(match self.as_ref() { - PartitionIsolatorExec::Pending(_) => Self::new_pending(children[0].clone()), + PartitionIsolatorExec::Pending(_) => Self::new(children[0].clone()), PartitionIsolatorExec::Ready(ready) => { - Self::new_pending(children[0].clone()).ready(ready.n_tasks)? + Self::new(children[0].clone()).ready(ready.n_tasks)? } })) } diff --git a/src/flight_service/do_get.rs b/src/flight_service/do_get.rs index 17c9c264..da894f2f 100644 --- a/src/flight_service/do_get.rs +++ b/src/flight_service/do_get.rs @@ -209,8 +209,8 @@ fn collect_and_create_metrics_flight_data( #[cfg(test)] mod tests { use super::*; - use crate::ExecutionTask; use crate::flight_service::session_builder::DefaultSessionBuilder; + use crate::stage::ExecutionTask; use arrow::datatypes::{Schema, SchemaRef}; use arrow_flight::Ticket; use datafusion::physical_expr::Partitioning; diff --git a/src/lib.rs b/src/lib.rs index 59f2f2b4..5d0c492e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,9 +17,11 @@ pub mod test_utils; pub use channel_resolver_ext::{BoxCloneSyncChannel, ChannelResolver}; pub use distributed_ext::DistributedExt; pub use distributed_physical_optimizer_rule::{ - DistributedPhysicalOptimizerRule, NetworkBoundaryExt, + DistributedPhysicalOptimizerRule, InputStageInfo, NetworkBoundary, NetworkBoundaryExt, +}; +pub use execution_plans::{ + DistributedExec, NetworkCoalesceExec, NetworkShuffleExec, PartitionIsolatorExec, }; -pub use execution_plans::{NetworkCoalesceExec, NetworkShuffleExec, PartitionIsolatorExec}; pub use flight_service::{ ArrowFlightEndpoint, DefaultSessionBuilder, DistributedSessionBuilder, DistributedSessionBuilderContext, MappedDistributedSessionBuilder, diff --git a/src/protobuf/distributed_codec.rs b/src/protobuf/distributed_codec.rs index e29422d5..ec51d8e7 100644 --- a/src/protobuf/distributed_codec.rs +++ b/src/protobuf/distributed_codec.rs @@ -1,8 +1,8 @@ use super::get_distributed_user_codecs; use crate::distributed_physical_optimizer_rule::NetworkBoundary; use crate::execution_plans::{NetworkCoalesceExec, NetworkCoalesceReady, NetworkShuffleReadyExec}; -use crate::stage::MaybeEncodedPlan; -use crate::{ExecutionTask, NetworkShuffleExec, PartitionIsolatorExec, Stage}; +use crate::stage::{ExecutionTask, MaybeEncodedPlan, Stage}; +use crate::{NetworkShuffleExec, PartitionIsolatorExec}; use bytes::Bytes; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::datatypes::SchemaRef; diff --git a/src/stage.rs b/src/stage.rs index d1cc3adc..e4db799c 100644 --- a/src/stage.rs +++ b/src/stage.rs @@ -72,11 +72,11 @@ use uuid::Uuid; #[derive(Debug, Clone)] pub struct Stage { /// Our query_id - pub query_id: Uuid, + pub(crate) query_id: Uuid, /// Our stage number - pub num: usize, + pub(crate) num: usize, /// The physical execution plan that this stage will execute. - pub plan: MaybeEncodedPlan, + pub(crate) plan: MaybeEncodedPlan, /// Our tasks which tell us how finely grained to execute the partitions in /// the plan pub tasks: Vec, @@ -86,7 +86,7 @@ pub struct Stage { pub struct ExecutionTask { /// The url of the worker that will execute this task. A None value is interpreted as /// unassigned. - pub url: Option, + pub(crate) url: Option, } /// An [ExecutionPlan] that can be either: @@ -94,7 +94,7 @@ pub struct ExecutionTask { /// - Encoded: the inner [ExecutionPlan] is stored as protobuf [Bytes]. Storing it this way allow us /// to thread it through the project and eventually send it through gRPC in a zero copy manner. #[derive(Debug, Clone)] -pub enum MaybeEncodedPlan { +pub(crate) enum MaybeEncodedPlan { /// The decoded [ExecutionPlan]. Decoded(Arc), /// A protobuf encoded version of the [ExecutionPlan]. The inner [Bytes] represent the full @@ -106,7 +106,7 @@ pub enum MaybeEncodedPlan { } impl MaybeEncodedPlan { - pub fn to_encoded(&self, codec: &dyn PhysicalExtensionCodec) -> Result { + pub(crate) fn to_encoded(&self, codec: &dyn PhysicalExtensionCodec) -> Result { Ok(match self { Self::Decoded(plan) => Self::Encoded( PhysicalPlanNode::try_from_physical_plan(Arc::clone(plan), codec)? @@ -117,14 +117,14 @@ impl MaybeEncodedPlan { }) } - pub fn decoded(&self) -> Result<&Arc> { + pub(crate) fn decoded(&self) -> Result<&Arc> { match self { MaybeEncodedPlan::Decoded(v) => Ok(v), MaybeEncodedPlan::Encoded(_) => plan_err!("Expected plan to be in a decoded state"), } } - pub fn encoded(&self) -> Result<&Bytes> { + pub(crate) fn encoded(&self) -> Result<&Bytes> { match self { MaybeEncodedPlan::Decoded(_) => plan_err!("Expected plan to be in a encoded state"), MaybeEncodedPlan::Encoded(v) => Ok(v), diff --git a/src/test_utils/plans.rs b/src/test_utils/plans.rs index b2a61e14..c182c900 100644 --- a/src/test_utils/plans.rs +++ b/src/test_utils/plans.rs @@ -7,7 +7,8 @@ use std::sync::Arc; use crate::distributed_physical_optimizer_rule::NetworkBoundaryExt; use crate::execution_plans::DistributedExec; -use crate::{Stage, protobuf::StageKey}; +use crate::protobuf::StageKey; +use crate::stage::Stage; /// count_plan_nodes counts the number of execution plan nodes in a plan using BFS traversal. /// This does NOT traverse child stages, only the execution plan tree within this stage. diff --git a/tests/custom_config_extension.rs b/tests/custom_config_extension.rs index 3c77d578..f00c43e8 100644 --- a/tests/custom_config_extension.rs +++ b/tests/custom_config_extension.rs @@ -9,6 +9,7 @@ mod tests { }; use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; + use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, execute_stream, @@ -47,8 +48,10 @@ mod tests { for size in [1, 2, 3] { plan = Arc::new(NetworkShuffleExec::try_new( - plan, - Partitioning::RoundRobinBatch(10), + Arc::new(RepartitionExec::try_new( + plan, + Partitioning::Hash(vec![], 10), + )?), size, )?); } diff --git a/tests/custom_extension_codec.rs b/tests/custom_extension_codec.rs index f8167806..402674e1 100644 --- a/tests/custom_extension_codec.rs +++ b/tests/custom_extension_codec.rs @@ -66,8 +66,8 @@ mod tests { │ RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=10 │ [Stage 2] => NetworkShuffleExec: output_partitions=10, input_tasks=10 └────────────────────────────────────────────────── - ┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t1:[p10,p11,p12,p13,p14,p15,p16,p17,p18,p19] t2:[p20,p21,p22,p23,p24,p25,p26,p27,p28,p29] t3:[p30,p31,p32,p33,p34,p35,p36,p37,p38,p39] t4:[p40,p41,p42,p43,p44,p45,p46,p47,p48,p49] t5:[p50,p51,p52,p53,p54,p55,p56,p57,p58,p59] t6:[p60,p61,p62,p63,p64,p65,p66,p67,p68,p69] t7:[p70,p71,p72,p73,p74,p75,p76,p77,p78,p79] t8:[p80,p81,p82,p83,p84,p85,p86,p87,p88,p89] t9:[p90,p91,p92,p93,p94,p95,p96,p97,p98,p99] - │ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + ┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t1:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t2:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t3:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t4:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t5:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t6:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t7:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t8:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t9:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] + │ RepartitionExec: partitioning=Hash([], 10), input_partitions=1 │ SortExec: expr=[numbers@0 DESC NULLS LAST], preserve_partitioning=[false] │ [Stage 1] => NetworkShuffleExec: output_partitions=1, input_tasks=1 └────────────────────────────────────────────────── @@ -114,7 +114,7 @@ mod tests { let mut plan: Arc = Arc::new(Int64ListExec::new(vec![1, 2, 3, 4, 5, 6])); if distributed { - plan = Arc::new(PartitionIsolatorExec::new_pending(plan)); + plan = Arc::new(PartitionIsolatorExec::new(plan)); } plan = Arc::new(FilterExec::try_new( @@ -128,8 +128,10 @@ mod tests { if distributed { plan = Arc::new(NetworkShuffleExec::try_new( - Arc::clone(&plan), - Partitioning::Hash(vec![col("numbers", &plan.schema())?], 1), + Arc::new(RepartitionExec::try_new( + Arc::clone(&plan), + Partitioning::Hash(vec![col("numbers", &plan.schema())?], 1), + )?), 10, )?); } @@ -145,8 +147,10 @@ mod tests { if distributed { plan = Arc::new(NetworkShuffleExec::try_new( - plan, - Partitioning::RoundRobinBatch(10), + Arc::new(RepartitionExec::try_new( + plan, + Partitioning::Hash(vec![], 10), + )?), 10, )?); diff --git a/tests/error_propagation.rs b/tests/error_propagation.rs index 423e882b..5a2a3706 100644 --- a/tests/error_propagation.rs +++ b/tests/error_propagation.rs @@ -7,6 +7,7 @@ mod tests { }; use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; + use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, execute_stream, @@ -43,8 +44,10 @@ mod tests { for size in [1, 2, 3] { plan = Arc::new(NetworkShuffleExec::try_new( - plan, - Partitioning::RoundRobinBatch(size), + Arc::new(RepartitionExec::try_new( + plan, + Partitioning::Hash(vec![], size), + )?), size, )?); } diff --git a/tests/highly_distributed_query.rs b/tests/highly_distributed_query.rs index c352bed1..37a9a8a9 100644 --- a/tests/highly_distributed_query.rs +++ b/tests/highly_distributed_query.rs @@ -1,6 +1,7 @@ #[cfg(all(feature = "integration", test))] mod tests { use datafusion::physical_expr::Partitioning; + use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::{displayable, execute_stream}; use datafusion_distributed::test_utils::localhost::start_localhost_context; use datafusion_distributed::test_utils::parquet::register_parquet_tables; @@ -25,8 +26,10 @@ mod tests { let mut physical_distributed = physical.clone(); for size in [1, 10, 5] { physical_distributed = Arc::new(NetworkShuffleExec::try_new( - physical_distributed, - Partitioning::RoundRobinBatch(size), + Arc::new(RepartitionExec::try_new( + physical_distributed, + Partitioning::Hash(vec![], size), + )?), size, )?); } diff --git a/tests/stateful_execution_plan.rs b/tests/stateful_execution_plan.rs index b9530e92..22e01bcd 100644 --- a/tests/stateful_execution_plan.rs +++ b/tests/stateful_execution_plan.rs @@ -70,8 +70,8 @@ mod tests { │ RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=10 │ [Stage 2] => NetworkShuffleExec: output_partitions=10, input_tasks=10 └────────────────────────────────────────────────── - ┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t1:[p10,p11,p12,p13,p14,p15,p16,p17,p18,p19] t2:[p20,p21,p22,p23,p24,p25,p26,p27,p28,p29] t3:[p30,p31,p32,p33,p34,p35,p36,p37,p38,p39] t4:[p40,p41,p42,p43,p44,p45,p46,p47,p48,p49] t5:[p50,p51,p52,p53,p54,p55,p56,p57,p58,p59] t6:[p60,p61,p62,p63,p64,p65,p66,p67,p68,p69] t7:[p70,p71,p72,p73,p74,p75,p76,p77,p78,p79] t8:[p80,p81,p82,p83,p84,p85,p86,p87,p88,p89] t9:[p90,p91,p92,p93,p94,p95,p96,p97,p98,p99] - │ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + ┌───── Stage 2 ── Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t1:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t2:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t3:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t4:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t5:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t6:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t7:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t8:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] t9:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9] + │ RepartitionExec: partitioning=Hash([], 10), input_partitions=1 │ SortExec: expr=[numbers@0 DESC NULLS LAST], preserve_partitioning=[false] │ [Stage 1] => NetworkShuffleExec: output_partitions=1, input_tasks=1 └────────────────────────────────────────────────── @@ -103,7 +103,7 @@ mod tests { let mut plan: Arc = Arc::new(StatefulInt64ListExec::new(vec![1, 2, 3, 4, 5, 6])); - plan = Arc::new(PartitionIsolatorExec::new_pending(plan)); + plan = Arc::new(PartitionIsolatorExec::new(plan)); plan = Arc::new(FilterExec::try_new( Arc::new(BinaryExpr::new( @@ -115,8 +115,10 @@ mod tests { )?); plan = Arc::new(NetworkShuffleExec::try_new( - Arc::clone(&plan), - Partitioning::Hash(vec![col("numbers", &plan.schema())?], 1), + Arc::new(RepartitionExec::try_new( + Arc::clone(&plan), + Partitioning::Hash(vec![col("numbers", &plan.schema())?], 1), + )?), 10, )?); @@ -130,8 +132,10 @@ mod tests { )); plan = Arc::new(NetworkShuffleExec::try_new( - plan, - Partitioning::RoundRobinBatch(10), + Arc::new(RepartitionExec::try_new( + plan, + Partitioning::Hash(vec![], 10), + )?), 10, )?);