Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use datafusion_distributed::test_utils::localhost::{
};
use datafusion_distributed::{
DistributedExt, DistributedPhysicalOptimizerRule, DistributedSessionBuilder,
DistributedSessionBuilderContext, NetworkBoundaryExt, Stage,
DistributedSessionBuilderContext, NetworkBoundaryExt,
};
use log::info;
use std::fs;
Expand Down
2 changes: 1 addition & 1 deletion src/common/ttl_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ where
mod tests {
use super::*;
use std::sync::atomic::Ordering;
use tokio::time::{Duration, sleep};
use tokio::time::Duration;

#[tokio::test]
async fn test_basic_insert_and_get() {
Expand Down
136 changes: 75 additions & 61 deletions src/distributed_physical_optimizer_rule.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{NetworkShuffleExec, PartitionIsolatorExec, Stage};
use super::{NetworkShuffleExec, PartitionIsolatorExec};
use crate::execution_plans::{DistributedExec, NetworkCoalesceExec};
use crate::stage::Stage;
use datafusion::common::plan_err;
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::datasource::source::DataSourceExec;
Expand Down Expand Up @@ -123,68 +124,66 @@ impl DistributedPhysicalOptimizerRule {
plan = Arc::new(CoalescePartitionsExec::new(plan))
}

let result = plan.transform_up(|plan| {
// If this node is a DataSourceExec, we need to wrap it with PartitionIsolatorExec so
// that not all tasks have access to all partitions of the underlying DataSource.
if plan.as_any().is::<DataSourceExec>() {
let node = PartitionIsolatorExec::new_pending(plan);
let result =
plan.transform_up(|plan| {
// If this node is a DataSourceExec, we need to wrap it with PartitionIsolatorExec so
// that not all tasks have access to all partitions of the underlying DataSource.
if plan.as_any().is::<DataSourceExec>() {
let node = PartitionIsolatorExec::new(plan);
Comment on lines -126 to +132
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very annoying that cargo fmt decided that its time to chance the formatting of the whole thing, when the changes are actually minimal


return Ok(Transformed::yes(Arc::new(node)));
}

// If this is a hash RepartitionExec, introduce a shuffle.
if let (Some(node), Some(tasks)) = (
plan.as_any().downcast_ref::<RepartitionExec>(),
self.network_shuffle_tasks,
) {
if !matches!(node.partitioning(), Partitioning::Hash(_, _)) {
return Ok(Transformed::no(plan));
return Ok(Transformed::yes(Arc::new(node)));
}
let node = NetworkShuffleExec::from_repartition_exec(&plan, tasks)?;

return Ok(Transformed::yes(Arc::new(node)));
}
// If this is a hash RepartitionExec, introduce a shuffle.
if let (Some(node), Some(tasks)) = (
plan.as_any().downcast_ref::<RepartitionExec>(),
self.network_shuffle_tasks,
) {
if !matches!(node.partitioning(), Partitioning::Hash(_, _)) {
return Ok(Transformed::no(plan));
}
let node = NetworkShuffleExec::try_new(plan, tasks)?;

// If this is a CoalescePartitionsExec, it means that the original plan is trying to
// merge all partitions into one. We need to go one step ahead and also merge all tasks
// into one.
if let (Some(node), Some(tasks)) = (
plan.as_any().downcast_ref::<CoalescePartitionsExec>(),
self.network_coalesce_tasks,
) {
// If the immediate child is a PartitionIsolatorExec, it means that the rest of the
// plan is just a couple of non-computational nodes that are probably not worth
// distributing.
if node
.children()
.first()
.is_some_and(|v| v.as_any().is::<PartitionIsolatorExec>())
{
return Ok(Transformed::no(plan));
return Ok(Transformed::yes(Arc::new(node)));
}
let node = NetworkCoalesceExec::from_input_exec(node, tasks)?;

let plan = plan.with_new_children(vec![Arc::new(node)])?;

return Ok(Transformed::yes(plan));
}
// If this is a CoalescePartitionsExec, it means that the original plan is trying to
// merge all partitions into one. We need to go one step ahead and also merge all tasks
// into one.
if let (Some(node), Some(tasks)) = (
plan.as_any().downcast_ref::<CoalescePartitionsExec>(),
self.network_coalesce_tasks,
) {
// If the immediate child is a PartitionIsolatorExec, it means that the rest of the
// plan is just a couple of non-computational nodes that are probably not worth
// distributing.
if node.input().as_any().is::<PartitionIsolatorExec>() {
return Ok(Transformed::no(plan));
}

// The SortPreservingMergeExec node will try to coalesce all partitions into just 1.
// We need to account for it and help it by also coalescing all tasks into one, therefore
// a NetworkCoalesceExec is introduced.
if let (Some(node), Some(tasks)) = (
plan.as_any().downcast_ref::<SortPreservingMergeExec>(),
self.network_coalesce_tasks,
) {
let node = NetworkCoalesceExec::from_input_exec(node, tasks)?;
let plan = plan.clone().with_new_children(vec![Arc::new(
NetworkCoalesceExec::new(Arc::clone(node.input()), tasks),
)])?;

let plan = plan.with_new_children(vec![Arc::new(node)])?;
return Ok(Transformed::yes(plan));
}

return Ok(Transformed::yes(plan));
}
// The SortPreservingMergeExec node will try to coalesce all partitions into just 1.
// We need to account for it and help it by also coalescing all tasks into one, therefore
// a NetworkCoalesceExec is introduced.
if let (Some(node), Some(tasks)) = (
plan.as_any().downcast_ref::<SortPreservingMergeExec>(),
self.network_coalesce_tasks,
) {
let plan = plan.clone().with_new_children(vec![Arc::new(
NetworkCoalesceExec::new(Arc::clone(node.input()), tasks),
)])?;

return Ok(Transformed::yes(plan));
}

Ok(Transformed::no(plan))
})?;
Ok(Transformed::no(plan))
})?;
Ok(result.data)
}

Expand Down Expand Up @@ -234,19 +233,19 @@ impl DistributedPhysicalOptimizerRule {
};

let stage = loop {
let (inner_plan, in_tasks) = dnode.as_ref().to_stage_info(n_tasks)?;
let input_stage_info = dnode.as_ref().get_input_stage_info(n_tasks)?;
// If the current stage has just 1 task, and the next stage is only going to have
// 1 task, there's no point in having a network boundary in between, they can just
// communicate in memory.
if n_tasks == 1 && in_tasks == 1 {
if n_tasks == 1 && input_stage_info.task_count == 1 {
let mut n = dnode.as_ref().rollback()?;
if let Some(node) = n.as_any().downcast_ref::<PartitionIsolatorExec>() {
// Also trim PartitionIsolatorExec out of the plan.
n = Arc::clone(node.children().first().unwrap());
}
return Ok(Transformed::yes(n));
}
match Self::_distribute_plan_inner(query_id, inner_plan.clone(), num, depth + 1, in_tasks) {
match Self::_distribute_plan_inner(query_id, input_stage_info.plan, num, depth + 1, input_stage_info.task_count) {
Ok(v) => break v,
Err(e) => match get_distribute_plan_err(&e) {
None => return Err(e),
Expand All @@ -255,7 +254,7 @@ impl DistributedPhysicalOptimizerRule {
// that no more than `limit` tasks can be used for it, so we are going
// to limit the amount of tasks to the requested number and try building
// the stage again.
if in_tasks == *limit {
if input_stage_info.task_count == *limit {
return plan_err!("A node requested {limit} tasks for the stage its in, but that stage already has that many tasks");
}
dnode = Referenced::Arced(dnode.as_ref().with_input_task_count(*limit)?);
Expand All @@ -280,14 +279,27 @@ impl DistributedPhysicalOptimizerRule {
}
}

/// Necessary information for building a [Stage] during distributed planning.
///
/// [NetworkBoundary]s return this piece of data so that the distributed planner know how to
/// build the next [Stage] from which the [NetworkBoundary] is going to receive data.
///
/// Some network boundaries might perform some modifications in their children, like scaling
/// up the number of partitions, or injecting a specific [ExecutionPlan] on top.
pub struct InputStageInfo {
/// The head plan of the [Stage] that is about to be built.
pub plan: Arc<dyn ExecutionPlan>,
/// The amount of tasks the [Stage] will have.
pub task_count: usize,
}

/// This trait represents a node that introduces the necessity of a network boundary in the plan.
/// The distributed planner, upon stepping into one of these, will break the plan and build a stage
/// out of it.
pub trait NetworkBoundary: ExecutionPlan {
/// Returns the information necessary for building the next stage.
/// - The head node of the stage.
/// - the amount of tasks that stage will have.
fn to_stage_info(&self, n_tasks: usize) -> Result<(Arc<dyn ExecutionPlan>, usize)>;
/// Returns the information necessary for building the next stage from which this
/// [NetworkBoundary] is going to collect data.
fn get_input_stage_info(&self, task_count: usize) -> Result<InputStageInfo>;

/// re-assigns a different number of input tasks to the current [NetworkBoundary].
///
Expand All @@ -297,6 +309,8 @@ pub trait NetworkBoundary: ExecutionPlan {

/// Called when a [Stage] is correctly formed. The [NetworkBoundary] can use this
/// information to perform any internal transformations necessary for distributed execution.
///
/// Typically, [NetworkBoundary]s will use this call for transitioning from "Pending" to "ready".
fn with_input_stage(&self, input_stage: Stage) -> Result<Arc<dyn ExecutionPlan>>;

/// Returns the assigned input [Stage], if any.
Expand Down
14 changes: 10 additions & 4 deletions src/execution_plans/common.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
use datafusion::common::{DataFusionError, plan_err};
use datafusion::physical_expr::Partitioning;
use datafusion::physical_plan::{ExecutionPlan, PlanProperties};
use std::borrow::Borrow;
use std::sync::Arc;

pub(super) fn require_one_child(
children: &[Arc<dyn ExecutionPlan>],
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
pub(super) fn require_one_child<L, T>(
children: L,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError>
where
L: AsRef<[T]>,
T: Borrow<Arc<dyn ExecutionPlan>>,
{
let children = children.as_ref();
if children.len() != 1 {
return plan_err!("Expected exactly 1 children, got {}", children.len());
}
Ok(children[0].clone())
Ok(children[0].borrow().clone())
}

pub(super) fn scale_partitioning_props(
Expand Down
4 changes: 2 additions & 2 deletions src/execution_plans/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::channel_resolver_ext::get_distributed_channel_resolver;
use crate::distributed_physical_optimizer_rule::NetworkBoundaryExt;
use crate::execution_plans::common::require_one_child;
use crate::protobuf::DistributedCodec;
use crate::{ExecutionTask, Stage};
use crate::stage::{ExecutionTask, Stage};
use datafusion::common::exec_err;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
Expand Down Expand Up @@ -98,7 +98,7 @@ impl ExecutionPlan for DistributedExec {
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(DistributedExec {
plan: require_one_child(&children)?,
plan: require_one_child(children)?,
}))
}

Expand Down
54 changes: 26 additions & 28 deletions src/execution_plans/network_coalesce.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use crate::channel_resolver_ext::get_distributed_channel_resolver;
use crate::config_extension_ext::ContextGrpcMetadata;
use crate::distributed_physical_optimizer_rule::{NetworkBoundary, limit_tasks_err};
use crate::distributed_physical_optimizer_rule::{
InputStageInfo, NetworkBoundary, limit_tasks_err,
};
use crate::execution_plans::common::{require_one_child, scale_partitioning_props};
use crate::flight_service::DoGet;
use crate::metrics::MetricsCollectingStream;
use crate::metrics::proto::MetricsSetProto;
use crate::protobuf::{StageKey, map_flight_to_datafusion_error, map_status_to_datafusion_error};
use crate::stage::MaybeEncodedPlan;
use crate::{ChannelResolver, DistributedTaskContext, Stage};
use crate::stage::{MaybeEncodedPlan, Stage};
use crate::{ChannelResolver, DistributedTaskContext};
use arrow_flight::Ticket;
use arrow_flight::decode::FlightRecordBatchStream;
use arrow_flight::error::FlightError;
Expand Down Expand Up @@ -75,7 +77,7 @@ pub enum NetworkCoalesceExec {
pub struct NetworkCoalescePending {
properties: PlanProperties,
input_tasks: usize,
child: Arc<dyn ExecutionPlan>,
input: Arc<dyn ExecutionPlan>,
}

/// Ready version of the [NetworkCoalesceExec] node. This node can be created in
Expand All @@ -99,30 +101,23 @@ pub struct NetworkCoalesceReady {
}

impl NetworkCoalesceExec {
/// Creates a new [NetworkCoalesceExec] node from a [CoalescePartitionsExec] and
/// [SortPreservingMergeExec].
pub fn from_input_exec(
input: &dyn ExecutionPlan,
input_tasks: usize,
) -> Result<Self, DataFusionError> {
let children = input.children();
let Some(child) = children.first() else {
return internal_err!("Expected a single child");
};

Ok(Self::Pending(NetworkCoalescePending {
properties: child.properties().clone(),
/// Builds a new [NetworkCoalesceExec] in "Pending" state.
///
/// Typically, this node should be place right after nodes that coalesce all the input
/// partitions into one, for example:
/// - [CoalescePartitionsExec]
/// - [SortPreservingMergeExec]
pub fn new(input: Arc<dyn ExecutionPlan>, input_tasks: usize) -> Self {
Self::Pending(NetworkCoalescePending {
properties: input.properties().clone(),
input_tasks,
child: Arc::clone(child),
}))
input,
})
}
}

impl NetworkBoundary for NetworkCoalesceExec {
fn to_stage_info(
&self,
n_tasks: usize,
) -> Result<(Arc<dyn ExecutionPlan>, usize), DataFusionError> {
fn get_input_stage_info(&self, n_tasks: usize) -> Result<InputStageInfo, DataFusionError> {
let Self::Pending(pending) = self else {
return plan_err!("can only return wrapped child if on Pending state");
};
Expand All @@ -132,7 +127,10 @@ impl NetworkBoundary for NetworkCoalesceExec {
return Err(limit_tasks_err(1));
}

Ok((Arc::clone(&pending.child), pending.input_tasks))
Ok(InputStageInfo {
plan: Arc::clone(&pending.input),
task_count: pending.input_tasks,
})
}

fn with_input_stage(
Expand Down Expand Up @@ -173,7 +171,7 @@ impl NetworkBoundary for NetworkCoalesceExec {
Self::Pending(pending) => Self::Pending(NetworkCoalescePending {
properties: pending.properties.clone(),
input_tasks,
child: pending.child.clone(),
input: pending.input.clone(),
}),
Self::Ready(_) => {
plan_err!("Self can only re-assign input tasks if in 'Pending' state")?
Expand Down Expand Up @@ -216,7 +214,7 @@ impl ExecutionPlan for NetworkCoalesceExec {

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
match self {
NetworkCoalesceExec::Pending(v) => vec![&v.child],
NetworkCoalesceExec::Pending(v) => vec![&v.input],
NetworkCoalesceExec::Ready(v) => match &v.input_stage.plan {
MaybeEncodedPlan::Decoded(v) => vec![v],
MaybeEncodedPlan::Encoded(_) => vec![],
Expand All @@ -231,12 +229,12 @@ impl ExecutionPlan for NetworkCoalesceExec {
match self.as_ref() {
Self::Pending(v) => {
let mut v = v.clone();
v.child = require_one_child(&children)?;
v.input = require_one_child(children)?;
Ok(Arc::new(Self::Pending(v)))
}
Self::Ready(v) => {
let mut v = v.clone();
v.input_stage.plan = MaybeEncodedPlan::Decoded(require_one_child(&children)?);
v.input_stage.plan = MaybeEncodedPlan::Decoded(require_one_child(children)?);
Ok(Arc::new(Self::Ready(v)))
}
}
Expand Down
Loading