Skip to content

Commit 01b0866

Browse files
authored
Rollback plan dependent usize (#201)
1 parent df78006 commit 01b0866

File tree

5 files changed

+42
-111
lines changed

5 files changed

+42
-111
lines changed

src/distributed_ext.rs

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::ChannelResolver;
12
use crate::channel_resolver_ext::set_distributed_channel_resolver;
23
use crate::config_extension_ext::{
34
set_distributed_option_extension, set_distributed_option_extension_from_headers,
@@ -6,7 +7,6 @@ use crate::distributed_planner::{
67
set_distributed_network_coalesce_tasks, set_distributed_network_shuffle_tasks,
78
};
89
use crate::protobuf::{set_distributed_user_codec, set_distributed_user_codec_arc};
9-
use crate::{ChannelResolver, IntoPlanDependentUsize};
1010
use datafusion::common::DataFusionError;
1111
use datafusion::config::ConfigExtension;
1212
use datafusion::execution::{SessionState, SessionStateBuilder};
@@ -233,10 +233,10 @@ pub trait DistributedExt: Sized {
233233
/// ( task 1 ) ( task 2 ) ( task 3 ) N tasks
234234
/// ```
235235
/// This parameter defines N
236-
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(self, tasks: T) -> Self;
236+
fn with_distributed_network_coalesce_tasks(self, tasks: usize) -> Self;
237237

238238
/// Same as [DistributedExt::with_distributed_network_coalesce_tasks] but with an in-place mutation.
239-
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
239+
fn set_distributed_network_coalesce_tasks(&mut self, tasks: usize);
240240

241241
/// Upon shuffling data, this defines how many tasks are employed into performing the shuffling.
242242
/// ```text
@@ -246,10 +246,10 @@ pub trait DistributedExt: Sized {
246246
/// ( task 1 ) ( task 2 ) N tasks
247247
/// ```
248248
/// This parameter defines N
249-
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(self, tasks: T) -> Self;
249+
fn with_distributed_network_shuffle_tasks(self, tasks: usize) -> Self;
250250

251251
/// Same as [DistributedExt::with_distributed_network_shuffle_tasks] but with an in-place mutation.
252-
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
252+
fn set_distributed_network_shuffle_tasks(&mut self, tasks: usize);
253253
}
254254

255255
impl DistributedExt for SessionConfig {
@@ -282,11 +282,11 @@ impl DistributedExt for SessionConfig {
282282
set_distributed_channel_resolver(self, resolver)
283283
}
284284

285-
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T) {
285+
fn set_distributed_network_coalesce_tasks(&mut self, tasks: usize) {
286286
set_distributed_network_coalesce_tasks(self, tasks)
287287
}
288288

289-
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T) {
289+
fn set_distributed_network_shuffle_tasks(&mut self, tasks: usize) {
290290
set_distributed_network_shuffle_tasks(self, tasks)
291291
}
292292

@@ -314,11 +314,11 @@ impl DistributedExt for SessionConfig {
314314

315315
#[call(set_distributed_network_coalesce_tasks)]
316316
#[expr($;self)]
317-
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
317+
fn with_distributed_network_coalesce_tasks(mut self, tasks: usize) -> Self;
318318

319319
#[call(set_distributed_network_shuffle_tasks)]
320320
#[expr($;self)]
321-
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
321+
fn with_distributed_network_shuffle_tasks(mut self, tasks: usize) -> Self;
322322
}
323323
}
324324
}
@@ -351,15 +351,15 @@ impl DistributedExt for SessionStateBuilder {
351351
#[expr($;self)]
352352
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
353353

354-
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
354+
fn set_distributed_network_coalesce_tasks(&mut self, tasks: usize);
355355
#[call(set_distributed_network_coalesce_tasks)]
356356
#[expr($;self)]
357-
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
357+
fn with_distributed_network_coalesce_tasks(mut self, tasks: usize) -> Self;
358358

359-
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
359+
fn set_distributed_network_shuffle_tasks(&mut self, tasks: usize);
360360
#[call(set_distributed_network_shuffle_tasks)]
361361
#[expr($;self)]
362-
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
362+
fn with_distributed_network_shuffle_tasks(mut self, tasks: usize) -> Self;
363363
}
364364
}
365365
}
@@ -392,15 +392,15 @@ impl DistributedExt for SessionState {
392392
#[expr($;self)]
393393
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
394394

395-
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
395+
fn set_distributed_network_coalesce_tasks(&mut self, tasks: usize);
396396
#[call(set_distributed_network_coalesce_tasks)]
397397
#[expr($;self)]
398-
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
398+
fn with_distributed_network_coalesce_tasks(mut self, tasks: usize) -> Self;
399399

400-
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
400+
fn set_distributed_network_shuffle_tasks(&mut self, tasks: usize);
401401
#[call(set_distributed_network_shuffle_tasks)]
402402
#[expr($;self)]
403-
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(mut self, tasks: T) -> Self;
403+
fn with_distributed_network_shuffle_tasks(mut self, tasks: usize) -> Self;
404404
}
405405
}
406406
}
@@ -433,15 +433,15 @@ impl DistributedExt for SessionContext {
433433
#[expr($;self)]
434434
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(self, resolver: T) -> Self;
435435

436-
fn set_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
436+
fn set_distributed_network_coalesce_tasks(&mut self, tasks: usize);
437437
#[call(set_distributed_network_coalesce_tasks)]
438438
#[expr($;self)]
439-
fn with_distributed_network_coalesce_tasks<T: IntoPlanDependentUsize>(self, tasks: T) -> Self;
439+
fn with_distributed_network_coalesce_tasks(self, tasks: usize) -> Self;
440440

441-
fn set_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(&mut self, tasks: T);
441+
fn set_distributed_network_shuffle_tasks(&mut self, tasks: usize);
442442
#[call(set_distributed_network_shuffle_tasks)]
443443
#[expr($;self)]
444-
fn with_distributed_network_shuffle_tasks<T: IntoPlanDependentUsize>(self, tasks: T) -> Self;
444+
fn with_distributed_network_shuffle_tasks(self, tasks: usize) -> Self;
445445
}
446446
}
447447
}
Lines changed: 12 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,66 +1,7 @@
11
use datafusion::common::extensions_options;
2-
use datafusion::config::{ConfigExtension, ConfigField, Visit, default_config_transform};
3-
use datafusion::physical_plan::ExecutionPlan;
2+
use datafusion::config::ConfigExtension;
43
use datafusion::prelude::SessionConfig;
5-
use std::fmt::{Debug, Display, Formatter};
6-
use std::sync::Arc;
7-
8-
#[derive(Clone)]
9-
#[allow(clippy::type_complexity)]
10-
pub struct PlanDependentUsize(
11-
pub(crate) Arc<dyn Fn(&Arc<dyn ExecutionPlan>) -> usize + Send + Sync>,
12-
);
13-
14-
impl PlanDependentUsize {
15-
pub fn call(&self, plan: &Arc<dyn ExecutionPlan>) -> usize {
16-
self.0(plan)
17-
}
18-
}
19-
20-
pub trait IntoPlanDependentUsize {
21-
fn into_plan_dependent_usize(self) -> PlanDependentUsize;
22-
}
23-
24-
impl IntoPlanDependentUsize for usize {
25-
fn into_plan_dependent_usize(self) -> PlanDependentUsize {
26-
PlanDependentUsize(Arc::new(move |_| self))
27-
}
28-
}
29-
30-
impl<T: Fn(&Arc<dyn ExecutionPlan>) -> usize + Send + Sync + 'static> IntoPlanDependentUsize for T {
31-
fn into_plan_dependent_usize(self) -> PlanDependentUsize {
32-
PlanDependentUsize(Arc::new(self))
33-
}
34-
}
35-
36-
impl Default for PlanDependentUsize {
37-
fn default() -> Self {
38-
PlanDependentUsize(Arc::new(|_| 0))
39-
}
40-
}
41-
42-
impl Debug for PlanDependentUsize {
43-
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44-
write!(f, "PlanDependantUsize")
45-
}
46-
}
47-
48-
impl Display for PlanDependentUsize {
49-
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
50-
write!(f, "PlanDependantUsize")
51-
}
52-
}
53-
54-
impl ConfigField for PlanDependentUsize {
55-
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
56-
v.some(key, self, description);
57-
}
58-
59-
fn set(&mut self, _: &str, value: &str) -> datafusion::common::Result<()> {
60-
*self = default_config_transform::<usize>(value)?.into_plan_dependent_usize();
61-
Ok(())
62-
}
63-
}
4+
use std::fmt::Debug;
645

656
extensions_options! {
667
pub struct DistributedConfig {
@@ -72,7 +13,7 @@ extensions_options! {
7213
/// ( task 1 ) ( task 2 ) N tasks
7314
/// ```
7415
/// This parameter defines N
75-
pub network_shuffle_tasks: Option<PlanDependentUsize>, default = None
16+
pub network_shuffle_tasks: Option<usize>, default = None
7617
/// Upon merging multiple tasks into one, this defines how many tasks are merged.
7718
/// ```text
7819
/// ( task 1 )
@@ -81,7 +22,7 @@ extensions_options! {
8122
/// ( task 1 ) ( task 2 ) ( task 3 ) N tasks
8223
/// ```
8324
/// This parameter defines N
84-
pub network_coalesce_tasks: Option<PlanDependentUsize>, default = None
25+
pub network_coalesce_tasks: Option<usize>, default = None
8526
}
8627
}
8728

@@ -91,36 +32,30 @@ impl ConfigExtension for DistributedConfig {
9132

9233
impl DistributedConfig {
9334
/// Sets the amount of tasks used in a network shuffle operation.
94-
pub fn with_network_shuffle_tasks(mut self, tasks: impl IntoPlanDependentUsize) -> Self {
95-
self.network_shuffle_tasks = Some(tasks.into_plan_dependent_usize());
35+
pub fn with_network_shuffle_tasks(mut self, tasks: usize) -> Self {
36+
self.network_shuffle_tasks = Some(tasks);
9637
self
9738
}
9839

9940
/// Sets the amount of tasks used in a network coalesce operation.
100-
pub fn with_network_coalesce_tasks(mut self, tasks: impl IntoPlanDependentUsize) -> Self {
101-
self.network_coalesce_tasks = Some(tasks.into_plan_dependent_usize());
41+
pub fn with_network_coalesce_tasks(mut self, tasks: usize) -> Self {
42+
self.network_coalesce_tasks = Some(tasks);
10243
self
10344
}
10445
}
10546

106-
pub(crate) fn set_distributed_network_coalesce_tasks(
107-
cfg: &mut SessionConfig,
108-
tasks: impl IntoPlanDependentUsize,
109-
) {
47+
pub(crate) fn set_distributed_network_coalesce_tasks(cfg: &mut SessionConfig, tasks: usize) {
11048
let ext = &mut cfg.options_mut().extensions;
11149
let Some(prev) = ext.get_mut::<DistributedConfig>() else {
11250
return ext.insert(DistributedConfig::default().with_network_coalesce_tasks(tasks));
11351
};
114-
prev.network_coalesce_tasks = Some(tasks.into_plan_dependent_usize());
52+
prev.network_coalesce_tasks = Some(tasks);
11553
}
11654

117-
pub(crate) fn set_distributed_network_shuffle_tasks(
118-
cfg: &mut SessionConfig,
119-
tasks: impl IntoPlanDependentUsize,
120-
) {
55+
pub(crate) fn set_distributed_network_shuffle_tasks(cfg: &mut SessionConfig, tasks: usize) {
12156
let ext = &mut cfg.options_mut().extensions;
12257
let Some(prev) = ext.get_mut::<DistributedConfig>() else {
12358
return ext.insert(DistributedConfig::default().with_network_shuffle_tasks(tasks));
12459
};
125-
prev.network_shuffle_tasks = Some(tasks.into_plan_dependent_usize());
60+
prev.network_shuffle_tasks = Some(tasks);
12661
}

src/distributed_planner/distributed_physical_optimizer_rule.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -110,14 +110,13 @@ pub fn apply_network_boundaries(
110110
}
111111

112112
// If this is a hash RepartitionExec, introduce a shuffle.
113-
if let (Some(node), Some(tasks)) = (
113+
if let (Some(node), Some(input_tasks)) = (
114114
plan.as_any().downcast_ref::<RepartitionExec>(),
115-
cfg.network_shuffle_tasks.clone(),
115+
cfg.network_shuffle_tasks,
116116
) {
117117
if !matches!(node.partitioning(), Partitioning::Hash(_, _)) {
118118
return Ok(Transformed::no(plan));
119119
}
120-
let input_tasks = tasks.0(&plan);
121120
if input_tasks == 0 {
122121
return Ok(Transformed::no(plan));
123122
}
@@ -129,9 +128,9 @@ pub fn apply_network_boundaries(
129128
// If this is a CoalescePartitionsExec, it means that the original plan is trying to
130129
// merge all partitions into one. We need to go one step ahead and also merge all tasks
131130
// into one.
132-
if let (Some(node), Some(tasks)) = (
131+
if let (Some(node), Some(input_tasks)) = (
133132
plan.as_any().downcast_ref::<CoalescePartitionsExec>(),
134-
cfg.network_coalesce_tasks.clone(),
133+
cfg.network_coalesce_tasks,
135134
) {
136135
// If the immediate child is a PartitionIsolatorExec, it means that the rest of the
137136
// plan is just a couple of non-computational nodes that are probably not worth
@@ -140,7 +139,6 @@ pub fn apply_network_boundaries(
140139
return Ok(Transformed::no(plan));
141140
}
142141

143-
let input_tasks = tasks.0(&plan);
144142
if input_tasks == 0 {
145143
return Ok(Transformed::no(plan));
146144
}
@@ -154,11 +152,10 @@ pub fn apply_network_boundaries(
154152
// The SortPreservingMergeExec node will try to coalesce all partitions into just 1.
155153
// We need to account for it and help it by also coalescing all tasks into one, therefore
156154
// a NetworkCoalesceExec is introduced.
157-
if let (Some(node), Some(tasks)) = (
155+
if let (Some(node), Some(input_tasks)) = (
158156
plan.as_any().downcast_ref::<SortPreservingMergeExec>(),
159-
cfg.network_coalesce_tasks.clone(),
157+
cfg.network_coalesce_tasks,
160158
) {
161-
let input_tasks = tasks.0(&plan);
162159
if input_tasks == 0 {
163160
return Ok(Transformed::no(plan));
164161
}

src/distributed_planner/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ pub(crate) use distributed_config::{
77
set_distributed_network_coalesce_tasks, set_distributed_network_shuffle_tasks,
88
};
99

10-
pub use distributed_config::{DistributedConfig, IntoPlanDependentUsize, PlanDependentUsize};
10+
pub use distributed_config::DistributedConfig;
1111
pub use distributed_physical_optimizer_rule::{
1212
DistributedPhysicalOptimizerRule, apply_network_boundaries, distribute_plan,
1313
};

src/lib.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@ pub mod test_utils;
1717
pub use channel_resolver_ext::{BoxCloneSyncChannel, ChannelResolver};
1818
pub use distributed_ext::DistributedExt;
1919
pub use distributed_planner::{
20-
DistributedConfig, DistributedPhysicalOptimizerRule, InputStageInfo, IntoPlanDependentUsize,
21-
NetworkBoundary, NetworkBoundaryExt, PlanDependentUsize, apply_network_boundaries,
22-
distribute_plan,
20+
DistributedConfig, DistributedPhysicalOptimizerRule, InputStageInfo, NetworkBoundary,
21+
NetworkBoundaryExt, apply_network_boundaries, distribute_plan,
2322
};
2423
pub use execution_plans::{
2524
DistributedExec, NetworkCoalesceExec, NetworkShuffleExec, PartitionIsolatorExec,

0 commit comments

Comments
 (0)