Skip to content

Commit f177452

Browse files
authored
Refactor distributed planner into its own folder (#196)
1 parent 23d640a commit f177452

File tree

13 files changed

+155
-137
lines changed

13 files changed

+155
-137
lines changed

src/distributed_physical_optimizer_rule.rs renamed to src/distributed_planner/distributed_physical_optimizer_rule.rs

Lines changed: 6 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1-
use super::{NetworkShuffleExec, PartitionIsolatorExec};
1+
use crate::distributed_planner::distributed_plan_error::get_distribute_plan_err;
2+
use crate::distributed_planner::{
3+
DistributedPlanError, NetworkBoundaryExt, limit_tasks_err, non_distributable_err,
4+
};
25
use crate::execution_plans::{DistributedExec, NetworkCoalesceExec};
36
use crate::stage::Stage;
7+
use crate::{NetworkShuffleExec, PartitionIsolatorExec};
48
use datafusion::common::plan_err;
59
use datafusion::common::tree_node::TreeNodeRecursion;
610
use datafusion::datasource::source::DataSourceExec;
@@ -18,8 +22,6 @@ use datafusion::{
1822
physical_optimizer::PhysicalOptimizerRule,
1923
physical_plan::{ExecutionPlan, repartition::RepartitionExec},
2024
};
21-
use std::error::Error;
22-
use std::fmt::{Display, Formatter};
2325
use std::sync::Arc;
2426
use uuid::Uuid;
2527

@@ -305,82 +307,6 @@ impl DistributedPhysicalOptimizerRule {
305307
Ok(stage)
306308
}
307309
}
308-
309-
/// Necessary information for building a [Stage] during distributed planning.
310-
///
311-
/// [NetworkBoundary]s return this piece of data so that the distributed planner know how to
312-
/// build the next [Stage] from which the [NetworkBoundary] is going to receive data.
313-
///
314-
/// Some network boundaries might perform some modifications in their children, like scaling
315-
/// up the number of partitions, or injecting a specific [ExecutionPlan] on top.
316-
pub struct InputStageInfo {
317-
/// The head plan of the [Stage] that is about to be built.
318-
pub plan: Arc<dyn ExecutionPlan>,
319-
/// The amount of tasks the [Stage] will have.
320-
pub task_count: usize,
321-
}
322-
323-
/// This trait represents a node that introduces the necessity of a network boundary in the plan.
324-
/// The distributed planner, upon stepping into one of these, will break the plan and build a stage
325-
/// out of it.
326-
pub trait NetworkBoundary: ExecutionPlan {
327-
/// Returns the information necessary for building the next stage from which this
328-
/// [NetworkBoundary] is going to collect data.
329-
fn get_input_stage_info(&self, task_count: usize) -> Result<InputStageInfo>;
330-
331-
/// re-assigns a different number of input tasks to the current [NetworkBoundary].
332-
///
333-
/// This will be called if upon building a stage, a [DistributedPlanError::LimitTasks] error
334-
/// is returned, prompting the [NetworkBoundary] to choose a different number of input tasks.
335-
fn with_input_task_count(&self, input_tasks: usize) -> Result<Arc<dyn NetworkBoundary>>;
336-
337-
/// Called when a [Stage] is correctly formed. The [NetworkBoundary] can use this
338-
/// information to perform any internal transformations necessary for distributed execution.
339-
///
340-
/// Typically, [NetworkBoundary]s will use this call for transitioning from "Pending" to "ready".
341-
fn with_input_stage(&self, input_stage: Stage) -> Result<Arc<dyn ExecutionPlan>>;
342-
343-
/// Returns the assigned input [Stage], if any.
344-
fn input_stage(&self) -> Option<&Stage>;
345-
346-
/// The planner might decide to remove this [NetworkBoundary] from the plan if it decides that
347-
/// it's not going to bring any benefit. The [NetworkBoundary] will be replaced with whatever
348-
/// this function returns.
349-
fn rollback(&self) -> Result<Arc<dyn ExecutionPlan>> {
350-
let children = self.children();
351-
if children.len() != 1 {
352-
return plan_err!(
353-
"Expected distributed node {} to have exactly 1 children, but got {}",
354-
self.name(),
355-
children.len()
356-
);
357-
}
358-
Ok(Arc::clone(children.first().unwrap()))
359-
}
360-
}
361-
362-
/// Extension trait for downcasting dynamic types to [NetworkBoundary].
363-
pub trait NetworkBoundaryExt {
364-
/// Downcasts self to a [NetworkBoundary] if possible.
365-
fn as_network_boundary(&self) -> Option<&dyn NetworkBoundary>;
366-
/// Returns whether self is a [NetworkBoundary] or not.
367-
fn is_network_boundary(&self) -> bool {
368-
self.as_network_boundary().is_some()
369-
}
370-
}
371-
372-
impl NetworkBoundaryExt for dyn ExecutionPlan {
373-
fn as_network_boundary(&self) -> Option<&dyn NetworkBoundary> {
374-
if let Some(node) = self.as_any().downcast_ref::<NetworkShuffleExec>() {
375-
Some(node)
376-
} else if let Some(node) = self.as_any().downcast_ref::<NetworkCoalesceExec>() {
377-
Some(node)
378-
} else {
379-
None
380-
}
381-
}
382-
}
383-
384310
/// Helper enum for storing either borrowed or owned trait object references
385311
enum Referenced<'a, T: ?Sized> {
386312
Borrowed(&'a T),
@@ -396,59 +322,15 @@ impl<T: ?Sized> Referenced<'_, T> {
396322
}
397323
}
398324

399-
/// Error thrown during distributed planning that prompts the planner to change something and
400-
/// try again.
401-
#[derive(Debug)]
402-
enum DistributedPlanError {
403-
/// Prompts the planner to limit the amount of tasks used in the stage that is currently
404-
/// being planned.
405-
LimitTasks(usize),
406-
/// Signals the planner that this whole plan is non-distributable. This can happen if
407-
/// certain nodes are present, like [StreamingTableExec], which are typically used in
408-
/// queries that rather performing some execution, they perform some introspection.
409-
NonDistributable(&'static str),
410-
}
411-
412-
impl Display for DistributedPlanError {
413-
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
414-
match self {
415-
DistributedPlanError::LimitTasks(n) => write!(f, "LimitTasksErr: {n}"),
416-
DistributedPlanError::NonDistributable(name) => write!(f, "NonDistributable: {name}"),
417-
}
418-
}
419-
}
420-
421-
impl Error for DistributedPlanError {}
422-
423-
/// Builds a [DistributedPlanError::LimitTasks] error. This error prompts the distributed planner
424-
/// to try rebuilding the current stage with a limited amount of tasks.
425-
pub fn limit_tasks_err(limit: usize) -> DataFusionError {
426-
DataFusionError::External(Box::new(DistributedPlanError::LimitTasks(limit)))
427-
}
428-
429-
/// Builds a [DistributedPlanError::NonDistributable] error. This error prompts the distributed
430-
/// planner to not distribute the query at all.
431-
pub fn non_distributable_err(name: &'static str) -> DataFusionError {
432-
DataFusionError::External(Box::new(DistributedPlanError::NonDistributable(name)))
433-
}
434-
435-
fn get_distribute_plan_err(err: &DataFusionError) -> Option<&DistributedPlanError> {
436-
let DataFusionError::External(err) = err else {
437-
return None;
438-
};
439-
err.downcast_ref()
440-
}
441-
442325
#[cfg(test)]
443326
mod tests {
444-
use crate::distributed_physical_optimizer_rule::DistributedPhysicalOptimizerRule;
327+
use crate::distributed_planner::distributed_physical_optimizer_rule::DistributedPhysicalOptimizerRule;
445328
use crate::test_utils::parquet::register_parquet_tables;
446329
use crate::{assert_snapshot, display_plan_ascii};
447330
use datafusion::error::DataFusionError;
448331
use datafusion::execution::SessionStateBuilder;
449332
use datafusion::prelude::{SessionConfig, SessionContext};
450333
use std::sync::Arc;
451-
452334
/* shema for the "weather" table
453335
454336
MinTemp [type=DOUBLE] [repetitiontype=OPTIONAL]
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
use datafusion::common::DataFusionError;
2+
use std::error::Error;
3+
use std::fmt::{Display, Formatter};
4+
5+
/// Error thrown during distributed planning that prompts the planner to change something and
6+
/// try again.
7+
#[derive(Debug)]
8+
pub enum DistributedPlanError {
9+
/// Prompts the planner to limit the amount of tasks used in the stage that is currently
10+
/// being planned.
11+
LimitTasks(usize),
12+
/// Signals the planner that this whole plan is non-distributable. This can happen if
13+
/// certain nodes are present, like `StreamingTableExec`, which are typically used in
14+
/// queries that rather performing some execution, they perform some introspection.
15+
NonDistributable(&'static str),
16+
}
17+
18+
impl Display for DistributedPlanError {
19+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
20+
match self {
21+
DistributedPlanError::LimitTasks(n) => write!(f, "LimitTasksErr: {n}"),
22+
DistributedPlanError::NonDistributable(name) => write!(f, "NonDistributable: {name}"),
23+
}
24+
}
25+
}
26+
27+
impl Error for DistributedPlanError {}
28+
29+
/// Builds a [DistributedPlanError::LimitTasks] error. This error prompts the distributed planner
30+
/// to try rebuilding the current stage with a limited amount of tasks.
31+
pub fn limit_tasks_err(limit: usize) -> DataFusionError {
32+
DataFusionError::External(Box::new(DistributedPlanError::LimitTasks(limit)))
33+
}
34+
35+
/// Builds a [DistributedPlanError::NonDistributable] error. This error prompts the distributed
36+
/// planner to not distribute the query at all.
37+
pub fn non_distributable_err(name: &'static str) -> DataFusionError {
38+
DataFusionError::External(Box::new(DistributedPlanError::NonDistributable(name)))
39+
}
40+
41+
pub(crate) fn get_distribute_plan_err(err: &DataFusionError) -> Option<&DistributedPlanError> {
42+
let DataFusionError::External(err) = err else {
43+
return None;
44+
};
45+
err.downcast_ref()
46+
}

src/distributed_planner/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
mod distributed_physical_optimizer_rule;
2+
mod distributed_plan_error;
3+
mod network_boundary;
4+
5+
pub use distributed_physical_optimizer_rule::DistributedPhysicalOptimizerRule;
6+
pub use distributed_plan_error::{DistributedPlanError, limit_tasks_err, non_distributable_err};
7+
pub use network_boundary::{InputStageInfo, NetworkBoundary, NetworkBoundaryExt};
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
use crate::{NetworkCoalesceExec, NetworkShuffleExec, Stage};
2+
use datafusion::common::plan_err;
3+
use datafusion::physical_plan::ExecutionPlan;
4+
use std::sync::Arc;
5+
6+
/// Necessary information for building a [Stage] during distributed planning.
7+
///
8+
/// [NetworkBoundary]s return this piece of data so that the distributed planner know how to
9+
/// build the next [Stage] from which the [NetworkBoundary] is going to receive data.
10+
///
11+
/// Some network boundaries might perform some modifications in their children, like scaling
12+
/// up the number of partitions, or injecting a specific [ExecutionPlan] on top.
13+
pub struct InputStageInfo {
14+
/// The head plan of the [Stage] that is about to be built.
15+
pub plan: Arc<dyn ExecutionPlan>,
16+
/// The amount of tasks the [Stage] will have.
17+
pub task_count: usize,
18+
}
19+
20+
/// This trait represents a node that introduces the necessity of a network boundary in the plan.
21+
/// The distributed planner, upon stepping into one of these, will break the plan and build a stage
22+
/// out of it.
23+
pub trait NetworkBoundary: ExecutionPlan {
24+
/// Returns the information necessary for building the next stage from which this
25+
/// [NetworkBoundary] is going to collect data.
26+
fn get_input_stage_info(&self, task_count: usize)
27+
-> datafusion::common::Result<InputStageInfo>;
28+
29+
/// re-assigns a different number of input tasks to the current [NetworkBoundary].
30+
///
31+
/// This will be called if upon building a stage, a [crate::distributed_planner::distributed_physical_optimizer_rule::DistributedPlanError::LimitTasks] error
32+
/// is returned, prompting the [NetworkBoundary] to choose a different number of input tasks.
33+
fn with_input_task_count(
34+
&self,
35+
input_tasks: usize,
36+
) -> datafusion::common::Result<Arc<dyn NetworkBoundary>>;
37+
38+
/// Called when a [Stage] is correctly formed. The [NetworkBoundary] can use this
39+
/// information to perform any internal transformations necessary for distributed execution.
40+
///
41+
/// Typically, [NetworkBoundary]s will use this call for transitioning from "Pending" to "ready".
42+
fn with_input_stage(
43+
&self,
44+
input_stage: Stage,
45+
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>>;
46+
47+
/// Returns the assigned input [Stage], if any.
48+
fn input_stage(&self) -> Option<&Stage>;
49+
50+
/// The planner might decide to remove this [NetworkBoundary] from the plan if it decides that
51+
/// it's not going to bring any benefit. The [NetworkBoundary] will be replaced with whatever
52+
/// this function returns.
53+
fn rollback(&self) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
54+
let children = self.children();
55+
if children.len() != 1 {
56+
return plan_err!(
57+
"Expected distributed node {} to have exactly 1 children, but got {}",
58+
self.name(),
59+
children.len()
60+
);
61+
}
62+
Ok(Arc::clone(children.first().unwrap()))
63+
}
64+
}
65+
66+
/// Extension trait for downcasting dynamic types to [NetworkBoundary].
67+
pub trait NetworkBoundaryExt {
68+
/// Downcasts self to a [NetworkBoundary] if possible.
69+
fn as_network_boundary(&self) -> Option<&dyn NetworkBoundary>;
70+
/// Returns whether self is a [NetworkBoundary] or not.
71+
fn is_network_boundary(&self) -> bool {
72+
self.as_network_boundary().is_some()
73+
}
74+
}
75+
76+
impl NetworkBoundaryExt for dyn ExecutionPlan {
77+
fn as_network_boundary(&self) -> Option<&dyn NetworkBoundary> {
78+
if let Some(node) = self.as_any().downcast_ref::<NetworkShuffleExec>() {
79+
Some(node)
80+
} else if let Some(node) = self.as_any().downcast_ref::<NetworkCoalesceExec>() {
81+
Some(node)
82+
} else {
83+
None
84+
}
85+
}
86+
}

src/execution_plans/distributed.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::channel_resolver_ext::get_distributed_channel_resolver;
2-
use crate::distributed_physical_optimizer_rule::NetworkBoundaryExt;
2+
use crate::distributed_planner::NetworkBoundaryExt;
33
use crate::execution_plans::common::require_one_child;
44
use crate::protobuf::DistributedCodec;
55
use crate::stage::{ExecutionTask, Stage};

src/execution_plans/network_coalesce.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
use crate::channel_resolver_ext::get_distributed_channel_resolver;
22
use crate::config_extension_ext::ContextGrpcMetadata;
3-
use crate::distributed_physical_optimizer_rule::{
4-
InputStageInfo, NetworkBoundary, limit_tasks_err,
5-
};
3+
use crate::distributed_planner::{InputStageInfo, NetworkBoundary, limit_tasks_err};
64
use crate::execution_plans::common::{require_one_child, scale_partitioning_props};
75
use crate::flight_service::DoGet;
86
use crate::metrics::MetricsCollectingStream;

src/execution_plans/network_shuffle.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
use crate::channel_resolver_ext::get_distributed_channel_resolver;
22
use crate::config_extension_ext::ContextGrpcMetadata;
3-
use crate::distributed_physical_optimizer_rule::{InputStageInfo, NetworkBoundary};
43
use crate::execution_plans::common::{require_one_child, scale_partitioning};
54
use crate::flight_service::DoGet;
65
use crate::metrics::MetricsCollectingStream;
76
use crate::metrics::proto::MetricsSetProto;
87
use crate::protobuf::StageKey;
98
use crate::protobuf::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
109
use crate::stage::{MaybeEncodedPlan, Stage};
11-
use crate::{ChannelResolver, DistributedTaskContext};
10+
use crate::{ChannelResolver, DistributedTaskContext, InputStageInfo, NetworkBoundary};
1211
use arrow_flight::Ticket;
1312
use arrow_flight::decode::FlightRecordBatchStream;
1413
use arrow_flight::error::FlightError;

src/execution_plans/partition_isolator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::DistributedTaskContext;
2-
use crate::distributed_physical_optimizer_rule::limit_tasks_err;
2+
use crate::distributed_planner::limit_tasks_err;
33
use datafusion::common::{exec_err, plan_err};
44
use datafusion::error::DataFusionError;
55
use datafusion::execution::TaskContext;

src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,19 @@ mod channel_resolver_ext;
44
mod common;
55
mod config_extension_ext;
66
mod distributed_ext;
7-
mod distributed_physical_optimizer_rule;
87
mod execution_plans;
98
mod flight_service;
109
mod metrics;
1110
mod stage;
1211

12+
mod distributed_planner;
1313
mod protobuf;
1414
#[cfg(any(feature = "integration", test))]
1515
pub mod test_utils;
1616

1717
pub use channel_resolver_ext::{BoxCloneSyncChannel, ChannelResolver};
1818
pub use distributed_ext::DistributedExt;
19-
pub use distributed_physical_optimizer_rule::{
19+
pub use distributed_planner::{
2020
DistributedPhysicalOptimizerRule, InputStageInfo, NetworkBoundary, NetworkBoundaryExt,
2121
};
2222
pub use execution_plans::{

src/metrics/task_metrics_rewriter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::NetworkBoundaryExt;
1+
use crate::distributed_planner::NetworkBoundaryExt;
22
use crate::execution_plans::DistributedExec;
33
use crate::execution_plans::MetricsWrapperExec;
44
use crate::metrics::MetricsCollectorResult;

0 commit comments

Comments
 (0)