Skip to content

Commit b728287

Browse files
authored
File name and folder restructure (#124)
* move plans/ to execution_plans/ * Create protobuf/ folder * Move stage along with the other execution plan implementations * Rename channel manager to channel resolver * Rename physical_optimizer.rs to distributed_physical_optimizer_rule.rs
1 parent ea561d2 commit b728287

File tree

18 files changed

+665
-740
lines changed

18 files changed

+665
-740
lines changed
File renamed without changes.

src/distributed_ext.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
use crate::channel_manager_ext::set_distributed_channel_resolver;
1+
use crate::channel_resolver_ext::set_distributed_channel_resolver;
22
use crate::config_extension_ext::{
33
set_distributed_option_extension, set_distributed_option_extension_from_headers,
44
};
5-
use crate::user_codec_ext::set_distributed_user_codec;
5+
use crate::protobuf::set_distributed_user_codec;
66
use crate::ChannelResolver;
77
use datafusion::common::DataFusionError;
88
use datafusion::config::ConfigExtension;

src/physical_optimizer.rs renamed to src/distributed_physical_optimizer_rule.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::sync::Arc;
22

3-
use super::stage::ExecutionStage;
4-
use crate::{plan::PartitionIsolatorExec, ArrowFlightReadExec};
3+
use super::{ArrowFlightReadExec, PartitionIsolatorExec, StageExec};
54
use datafusion::common::tree_node::TreeNodeRecursion;
65
use datafusion::error::DataFusionError;
76
use datafusion::physical_plan::joins::PartitionMode;
@@ -55,7 +54,7 @@ impl PhysicalOptimizerRule for DistributedPhysicalOptimizerRule {
5554
_config: &ConfigOptions,
5655
) -> Result<Arc<dyn ExecutionPlan>> {
5756
// We can only optimize plans that are not already distributed
58-
if plan.as_any().is::<ExecutionStage>() {
57+
if plan.as_any().is::<StageExec>() {
5958
return Ok(plan);
6059
}
6160

@@ -106,7 +105,7 @@ impl DistributedPhysicalOptimizerRule {
106105
pub fn distribute_plan(
107106
&self,
108107
plan: Arc<dyn ExecutionPlan>,
109-
) -> Result<ExecutionStage, DataFusionError> {
108+
) -> Result<StageExec, DataFusionError> {
110109
let query_id = Uuid::new_v4();
111110
self._distribute_plan_inner(query_id, plan, &mut 1, 0)
112111
}
@@ -117,7 +116,7 @@ impl DistributedPhysicalOptimizerRule {
117116
plan: Arc<dyn ExecutionPlan>,
118117
num: &mut usize,
119118
depth: usize,
120-
) -> Result<ExecutionStage, DataFusionError> {
119+
) -> Result<StageExec, DataFusionError> {
121120
let mut inputs = vec![];
122121

123122
let distributed = plan.clone().transform_down(|plan| {
@@ -134,7 +133,7 @@ impl DistributedPhysicalOptimizerRule {
134133
})?;
135134

136135
let inputs = inputs.into_iter().map(Arc::new).collect();
137-
let mut stage = ExecutionStage::new(query_id, *num, distributed.data, inputs);
136+
let mut stage = StageExec::new(query_id, *num, distributed.data, inputs);
138137
*num += 1;
139138

140139
stage = match (self.partitions_per_task, can_be_divided(&plan)?) {
@@ -188,7 +187,7 @@ pub fn can_be_divided(plan: &Arc<dyn ExecutionPlan>) -> Result<bool> {
188187
#[cfg(test)]
189188
mod tests {
190189
use crate::assert_snapshot;
191-
use crate::physical_optimizer::DistributedPhysicalOptimizerRule;
190+
use crate::distributed_physical_optimizer_rule::DistributedPhysicalOptimizerRule;
192191
use crate::test_utils::parquet::register_parquet_tables;
193192
use datafusion::error::DataFusionError;
194193
use datafusion::execution::SessionStateBuilder;

src/plan/arrow_flight_read.rs renamed to src/execution_plans/arrow_flight_read.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use crate::channel_manager_ext::get_distributed_channel_resolver;
1+
use crate::channel_resolver_ext::get_distributed_channel_resolver;
22
use crate::config_extension_ext::ContextGrpcMetadata;
33
use crate::errors::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
4+
use crate::execution_plans::StageExec;
45
use crate::flight_service::{DoGet, StageKey};
5-
use crate::plan::DistributedCodec;
6-
use crate::stage::{proto_from_stage, ExecutionStage};
6+
use crate::protobuf::{proto_from_stage, DistributedCodec};
77
use crate::ChannelResolver;
88
use arrow_flight::decode::FlightRecordBatchStream;
99
use arrow_flight::error::FlightError;
@@ -161,7 +161,7 @@ impl ExecutionPlan for ArrowFlightReadExec {
161161

162162
let stage = context
163163
.session_config()
164-
.get_extension::<ExecutionStage>()
164+
.get_extension::<StageExec>()
165165
.ok_or(internal_datafusion_err!(
166166
"ArrowFlightReadExec requires an ExecutionStage in the session config"
167167
))?;
@@ -218,7 +218,7 @@ impl ExecutionPlan for ArrowFlightReadExec {
218218
);
219219

220220
async move {
221-
let url = task.url()?.ok_or(internal_datafusion_err!(
221+
let url = task.url.ok_or(internal_datafusion_err!(
222222
"ArrowFlightReadExec: task is unassigned, cannot proceed"
223223
))?;
224224

src/execution_plans/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
mod arrow_flight_read;
2+
mod partition_isolator;
3+
mod stage;
4+
5+
pub use arrow_flight_read::ArrowFlightReadExec;
6+
pub use partition_isolator::{PartitionGroup, PartitionIsolatorExec};
7+
pub use stage::{display_stage_graphviz, ExecutionTask, StageExec};
File renamed without changes.

0 commit comments

Comments
 (0)