Skip to content

Commit 1f3b632

Browse files
committed
Add dynamic task config based on DataFusion extension options
1 parent 58b79bc commit 1f3b632

18 files changed

+459
-289
lines changed

benchmarks/src/tpch/run.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,10 +159,14 @@ impl DistributedSessionBuilder for RunOpt {
159159
builder = builder.with_physical_optimizer_rule(Arc::new(InMemoryDataSourceRule));
160160
}
161161
if !self.workers.is_empty() {
162-
let rule = DistributedPhysicalOptimizerRule::new()
163-
.with_network_coalesce_tasks(self.coalesce_tasks.unwrap_or(self.workers.len()))
164-
.with_network_shuffle_tasks(self.shuffle_tasks.unwrap_or(self.workers.len()));
165-
builder = builder.with_physical_optimizer_rule(Arc::new(rule));
162+
builder = builder
163+
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
164+
.with_distributed_network_coalesce_tasks(
165+
self.coalesce_tasks.unwrap_or(self.workers.len()),
166+
)
167+
.with_distributed_network_shuffle_tasks(
168+
self.shuffle_tasks.unwrap_or(self.workers.len()),
169+
);
166170
}
167171

168172
Ok(builder

examples/in_memory_cluster.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ struct Args {
2828

2929
#[structopt(long)]
3030
explain: bool,
31+
32+
#[structopt(long, default_value = "3")]
33+
network_shuffle_tasks: usize,
34+
35+
#[structopt(long, default_value = "3")]
36+
network_coalesce_tasks: usize,
3137
}
3238

3339
#[tokio::main]
@@ -37,7 +43,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
3743
let state = SessionStateBuilder::new()
3844
.with_default_features()
3945
.with_distributed_channel_resolver(InMemoryChannelResolver::new())
40-
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule::new()))
46+
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
47+
.with_distributed_network_coalesce_tasks(args.network_shuffle_tasks)
48+
.with_distributed_network_shuffle_tasks(args.network_coalesce_tasks)
4149
.build();
4250

4351
let ctx = SessionContext::from(state);

examples/localhost_run.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
4848
let state = SessionStateBuilder::new()
4949
.with_default_features()
5050
.with_distributed_channel_resolver(localhost_resolver)
51-
.with_physical_optimizer_rule(Arc::new(
52-
DistributedPhysicalOptimizerRule::new()
53-
.with_network_coalesce_tasks(args.network_coalesce_tasks)
54-
.with_network_shuffle_tasks(args.network_shuffle_tasks),
55-
))
51+
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
52+
.with_distributed_network_coalesce_tasks(args.network_coalesce_tasks)
53+
.with_distributed_network_shuffle_tasks(args.network_shuffle_tasks)
5654
.build();
5755

5856
let ctx = SessionContext::from(state);

src/distributed_ext.rs

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1-
use crate::ChannelResolver;
21
use crate::channel_resolver_ext::set_distributed_channel_resolver;
32
use crate::config_extension_ext::{
43
set_distributed_option_extension, set_distributed_option_extension_from_headers,
54
};
5+
use crate::distributed_planner::{
6+
set_distributed_network_coalesce_tasks, set_distributed_network_shuffle_tasks,
7+
};
68
use crate::protobuf::{set_distributed_user_codec, set_distributed_user_codec_arc};
9+
use crate::{ChannelResolver, IntoPlanDependantUsize};
710
use datafusion::common::DataFusionError;
811
use datafusion::config::ConfigExtension;
912
use datafusion::execution::{SessionState, SessionStateBuilder};
@@ -221,6 +224,32 @@ pub trait DistributedExt: Sized {
221224
&mut self,
222225
resolver: T,
223226
);
227+
228+
/// Upon merging multiple tasks into one, this defines how many tasks are merged.
229+
/// ```text
230+
/// ( task 1 )
231+
/// ▲
232+
/// ┌───────────┴──────────┐
233+
/// ( task 1 ) ( task 2 ) ( task 3 ) N tasks
234+
/// ```
235+
/// This parameter defines N
236+
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependantUsize>(self, tasks: T) -> Self;
237+
238+
/// Same as [DistributedExt::with_distributed_network_coalesce_tasks] but with an in-place mutation.
239+
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependantUsize>(&mut self, tasks: T);
240+
241+
/// Upon shuffling data, this defines how many tasks are employed into performing the shuffling.
242+
/// ```text
243+
/// ( task 1 ) ( task 2 ) ( task 3 )
244+
/// ▲ ▲ ▲
245+
/// └────┬──────┴─────┬────┘
246+
/// ( task 1 ) ( task 2 ) N tasks
247+
/// ```
248+
/// This parameter defines N
249+
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependantUsize>(self, tasks: T) -> Self;
250+
251+
/// Same as [DistributedExt::with_distributed_network_shuffle_tasks] but with an in-place mutation.
252+
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependantUsize>(&mut self, tasks: T);
224253
}
225254

226255
impl DistributedExt for SessionConfig {
@@ -253,6 +282,14 @@ impl DistributedExt for SessionConfig {
253282
set_distributed_channel_resolver(self, resolver)
254283
}
255284

285+
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependantUsize>(&mut self, tasks: T) {
286+
set_distributed_network_coalesce_tasks(self, tasks)
287+
}
288+
289+
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependantUsize>(&mut self, tasks: T) {
290+
set_distributed_network_shuffle_tasks(self, tasks)
291+
}
292+
256293
delegate! {
257294
to self {
258295
#[call(set_distributed_option_extension)]
@@ -274,6 +311,14 @@ impl DistributedExt for SessionConfig {
274311
#[call(set_distributed_channel_resolver)]
275312
#[expr($;self)]
276313
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
314+
315+
#[call(set_distributed_network_coalesce_tasks)]
316+
#[expr($;self)]
317+
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependantUsize>(mut self, tasks: T) -> Self;
318+
319+
#[call(set_distributed_network_shuffle_tasks)]
320+
#[expr($;self)]
321+
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependantUsize>(mut self, tasks: T) -> Self;
277322
}
278323
}
279324
}
@@ -305,6 +350,16 @@ impl DistributedExt for SessionStateBuilder {
305350
#[call(set_distributed_channel_resolver)]
306351
#[expr($;self)]
307352
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
353+
354+
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependantUsize>(&mut self, tasks: T);
355+
#[call(set_distributed_network_coalesce_tasks)]
356+
#[expr($;self)]
357+
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependantUsize>(mut self, tasks: T) -> Self;
358+
359+
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependantUsize>(&mut self, tasks: T);
360+
#[call(set_distributed_network_shuffle_tasks)]
361+
#[expr($;self)]
362+
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependantUsize>(mut self, tasks: T) -> Self;
308363
}
309364
}
310365
}
@@ -336,6 +391,16 @@ impl DistributedExt for SessionState {
336391
#[call(set_distributed_channel_resolver)]
337392
#[expr($;self)]
338393
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
394+
395+
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependantUsize>(&mut self, tasks: T);
396+
#[call(set_distributed_network_coalesce_tasks)]
397+
#[expr($;self)]
398+
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependantUsize>(mut self, tasks: T) -> Self;
399+
400+
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependantUsize>(&mut self, tasks: T);
401+
#[call(set_distributed_network_shuffle_tasks)]
402+
#[expr($;self)]
403+
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependantUsize>(mut self, tasks: T) -> Self;
339404
}
340405
}
341406
}
@@ -367,6 +432,16 @@ impl DistributedExt for SessionContext {
367432
#[call(set_distributed_channel_resolver)]
368433
#[expr($;self)]
369434
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(self, resolver: T) -> Self;
435+
436+
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependantUsize>(&mut self, tasks: T);
437+
#[call(set_distributed_network_coalesce_tasks)]
438+
#[expr($;self)]
439+
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependantUsize>(self, tasks: T) -> Self;
440+
441+
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependantUsize>(&mut self, tasks: T);
442+
#[call(set_distributed_network_shuffle_tasks)]
443+
#[expr($;self)]
444+
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependantUsize>(self, tasks: T) -> Self;
370445
}
371446
}
372447
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
use datafusion::common::extensions_options;
2+
use datafusion::config::{ConfigExtension, ConfigField, Visit, default_config_transform};
3+
use datafusion::physical_plan::ExecutionPlan;
4+
use datafusion::prelude::SessionConfig;
5+
use std::fmt::{Debug, Display, Formatter};
6+
use std::sync::Arc;
7+
8+
#[derive(Clone)]
9+
#[allow(clippy::type_complexity)]
10+
pub struct PlanDependantUsize(
11+
pub(crate) Arc<dyn Fn(&Arc<dyn ExecutionPlan>) -> usize + Send + Sync>,
12+
);
13+
14+
pub trait IntoPlanDependantUsize {
15+
fn into_plan_dependant_usize(self) -> PlanDependantUsize;
16+
}
17+
18+
impl IntoPlanDependantUsize for usize {
19+
fn into_plan_dependant_usize(self) -> PlanDependantUsize {
20+
PlanDependantUsize(Arc::new(move |_| self))
21+
}
22+
}
23+
24+
impl<T: Fn(&Arc<dyn ExecutionPlan>) -> usize + Send + Sync + 'static> IntoPlanDependantUsize for T {
25+
fn into_plan_dependant_usize(self) -> PlanDependantUsize {
26+
PlanDependantUsize(Arc::new(self))
27+
}
28+
}
29+
30+
impl Default for PlanDependantUsize {
31+
fn default() -> Self {
32+
PlanDependantUsize(Arc::new(|_| 0))
33+
}
34+
}
35+
36+
impl Debug for PlanDependantUsize {
37+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
38+
write!(f, "PlanDependantUsize")
39+
}
40+
}
41+
42+
impl Display for PlanDependantUsize {
43+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44+
write!(f, "PlanDependantUsize")
45+
}
46+
}
47+
48+
impl ConfigField for PlanDependantUsize {
49+
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
50+
v.some(key, self, description);
51+
}
52+
53+
fn set(&mut self, _: &str, value: &str) -> datafusion::common::Result<()> {
54+
*self = default_config_transform::<usize>(value)?.into_plan_dependant_usize();
55+
Ok(())
56+
}
57+
}
58+
59+
extensions_options! {
60+
pub struct DistributedConfig {
61+
/// Upon shuffling data, this defines how many tasks are employed into performing the shuffling.
62+
/// ```text
63+
/// ( task 1 ) ( task 2 ) ( task 3 )
64+
/// ▲ ▲ ▲
65+
/// └────┬──────┴─────┬────┘
66+
/// ( task 1 ) ( task 2 ) N tasks
67+
/// ```
68+
/// This parameter defines N
69+
pub(crate) network_shuffle_tasks: Option<PlanDependantUsize>, default = None
70+
/// Upon merging multiple tasks into one, this defines how many tasks are merged.
71+
/// ```text
72+
/// ( task 1 )
73+
/// ▲
74+
/// ┌───────────┴──────────┐
75+
/// ( task 1 ) ( task 2 ) ( task 3 ) N tasks
76+
/// ```
77+
/// This parameter defines N
78+
pub(crate) network_coalesce_tasks: Option<PlanDependantUsize>, default = None
79+
}
80+
}
81+
82+
impl ConfigExtension for DistributedConfig {
83+
const PREFIX: &'static str = "distributed";
84+
}
85+
86+
impl DistributedConfig {
87+
/// Sets the amount of tasks used in a network shuffle operation.
88+
pub fn with_network_shuffle_tasks(mut self, tasks: impl IntoPlanDependantUsize) -> Self {
89+
self.network_shuffle_tasks = Some(tasks.into_plan_dependant_usize());
90+
self
91+
}
92+
93+
/// Sets the amount of tasks used in a network coalesce operation.
94+
pub fn with_network_coalesce_tasks(mut self, tasks: impl IntoPlanDependantUsize) -> Self {
95+
self.network_coalesce_tasks = Some(tasks.into_plan_dependant_usize());
96+
self
97+
}
98+
}
99+
100+
pub(crate) fn set_distributed_network_coalesce_tasks(
101+
cfg: &mut SessionConfig,
102+
tasks: impl IntoPlanDependantUsize,
103+
) {
104+
let ext = &mut cfg.options_mut().extensions;
105+
let Some(prev) = ext.get_mut::<DistributedConfig>() else {
106+
return ext.insert(DistributedConfig::default().with_network_coalesce_tasks(tasks));
107+
};
108+
prev.network_coalesce_tasks = Some(tasks.into_plan_dependant_usize());
109+
}
110+
111+
pub(crate) fn set_distributed_network_shuffle_tasks(
112+
cfg: &mut SessionConfig,
113+
tasks: impl IntoPlanDependantUsize,
114+
) {
115+
let ext = &mut cfg.options_mut().extensions;
116+
let Some(prev) = ext.get_mut::<DistributedConfig>() else {
117+
return ext.insert(DistributedConfig::default().with_network_shuffle_tasks(tasks));
118+
};
119+
prev.network_shuffle_tasks = Some(tasks.into_plan_dependant_usize());
120+
}

0 commit comments

Comments
 (0)