Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
4 changes: 2 additions & 2 deletions src/distributed_ext.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::channel_manager_ext::set_distributed_channel_resolver;
use crate::channel_resolver_ext::set_distributed_channel_resolver;
use crate::config_extension_ext::{
set_distributed_option_extension, set_distributed_option_extension_from_headers,
};
use crate::user_codec_ext::set_distributed_user_codec;
use crate::protobuf::set_distributed_user_codec;
use crate::ChannelResolver;
use datafusion::common::DataFusionError;
use datafusion::config::ConfigExtension;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::Arc;

use super::stage::ExecutionStage;
use crate::{plan::PartitionIsolatorExec, ArrowFlightReadExec};
use super::{ArrowFlightReadExec, PartitionIsolatorExec, StageExec};
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::joins::PartitionMode;
Expand Down Expand Up @@ -55,7 +54,7 @@ impl PhysicalOptimizerRule for DistributedPhysicalOptimizerRule {
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
// We can only optimize plans that are not already distributed
if plan.as_any().is::<ExecutionStage>() {
if plan.as_any().is::<StageExec>() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StageExec makes it more like DF style 👍

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it seems like an unwritten rule to suffix all ExecutionPlan implementations with *Exec

return Ok(plan);
}

Expand Down Expand Up @@ -106,7 +105,7 @@ impl DistributedPhysicalOptimizerRule {
pub fn distribute_plan(
&self,
plan: Arc<dyn ExecutionPlan>,
) -> Result<ExecutionStage, DataFusionError> {
) -> Result<StageExec, DataFusionError> {
let query_id = Uuid::new_v4();
self._distribute_plan_inner(query_id, plan, &mut 1, 0)
}
Expand All @@ -117,7 +116,7 @@ impl DistributedPhysicalOptimizerRule {
plan: Arc<dyn ExecutionPlan>,
num: &mut usize,
depth: usize,
) -> Result<ExecutionStage, DataFusionError> {
) -> Result<StageExec, DataFusionError> {
let mut inputs = vec![];

let distributed = plan.clone().transform_down(|plan| {
Expand All @@ -134,7 +133,7 @@ impl DistributedPhysicalOptimizerRule {
})?;

let inputs = inputs.into_iter().map(Arc::new).collect();
let mut stage = ExecutionStage::new(query_id, *num, distributed.data, inputs);
let mut stage = StageExec::new(query_id, *num, distributed.data, inputs);
*num += 1;

stage = match (self.partitions_per_task, can_be_divided(&plan)?) {
Expand Down Expand Up @@ -188,7 +187,7 @@ pub fn can_be_divided(plan: &Arc<dyn ExecutionPlan>) -> Result<bool> {
#[cfg(test)]
mod tests {
use crate::assert_snapshot;
use crate::physical_optimizer::DistributedPhysicalOptimizerRule;
use crate::distributed_physical_optimizer_rule::DistributedPhysicalOptimizerRule;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like you rename it to distributed_physical_optimizer_rule. Make it clearer

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I've tried to rename files to be a bit more consistent with the content.

use crate::test_utils::parquet::register_parquet_tables;
use datafusion::error::DataFusionError;
use datafusion::execution::SessionStateBuilder;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::channel_manager_ext::get_distributed_channel_resolver;
use crate::channel_resolver_ext::get_distributed_channel_resolver;
use crate::config_extension_ext::ContextGrpcMetadata;
use crate::errors::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
use crate::execution_plans::StageExec;
use crate::flight_service::{DoGet, StageKey};
use crate::plan::DistributedCodec;
use crate::stage::{proto_from_stage, ExecutionStage};
use crate::protobuf::{proto_from_stage, DistributedCodec};
use crate::ChannelResolver;
use arrow_flight::decode::FlightRecordBatchStream;
use arrow_flight::error::FlightError;
Expand Down Expand Up @@ -161,7 +161,7 @@ impl ExecutionPlan for ArrowFlightReadExec {

let stage = context
.session_config()
.get_extension::<ExecutionStage>()
.get_extension::<StageExec>()
.ok_or(internal_datafusion_err!(
"ArrowFlightReadExec requires an ExecutionStage in the session config"
))?;
Expand Down Expand Up @@ -218,7 +218,7 @@ impl ExecutionPlan for ArrowFlightReadExec {
);

async move {
let url = task.url()?.ok_or(internal_datafusion_err!(
let url = task.url.ok_or(internal_datafusion_err!(
"ArrowFlightReadExec: task is unassigned, cannot proceed"
))?;

Expand Down
7 changes: 7 additions & 0 deletions src/execution_plans/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod arrow_flight_read;
mod partition_isolator;
mod stage;

pub use arrow_flight_read::ArrowFlightReadExec;
pub use partition_isolator::{PartitionGroup, PartitionIsolatorExec};
pub use stage::{display_stage_graphviz, ExecutionTask, StageExec};
File renamed without changes.
Loading