Skip to content

Commit 35c6e01

Browse files
Revert "Rollback plan dependent usize (#201)"
This reverts commit 01b0866.
1 parent 3d3609c commit 35c6e01

File tree

5 files changed

+111
-42
lines changed

5 files changed

+111
-42
lines changed

src/distributed_ext.rs

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::ChannelResolver;
21
use crate::channel_resolver_ext::set_distributed_channel_resolver;
32
use crate::config_extension_ext::{
43
set_distributed_option_extension, set_distributed_option_extension_from_headers,
@@ -7,6 +6,7 @@ use crate::distributed_planner::{
76
set_distributed_network_coalesce_tasks, set_distributed_network_shuffle_tasks,
87
};
98
use crate::protobuf::{set_distributed_user_codec, set_distributed_user_codec_arc};
9+
use crate::{ChannelResolver, IntoPlanDependentUsize};
1010
use datafusion::common::DataFusionError;
1111
use datafusion::config::ConfigExtension;
1212
use datafusion::execution::{SessionState, SessionStateBuilder};
@@ -233,10 +233,10 @@ pub trait DistributedExt: Sized {
233233
/// ( task 1 ) ( task 2 ) ( task 3 ) N tasks
234234
/// ```
235235
/// This parameter defines N
236-
fn with_distributed_network_coalesce_tasks(self, tasks: usize) -> Self;
236+
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(self, tasks: T) -> Self;
237237

238238
/// Same as [DistributedExt::with_distributed_network_coalesce_tasks] but with an in-place mutation.
239-
fn set_distributed_network_coalesce_tasks(&mut self, tasks: usize);
239+
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
240240

241241
/// Upon shuffling data, this defines how many tasks are employed into performing the shuffling.
242242
/// ```text
@@ -246,10 +246,10 @@ pub trait DistributedExt: Sized {
246246
/// ( task 1 ) ( task 2 ) N tasks
247247
/// ```
248248
/// This parameter defines N
249-
fn with_distributed_network_shuffle_tasks(self, tasks: usize) -> Self;
249+
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(self, tasks: T) -> Self;
250250

251251
/// Same as [DistributedExt::with_distributed_network_shuffle_tasks] but with an in-place mutation.
252-
fn set_distributed_network_shuffle_tasks(&mut self, tasks: usize);
252+
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
253253
}
254254

255255
impl DistributedExt for SessionConfig {
@@ -282,11 +282,11 @@ impl DistributedExt for SessionConfig {
282282
set_distributed_channel_resolver(self, resolver)
283283
}
284284

285-
fn set_distributed_network_coalesce_tasks(&mut self, tasks: usize) {
285+
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T) {
286286
set_distributed_network_coalesce_tasks(self, tasks)
287287
}
288288

289-
fn set_distributed_network_shuffle_tasks(&mut self, tasks: usize) {
289+
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T) {
290290
set_distributed_network_shuffle_tasks(self, tasks)
291291
}
292292

@@ -314,11 +314,11 @@ impl DistributedExt for SessionConfig {
314314

315315
#[call(set_distributed_network_coalesce_tasks)]
316316
#[expr($;self)]
317-
fn with_distributed_network_coalesce_tasks(mut self, tasks: usize) -> Self;
317+
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
318318

319319
#[call(set_distributed_network_shuffle_tasks)]
320320
#[expr($;self)]
321-
fn with_distributed_network_shuffle_tasks(mut self, tasks: usize) -> Self;
321+
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
322322
}
323323
}
324324
}
@@ -351,15 +351,15 @@ impl DistributedExt for SessionStateBuilder {
351351
#[expr($;self)]
352352
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
353353

354-
fn set_distributed_network_coalesce_tasks(&mut self, tasks: usize);
354+
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
355355
#[call(set_distributed_network_coalesce_tasks)]
356356
#[expr($;self)]
357-
fn with_distributed_network_coalesce_tasks(mut self, tasks: usize) -> Self;
357+
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
358358

359-
fn set_distributed_network_shuffle_tasks(&mut self, tasks: usize);
359+
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
360360
#[call(set_distributed_network_shuffle_tasks)]
361361
#[expr($;self)]
362-
fn with_distributed_network_shuffle_tasks(mut self, tasks: usize) -> Self;
362+
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
363363
}
364364
}
365365
}
@@ -392,15 +392,15 @@ impl DistributedExt for SessionState {
392392
#[expr($;self)]
393393
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
394394

395-
fn set_distributed_network_coalesce_tasks(&mut self, tasks: usize);
395+
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
396396
#[call(set_distributed_network_coalesce_tasks)]
397397
#[expr($;self)]
398-
fn with_distributed_network_coalesce_tasks(mut self, tasks: usize) -> Self;
398+
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
399399

400-
fn set_distributed_network_shuffle_tasks(&mut self, tasks: usize);
400+
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
401401
#[call(set_distributed_network_shuffle_tasks)]
402402
#[expr($;self)]
403-
fn with_distributed_network_shuffle_tasks(mut self, tasks: usize) -> Self;
403+
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
404404
}
405405
}
406406
}
@@ -433,15 +433,15 @@ impl DistributedExt for SessionContext {
433433
#[expr($;self)]
434434
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(self, resolver: T) -> Self;
435435

436-
fn set_distributed_network_coalesce_tasks(&mut self, tasks: usize);
436+
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
437437
#[call(set_distributed_network_coalesce_tasks)]
438438
#[expr($;self)]
439-
fn with_distributed_network_coalesce_tasks(self, tasks: usize) -> Self;
439+
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(self, tasks: T) -> Self;
440440

441-
fn set_distributed_network_shuffle_tasks(&mut self, tasks: usize);
441+
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
442442
#[call(set_distributed_network_shuffle_tasks)]
443443
#[expr($;self)]
444-
fn with_distributed_network_shuffle_tasks(self, tasks: usize) -> Self;
444+
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(self, tasks: T) -> Self;
445445
}
446446
}
447447
}
Lines changed: 77 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,66 @@
11
use datafusion::common::extensions_options;
2-
use datafusion::config::ConfigExtension;
2+
use datafusion::config::{ConfigExtension, ConfigField, Visit, default_config_transform};
3+
use datafusion::physical_plan::ExecutionPlan;
34
use datafusion::prelude::SessionConfig;
4-
use std::fmt::Debug;
5+
use std::fmt::{Debug, Display, Formatter};
6+
use std::sync::Arc;
7+
8+
#[derive(Clone)]
9+
#[allow(clippy::type_complexity)]
10+
pub struct PlanDependentUsize(
11+
pub(crate) Arc<dyn Fn(&Arc<dyn ExecutionPlan>) -> usize + Send + Sync>,
12+
);
13+
14+
impl PlanDependentUsize {
15+
pub fn call(&self, plan: &Arc<dyn ExecutionPlan>) -> usize {
16+
self.0(plan)
17+
}
18+
}
19+
20+
pub trait IntoPlanDependentUsize {
21+
fn into_plan_dependent_usize(self) -> PlanDependentUsize;
22+
}
23+
24+
impl IntoPlanDependentUsize for usize {
25+
fn into_plan_dependent_usize(self) -> PlanDependentUsize {
26+
PlanDependentUsize(Arc::new(move |_| self))
27+
}
28+
}
29+
30+
impl<T: Fn(&Arc<dyn ExecutionPlan>) -> usize + Send + Sync + 'static> IntoPlanDependentUsize for T {
31+
fn into_plan_dependent_usize(self) -> PlanDependentUsize {
32+
PlanDependentUsize(Arc::new(self))
33+
}
34+
}
35+
36+
impl Default for PlanDependentUsize {
37+
fn default() -> Self {
38+
PlanDependentUsize(Arc::new(|_| 0))
39+
}
40+
}
41+
42+
impl Debug for PlanDependentUsize {
43+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44+
write!(f, "PlanDependantUsize")
45+
}
46+
}
47+
48+
impl Display for PlanDependentUsize {
49+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
50+
write!(f, "PlanDependantUsize")
51+
}
52+
}
53+
54+
impl ConfigField for PlanDependentUsize {
55+
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
56+
v.some(key, self, description);
57+
}
58+
59+
fn set(&mut self, _: &str, value: &str) -> datafusion::common::Result<()> {
60+
*self = default_config_transform::<usize>(value)?.into_plan_dependent_usize();
61+
Ok(())
62+
}
63+
}
564

665
extensions_options! {
766
pub struct DistributedConfig {
@@ -13,7 +72,7 @@ extensions_options! {
1372
/// ( task 1 ) ( task 2 ) N tasks
1473
/// ```
1574
/// This parameter defines N
16-
pub network_shuffle_tasks: Option<usize>, default = None
75+
pub network_shuffle_tasks: Option<PlanDependentUsize>, default = None
1776
/// Upon merging multiple tasks into one, this defines how many tasks are merged.
1877
/// ```text
1978
/// ( task 1 )
@@ -22,7 +81,7 @@ extensions_options! {
2281
/// ( task 1 ) ( task 2 ) ( task 3 ) N tasks
2382
/// ```
2483
/// This parameter defines N
25-
pub network_coalesce_tasks: Option<usize>, default = None
84+
pub network_coalesce_tasks: Option<PlanDependentUsize>, default = None
2685
}
2786
}
2887

@@ -32,30 +91,36 @@ impl ConfigExtension for DistributedConfig {
3291

3392
impl DistributedConfig {
3493
/// Sets the amount of tasks used in a network shuffle operation.
35-
pub fn with_network_shuffle_tasks(mut self, tasks: usize) -> Self {
36-
self.network_shuffle_tasks = Some(tasks);
94+
pub fn with_network_shuffle_tasks(mut self, tasks: impl IntoPlanDependentUsize) -> Self {
95+
self.network_shuffle_tasks = Some(tasks.into_plan_dependent_usize());
3796
self
3897
}
3998

4099
/// Sets the amount of tasks used in a network coalesce operation.
41-
pub fn with_network_coalesce_tasks(mut self, tasks: usize) -> Self {
42-
self.network_coalesce_tasks = Some(tasks);
100+
pub fn with_network_coalesce_tasks(mut self, tasks: impl IntoPlanDependentUsize) -> Self {
101+
self.network_coalesce_tasks = Some(tasks.into_plan_dependent_usize());
43102
self
44103
}
45104
}
46105

47-
pub(crate) fn set_distributed_network_coalesce_tasks(cfg: &mut SessionConfig, tasks: usize) {
106+
pub(crate) fn set_distributed_network_coalesce_tasks(
107+
cfg: &mut SessionConfig,
108+
tasks: impl IntoPlanDependentUsize,
109+
) {
48110
let ext = &mut cfg.options_mut().extensions;
49111
let Some(prev) = ext.get_mut::<DistributedConfig>() else {
50112
return ext.insert(DistributedConfig::default().with_network_coalesce_tasks(tasks));
51113
};
52-
prev.network_coalesce_tasks = Some(tasks);
114+
prev.network_coalesce_tasks = Some(tasks.into_plan_dependent_usize());
53115
}
54116

55-
pub(crate) fn set_distributed_network_shuffle_tasks(cfg: &mut SessionConfig, tasks: usize) {
117+
pub(crate) fn set_distributed_network_shuffle_tasks(
118+
cfg: &mut SessionConfig,
119+
tasks: impl IntoPlanDependentUsize,
120+
) {
56121
let ext = &mut cfg.options_mut().extensions;
57122
let Some(prev) = ext.get_mut::<DistributedConfig>() else {
58123
return ext.insert(DistributedConfig::default().with_network_shuffle_tasks(tasks));
59124
};
60-
prev.network_shuffle_tasks = Some(tasks);
125+
prev.network_shuffle_tasks = Some(tasks.into_plan_dependent_usize());
61126
}

src/distributed_planner/distributed_physical_optimizer_rule.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,14 @@ pub fn apply_network_boundaries(
110110
}
111111

112112
// If this is a hash RepartitionExec, introduce a shuffle.
113-
if let (Some(node), Some(input_tasks)) = (
113+
if let (Some(node), Some(tasks)) = (
114114
plan.as_any().downcast_ref::<RepartitionExec>(),
115-
cfg.network_shuffle_tasks,
115+
cfg.network_shuffle_tasks.clone(),
116116
) {
117117
if !matches!(node.partitioning(), Partitioning::Hash(_, _)) {
118118
return Ok(Transformed::no(plan));
119119
}
120+
let input_tasks = tasks.0(&plan);
120121
if input_tasks == 0 {
121122
return Ok(Transformed::no(plan));
122123
}
@@ -128,9 +129,9 @@ pub fn apply_network_boundaries(
128129
// If this is a CoalescePartitionsExec, it means that the original plan is trying to
129130
// merge all partitions into one. We need to go one step ahead and also merge all tasks
130131
// into one.
131-
if let (Some(node), Some(input_tasks)) = (
132+
if let (Some(node), Some(tasks)) = (
132133
plan.as_any().downcast_ref::<CoalescePartitionsExec>(),
133-
cfg.network_coalesce_tasks,
134+
cfg.network_coalesce_tasks.clone(),
134135
) {
135136
// If the immediate child is a PartitionIsolatorExec, it means that the rest of the
136137
// plan is just a couple of non-computational nodes that are probably not worth
@@ -139,6 +140,7 @@ pub fn apply_network_boundaries(
139140
return Ok(Transformed::no(plan));
140141
}
141142

143+
let input_tasks = tasks.0(&plan);
142144
if input_tasks == 0 {
143145
return Ok(Transformed::no(plan));
144146
}
@@ -152,10 +154,11 @@ pub fn apply_network_boundaries(
152154
// The SortPreservingMergeExec node will try to coalesce all partitions into just 1.
153155
// We need to account for it and help it by also coalescing all tasks into one, therefore
154156
// a NetworkCoalesceExec is introduced.
155-
if let (Some(node), Some(input_tasks)) = (
157+
if let (Some(node), Some(tasks)) = (
156158
plan.as_any().downcast_ref::<SortPreservingMergeExec>(),
157-
cfg.network_coalesce_tasks,
159+
cfg.network_coalesce_tasks.clone(),
158160
) {
161+
let input_tasks = tasks.0(&plan);
159162
if input_tasks == 0 {
160163
return Ok(Transformed::no(plan));
161164
}

src/distributed_planner/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ pub(crate) use distributed_config::{
77
set_distributed_network_coalesce_tasks, set_distributed_network_shuffle_tasks,
88
};
99

10-
pub use distributed_config::DistributedConfig;
10+
pub use distributed_config::{DistributedConfig, IntoPlanDependentUsize, PlanDependentUsize};
1111
pub use distributed_physical_optimizer_rule::{
1212
DistributedPhysicalOptimizerRule, apply_network_boundaries, distribute_plan,
1313
};

src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ pub mod test_utils;
1717
pub use channel_resolver_ext::{BoxCloneSyncChannel, ChannelResolver};
1818
pub use distributed_ext::DistributedExt;
1919
pub use distributed_planner::{
20-
DistributedConfig, DistributedPhysicalOptimizerRule, InputStageInfo, NetworkBoundary,
21-
NetworkBoundaryExt, apply_network_boundaries, distribute_plan,
20+
DistributedConfig, DistributedPhysicalOptimizerRule, InputStageInfo, IntoPlanDependentUsize,
21+
NetworkBoundary, NetworkBoundaryExt, PlanDependentUsize, apply_network_boundaries,
22+
distribute_plan,
2223
};
2324
pub use execution_plans::{
2425
DistributedExec, NetworkCoalesceExec, NetworkShuffleExec, PartitionIsolatorExec,

0 commit comments

Comments
 (0)