Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/flight_service/do_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,3 @@ impl ArrowFlightEndpoint {
))))
}
}

fn invalid_argument<T>(msg: impl Into<String>) -> Result<T, Status> {
Err(Status::invalid_argument(msg))
}
264 changes: 85 additions & 179 deletions src/physical_optimizer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::sync::Arc;

use crate::{plan::PartitionIsolatorExec, ArrowFlightReadExec};
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::error::DataFusionError;
use datafusion::{
common::{
internal_datafusion_err,
tree_node::{Transformed, TreeNode, TreeNodeRewriter},
tree_node::{Transformed, TreeNode},
},
config::ConfigOptions,
error::Result,
Expand All @@ -14,8 +17,6 @@ use datafusion::{
};
use datafusion_proto::physical_plan::PhysicalExtensionCodec;

use crate::{plan::PartitionIsolatorExec, ArrowFlightReadExec};

use super::stage::ExecutionStage;

#[derive(Debug, Default)]
Expand Down Expand Up @@ -75,11 +76,9 @@ impl PhysicalOptimizerRule for DistributedPhysicalOptimizerRule {
displayable(plan.as_ref()).indent(false)
);

let mut planner = StagePlanner::new(self.codec.clone(), self.partitions_per_task);
plan.rewrite(&mut planner)?;
planner
.finish()
.map(|stage| stage as Arc<dyn ExecutionPlan>)
let plan = self.apply_network_boundaries(plan)?;
let plan = self.distribute_plan(plan)?;
Ok(Arc::new(plan))
}

fn name(&self) -> &str {
Expand All @@ -91,171 +90,78 @@ impl PhysicalOptimizerRule for DistributedPhysicalOptimizerRule {
}
}

/// StagePlanner is a TreeNodeRewriter that walks the plan tree and creates
/// a tree of ExecutionStage nodes that represent discrete stages of execution
/// can are separated by a data shuffle.
///
/// See https://howqueryengineswork.com/13-distributed-query.html for more information
/// about distributed execution.
struct StagePlanner {
/// used to keep track of the current plan head
plan_head: Option<Arc<dyn ExecutionPlan>>,
/// Current depth in the plan tree, as we walk the tree
depth: usize,
/// Input stages collected so far. Each entry is a tuple of (plan tree depth, stage).
/// This allows us to keep track of the depth in the plan tree
/// where we created the stage. That way when we create a new
/// stage, we can tell if it is a peer to the current input stages or
/// should be a parent (if its depth is a smaller number)
input_stages: Vec<(usize, ExecutionStage)>,
/// current stage number
stage_counter: usize,
/// Optional codec to assist in serializing and deserializing any custom
codec: Option<Arc<dyn PhysicalExtensionCodec>>,
/// partitions_per_task is used to determine how many tasks to create for each stage
partitions_per_task: Option<usize>,
}

impl StagePlanner {
fn new(
codec: Option<Arc<dyn PhysicalExtensionCodec>>,
partitions_per_task: Option<usize>,
) -> Self {
StagePlanner {
plan_head: None,
depth: 0,
input_stages: vec![],
stage_counter: 1,
codec,
partitions_per_task,
}
}

fn finish(mut self) -> Result<Arc<ExecutionStage>> {
let stage = if self.input_stages.is_empty() {
ExecutionStage::new(
self.stage_counter,
self.plan_head
.take()
.ok_or_else(|| internal_datafusion_err!("No plan head set"))?,
vec![],
)
} else if self.depth < self.input_stages[0].0 {
// There is more plan above the last stage we created, so we need to
// create a new stage that includes the last plan head
ExecutionStage::new(
self.stage_counter,
self.plan_head
.take()
.ok_or_else(|| internal_datafusion_err!("No plan head set"))?,
self.input_stages
.into_iter()
.map(|(_, stage)| Arc::new(stage))
.collect(),
)
} else {
// We have a plan head, and we are at the same depth as the last stage we created,
// so we can just return the last stage
self.input_stages.last().unwrap().1.clone()
};

// assign the proper tree depth to each stage in the tree
fn assign_tree_depth(stage: &ExecutionStage, depth: usize) {
stage
.depth
.store(depth as u64, std::sync::atomic::Ordering::Relaxed);
for input in stage.child_stages_iter() {
assign_tree_depth(input, depth + 1);
impl DistributedPhysicalOptimizerRule {
pub fn apply_network_boundaries(
&self,
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let result = plan.transform_up(|plan| {
if plan.as_any().downcast_ref::<RepartitionExec>().is_some() {
let child = Arc::clone(plan.children().first().cloned().ok_or(
internal_datafusion_err!("Expected RepartitionExec to have a child"),
)?);

let maybe_isolated_plan = if let Some(ppt) = self.partitions_per_task {
let isolated = Arc::new(PartitionIsolatorExec::new(child, ppt));
plan.with_new_children(vec![isolated])?
} else {
plan
};

return Ok(Transformed::yes(Arc::new(
ArrowFlightReadExec::new_pending(
Arc::clone(&maybe_isolated_plan),
maybe_isolated_plan.output_partitioning().clone(),
),
)));
}
}
assign_tree_depth(&stage, 0);

Ok(Arc::new(stage))
Ok(Transformed::no(plan))
})?;
Ok(result.data)
}
}

impl TreeNodeRewriter for StagePlanner {
type Node = Arc<dyn ExecutionPlan>;

fn f_down(&mut self, plan: Self::Node) -> Result<Transformed<Self::Node>> {
self.depth += 1;
Ok(Transformed::no(plan))
pub fn distribute_plan(
&self,
plan: Arc<dyn ExecutionPlan>,
) -> Result<ExecutionStage, DataFusionError> {
self._distribute_plan_inner(plan, &mut 1, 0)
}

fn f_up(&mut self, plan: Self::Node) -> Result<Transformed<Self::Node>> {
self.depth -= 1;

// keep track of where we are
self.plan_head = Some(plan.clone());

// determine if we need to shuffle data, and thus create a new stage
// at this shuffle boundary
if let Some(repartition_exec) = plan.as_any().downcast_ref::<RepartitionExec>() {
// time to create a stage here so include all previous seen stages deeper than us as
// our input stages
let child_stages = self
.input_stages
.iter()
.rev()
.take_while(|(depth, _)| *depth > self.depth)
.map(|(_, stage)| stage.clone())
.collect::<Vec<_>>();

self.input_stages.retain(|(depth, _)| *depth <= self.depth);

let maybe_isolated_plan = if let Some(partitions_per_task) = self.partitions_per_task {
let child = repartition_exec
.children()
.first()
.ok_or(internal_datafusion_err!(
"RepartitionExec has no children, cannot create PartitionIsolatorExec"
))?
.clone()
.clone(); // just clone the Arcs
let isolated = Arc::new(PartitionIsolatorExec::new(child, partitions_per_task));
plan.clone().with_new_children(vec![isolated])?
} else {
plan.clone()
fn _distribute_plan_inner(
&self,
plan: Arc<dyn ExecutionPlan>,
num: &mut usize,
depth: usize,
) -> Result<ExecutionStage, DataFusionError> {
let mut inputs = vec![];

let distributed = plan.transform_down(|plan| {
let Some(node) = plan.as_any().downcast_ref::<ArrowFlightReadExec>() else {
return Ok(Transformed::no(plan));
};

let mut stage = ExecutionStage::new(
self.stage_counter,
maybe_isolated_plan,
child_stages.into_iter().map(Arc::new).collect(),
);

if let Some(partitions_per_task) = self.partitions_per_task {
stage = stage.with_maximum_partitions_per_task(partitions_per_task);
}
if let Some(codec) = self.codec.as_ref() {
stage = stage.with_codec(codec.clone());
}

self.input_stages.push((self.depth, stage));

// As we are walking up the plan tree, we've now put what we've encountered so far
// into a stage. We want to replace this plan now with an ArrowFlightReadExec
// which will be able to consume from this stage over the network.
//
// That way as we walk further up the tree and build the next stage, the leaf
// node in that plan will be an ArrowFlightReadExec that can read from
//
// Note that we use the original plans partitioning and schema for ArrowFlightReadExec.
// If we divide it up in to tasks, then that parittion will need to be gathered from
// among them
let name = format!("Stage {:<3}", self.stage_counter);
let read = Arc::new(ArrowFlightReadExec::new(
plan.output_partitioning().clone(),
plan.schema(),
self.stage_counter,
));

self.stage_counter += 1;

Ok(Transformed::yes(read as Self::Node))
} else {
Ok(Transformed::no(plan))
let child = Arc::clone(node.children().first().cloned().ok_or(
internal_datafusion_err!("Expected ArrowFlightExecRead to have a child"),
)?);
let stage = self._distribute_plan_inner(child, num, depth + 1)?;
let node = Arc::new(node.to_distributed(stage.num)?);
inputs.push(stage);
Ok(Transformed::new(node, true, TreeNodeRecursion::Jump))
})?;

let inputs = inputs.into_iter().map(Arc::new).collect();
let mut stage = ExecutionStage::new(*num, distributed.data, inputs);
*num += 1;

if let Some(partitions_per_task) = self.partitions_per_task {
stage = stage.with_maximum_partitions_per_task(partitions_per_task);
}
if let Some(codec) = self.codec.as_ref() {
stage = stage.with_codec(codec.clone());
}
stage.depth = depth;

Ok(stage)
}
}

Expand Down Expand Up @@ -427,6 +333,20 @@ mod tests {
│partitions [out:4 ] ArrowFlightReadExec: Stage 4
└──────────────────────────────────────────────────
┌───── Stage 2 Task: partitions: 0..3,unassigned]
│partitions [out:4 <-- in:4 ] RepartitionExec: partitioning=Hash([RainTomorrow@0], 4), input_partitions=4
│partitions [out:4 <-- in:4 ] AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MinTemp)]
│partitions [out:4 <-- in:4 ] CoalesceBatchesExec: target_batch_size=8192
│partitions [out:4 <-- in:4 ] FilterExec: RainToday@1 = yes, projection=[MinTemp@0, RainTomorrow@2]
│partitions [out:4 ] ArrowFlightReadExec: Stage 1
└──────────────────────────────────────────────────
┌───── Stage 1 Task: partitions: 0..3,unassigned]
│partitions [out:4 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
│partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[MinTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = yes, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= yes AND yes <= RainToday_max@1, required_guarantees=[RainToday in (yes)]
└──────────────────────────────────────────────────
┌───── Stage 4 Task: partitions: 0..3,unassigned]
│partitions [out:4 <-- in:4 ] RepartitionExec: partitioning=Hash([RainTomorrow@0], 4), input_partitions=4
│partitions [out:4 <-- in:4 ] AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
Expand All @@ -441,20 +361,6 @@ mod tests {
└──────────────────────────────────────────────────
┌───── Stage 2 Task: partitions: 0..3,unassigned]
│partitions [out:4 <-- in:4 ] RepartitionExec: partitioning=Hash([RainTomorrow@0], 4), input_partitions=4
│partitions [out:4 <-- in:4 ] AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MinTemp)]
│partitions [out:4 <-- in:4 ] CoalesceBatchesExec: target_batch_size=8192
│partitions [out:4 <-- in:4 ] FilterExec: RainToday@1 = yes, projection=[MinTemp@0, RainTomorrow@2]
│partitions [out:4 ] ArrowFlightReadExec: Stage 1
└──────────────────────────────────────────────────
┌───── Stage 1 Task: partitions: 0..3,unassigned]
│partitions [out:4 <-- in:1 ] RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
│partitions [out:1 ] DataSourceExec: file_groups={1 group: [[/testdata/weather.parquet]]}, projection=[MinTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = yes, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= yes AND yes <= RainToday_max@1, required_guarantees=[RainToday in (yes)]
└──────────────────────────────────────────────────
Comment on lines -444 to -457
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the depth here was wrong in the previous implementation, the new depth looks correct

");
}

Expand Down
Loading