Skip to content

Commit a638e6d

Browse files
authored
Rework execution plan hierarchy for better interoperability (#178)
* Add task_count to context * Rework stage hierarchy for better interoperability with DataFusion ecosystem * Scope common execution_plan helpers to src/execution_plans/common.rs * Add clarifying comment
1 parent 5db1101 commit a638e6d

26 files changed

+1193
-1366
lines changed

benchmarks/src/tpch/run.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use datafusion_distributed::test_utils::localhost::{
4747
};
4848
use datafusion_distributed::{
4949
DistributedExt, DistributedPhysicalOptimizerRule, DistributedSessionBuilder,
50-
DistributedSessionBuilderContext, StageExec,
50+
DistributedSessionBuilderContext, NetworkBoundaryExt, Stage,
5151
};
5252
use log::info;
5353
use std::fs;
@@ -331,8 +331,8 @@ impl RunOpt {
331331
}
332332
let mut n_tasks = 0;
333333
physical_plan.clone().transform_down(|node| {
334-
if let Some(node) = node.as_any().downcast_ref::<StageExec>() {
335-
n_tasks += node.tasks.len()
334+
if let Some(node) = node.as_network_boundary() {
335+
n_tasks += node.input_stage().map(|v| v.tasks.len()).unwrap_or(0)
336336
}
337337
Ok(Transformed::no(node))
338338
})?;

src/common/mod.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
11
mod map_last_stream;
2-
mod partitioning;
3-
#[allow(unused)]
42
pub mod ttl_map;
53

64
pub(crate) use map_last_stream::map_last_stream;
7-
pub(crate) use partitioning::{scale_partitioning, scale_partitioning_props};

src/distributed_physical_optimizer_rule.rs

Lines changed: 98 additions & 79 deletions
Large diffs are not rendered by default.

src/common/partitioning.rs renamed to src/execution_plans/common.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,18 @@
1+
use datafusion::common::{DataFusionError, plan_err};
12
use datafusion::physical_expr::Partitioning;
2-
use datafusion::physical_plan::PlanProperties;
3+
use datafusion::physical_plan::{ExecutionPlan, PlanProperties};
4+
use std::sync::Arc;
35

4-
pub fn scale_partitioning_props(
6+
pub(super) fn require_one_child(
7+
children: &[Arc<dyn ExecutionPlan>],
8+
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
9+
if children.len() != 1 {
10+
return plan_err!("Expected exactly 1 children, got {}", children.len());
11+
}
12+
Ok(children[0].clone())
13+
}
14+
15+
pub(super) fn scale_partitioning_props(
516
props: &PlanProperties,
617
f: impl FnOnce(usize) -> usize,
718
) -> PlanProperties {
@@ -13,7 +24,7 @@ pub fn scale_partitioning_props(
1324
)
1425
}
1526

16-
pub fn scale_partitioning(
27+
pub(super) fn scale_partitioning(
1728
partitioning: &Partitioning,
1829
f: impl FnOnce(usize) -> usize,
1930
) -> Partitioning {

src/execution_plans/distributed.rs

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
use crate::channel_resolver_ext::get_distributed_channel_resolver;
2+
use crate::distributed_physical_optimizer_rule::NetworkBoundaryExt;
3+
use crate::execution_plans::common::require_one_child;
4+
use crate::protobuf::DistributedCodec;
5+
use crate::{ExecutionTask, Stage};
6+
use datafusion::common::exec_err;
7+
use datafusion::common::tree_node::{Transformed, TreeNode};
8+
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
9+
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
10+
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
11+
use rand::Rng;
12+
use std::any::Any;
13+
use std::fmt::Formatter;
14+
use std::sync::Arc;
15+
use url::Url;
16+
17+
/// [ExecutionPlan] that executes the inner plan in distributed mode.
18+
/// Before executing it, two modifications are lazily performed on the plan:
19+
/// 1. Assigns worker URLs to all the stages. A random set of URLs are sampled from the
20+
/// channel resolver and assigned to each task in each stage.
21+
/// 2. Encodes all the plans in protobuf format so that network boundary nodes can send them
22+
/// over the wire.
23+
#[derive(Debug, Clone)]
24+
pub struct DistributedExec {
25+
pub plan: Arc<dyn ExecutionPlan>,
26+
}
27+
28+
impl DistributedExec {
29+
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
30+
Self { plan }
31+
}
32+
33+
fn prepare_plan(
34+
&self,
35+
urls: &[Url],
36+
codec: &dyn PhysicalExtensionCodec,
37+
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
38+
let prepared = Arc::clone(&self.plan).transform_up(|plan| {
39+
let Some(plan) = plan.as_network_boundary() else {
40+
return Ok(Transformed::no(plan));
41+
};
42+
43+
let mut rng = rand::thread_rng();
44+
let start_idx = rng.gen_range(0..urls.len());
45+
46+
let Some(stage) = plan.input_stage() else {
47+
return exec_err!(
48+
"NetworkBoundary '{}' has not been assigned a stage",
49+
plan.name()
50+
);
51+
};
52+
53+
let ready_stage = Stage {
54+
query_id: stage.query_id,
55+
num: stage.num,
56+
plan: stage.plan.to_encoded(codec)?,
57+
tasks: stage
58+
.tasks
59+
.iter()
60+
.enumerate()
61+
.map(|(i, _)| ExecutionTask {
62+
url: Some(urls[(start_idx + i) % urls.len()].clone()),
63+
})
64+
.collect::<Vec<_>>(),
65+
};
66+
67+
Ok(Transformed::yes(plan.with_input_stage(ready_stage)?))
68+
})?;
69+
Ok(prepared.data)
70+
}
71+
}
72+
73+
impl DisplayAs for DistributedExec {
74+
fn fmt_as(&self, _: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
75+
write!(f, "DistributedExec")
76+
}
77+
}
78+
79+
impl ExecutionPlan for DistributedExec {
80+
fn name(&self) -> &str {
81+
"DistributedExec"
82+
}
83+
84+
fn as_any(&self) -> &dyn Any {
85+
self
86+
}
87+
88+
fn properties(&self) -> &PlanProperties {
89+
self.plan.properties()
90+
}
91+
92+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
93+
vec![&self.plan]
94+
}
95+
96+
fn with_new_children(
97+
self: Arc<Self>,
98+
children: Vec<Arc<dyn ExecutionPlan>>,
99+
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
100+
Ok(Arc::new(DistributedExec {
101+
plan: require_one_child(&children)?,
102+
}))
103+
}
104+
105+
fn execute(
106+
&self,
107+
partition: usize,
108+
context: Arc<TaskContext>,
109+
) -> datafusion::common::Result<SendableRecordBatchStream> {
110+
if partition > 0 {
111+
// The DistributedExec node calls try_assign_urls() lazily upon calling .execute(). This means
112+
// that .execute() must only be called once, as we cannot afford to perform several
113+
// random URL assignation while calling multiple partitions, as they will differ,
114+
// producing an invalid plan
115+
return exec_err!(
116+
"DistributedExec must only have 1 partition, but it was called with partition index {partition}"
117+
);
118+
}
119+
120+
let channel_resolver = get_distributed_channel_resolver(context.session_config())?;
121+
let codec = DistributedCodec::new_combined_with_user(context.session_config());
122+
123+
let plan = self.prepare_plan(&channel_resolver.get_urls()?, &codec)?;
124+
plan.execute(partition, context)
125+
}
126+
}

src/execution_plans/mod.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
1+
mod common;
2+
mod distributed;
13
mod metrics;
24
mod network_coalesce;
35
mod network_shuffle;
46
mod partition_isolator;
5-
mod stage;
67

8+
pub use distributed::DistributedExec;
79
pub use metrics::MetricsWrapperExec;
810
pub use network_coalesce::{NetworkCoalesceExec, NetworkCoalesceReady};
911
pub use network_shuffle::{NetworkShuffleExec, NetworkShuffleReadyExec};
1012
pub use partition_isolator::PartitionIsolatorExec;
11-
pub(crate) use stage::InputStage;
12-
pub use stage::display_plan_graphviz;
13-
pub use stage::{DistributedTaskContext, ExecutionTask, StageExec};

0 commit comments

Comments
 (0)