Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,14 @@ impl DistributedSessionBuilder for RunOpt {
builder = builder.with_physical_optimizer_rule(Arc::new(InMemoryDataSourceRule));
}
if !self.workers.is_empty() {
let rule = DistributedPhysicalOptimizerRule::new()
.with_network_coalesce_tasks(self.coalesce_tasks.unwrap_or(self.workers.len()))
.with_network_shuffle_tasks(self.shuffle_tasks.unwrap_or(self.workers.len()));
builder = builder.with_physical_optimizer_rule(Arc::new(rule));
builder = builder
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_network_coalesce_tasks(
self.coalesce_tasks.unwrap_or(self.workers.len()),
)
.with_distributed_network_shuffle_tasks(
self.shuffle_tasks.unwrap_or(self.workers.len()),
);
}

Ok(builder
Expand Down
10 changes: 9 additions & 1 deletion examples/in_memory_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ struct Args {

#[structopt(long)]
explain: bool,

#[structopt(long, default_value = "3")]
network_shuffle_tasks: usize,

#[structopt(long, default_value = "3")]
network_coalesce_tasks: usize,
}

#[tokio::main]
Expand All @@ -37,7 +43,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
let state = SessionStateBuilder::new()
.with_default_features()
.with_distributed_channel_resolver(InMemoryChannelResolver::new())
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule::new()))
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_network_coalesce_tasks(args.network_shuffle_tasks)
.with_distributed_network_shuffle_tasks(args.network_coalesce_tasks)
.build();

let ctx = SessionContext::from(state);
Expand Down
8 changes: 3 additions & 5 deletions examples/localhost_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
let state = SessionStateBuilder::new()
.with_default_features()
.with_distributed_channel_resolver(localhost_resolver)
.with_physical_optimizer_rule(Arc::new(
DistributedPhysicalOptimizerRule::new()
.with_network_coalesce_tasks(args.network_coalesce_tasks)
.with_network_shuffle_tasks(args.network_shuffle_tasks),
))
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_network_coalesce_tasks(args.network_coalesce_tasks)
.with_distributed_network_shuffle_tasks(args.network_shuffle_tasks)
.build();

let ctx = SessionContext::from(state);
Expand Down
77 changes: 76 additions & 1 deletion src/distributed_ext.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use crate::ChannelResolver;
use crate::channel_resolver_ext::set_distributed_channel_resolver;
use crate::config_extension_ext::{
set_distributed_option_extension, set_distributed_option_extension_from_headers,
};
use crate::distributed_planner::{
set_distributed_network_coalesce_tasks, set_distributed_network_shuffle_tasks,
};
use crate::protobuf::{set_distributed_user_codec, set_distributed_user_codec_arc};
use crate::{ChannelResolver, IntoPlanDependentUsize};
use datafusion::common::DataFusionError;
use datafusion::config::ConfigExtension;
use datafusion::execution::{SessionState, SessionStateBuilder};
Expand Down Expand Up @@ -221,6 +224,32 @@ pub trait DistributedExt: Sized {
&mut self,
resolver: T,
);

/// Upon merging multiple tasks into one, this defines how many tasks are merged.
/// ```text
/// ( task 1 )
/// ▲
/// ┌───────────┴──────────┐
/// ( task 1 ) ( task 2 ) ( task 3 ) N tasks
/// ```
/// This parameter defines N
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(self, tasks: T) -> Self;

/// Same as [DistributedExt::with_distributed_network_coalesce_tasks] but with an in-place mutation.
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);

/// Upon shuffling data, this defines how many tasks are employed into performing the shuffling.
/// ```text
/// ( task 1 ) ( task 2 ) ( task 3 )
/// ▲ ▲ ▲
/// └────┬──────┴─────┬────┘
/// ( task 1 ) ( task 2 ) N tasks
/// ```
/// This parameter defines N
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(self, tasks: T) -> Self;

/// Same as [DistributedExt::with_distributed_network_shuffle_tasks] but with an in-place mutation.
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
}

impl DistributedExt for SessionConfig {
Expand Down Expand Up @@ -253,6 +282,14 @@ impl DistributedExt for SessionConfig {
set_distributed_channel_resolver(self, resolver)
}

fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T) {
set_distributed_network_coalesce_tasks(self, tasks)
}

fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T) {
set_distributed_network_shuffle_tasks(self, tasks)
}

delegate! {
to self {
#[call(set_distributed_option_extension)]
Expand All @@ -274,6 +311,14 @@ impl DistributedExt for SessionConfig {
#[call(set_distributed_channel_resolver)]
#[expr($;self)]
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;

#[call(set_distributed_network_coalesce_tasks)]
#[expr($;self)]
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;

#[call(set_distributed_network_shuffle_tasks)]
#[expr($;self)]
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
}
}
}
Expand Down Expand Up @@ -305,6 +350,16 @@ impl DistributedExt for SessionStateBuilder {
#[call(set_distributed_channel_resolver)]
#[expr($;self)]
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;

fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
#[call(set_distributed_network_coalesce_tasks)]
#[expr($;self)]
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;

fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
#[call(set_distributed_network_shuffle_tasks)]
#[expr($;self)]
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
}
}
}
Expand Down Expand Up @@ -336,6 +391,16 @@ impl DistributedExt for SessionState {
#[call(set_distributed_channel_resolver)]
#[expr($;self)]
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;

fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
#[call(set_distributed_network_coalesce_tasks)]
#[expr($;self)]
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;

fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
#[call(set_distributed_network_shuffle_tasks)]
#[expr($;self)]
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
}
}
}
Expand Down Expand Up @@ -367,6 +432,16 @@ impl DistributedExt for SessionContext {
#[call(set_distributed_channel_resolver)]
#[expr($;self)]
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(self, resolver: T) -> Self;

fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
#[call(set_distributed_network_coalesce_tasks)]
#[expr($;self)]
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(self, tasks: T) -> Self;

fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
#[call(set_distributed_network_shuffle_tasks)]
#[expr($;self)]
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(self, tasks: T) -> Self;
}
}
}
126 changes: 126 additions & 0 deletions src/distributed_planner/distributed_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use datafusion::common::extensions_options;
use datafusion::config::{ConfigExtension, ConfigField, Visit, default_config_transform};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionConfig;
use std::fmt::{Debug, Display, Formatter};
use std::sync::Arc;

#[derive(Clone)]
#[allow(clippy::type_complexity)]
pub struct PlanDependentUsize(
pub(crate) Arc<dyn Fn(&Arc<dyn ExecutionPlan>) -> usize + Send + Sync>,
);

impl PlanDependentUsize {
pub fn call(&self, plan: &Arc<dyn ExecutionPlan>) -> usize {
self.0(plan)
}
}

pub trait IntoPlanDependentUsize {
fn into_plan_dependent_usize(self) -> PlanDependentUsize;
}

impl IntoPlanDependentUsize for usize {
fn into_plan_dependent_usize(self) -> PlanDependentUsize {
PlanDependentUsize(Arc::new(move |_| self))
}
}

impl<T: Fn(&Arc<dyn ExecutionPlan>) -> usize + Send + Sync + 'static> IntoPlanDependentUsize for T {
fn into_plan_dependent_usize(self) -> PlanDependentUsize {
PlanDependentUsize(Arc::new(self))
}
}

impl Default for PlanDependentUsize {
fn default() -> Self {
PlanDependentUsize(Arc::new(|_| 0))
}
}

impl Debug for PlanDependentUsize {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "PlanDependantUsize")
}
}

impl Display for PlanDependentUsize {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "PlanDependantUsize")
}
}

impl ConfigField for PlanDependentUsize {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description);
}

fn set(&mut self, _: &str, value: &str) -> datafusion::common::Result<()> {
*self = default_config_transform::<usize>(value)?.into_plan_dependent_usize();
Ok(())
}
}

extensions_options! {
pub struct DistributedConfig {
/// Upon shuffling data, this defines how many tasks are employed into performing the shuffling.
/// ```text
/// ( task 1 ) ( task 2 ) ( task 3 )
/// ▲ ▲ ▲
/// └────┬──────┴─────┬────┘
/// ( task 1 ) ( task 2 ) N tasks
/// ```
/// This parameter defines N
pub network_shuffle_tasks: Option<PlanDependentUsize>, default = None
/// Upon merging multiple tasks into one, this defines how many tasks are merged.
/// ```text
/// ( task 1 )
/// ▲
/// ┌───────────┴──────────┐
/// ( task 1 ) ( task 2 ) ( task 3 ) N tasks
/// ```
/// This parameter defines N
pub network_coalesce_tasks: Option<PlanDependentUsize>, default = None
}
}

impl ConfigExtension for DistributedConfig {
const PREFIX: &'static str = "distributed";
}

impl DistributedConfig {
/// Sets the amount of tasks used in a network shuffle operation.
pub fn with_network_shuffle_tasks(mut self, tasks: impl IntoPlanDependentUsize) -> Self {
self.network_shuffle_tasks = Some(tasks.into_plan_dependent_usize());
self
}

/// Sets the amount of tasks used in a network coalesce operation.
pub fn with_network_coalesce_tasks(mut self, tasks: impl IntoPlanDependentUsize) -> Self {
self.network_coalesce_tasks = Some(tasks.into_plan_dependent_usize());
self
}
}

pub(crate) fn set_distributed_network_coalesce_tasks(
cfg: &mut SessionConfig,
tasks: impl IntoPlanDependentUsize,
) {
let ext = &mut cfg.options_mut().extensions;
let Some(prev) = ext.get_mut::<DistributedConfig>() else {
return ext.insert(DistributedConfig::default().with_network_coalesce_tasks(tasks));
};
prev.network_coalesce_tasks = Some(tasks.into_plan_dependent_usize());
}

pub(crate) fn set_distributed_network_shuffle_tasks(
cfg: &mut SessionConfig,
tasks: impl IntoPlanDependentUsize,
) {
let ext = &mut cfg.options_mut().extensions;
let Some(prev) = ext.get_mut::<DistributedConfig>() else {
return ext.insert(DistributedConfig::default().with_network_shuffle_tasks(tasks));
};
prev.network_shuffle_tasks = Some(tasks.into_plan_dependent_usize());
}
Loading