Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use datafusion_distributed::test_utils::localhost::{
};
use datafusion_distributed::{
DistributedExt, DistributedPhysicalOptimizerRule, DistributedSessionBuilder,
DistributedSessionBuilderContext, StageExec,
DistributedSessionBuilderContext, NetworkBoundaryExt, Stage,
};
use log::info;
use std::fs;
Expand Down Expand Up @@ -331,8 +331,8 @@ impl RunOpt {
}
let mut n_tasks = 0;
physical_plan.clone().transform_down(|node| {
if let Some(node) = node.as_any().downcast_ref::<StageExec>() {
n_tasks += node.tasks.len()
if let Some(node) = node.as_network_boundary() {
n_tasks += node.input_stage().map(|v| v.tasks.len()).unwrap_or(0)
}
Ok(Transformed::no(node))
})?;
Expand Down
3 changes: 0 additions & 3 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
mod map_last_stream;
mod partitioning;
#[allow(unused)]
pub mod ttl_map;

pub(crate) use map_last_stream::map_last_stream;
pub(crate) use partitioning::{scale_partitioning, scale_partitioning_props};
177 changes: 98 additions & 79 deletions src/distributed_physical_optimizer_rule.rs

Large diffs are not rendered by default.

17 changes: 14 additions & 3 deletions src/common/partitioning.rs → src/execution_plans/common.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
use datafusion::common::{DataFusionError, plan_err};
use datafusion::physical_expr::Partitioning;
use datafusion::physical_plan::PlanProperties;
use datafusion::physical_plan::{ExecutionPlan, PlanProperties};
use std::sync::Arc;

pub fn scale_partitioning_props(
pub(super) fn require_one_child(
children: &[Arc<dyn ExecutionPlan>],
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
if children.len() != 1 {
return plan_err!("Expected exactly 1 children, got {}", children.len());
}
Ok(children[0].clone())
}

pub(super) fn scale_partitioning_props(
props: &PlanProperties,
f: impl FnOnce(usize) -> usize,
) -> PlanProperties {
Expand All @@ -13,7 +24,7 @@ pub fn scale_partitioning_props(
)
}

pub fn scale_partitioning(
pub(super) fn scale_partitioning(
partitioning: &Partitioning,
f: impl FnOnce(usize) -> usize,
) -> Partitioning {
Expand Down
126 changes: 126 additions & 0 deletions src/execution_plans/distributed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use crate::channel_resolver_ext::get_distributed_channel_resolver;
use crate::distributed_physical_optimizer_rule::NetworkBoundaryExt;
use crate::execution_plans::common::require_one_child;
use crate::protobuf::DistributedCodec;
use crate::{ExecutionTask, Stage};
use datafusion::common::exec_err;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use rand::Rng;
use std::any::Any;
use std::fmt::Formatter;
use std::sync::Arc;
use url::Url;

/// [ExecutionPlan] that executes the inner plan in distributed mode.
/// Before executing it, two modifications are lazily performed on the plan:
/// 1. Assigns worker URLs to all the stages. A random set of URLs are sampled from the
/// channel resolver and assigned to each task in each stage.
/// 2. Encodes all the plans in protobuf format so that network boundary nodes can send them
/// over the wire.
#[derive(Debug, Clone)]
pub struct DistributedExec {
pub plan: Arc<dyn ExecutionPlan>,
}

impl DistributedExec {
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
Self { plan }
}

fn prepare_plan(
&self,
urls: &[Url],
codec: &dyn PhysicalExtensionCodec,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
let prepared = Arc::clone(&self.plan).transform_up(|plan| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be done during planning? Totally fine to do as a follow up. Curious why we do it here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we do two things:

  1. URL assignation
  2. Stage plan encoding

we want to do 1) as late as possible, because otherwise, if we do it during planning, by the time we get to execute the plan, the assigned URLs might no longer be valid. By making the URL assignation happen as close to execution as possible, we reduce the risk of workers no longer being available. This is how it was done previously also, it's not really something new.

we want to do 2) right before execution because plans in the "encoded" state are not visible or inspectable or pretty much anything, and with a plan at hand, we might want to be able to display it or things like that.

let Some(plan) = plan.as_network_boundary() else {
return Ok(Transformed::no(plan));
};

let mut rng = rand::thread_rng();
let start_idx = rng.gen_range(0..urls.len());

let Some(stage) = plan.input_stage() else {
return exec_err!(
"NetworkBoundary '{}' has not been assigned a stage",
plan.name()
);
};

let ready_stage = Stage {
query_id: stage.query_id,
num: stage.num,
plan: stage.plan.to_encoded(codec)?,
tasks: stage
.tasks
.iter()
.enumerate()
.map(|(i, _)| ExecutionTask {
url: Some(urls[(start_idx + i) % urls.len()].clone()),
})
.collect::<Vec<_>>(),
};

Ok(Transformed::yes(plan.with_input_stage(ready_stage)?))
})?;
Ok(prepared.data)
}
}

impl DisplayAs for DistributedExec {
fn fmt_as(&self, _: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(f, "DistributedExec")
}
}

impl ExecutionPlan for DistributedExec {
fn name(&self) -> &str {
"DistributedExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &PlanProperties {
self.plan.properties()
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.plan]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(DistributedExec {
plan: require_one_child(&children)?,
}))
}

fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> datafusion::common::Result<SendableRecordBatchStream> {
if partition > 0 {
// The DistributedExec node calls try_assign_urls() lazily upon calling .execute(). This means
// that .execute() must only be called once, as we cannot afford to perform several
// random URL assignation while calling multiple partitions, as they will differ,
// producing an invalid plan
return exec_err!(
"DistributedExec must only have 1 partition, but it was called with partition index {partition}"
);
}

let channel_resolver = get_distributed_channel_resolver(context.session_config())?;
let codec = DistributedCodec::new_combined_with_user(context.session_config());

let plan = self.prepare_plan(&channel_resolver.get_urls()?, &codec)?;
plan.execute(partition, context)
}
}
7 changes: 3 additions & 4 deletions src/execution_plans/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
mod common;
mod distributed;
mod metrics;
mod network_coalesce;
mod network_shuffle;
mod partition_isolator;
mod stage;

pub use distributed::DistributedExec;
pub use metrics::MetricsWrapperExec;
pub use network_coalesce::{NetworkCoalesceExec, NetworkCoalesceReady};
pub use network_shuffle::{NetworkShuffleExec, NetworkShuffleReadyExec};
pub use partition_isolator::PartitionIsolatorExec;
pub(crate) use stage::InputStage;
pub use stage::display_plan_graphviz;
pub use stage::{DistributedTaskContext, ExecutionTask, StageExec};
Loading