Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,14 @@ impl DistributedSessionBuilder for RunOpt {
builder = builder.with_physical_optimizer_rule(Arc::new(InMemoryDataSourceRule));
}
if !self.workers.is_empty() {
let rule = DistributedPhysicalOptimizerRule::new()
.with_network_coalesce_tasks(self.coalesce_tasks.unwrap_or(self.workers.len()))
.with_network_shuffle_tasks(self.shuffle_tasks.unwrap_or(self.workers.len()));
builder = builder.with_physical_optimizer_rule(Arc::new(rule));
builder = builder
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_network_coalesce_tasks(
self.coalesce_tasks.unwrap_or(self.workers.len()),
)
.with_distributed_network_shuffle_tasks(
self.shuffle_tasks.unwrap_or(self.workers.len()),
);
}

Ok(builder
Expand Down
10 changes: 9 additions & 1 deletion examples/in_memory_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ struct Args {

#[structopt(long)]
explain: bool,

#[structopt(long, default_value = "3")]
network_shuffle_tasks: usize,

#[structopt(long, default_value = "3")]
network_coalesce_tasks: usize,
}

#[tokio::main]
Expand All @@ -37,7 +43,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
let state = SessionStateBuilder::new()
.with_default_features()
.with_distributed_channel_resolver(InMemoryChannelResolver::new())
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule::new()))
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_network_coalesce_tasks(args.network_shuffle_tasks)
.with_distributed_network_shuffle_tasks(args.network_coalesce_tasks)
.build();

let ctx = SessionContext::from(state);
Expand Down
8 changes: 3 additions & 5 deletions examples/localhost_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
let state = SessionStateBuilder::new()
.with_default_features()
.with_distributed_channel_resolver(localhost_resolver)
.with_physical_optimizer_rule(Arc::new(
DistributedPhysicalOptimizerRule::new()
.with_network_coalesce_tasks(args.network_coalesce_tasks)
.with_network_shuffle_tasks(args.network_shuffle_tasks),
))
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_network_coalesce_tasks(args.network_coalesce_tasks)
.with_distributed_network_shuffle_tasks(args.network_shuffle_tasks)
.build();

let ctx = SessionContext::from(state);
Expand Down
77 changes: 76 additions & 1 deletion src/distributed_ext.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use crate::ChannelResolver;
use crate::channel_resolver_ext::set_distributed_channel_resolver;
use crate::config_extension_ext::{
set_distributed_option_extension, set_distributed_option_extension_from_headers,
};
use crate::distributed_planner::{
set_distributed_network_coalesce_tasks, set_distributed_network_shuffle_tasks,
};
use crate::protobuf::{set_distributed_user_codec, set_distributed_user_codec_arc};
use crate::{ChannelResolver, IntoPlanDependantUsize};
use datafusion::common::DataFusionError;
use datafusion::config::ConfigExtension;
use datafusion::execution::{SessionState, SessionStateBuilder};
Expand Down Expand Up @@ -221,6 +224,32 @@ pub trait DistributedExt: Sized {
&mut self,
resolver: T,
);

/// Upon merging multiple tasks into one, this defines how many tasks are merged.
/// ```text
/// ( task 1 )
/// ▲
/// ┌───────────┴──────────┐
/// ( task 1 ) ( task 2 ) ( task 3 ) N tasks
/// ```
/// This parameter defines N
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependantUsize>(self, tasks: T) -> Self;

/// Same as [DistributedExt::with_distributed_network_coalesce_tasks] but with an in-place mutation.
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependantUsize>(&mut self, tasks: T);

/// Upon shuffling data, this defines how many tasks are employed into performing the shuffling.
/// ```text
/// ( task 1 ) ( task 2 ) ( task 3 )
/// ▲ ▲ ▲
/// └────┬──────┴─────┬────┘
/// ( task 1 ) ( task 2 ) N tasks
/// ```
/// This parameter defines N
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependantUsize>(self, tasks: T) -> Self;

/// Same as [DistributedExt::with_distributed_network_shuffle_tasks] but with an in-place mutation.
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependantUsize>(&mut self, tasks: T);
}

impl DistributedExt for SessionConfig {
Expand Down Expand Up @@ -253,6 +282,14 @@ impl DistributedExt for SessionConfig {
set_distributed_channel_resolver(self, resolver)
}

fn set_distributed_network_coalesce_tasks<T: IntoPlanDependantUsize>(&mut self, tasks: T) {
set_distributed_network_coalesce_tasks(self, tasks)
}

fn set_distributed_network_shuffle_tasks<T: IntoPlanDependantUsize>(&mut self, tasks: T) {
set_distributed_network_shuffle_tasks(self, tasks)
}

delegate! {
to self {
#[call(set_distributed_option_extension)]
Expand All @@ -274,6 +311,14 @@ impl DistributedExt for SessionConfig {
#[call(set_distributed_channel_resolver)]
#[expr($;self)]
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;

#[call(set_distributed_network_coalesce_tasks)]
#[expr($;self)]
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependantUsize>(mut self, tasks: T) -> Self;

#[call(set_distributed_network_shuffle_tasks)]
#[expr($;self)]
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependantUsize>(mut self, tasks: T) -> Self;
}
}
}
Expand Down Expand Up @@ -305,6 +350,16 @@ impl DistributedExt for SessionStateBuilder {
#[call(set_distributed_channel_resolver)]
#[expr($;self)]
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;

fn set_distributed_network_coalesce_tasks<T: IntoPlanDependantUsize>(&mut self, tasks: T);
#[call(set_distributed_network_coalesce_tasks)]
#[expr($;self)]
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependantUsize>(mut self, tasks: T) -> Self;

fn set_distributed_network_shuffle_tasks<T: IntoPlanDependantUsize>(&mut self, tasks: T);
#[call(set_distributed_network_shuffle_tasks)]
#[expr($;self)]
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependantUsize>(mut self, tasks: T) -> Self;
}
}
}
Expand Down Expand Up @@ -336,6 +391,16 @@ impl DistributedExt for SessionState {
#[call(set_distributed_channel_resolver)]
#[expr($;self)]
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;

fn set_distributed_network_coalesce_tasks<T: IntoPlanDependantUsize>(&mut self, tasks: T);
#[call(set_distributed_network_coalesce_tasks)]
#[expr($;self)]
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependantUsize>(mut self, tasks: T) -> Self;

fn set_distributed_network_shuffle_tasks<T: IntoPlanDependantUsize>(&mut self, tasks: T);
#[call(set_distributed_network_shuffle_tasks)]
#[expr($;self)]
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependantUsize>(mut self, tasks: T) -> Self;
}
}
}
Expand Down Expand Up @@ -367,6 +432,16 @@ impl DistributedExt for SessionContext {
#[call(set_distributed_channel_resolver)]
#[expr($;self)]
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(self, resolver: T) -> Self;

fn set_distributed_network_coalesce_tasks<T: IntoPlanDependantUsize>(&mut self, tasks: T);
#[call(set_distributed_network_coalesce_tasks)]
#[expr($;self)]
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependantUsize>(self, tasks: T) -> Self;

fn set_distributed_network_shuffle_tasks<T: IntoPlanDependantUsize>(&mut self, tasks: T);
#[call(set_distributed_network_shuffle_tasks)]
#[expr($;self)]
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependantUsize>(self, tasks: T) -> Self;
}
}
}
126 changes: 126 additions & 0 deletions src/distributed_planner/distributed_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use datafusion::common::extensions_options;
use datafusion::config::{ConfigExtension, ConfigField, Visit, default_config_transform};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionConfig;
use std::fmt::{Debug, Display, Formatter};
use std::sync::Arc;

#[derive(Clone)]
#[allow(clippy::type_complexity)]
pub struct PlanDependantUsize(
pub(crate) Arc<dyn Fn(&Arc<dyn ExecutionPlan>) -> usize + Send + Sync>,
);

impl PlanDependantUsize {
pub fn call(&self, plan: &Arc<dyn ExecutionPlan>) -> usize {
self.0(plan)
}
}

pub trait IntoPlanDependantUsize {
fn into_plan_dependant_usize(self) -> PlanDependantUsize;
}

impl IntoPlanDependantUsize for usize {
fn into_plan_dependant_usize(self) -> PlanDependantUsize {
PlanDependantUsize(Arc::new(move |_| self))
}
}

impl<T: Fn(&Arc<dyn ExecutionPlan>) -> usize + Send + Sync + 'static> IntoPlanDependantUsize for T {
fn into_plan_dependant_usize(self) -> PlanDependantUsize {
PlanDependantUsize(Arc::new(self))
}
}

impl Default for PlanDependantUsize {
fn default() -> Self {
PlanDependantUsize(Arc::new(|_| 0))
}
}

impl Debug for PlanDependantUsize {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "PlanDependantUsize")
}
}

impl Display for PlanDependantUsize {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "PlanDependantUsize")
}
}

impl ConfigField for PlanDependantUsize {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description);
}

fn set(&mut self, _: &str, value: &str) -> datafusion::common::Result<()> {
*self = default_config_transform::<usize>(value)?.into_plan_dependant_usize();
Ok(())
}
}

extensions_options! {
pub struct DistributedConfig {
/// Upon shuffling data, this defines how many tasks are employed into performing the shuffling.
/// ```text
/// ( task 1 ) ( task 2 ) ( task 3 )
/// ▲ ▲ ▲
/// └────┬──────┴─────┬────┘
/// ( task 1 ) ( task 2 ) N tasks
/// ```
/// This parameter defines N
pub network_shuffle_tasks: Option<PlanDependantUsize>, default = None
/// Upon merging multiple tasks into one, this defines how many tasks are merged.
/// ```text
/// ( task 1 )
/// ▲
/// ┌───────────┴──────────┐
/// ( task 1 ) ( task 2 ) ( task 3 ) N tasks
/// ```
/// This parameter defines N
pub network_coalesce_tasks: Option<PlanDependantUsize>, default = None
}
}

impl ConfigExtension for DistributedConfig {
const PREFIX: &'static str = "distributed";
}

impl DistributedConfig {
/// Sets the amount of tasks used in a network shuffle operation.
pub fn with_network_shuffle_tasks(mut self, tasks: impl IntoPlanDependantUsize) -> Self {
self.network_shuffle_tasks = Some(tasks.into_plan_dependant_usize());
self
}

/// Sets the amount of tasks used in a network coalesce operation.
pub fn with_network_coalesce_tasks(mut self, tasks: impl IntoPlanDependantUsize) -> Self {
self.network_coalesce_tasks = Some(tasks.into_plan_dependant_usize());
self
}
}

pub(crate) fn set_distributed_network_coalesce_tasks(
cfg: &mut SessionConfig,
tasks: impl IntoPlanDependantUsize,
) {
let ext = &mut cfg.options_mut().extensions;
let Some(prev) = ext.get_mut::<DistributedConfig>() else {
return ext.insert(DistributedConfig::default().with_network_coalesce_tasks(tasks));
};
prev.network_coalesce_tasks = Some(tasks.into_plan_dependant_usize());
}

pub(crate) fn set_distributed_network_shuffle_tasks(
cfg: &mut SessionConfig,
tasks: impl IntoPlanDependantUsize,
) {
let ext = &mut cfg.options_mut().extensions;
let Some(prev) = ext.get_mut::<DistributedConfig>() else {
return ext.insert(DistributedConfig::default().with_network_shuffle_tasks(tasks));
};
prev.network_shuffle_tasks = Some(tasks.into_plan_dependant_usize());
}
Loading