Skip to content

Commit 10dc81a

Browse files
committed
check if plan can be divided into tasks
1 parent ff9167d commit 10dc81a

File tree

4 files changed

+20
-17
lines changed

4 files changed

+20
-17
lines changed

src/common/ttl_map.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use dashmap::{DashMap, Entry};
2727
use datafusion::error::DataFusionError;
2828
use std::collections::HashSet;
2929
use std::hash::Hash;
30-
use std::mem;
3130
use std::sync::atomic::AtomicU64;
3231
#[cfg(test)]
3332
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
@@ -94,7 +93,7 @@ where
9493
shard.insert(key);
9594
}
9695
BucketOp::Clear => {
97-
let keys_to_delete = mem::replace(&mut shard, HashSet::new());
96+
let keys_to_delete = std::mem::replace(&mut shard, HashSet::new());
9897
for key in keys_to_delete {
9998
data.remove(&key);
10099
}

src/common/util.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
22
use datafusion::error::Result;
3-
use datafusion::physical_plan::joins::PartitionMode;
43
use datafusion::physical_plan::{displayable, ExecutionPlan, ExecutionPlanProperties};
54

65
use std::fmt::Write;

src/physical_optimizer.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::sync::Arc;
22

33
use super::stage::ExecutionStage;
4+
use crate::common::util::can_be_divided;
45
use crate::{plan::PartitionIsolatorExec, ArrowFlightReadExec};
56
use datafusion::common::tree_node::TreeNodeRecursion;
67
use datafusion::error::DataFusionError;
@@ -83,12 +84,14 @@ impl DistributedPhysicalOptimizerRule {
8384
internal_datafusion_err!("Expected RepartitionExec to have a child"),
8485
)?);
8586

86-
let maybe_isolated_plan = if let Some(ppt) = self.partitions_per_task {
87-
let isolated = Arc::new(PartitionIsolatorExec::new(child, ppt));
88-
plan.with_new_children(vec![isolated])?
89-
} else {
90-
plan
91-
};
87+
let maybe_isolated_plan =
88+
if can_be_divided(&plan)? && self.partitions_per_task.is_some() {
89+
let ppt = self.partitions_per_task.unwrap();
90+
let isolated = Arc::new(PartitionIsolatorExec::new(child, ppt));
91+
plan.with_new_children(vec![isolated])?
92+
} else {
93+
plan
94+
};
9295

9396
return Ok(Transformed::yes(Arc::new(
9497
ArrowFlightReadExec::new_pending(
@@ -120,7 +123,7 @@ impl DistributedPhysicalOptimizerRule {
120123
) -> Result<ExecutionStage, DataFusionError> {
121124
let mut inputs = vec![];
122125

123-
let distributed = plan.transform_down(|plan| {
126+
let distributed = plan.clone().transform_down(|plan| {
124127
let Some(node) = plan.as_any().downcast_ref::<ArrowFlightReadExec>() else {
125128
return Ok(Transformed::no(plan));
126129
};
@@ -137,9 +140,13 @@ impl DistributedPhysicalOptimizerRule {
137140
let mut stage = ExecutionStage::new(query_id, *num, distributed.data, inputs);
138141
*num += 1;
139142

140-
if let Some(partitions_per_task) = self.partitions_per_task {
141-
stage = stage.with_maximum_partitions_per_task(partitions_per_task);
142-
}
143+
stage = match (self.partitions_per_task, can_be_divided(&plan)?) {
144+
(Some(partitions_per_task), true) => {
145+
stage.with_maximum_partitions_per_task(partitions_per_task)
146+
}
147+
(_, _) => stage,
148+
};
149+
143150
stage.depth = depth;
144151

145152
Ok(stage)

tests/tpch_validation_test.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,10 @@ mod tests {
66
use async_trait::async_trait;
77
use datafusion::error::DataFusionError;
88
use datafusion::execution::SessionStateBuilder;
9-
use datafusion::physical_plan::displayable;
9+
1010
use datafusion::prelude::{SessionConfig, SessionContext};
1111
use datafusion_distributed::test_utils::localhost::start_localhost_context;
12-
use datafusion_distributed::{
13-
display_stage_graphviz, DistributedPhysicalOptimizerRule, ExecutionStage, SessionBuilder,
14-
};
12+
use datafusion_distributed::{DistributedPhysicalOptimizerRule, SessionBuilder};
1513
use futures::TryStreamExt;
1614
use std::error::Error;
1715
use std::sync::Arc;

0 commit comments

Comments
 (0)