Skip to content

Commit 4ead8f1

Browse files
authored
Add dynamic task config based on DataFusion extension options (#197)
* Refactor distributed planner into its own folder * Tweak tests so that they use SET statement for configuring tasks. * make config fields public * Allow users to call PlanDependantUsize * Fix typo * Address conflicts
1 parent a94a214 commit 4ead8f1

19 files changed

+541
-316
lines changed

benchmarks/src/tpch/run.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,10 +159,14 @@ impl DistributedSessionBuilder for RunOpt {
159159
builder = builder.with_physical_optimizer_rule(Arc::new(InMemoryDataSourceRule));
160160
}
161161
if !self.workers.is_empty() {
162-
let rule = DistributedPhysicalOptimizerRule::new()
163-
.with_network_coalesce_tasks(self.coalesce_tasks.unwrap_or(self.workers.len()))
164-
.with_network_shuffle_tasks(self.shuffle_tasks.unwrap_or(self.workers.len()));
165-
builder = builder.with_physical_optimizer_rule(Arc::new(rule));
162+
builder = builder
163+
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
164+
.with_distributed_network_coalesce_tasks(
165+
self.coalesce_tasks.unwrap_or(self.workers.len()),
166+
)
167+
.with_distributed_network_shuffle_tasks(
168+
self.shuffle_tasks.unwrap_or(self.workers.len()),
169+
);
166170
}
167171

168172
Ok(builder

examples/in_memory_cluster.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ struct Args {
2828

2929
#[structopt(long)]
3030
explain: bool,
31+
32+
#[structopt(long, default_value = "3")]
33+
network_shuffle_tasks: usize,
34+
35+
#[structopt(long, default_value = "3")]
36+
network_coalesce_tasks: usize,
3137
}
3238

3339
#[tokio::main]
@@ -37,7 +43,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
3743
let state = SessionStateBuilder::new()
3844
.with_default_features()
3945
.with_distributed_channel_resolver(InMemoryChannelResolver::new())
40-
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule::new()))
46+
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
47+
.with_distributed_network_coalesce_tasks(args.network_shuffle_tasks)
48+
.with_distributed_network_shuffle_tasks(args.network_coalesce_tasks)
4149
.build();
4250

4351
let ctx = SessionContext::from(state);

examples/localhost_run.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
4848
let state = SessionStateBuilder::new()
4949
.with_default_features()
5050
.with_distributed_channel_resolver(localhost_resolver)
51-
.with_physical_optimizer_rule(Arc::new(
52-
DistributedPhysicalOptimizerRule::new()
53-
.with_network_coalesce_tasks(args.network_coalesce_tasks)
54-
.with_network_shuffle_tasks(args.network_shuffle_tasks),
55-
))
51+
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
52+
.with_distributed_network_coalesce_tasks(args.network_coalesce_tasks)
53+
.with_distributed_network_shuffle_tasks(args.network_shuffle_tasks)
5654
.build();
5755

5856
let ctx = SessionContext::from(state);

src/distributed_ext.rs

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1-
use crate::ChannelResolver;
21
use crate::channel_resolver_ext::set_distributed_channel_resolver;
32
use crate::config_extension_ext::{
43
set_distributed_option_extension, set_distributed_option_extension_from_headers,
54
};
5+
use crate::distributed_planner::{
6+
set_distributed_network_coalesce_tasks, set_distributed_network_shuffle_tasks,
7+
};
68
use crate::protobuf::{set_distributed_user_codec, set_distributed_user_codec_arc};
9+
use crate::{ChannelResolver, IntoPlanDependentUsize};
710
use datafusion::common::DataFusionError;
811
use datafusion::config::ConfigExtension;
912
use datafusion::execution::{SessionState, SessionStateBuilder};
@@ -221,6 +224,32 @@ pub trait DistributedExt: Sized {
221224
&mut self,
222225
resolver: T,
223226
);
227+
228+
/// Upon merging multiple tasks into one, this defines how many tasks are merged.
229+
/// ```text
230+
/// ( task 1 )
231+
/// ▲
232+
/// ┌───────────┴──────────┐
233+
/// ( task 1 ) ( task 2 ) ( task 3 ) N tasks
234+
/// ```
235+
/// This parameter defines N
236+
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(self, tasks: T) -> Self;
237+
238+
/// Same as [DistributedExt::with_distributed_network_coalesce_tasks] but with an in-place mutation.
239+
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
240+
241+
/// Upon shuffling data, this defines how many tasks are employed into performing the shuffling.
242+
/// ```text
243+
/// ( task 1 ) ( task 2 ) ( task 3 )
244+
/// ▲ ▲ ▲
245+
/// └────┬──────┴─────┬────┘
246+
/// ( task 1 ) ( task 2 ) N tasks
247+
/// ```
248+
/// This parameter defines N
249+
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(self, tasks: T) -> Self;
250+
251+
/// Same as [DistributedExt::with_distributed_network_shuffle_tasks] but with an in-place mutation.
252+
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
224253
}
225254

226255
impl DistributedExt for SessionConfig {
@@ -253,6 +282,14 @@ impl DistributedExt for SessionConfig {
253282
set_distributed_channel_resolver(self, resolver)
254283
}
255284

285+
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T) {
286+
set_distributed_network_coalesce_tasks(self, tasks)
287+
}
288+
289+
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T) {
290+
set_distributed_network_shuffle_tasks(self, tasks)
291+
}
292+
256293
delegate! {
257294
to self {
258295
#[call(set_distributed_option_extension)]
@@ -274,6 +311,14 @@ impl DistributedExt for SessionConfig {
274311
#[call(set_distributed_channel_resolver)]
275312
#[expr($;self)]
276313
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
314+
315+
#[call(set_distributed_network_coalesce_tasks)]
316+
#[expr($;self)]
317+
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
318+
319+
#[call(set_distributed_network_shuffle_tasks)]
320+
#[expr($;self)]
321+
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
277322
}
278323
}
279324
}
@@ -305,6 +350,16 @@ impl DistributedExt for SessionStateBuilder {
305350
#[call(set_distributed_channel_resolver)]
306351
#[expr($;self)]
307352
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
353+
354+
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
355+
#[call(set_distributed_network_coalesce_tasks)]
356+
#[expr($;self)]
357+
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
358+
359+
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
360+
#[call(set_distributed_network_shuffle_tasks)]
361+
#[expr($;self)]
362+
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
308363
}
309364
}
310365
}
@@ -336,6 +391,16 @@ impl DistributedExt for SessionState {
336391
#[call(set_distributed_channel_resolver)]
337392
#[expr($;self)]
338393
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
394+
395+
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
396+
#[call(set_distributed_network_coalesce_tasks)]
397+
#[expr($;self)]
398+
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
399+
400+
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
401+
#[call(set_distributed_network_shuffle_tasks)]
402+
#[expr($;self)]
403+
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
339404
}
340405
}
341406
}
@@ -367,6 +432,16 @@ impl DistributedExt for SessionContext {
367432
#[call(set_distributed_channel_resolver)]
368433
#[expr($;self)]
369434
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(self, resolver: T) -> Self;
435+
436+
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
437+
#[call(set_distributed_network_coalesce_tasks)]
438+
#[expr($;self)]
439+
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(self, tasks: T) -> Self;
440+
441+
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
442+
#[call(set_distributed_network_shuffle_tasks)]
443+
#[expr($;self)]
444+
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(self, tasks: T) -> Self;
370445
}
371446
}
372447
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
use datafusion::common::extensions_options;
2+
use datafusion::config::{ConfigExtension, ConfigField, Visit, default_config_transform};
3+
use datafusion::physical_plan::ExecutionPlan;
4+
use datafusion::prelude::SessionConfig;
5+
use std::fmt::{Debug, Display, Formatter};
6+
use std::sync::Arc;
7+
8+
#[derive(Clone)]
9+
#[allow(clippy::type_complexity)]
10+
pub struct PlanDependentUsize(
11+
pub(crate) Arc<dyn Fn(&Arc<dyn ExecutionPlan>) -> usize + Send + Sync>,
12+
);
13+
14+
impl PlanDependentUsize {
15+
pub fn call(&self, plan: &Arc<dyn ExecutionPlan>) -> usize {
16+
self.0(plan)
17+
}
18+
}
19+
20+
pub trait IntoPlanDependentUsize {
21+
fn into_plan_dependent_usize(self) -> PlanDependentUsize;
22+
}
23+
24+
impl IntoPlanDependentUsize for usize {
25+
fn into_plan_dependent_usize(self) -> PlanDependentUsize {
26+
PlanDependentUsize(Arc::new(move |_| self))
27+
}
28+
}
29+
30+
impl<T: Fn(&Arc<dyn ExecutionPlan>) -> usize + Send + Sync + 'static> IntoPlanDependentUsize for T {
31+
fn into_plan_dependent_usize(self) -> PlanDependentUsize {
32+
PlanDependentUsize(Arc::new(self))
33+
}
34+
}
35+
36+
impl Default for PlanDependentUsize {
37+
fn default() -> Self {
38+
PlanDependentUsize(Arc::new(|_| 0))
39+
}
40+
}
41+
42+
impl Debug for PlanDependentUsize {
43+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44+
write!(f, "PlanDependantUsize")
45+
}
46+
}
47+
48+
impl Display for PlanDependentUsize {
49+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
50+
write!(f, "PlanDependantUsize")
51+
}
52+
}
53+
54+
impl ConfigField for PlanDependentUsize {
55+
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
56+
v.some(key, self, description);
57+
}
58+
59+
fn set(&mut self, _: &str, value: &str) -> datafusion::common::Result<()> {
60+
*self = default_config_transform::<usize>(value)?.into_plan_dependent_usize();
61+
Ok(())
62+
}
63+
}
64+
65+
extensions_options! {
66+
pub struct DistributedConfig {
67+
/// Upon shuffling data, this defines how many tasks are employed into performing the shuffling.
68+
/// ```text
69+
/// ( task 1 ) ( task 2 ) ( task 3 )
70+
/// ▲ ▲ ▲
71+
/// └────┬──────┴─────┬────┘
72+
/// ( task 1 ) ( task 2 ) N tasks
73+
/// ```
74+
/// This parameter defines N
75+
pub network_shuffle_tasks: Option<PlanDependentUsize>, default = None
76+
/// Upon merging multiple tasks into one, this defines how many tasks are merged.
77+
/// ```text
78+
/// ( task 1 )
79+
/// ▲
80+
/// ┌───────────┴──────────┐
81+
/// ( task 1 ) ( task 2 ) ( task 3 ) N tasks
82+
/// ```
83+
/// This parameter defines N
84+
pub network_coalesce_tasks: Option<PlanDependentUsize>, default = None
85+
}
86+
}
87+
88+
impl ConfigExtension for DistributedConfig {
89+
const PREFIX: &'static str = "distributed";
90+
}
91+
92+
impl DistributedConfig {
93+
/// Sets the amount of tasks used in a network shuffle operation.
94+
pub fn with_network_shuffle_tasks(mut self, tasks: impl IntoPlanDependentUsize) -> Self {
95+
self.network_shuffle_tasks = Some(tasks.into_plan_dependent_usize());
96+
self
97+
}
98+
99+
/// Sets the amount of tasks used in a network coalesce operation.
100+
pub fn with_network_coalesce_tasks(mut self, tasks: impl IntoPlanDependentUsize) -> Self {
101+
self.network_coalesce_tasks = Some(tasks.into_plan_dependent_usize());
102+
self
103+
}
104+
}
105+
106+
pub(crate) fn set_distributed_network_coalesce_tasks(
107+
cfg: &mut SessionConfig,
108+
tasks: impl IntoPlanDependentUsize,
109+
) {
110+
let ext = &mut cfg.options_mut().extensions;
111+
let Some(prev) = ext.get_mut::<DistributedConfig>() else {
112+
return ext.insert(DistributedConfig::default().with_network_coalesce_tasks(tasks));
113+
};
114+
prev.network_coalesce_tasks = Some(tasks.into_plan_dependent_usize());
115+
}
116+
117+
pub(crate) fn set_distributed_network_shuffle_tasks(
118+
cfg: &mut SessionConfig,
119+
tasks: impl IntoPlanDependentUsize,
120+
) {
121+
let ext = &mut cfg.options_mut().extensions;
122+
let Some(prev) = ext.get_mut::<DistributedConfig>() else {
123+
return ext.insert(DistributedConfig::default().with_network_shuffle_tasks(tasks));
124+
};
125+
prev.network_shuffle_tasks = Some(tasks.into_plan_dependent_usize());
126+
}

0 commit comments

Comments
 (0)