Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use datafusion_distributed::test_utils::localhost::{
};
use datafusion_distributed::{
DistributedExt, DistributedPhysicalOptimizerRule, DistributedSessionBuilder,
DistributedSessionBuilderContext, StageExec,
DistributedSessionBuilderContext, NetworkBoundaryExt, Stage,
};
use log::info;
use std::fs;
Expand Down Expand Up @@ -331,8 +331,8 @@ impl RunOpt {
}
let mut n_tasks = 0;
physical_plan.clone().transform_down(|node| {
if let Some(node) = node.as_any().downcast_ref::<StageExec>() {
n_tasks += node.tasks.len()
if let Some(node) = node.as_network_boundary() {
n_tasks += node.input_stage().map(|v| v.tasks.len()).unwrap_or(0)
}
Ok(Transformed::no(node))
})?;
Expand Down
13 changes: 13 additions & 0 deletions src/common/execution_plan_ops.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use datafusion::common::plan_err;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::ExecutionPlan;
use std::sync::Arc;

pub fn one_child(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a doc comment would be nice
nit: would src/common/execution_plan.rs or src/execution_plans/common.rs make more sense?
nit: maybe require_one_child makes more sense

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

src/execution_plans/common.rs sounds like a good place for helpers that are used in more than one plan, but that are scoped to src/execution_plans.

Followed your suggestion with this, and with a couple other helpers that were also only used in src/execution_plans

children: &[Arc<dyn ExecutionPlan>],
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
if children.len() != 1 {
return plan_err!("Expected exactly 1 children, got {}", children.len());
}
Ok(children[0].clone())
}
2 changes: 2 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
mod callback_stream;
mod execution_plan_ops;
mod partitioning;
#[allow(unused)]
pub mod ttl_map;

pub(crate) use callback_stream::with_callback;
pub(crate) use execution_plan_ops::*;
pub(crate) use partitioning::{scale_partitioning, scale_partitioning_props};
177 changes: 98 additions & 79 deletions src/distributed_physical_optimizer_rule.rs

Large diffs are not rendered by default.

126 changes: 126 additions & 0 deletions src/execution_plans/distributed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use crate::channel_resolver_ext::get_distributed_channel_resolver;
use crate::common::one_child;
use crate::distributed_physical_optimizer_rule::NetworkBoundaryExt;
use crate::protobuf::DistributedCodec;
use crate::{ExecutionTask, Stage};
use datafusion::common::exec_err;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use rand::Rng;
use std::any::Any;
use std::fmt::Formatter;
use std::sync::Arc;
use url::Url;

/// [ExecutionPlan] that executes the inner plan in distributed mode.
/// Before executing it, two modifications are lazily performed on the plan:
/// 1. Assigns worker URLs to all the stages. A random set of URLs are sampled from the
/// channel resolver and assigned to each task in each stage.
/// 2. Encodes all the plans in protobuf format so that network boundary nodes can send them
/// over the wire.
#[derive(Debug, Clone)]
pub struct DistributedExec {
pub plan: Arc<dyn ExecutionPlan>,
}

impl DistributedExec {
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
Self { plan }
}

fn prepare_plan(
&self,
urls: &[Url],
codec: &dyn PhysicalExtensionCodec,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
let prepared = Arc::clone(&self.plan).transform_up(|plan| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be done during planning? Totally fine to do as a follow up. Curious why we do it here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we do two things:

  1. URL assignation
  2. Stage plan encoding

we want to do 1) as late as possible, because otherwise, if we do it during planning, by the time we get to execute the plan, the assigned URLs might no longer be valid. By making the URL assignation happen as close to execution as possible, we reduce the risk of workers no longer being available. This is how it was done previously also, it's not really something new.

we want to do 2) right before execution because plans in the "encoded" state are not visible or inspectable or pretty much anything, and with a plan at hand, we might want to be able to display it or things like that.

let Some(plan) = plan.as_network_boundary() else {
return Ok(Transformed::no(plan));
};

let mut rng = rand::thread_rng();
let start_idx = rng.gen_range(0..urls.len());

let Some(stage) = plan.input_stage() else {
return exec_err!(
"NetworkBoundary '{}' has not been assigned a stage",
plan.name()
);
};

let ready_stage = Stage {
query_id: stage.query_id,
num: stage.num,
plan: stage.plan.to_encoded(codec)?,
tasks: stage
.tasks
.iter()
.enumerate()
.map(|(i, _)| ExecutionTask {
url: Some(urls[(start_idx + i) % urls.len()].clone()),
})
.collect::<Vec<_>>(),
};

Ok(Transformed::yes(plan.with_input_stage(ready_stage)?))
})?;
Ok(prepared.data)
}
}

impl DisplayAs for DistributedExec {
fn fmt_as(&self, _: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(f, "DistributedExec")
}
}

impl ExecutionPlan for DistributedExec {
fn name(&self) -> &str {
"DistributedExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &PlanProperties {
self.plan.properties()
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.plan]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(DistributedExec {
plan: one_child(&children)?,
}))
}

fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> datafusion::common::Result<SendableRecordBatchStream> {
if partition > 0 {
// The DistributedExec node calls try_assign_urls() lazily upon calling .execute(). This means
// that .execute() must only be called once, as we cannot afford to perform several
// random URL assignation while calling multiple partitions, as they will differ,
// producing an invalid plan
return exec_err!(
"DistributedExec must only have 1 partition, but it was called with partition index {partition}"
);
}

let channel_resolver = get_distributed_channel_resolver(context.session_config())?;
let codec = DistributedCodec::new_combined_with_user(context.session_config());

let plan = self.prepare_plan(&channel_resolver.get_urls()?, &codec)?;
plan.execute(partition, context)
}
}
6 changes: 2 additions & 4 deletions src/execution_plans/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
mod distributed;
mod metrics;
mod network_coalesce;
mod network_shuffle;
mod partition_isolator;
mod stage;

pub use distributed::DistributedExec;
pub use metrics::MetricsWrapperExec;
pub use network_coalesce::{NetworkCoalesceExec, NetworkCoalesceReady};
pub use network_shuffle::{NetworkShuffleExec, NetworkShuffleReadyExec};
pub use partition_isolator::PartitionIsolatorExec;
pub(crate) use stage::InputStage;
pub use stage::display_plan_graphviz;
pub use stage::{DistributedTaskContext, ExecutionTask, StageExec};
Loading