Skip to content

Commit 7636da1

Browse files
authored
Rework distributed planning logic (#259)
* Rework distributed planner * Ignore some failing tests that should not be failing * Improve default task estimator repartition * Update test snapshots * Factor out plan_annotator.rs * Add docs * Respond to PR feedback * Make TaskCountAnnotation public and part of the TaskEstimator API * Add plan annotator tests (#263) * Fix typo * Add docs
1 parent 625044c commit 7636da1

25 files changed

+2209
-2410
lines changed

benchmarks/src/tpch/run.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ impl RunOpt {
324324
let mut n_tasks = 0;
325325
physical_plan.clone().transform_down(|node| {
326326
if let Some(node) = node.as_network_boundary() {
327-
n_tasks += node.input_stage().map(|v| v.tasks.len()).unwrap_or(0)
327+
n_tasks += node.input_stage().tasks.len()
328328
}
329329
Ok(Transformed::no(node))
330330
})?;

src/common/children_helpers.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
use datafusion::common::{DataFusionError, plan_err};
2+
use datafusion::physical_plan::ExecutionPlan;
3+
use std::borrow::Borrow;
4+
use std::sync::Arc;
5+
6+
pub(crate) fn require_one_child<L, T>(
7+
children: L,
8+
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError>
9+
where
10+
L: AsRef<[T]>,
11+
T: Borrow<Arc<dyn ExecutionPlan>>,
12+
{
13+
let children = children.as_ref();
14+
if children.len() != 1 {
15+
return plan_err!("Expected exactly 1 children, got {}", children.len());
16+
}
17+
Ok(children[0].borrow().clone())
18+
}

src/common/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
mod children_helpers;
12
mod map_last_stream;
23
pub mod ttl_map;
34

5+
pub(crate) use children_helpers::require_one_child;
46
pub(crate) use map_last_stream::map_last_stream;

src/distributed_planner/distributed_physical_optimizer_rule.rs

Lines changed: 141 additions & 450 deletions
Large diffs are not rendered by default.

src/distributed_planner/distributed_plan_error.rs

Lines changed: 0 additions & 46 deletions
This file was deleted.

src/distributed_planner/mod.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
11
mod distributed_config;
22
mod distributed_physical_optimizer_rule;
3-
mod distributed_plan_error;
43
mod network_boundary;
4+
mod plan_annotator;
55
mod task_estimator;
66

77
pub use distributed_config::DistributedConfig;
8-
pub use distributed_physical_optimizer_rule::{
9-
DistributedPhysicalOptimizerRule, apply_network_boundaries, distribute_plan,
10-
};
11-
pub use distributed_plan_error::{DistributedPlanError, limit_tasks_err, non_distributable_err};
12-
pub use network_boundary::{InputStageInfo, NetworkBoundary, NetworkBoundaryExt};
8+
pub use distributed_physical_optimizer_rule::DistributedPhysicalOptimizerRule;
9+
pub use network_boundary::{NetworkBoundary, NetworkBoundaryExt};
1310
pub(crate) use task_estimator::set_distributed_task_estimator;
14-
pub use task_estimator::{TaskEstimation, TaskEstimator};
11+
pub use task_estimator::{TaskCountAnnotation, TaskEstimation, TaskEstimator};

src/distributed_planner/network_boundary.rs

Lines changed: 1 addition & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,11 @@
11
use crate::{NetworkCoalesceExec, NetworkShuffleExec, Stage};
2-
use datafusion::common::plan_err;
32
use datafusion::physical_plan::ExecutionPlan;
43
use std::sync::Arc;
54

6-
/// Necessary information for building a [Stage] during distributed planning.
7-
///
8-
/// [NetworkBoundary]s return this piece of data so that the distributed planner know how to
9-
/// build the next [Stage] from which the [NetworkBoundary] is going to receive data.
10-
///
11-
/// Some network boundaries might perform some modifications in their children, like scaling
12-
/// up the number of partitions, or injecting a specific [ExecutionPlan] on top.
13-
pub struct InputStageInfo {
14-
/// The head plan of the [Stage] that is about to be built.
15-
pub plan: Arc<dyn ExecutionPlan>,
16-
/// The amount of tasks the [Stage] will have.
17-
pub task_count: usize,
18-
}
19-
205
/// This trait represents a node that introduces the necessity of a network boundary in the plan.
216
/// The distributed planner, upon stepping into one of these, will break the plan and build a stage
227
/// out of it.
238
pub trait NetworkBoundary: ExecutionPlan {
24-
/// Returns the information necessary for building the next stage from which this
25-
/// [NetworkBoundary] is going to collect data.
26-
fn get_input_stage_info(&self, task_count: usize)
27-
-> datafusion::common::Result<InputStageInfo>;
28-
29-
/// re-assigns a different number of input tasks to the current [NetworkBoundary].
30-
///
31-
/// This will be called if upon building a stage, a [crate::distributed_planner::distributed_physical_optimizer_rule::DistributedPlanError::LimitTasks] error
32-
/// is returned, prompting the [NetworkBoundary] to choose a different number of input tasks.
33-
fn with_input_task_count(
34-
&self,
35-
input_tasks: usize,
36-
) -> datafusion::common::Result<Arc<dyn NetworkBoundary>>;
37-
38-
/// Returns the input tasks assigned to this [NetworkBoundary].
39-
fn input_task_count(&self) -> usize;
40-
419
/// Called when a [Stage] is correctly formed. The [NetworkBoundary] can use this
4210
/// information to perform any internal transformations necessary for distributed execution.
4311
///
@@ -48,22 +16,7 @@ pub trait NetworkBoundary: ExecutionPlan {
4816
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>>;
4917

5018
/// Returns the assigned input [Stage], if any.
51-
fn input_stage(&self) -> Option<&Stage>;
52-
53-
/// The planner might decide to remove this [NetworkBoundary] from the plan if it decides that
54-
/// it's not going to bring any benefit. The [NetworkBoundary] will be replaced with whatever
55-
/// this function returns.
56-
fn rollback(&self) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
57-
let children = self.children();
58-
if children.len() != 1 {
59-
return plan_err!(
60-
"Expected distributed node {} to have exactly 1 children, but got {}",
61-
self.name(),
62-
children.len()
63-
);
64-
}
65-
Ok(Arc::clone(children.first().unwrap()))
66-
}
19+
fn input_stage(&self) -> &Stage;
6720
}
6821

6922
/// Extension trait for downcasting dynamic types to [NetworkBoundary].

0 commit comments

Comments
 (0)