Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions src/distributed_ext.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::ChannelResolver;
use crate::channel_resolver_ext::set_distributed_channel_resolver;
use crate::config_extension_ext::{
set_distributed_option_extension, set_distributed_option_extension_from_headers,
Expand All @@ -6,7 +7,6 @@ use crate::distributed_planner::{
set_distributed_network_coalesce_tasks, set_distributed_network_shuffle_tasks,
};
use crate::protobuf::{set_distributed_user_codec, set_distributed_user_codec_arc};
use crate::{ChannelResolver, IntoPlanDependentUsize};
use datafusion::common::DataFusionError;
use datafusion::config::ConfigExtension;
use datafusion::execution::{SessionState, SessionStateBuilder};
Expand Down Expand Up @@ -233,10 +233,10 @@ pub trait DistributedExt: Sized {
/// ( task 1 ) ( task 2 ) ( task 3 ) N tasks
/// ```
/// This parameter defines N
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(self, tasks: T) -> Self;
fn with_distributed_network_coalesce_tasks(self, tasks: usize) -> Self;

/// Same as [DistributedExt::with_distributed_network_coalesce_tasks] but with an in-place mutation.
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
fn set_distributed_network_coalesce_tasks(&mut self, tasks: usize);

/// Upon shuffling data, this defines how many tasks are employed into performing the shuffling.
/// ```text
Expand All @@ -246,10 +246,10 @@ pub trait DistributedExt: Sized {
/// ( task 1 ) ( task 2 ) N tasks
/// ```
/// This parameter defines N
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(self, tasks: T) -> Self;
fn with_distributed_network_shuffle_tasks(self, tasks: usize) -> Self;

/// Same as [DistributedExt::with_distributed_network_shuffle_tasks] but with an in-place mutation.
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
fn set_distributed_network_shuffle_tasks(&mut self, tasks: usize);
}

impl DistributedExt for SessionConfig {
Expand Down Expand Up @@ -282,11 +282,11 @@ impl DistributedExt for SessionConfig {
set_distributed_channel_resolver(self, resolver)
}

fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T) {
fn set_distributed_network_coalesce_tasks(&mut self, tasks: usize) {
set_distributed_network_coalesce_tasks(self, tasks)
}

fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T) {
fn set_distributed_network_shuffle_tasks(&mut self, tasks: usize) {
set_distributed_network_shuffle_tasks(self, tasks)
}

Expand Down Expand Up @@ -314,11 +314,11 @@ impl DistributedExt for SessionConfig {

#[call(set_distributed_network_coalesce_tasks)]
#[expr($;self)]
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
fn with_distributed_network_coalesce_tasks(mut self, tasks: usize) -> Self;

#[call(set_distributed_network_shuffle_tasks)]
#[expr($;self)]
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
fn with_distributed_network_shuffle_tasks(mut self, tasks: usize) -> Self;
}
}
}
Expand Down Expand Up @@ -351,15 +351,15 @@ impl DistributedExt for SessionStateBuilder {
#[expr($;self)]
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;

fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
fn set_distributed_network_coalesce_tasks(&mut self, tasks: usize);
#[call(set_distributed_network_coalesce_tasks)]
#[expr($;self)]
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
fn with_distributed_network_coalesce_tasks(mut self, tasks: usize) -> Self;

fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
fn set_distributed_network_shuffle_tasks(&mut self, tasks: usize);
#[call(set_distributed_network_shuffle_tasks)]
#[expr($;self)]
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
fn with_distributed_network_shuffle_tasks(mut self, tasks: usize) -> Self;
}
}
}
Expand Down Expand Up @@ -392,15 +392,15 @@ impl DistributedExt for SessionState {
#[expr($;self)]
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;

fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
fn set_distributed_network_coalesce_tasks(&mut self, tasks: usize);
#[call(set_distributed_network_coalesce_tasks)]
#[expr($;self)]
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
fn with_distributed_network_coalesce_tasks(mut self, tasks: usize) -> Self;

fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
fn set_distributed_network_shuffle_tasks(&mut self, tasks: usize);
#[call(set_distributed_network_shuffle_tasks)]
#[expr($;self)]
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
fn with_distributed_network_shuffle_tasks(mut self, tasks: usize) -> Self;
}
}
}
Expand Down Expand Up @@ -433,15 +433,15 @@ impl DistributedExt for SessionContext {
#[expr($;self)]
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(self, resolver: T) -> Self;

fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
fn set_distributed_network_coalesce_tasks(&mut self, tasks: usize);
#[call(set_distributed_network_coalesce_tasks)]
#[expr($;self)]
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(self, tasks: T) -> Self;
fn with_distributed_network_coalesce_tasks(self, tasks: usize) -> Self;

fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
fn set_distributed_network_shuffle_tasks(&mut self, tasks: usize);
#[call(set_distributed_network_shuffle_tasks)]
#[expr($;self)]
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(self, tasks: T) -> Self;
fn with_distributed_network_shuffle_tasks(self, tasks: usize) -> Self;
}
}
}
89 changes: 12 additions & 77 deletions src/distributed_planner/distributed_config.rs
Original file line number Diff line number Diff line change
@@ -1,66 +1,7 @@
use datafusion::common::extensions_options;
use datafusion::config::{ConfigExtension, ConfigField, Visit, default_config_transform};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::config::ConfigExtension;
use datafusion::prelude::SessionConfig;
use std::fmt::{Debug, Display, Formatter};
use std::sync::Arc;

#[derive(Clone)]
#[allow(clippy::type_complexity)]
pub struct PlanDependentUsize(
pub(crate) Arc<dyn Fn(&Arc<dyn ExecutionPlan>) -> usize + Send + Sync>,
);

impl PlanDependentUsize {
pub fn call(&self, plan: &Arc<dyn ExecutionPlan>) -> usize {
self.0(plan)
}
}

pub trait IntoPlanDependentUsize {
fn into_plan_dependent_usize(self) -> PlanDependentUsize;
}

impl IntoPlanDependentUsize for usize {
fn into_plan_dependent_usize(self) -> PlanDependentUsize {
PlanDependentUsize(Arc::new(move |_| self))
}
}

impl<T: Fn(&Arc<dyn ExecutionPlan>) -> usize + Send + Sync + 'static> IntoPlanDependentUsize for T {
fn into_plan_dependent_usize(self) -> PlanDependentUsize {
PlanDependentUsize(Arc::new(self))
}
}

impl Default for PlanDependentUsize {
fn default() -> Self {
PlanDependentUsize(Arc::new(|_| 0))
}
}

impl Debug for PlanDependentUsize {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "PlanDependantUsize")
}
}

impl Display for PlanDependentUsize {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "PlanDependantUsize")
}
}

impl ConfigField for PlanDependentUsize {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description);
}

fn set(&mut self, _: &str, value: &str) -> datafusion::common::Result<()> {
*self = default_config_transform::<usize>(value)?.into_plan_dependent_usize();
Ok(())
}
}
use std::fmt::Debug;

extensions_options! {
pub struct DistributedConfig {
Expand All @@ -72,7 +13,7 @@ extensions_options! {
/// ( task 1 ) ( task 2 ) N tasks
/// ```
/// This parameter defines N
pub network_shuffle_tasks: Option<PlanDependentUsize>, default = None
pub network_shuffle_tasks: Option<usize>, default = None
/// Upon merging multiple tasks into one, this defines how many tasks are merged.
/// ```text
/// ( task 1 )
Expand All @@ -81,7 +22,7 @@ extensions_options! {
/// ( task 1 ) ( task 2 ) ( task 3 ) N tasks
/// ```
/// This parameter defines N
pub network_coalesce_tasks: Option<PlanDependentUsize>, default = None
pub network_coalesce_tasks: Option<usize>, default = None
}
}

Expand All @@ -91,36 +32,30 @@ impl ConfigExtension for DistributedConfig {

impl DistributedConfig {
/// Sets the amount of tasks used in a network shuffle operation.
pub fn with_network_shuffle_tasks(mut self, tasks: impl IntoPlanDependentUsize) -> Self {
self.network_shuffle_tasks = Some(tasks.into_plan_dependent_usize());
pub fn with_network_shuffle_tasks(mut self, tasks: usize) -> Self {
self.network_shuffle_tasks = Some(tasks);
self
}

/// Sets the amount of tasks used in a network coalesce operation.
pub fn with_network_coalesce_tasks(mut self, tasks: impl IntoPlanDependentUsize) -> Self {
self.network_coalesce_tasks = Some(tasks.into_plan_dependent_usize());
pub fn with_network_coalesce_tasks(mut self, tasks: usize) -> Self {
self.network_coalesce_tasks = Some(tasks);
self
}
}

pub(crate) fn set_distributed_network_coalesce_tasks(
cfg: &mut SessionConfig,
tasks: impl IntoPlanDependentUsize,
) {
pub(crate) fn set_distributed_network_coalesce_tasks(cfg: &mut SessionConfig, tasks: usize) {
let ext = &mut cfg.options_mut().extensions;
let Some(prev) = ext.get_mut::<DistributedConfig>() else {
return ext.insert(DistributedConfig::default().with_network_coalesce_tasks(tasks));
};
prev.network_coalesce_tasks = Some(tasks.into_plan_dependent_usize());
prev.network_coalesce_tasks = Some(tasks);
}

pub(crate) fn set_distributed_network_shuffle_tasks(
cfg: &mut SessionConfig,
tasks: impl IntoPlanDependentUsize,
) {
pub(crate) fn set_distributed_network_shuffle_tasks(cfg: &mut SessionConfig, tasks: usize) {
let ext = &mut cfg.options_mut().extensions;
let Some(prev) = ext.get_mut::<DistributedConfig>() else {
return ext.insert(DistributedConfig::default().with_network_shuffle_tasks(tasks));
};
prev.network_shuffle_tasks = Some(tasks.into_plan_dependent_usize());
prev.network_shuffle_tasks = Some(tasks);
}
15 changes: 6 additions & 9 deletions src/distributed_planner/distributed_physical_optimizer_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,13 @@ pub fn apply_network_boundaries(
}

// If this is a hash RepartitionExec, introduce a shuffle.
if let (Some(node), Some(tasks)) = (
if let (Some(node), Some(input_tasks)) = (
plan.as_any().downcast_ref::<RepartitionExec>(),
cfg.network_shuffle_tasks.clone(),
cfg.network_shuffle_tasks,
) {
if !matches!(node.partitioning(), Partitioning::Hash(_, _)) {
return Ok(Transformed::no(plan));
}
let input_tasks = tasks.0(&plan);
if input_tasks == 0 {
return Ok(Transformed::no(plan));
}
Expand All @@ -129,9 +128,9 @@ pub fn apply_network_boundaries(
// If this is a CoalescePartitionsExec, it means that the original plan is trying to
// merge all partitions into one. We need to go one step ahead and also merge all tasks
// into one.
if let (Some(node), Some(tasks)) = (
if let (Some(node), Some(input_tasks)) = (
plan.as_any().downcast_ref::<CoalescePartitionsExec>(),
cfg.network_coalesce_tasks.clone(),
cfg.network_coalesce_tasks,
) {
// If the immediate child is a PartitionIsolatorExec, it means that the rest of the
// plan is just a couple of non-computational nodes that are probably not worth
Expand All @@ -140,7 +139,6 @@ pub fn apply_network_boundaries(
return Ok(Transformed::no(plan));
}

let input_tasks = tasks.0(&plan);
if input_tasks == 0 {
return Ok(Transformed::no(plan));
}
Expand All @@ -154,11 +152,10 @@ pub fn apply_network_boundaries(
// The SortPreservingMergeExec node will try to coalesce all partitions into just 1.
// We need to account for it and help it by also coalescing all tasks into one, therefore
// a NetworkCoalesceExec is introduced.
if let (Some(node), Some(tasks)) = (
if let (Some(node), Some(input_tasks)) = (
plan.as_any().downcast_ref::<SortPreservingMergeExec>(),
cfg.network_coalesce_tasks.clone(),
cfg.network_coalesce_tasks,
) {
let input_tasks = tasks.0(&plan);
if input_tasks == 0 {
return Ok(Transformed::no(plan));
}
Expand Down
2 changes: 1 addition & 1 deletion src/distributed_planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub(crate) use distributed_config::{
set_distributed_network_coalesce_tasks, set_distributed_network_shuffle_tasks,
};

pub use distributed_config::{DistributedConfig, IntoPlanDependentUsize, PlanDependentUsize};
pub use distributed_config::DistributedConfig;
pub use distributed_physical_optimizer_rule::{
DistributedPhysicalOptimizerRule, apply_network_boundaries, distribute_plan,
};
Expand Down
5 changes: 2 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ pub mod test_utils;
pub use channel_resolver_ext::{BoxCloneSyncChannel, ChannelResolver};
pub use distributed_ext::DistributedExt;
pub use distributed_planner::{
DistributedConfig, DistributedPhysicalOptimizerRule, InputStageInfo, IntoPlanDependentUsize,
NetworkBoundary, NetworkBoundaryExt, PlanDependentUsize, apply_network_boundaries,
distribute_plan,
DistributedConfig, DistributedPhysicalOptimizerRule, InputStageInfo, NetworkBoundary,
NetworkBoundaryExt, apply_network_boundaries, distribute_plan,
};
pub use execution_plans::{
DistributedExec, NetworkCoalesceExec, NetworkShuffleExec, PartitionIsolatorExec,
Expand Down