Skip to content

Commit 7d8f606

Browse files
committed
Rework stage hierarchy for better interoperability with DataFusion ecosystem
1 parent ab113ee commit 7d8f606

26 files changed

+1152
-1325
lines changed

benchmarks/src/tpch/run.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use datafusion_distributed::test_utils::localhost::{
4747
};
4848
use datafusion_distributed::{
4949
DistributedExt, DistributedPhysicalOptimizerRule, DistributedSessionBuilder,
50-
DistributedSessionBuilderContext, StageExec,
50+
DistributedSessionBuilderContext, NetworkBoundaryExt, Stage,
5151
};
5252
use log::info;
5353
use std::fs;
@@ -331,8 +331,8 @@ impl RunOpt {
331331
}
332332
let mut n_tasks = 0;
333333
physical_plan.clone().transform_down(|node| {
334-
if let Some(node) = node.as_any().downcast_ref::<StageExec>() {
335-
n_tasks += node.tasks.len()
334+
if let Some(node) = node.as_network_boundary() {
335+
n_tasks += node.input_stage().map(|v| v.tasks.len()).unwrap_or(0)
336336
}
337337
Ok(Transformed::no(node))
338338
})?;

src/common/execution_plan_ops.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
use datafusion::common::plan_err;
2+
use datafusion::error::DataFusionError;
3+
use datafusion::physical_plan::ExecutionPlan;
4+
use std::sync::Arc;
5+
6+
pub fn one_child(
7+
children: &[Arc<dyn ExecutionPlan>],
8+
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
9+
if children.len() != 1 {
10+
return plan_err!("Expected exactly 1 children, got {}", children.len());
11+
}
12+
Ok(children[0].clone())
13+
}

src/common/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
mod callback_stream;
2+
mod execution_plan_ops;
23
mod partitioning;
34
#[allow(unused)]
45
pub mod ttl_map;
56

67
pub(crate) use callback_stream::with_callback;
8+
pub(crate) use execution_plan_ops::*;
79
pub(crate) use partitioning::{scale_partitioning, scale_partitioning_props};

src/distributed_physical_optimizer_rule.rs

Lines changed: 96 additions & 79 deletions
Large diffs are not rendered by default.

src/execution_plans/distributed.rs

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
use crate::channel_resolver_ext::get_distributed_channel_resolver;
2+
use crate::common::one_child;
3+
use crate::distributed_physical_optimizer_rule::NetworkBoundaryExt;
4+
use crate::protobuf::DistributedCodec;
5+
use crate::{ExecutionTask, Stage};
6+
use datafusion::common::exec_err;
7+
use datafusion::common::tree_node::{Transformed, TreeNode};
8+
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
9+
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
10+
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
11+
use rand::Rng;
12+
use std::any::Any;
13+
use std::fmt::Formatter;
14+
use std::sync::Arc;
15+
use url::Url;
16+
17+
#[derive(Debug, Clone)]
18+
pub struct DistributedExec {
19+
pub plan: Arc<dyn ExecutionPlan>,
20+
}
21+
22+
impl DistributedExec {
23+
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
24+
Self { plan }
25+
}
26+
27+
fn prepare_plan(
28+
&self,
29+
urls: &[Url],
30+
codec: &dyn PhysicalExtensionCodec,
31+
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
32+
let prepared = Arc::clone(&self.plan).transform_up(|plan| {
33+
let Some(plan) = plan.as_network_boundary() else {
34+
return Ok(Transformed::no(plan));
35+
};
36+
37+
let mut rng = rand::thread_rng();
38+
let start_idx = rng.gen_range(0..urls.len());
39+
40+
let Some(stage) = plan.input_stage() else {
41+
return exec_err!(
42+
"NetworkBoundary '{}' has not been assigned a stage",
43+
plan.name()
44+
);
45+
};
46+
47+
let ready_stage = Stage {
48+
query_id: stage.query_id,
49+
num: stage.num,
50+
plan: stage.plan.to_encoded(codec)?,
51+
tasks: stage
52+
.tasks
53+
.iter()
54+
.enumerate()
55+
.map(|(i, _)| ExecutionTask {
56+
url: Some(urls[(start_idx + i) % urls.len()].clone()),
57+
})
58+
.collect::<Vec<_>>(),
59+
};
60+
61+
Ok(Transformed::yes(plan.with_input_stage(ready_stage)?))
62+
})?;
63+
Ok(prepared.data)
64+
}
65+
}
66+
67+
impl DisplayAs for DistributedExec {
68+
fn fmt_as(&self, _: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
69+
write!(f, "DistributedExec")
70+
}
71+
}
72+
73+
impl ExecutionPlan for DistributedExec {
74+
fn name(&self) -> &str {
75+
"DistributedExec"
76+
}
77+
78+
fn as_any(&self) -> &dyn Any {
79+
self
80+
}
81+
82+
fn properties(&self) -> &PlanProperties {
83+
self.plan.properties()
84+
}
85+
86+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
87+
vec![&self.plan]
88+
}
89+
90+
fn with_new_children(
91+
self: Arc<Self>,
92+
children: Vec<Arc<dyn ExecutionPlan>>,
93+
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
94+
Ok(Arc::new(DistributedExec {
95+
plan: one_child(&children)?,
96+
}))
97+
}
98+
99+
fn execute(
100+
&self,
101+
partition: usize,
102+
context: Arc<TaskContext>,
103+
) -> datafusion::common::Result<SendableRecordBatchStream> {
104+
if partition > 0 {
105+
// The DistributedExec node calls try_assign_urls() lazily upon calling .execute(). This means
106+
// that .execute() must only be called once, as we cannot afford to perform several
107+
// random URL assignation while calling multiple partitions, as they will differ,
108+
// producing an invalid plan
109+
return exec_err!(
110+
"DistributedExec must only have 1 partition, but it was called with partition index {partition}"
111+
);
112+
}
113+
114+
let channel_resolver = get_distributed_channel_resolver(context.session_config())?;
115+
let codec = DistributedCodec::new_combined_with_user(context.session_config());
116+
117+
let plan = self.prepare_plan(&channel_resolver.get_urls()?, &codec)?;
118+
plan.execute(partition, context)
119+
}
120+
}

src/execution_plans/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
1+
mod distributed;
12
mod metrics;
23
mod network_coalesce;
34
mod network_shuffle;
45
mod partition_isolator;
5-
mod stage;
66

7+
pub use distributed::DistributedExec;
78
pub use metrics::MetricsWrapperExec;
89
pub use network_coalesce::{NetworkCoalesceExec, NetworkCoalesceReady};
910
pub use network_shuffle::{NetworkShuffleExec, NetworkShuffleReadyExec};
1011
pub use partition_isolator::PartitionIsolatorExec;
11-
pub(crate) use stage::InputStage;
12-
pub use stage::display_plan_graphviz;
13-
pub use stage::{DistributedTaskContext, ExecutionTask, StageExec};

0 commit comments

Comments
 (0)