Skip to content

Commit e469202

Browse files
committed
Rework stage hierarchy for better interoperability with DataFusion ecosystem
1 parent ab113ee commit e469202

26 files changed

+1185
-1355
lines changed

benchmarks/src/tpch/run.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use datafusion_distributed::test_utils::localhost::{
4747
};
4848
use datafusion_distributed::{
4949
DistributedExt, DistributedPhysicalOptimizerRule, DistributedSessionBuilder,
50-
DistributedSessionBuilderContext, StageExec,
50+
DistributedSessionBuilderContext, NetworkBoundaryExt, Stage,
5151
};
5252
use log::info;
5353
use std::fs;
@@ -331,8 +331,8 @@ impl RunOpt {
331331
}
332332
let mut n_tasks = 0;
333333
physical_plan.clone().transform_down(|node| {
334-
if let Some(node) = node.as_any().downcast_ref::<StageExec>() {
335-
n_tasks += node.tasks.len()
334+
if let Some(node) = node.as_network_boundary() {
335+
n_tasks += node.input_stage().map(|v| v.tasks.len()).unwrap_or(0)
336336
}
337337
Ok(Transformed::no(node))
338338
})?;

src/common/execution_plan_ops.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
use datafusion::common::plan_err;
2+
use datafusion::error::DataFusionError;
3+
use datafusion::physical_plan::ExecutionPlan;
4+
use std::sync::Arc;
5+
6+
pub fn one_child(
7+
children: &[Arc<dyn ExecutionPlan>],
8+
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
9+
if children.len() != 1 {
10+
return plan_err!("Expected exactly 1 children, got {}", children.len());
11+
}
12+
Ok(children[0].clone())
13+
}

src/common/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
mod callback_stream;
2+
mod execution_plan_ops;
23
mod partitioning;
34
#[allow(unused)]
45
pub mod ttl_map;
56

67
pub(crate) use callback_stream::with_callback;
8+
pub(crate) use execution_plan_ops::*;
79
pub(crate) use partitioning::{scale_partitioning, scale_partitioning_props};

src/distributed_physical_optimizer_rule.rs

Lines changed: 98 additions & 79 deletions
Large diffs are not rendered by default.

src/execution_plans/distributed.rs

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
use crate::channel_resolver_ext::get_distributed_channel_resolver;
2+
use crate::common::one_child;
3+
use crate::distributed_physical_optimizer_rule::NetworkBoundaryExt;
4+
use crate::protobuf::DistributedCodec;
5+
use crate::{ExecutionTask, Stage};
6+
use datafusion::common::exec_err;
7+
use datafusion::common::tree_node::{Transformed, TreeNode};
8+
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
9+
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
10+
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
11+
use rand::Rng;
12+
use std::any::Any;
13+
use std::fmt::Formatter;
14+
use std::sync::Arc;
15+
use url::Url;
16+
17+
/// [ExecutionPlan] that executes the inner plan in distributed mode.
18+
/// Before executing it, two modifications are lazily performed on the plan:
19+
/// 1. Assigns worker URLs to all the stages. A random set of URLs are sampled from the
20+
/// channel resolver and assigned to each task in each stage.
21+
/// 2. Encodes all the plans in protobuf format so that network boundary nodes can send them
22+
/// over the wire.
23+
#[derive(Debug, Clone)]
24+
pub struct DistributedExec {
25+
pub plan: Arc<dyn ExecutionPlan>,
26+
}
27+
28+
impl DistributedExec {
29+
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
30+
Self { plan }
31+
}
32+
33+
fn prepare_plan(
34+
&self,
35+
urls: &[Url],
36+
codec: &dyn PhysicalExtensionCodec,
37+
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
38+
let prepared = Arc::clone(&self.plan).transform_up(|plan| {
39+
let Some(plan) = plan.as_network_boundary() else {
40+
return Ok(Transformed::no(plan));
41+
};
42+
43+
let mut rng = rand::thread_rng();
44+
let start_idx = rng.gen_range(0..urls.len());
45+
46+
let Some(stage) = plan.input_stage() else {
47+
return exec_err!(
48+
"NetworkBoundary '{}' has not been assigned a stage",
49+
plan.name()
50+
);
51+
};
52+
53+
let ready_stage = Stage {
54+
query_id: stage.query_id,
55+
num: stage.num,
56+
plan: stage.plan.to_encoded(codec)?,
57+
tasks: stage
58+
.tasks
59+
.iter()
60+
.enumerate()
61+
.map(|(i, _)| ExecutionTask {
62+
url: Some(urls[(start_idx + i) % urls.len()].clone()),
63+
})
64+
.collect::<Vec<_>>(),
65+
};
66+
67+
Ok(Transformed::yes(plan.with_input_stage(ready_stage)?))
68+
})?;
69+
Ok(prepared.data)
70+
}
71+
}
72+
73+
impl DisplayAs for DistributedExec {
74+
fn fmt_as(&self, _: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
75+
write!(f, "DistributedExec")
76+
}
77+
}
78+
79+
impl ExecutionPlan for DistributedExec {
80+
fn name(&self) -> &str {
81+
"DistributedExec"
82+
}
83+
84+
fn as_any(&self) -> &dyn Any {
85+
self
86+
}
87+
88+
fn properties(&self) -> &PlanProperties {
89+
self.plan.properties()
90+
}
91+
92+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
93+
vec![&self.plan]
94+
}
95+
96+
fn with_new_children(
97+
self: Arc<Self>,
98+
children: Vec<Arc<dyn ExecutionPlan>>,
99+
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
100+
Ok(Arc::new(DistributedExec {
101+
plan: one_child(&children)?,
102+
}))
103+
}
104+
105+
fn execute(
106+
&self,
107+
partition: usize,
108+
context: Arc<TaskContext>,
109+
) -> datafusion::common::Result<SendableRecordBatchStream> {
110+
if partition > 0 {
111+
// The DistributedExec node calls try_assign_urls() lazily upon calling .execute(). This means
112+
// that .execute() must only be called once, as we cannot afford to perform several
113+
// random URL assignation while calling multiple partitions, as they will differ,
114+
// producing an invalid plan
115+
return exec_err!(
116+
"DistributedExec must only have 1 partition, but it was called with partition index {partition}"
117+
);
118+
}
119+
120+
let channel_resolver = get_distributed_channel_resolver(context.session_config())?;
121+
let codec = DistributedCodec::new_combined_with_user(context.session_config());
122+
123+
let plan = self.prepare_plan(&channel_resolver.get_urls()?, &codec)?;
124+
plan.execute(partition, context)
125+
}
126+
}

src/execution_plans/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
1+
mod distributed;
12
mod metrics;
23
mod network_coalesce;
34
mod network_shuffle;
45
mod partition_isolator;
5-
mod stage;
66

7+
pub use distributed::DistributedExec;
78
pub use metrics::MetricsWrapperExec;
89
pub use network_coalesce::{NetworkCoalesceExec, NetworkCoalesceReady};
910
pub use network_shuffle::{NetworkShuffleExec, NetworkShuffleReadyExec};
1011
pub use partition_isolator::PartitionIsolatorExec;
11-
pub(crate) use stage::InputStage;
12-
pub use stage::display_plan_graphviz;
13-
pub use stage::{DistributedTaskContext, ExecutionTask, StageExec};

0 commit comments

Comments
 (0)