-
Notifications
You must be signed in to change notification settings - Fork 25
Gene.bordegaray/2025/12/add broadcast exec #279
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 29 commits
a6e01e7
fc9bfc8
9e15f2b
34cf529
312901d
f026e41
2508e48
f7218b0
b49289a
793f898
eaad60f
ea4e09a
33b0cc7
1aeb719
e377698
7a0b296
41f90a1
c88058e
b3bdd2b
05a30cc
0c736fd
f9f4439
21e8581
c306c6d
12512af
8927012
d13a28c
e0e5f50
ec607b5
47d4ab9
eae78c5
7152752
83bfed2
5d02692
9669518
4a3af9a
9bcd287
b3cd8e0
92b81a6
c1610a3
2cebb41
1bd0922
b8064e2
c50f9b5
a4392a4
f1922d7
728a031
5f884ac
c9d2a11
c271cd5
0809dee
3c0d1c1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,9 +3,9 @@ use crate::distributed_planner::plan_annotator::{ | |
| AnnotatedPlan, RequiredNetworkBoundary, annotate_plan, | ||
| }; | ||
| use crate::{ | ||
| DistributedConfig, DistributedExec, NetworkCoalesceExec, NetworkShuffleExec, TaskEstimator, | ||
| BroadcastExec, DistributedConfig, DistributedExec, NetworkBroadcastExec, NetworkCoalesceExec, | ||
| NetworkShuffleExec, TaskCountAnnotation, TaskEstimator, | ||
| }; | ||
| use datafusion::common::internal_err; | ||
| use datafusion::common::tree_node::{Transformed, TreeNode}; | ||
| use datafusion::config::ConfigOptions; | ||
| use datafusion::error::DataFusionError; | ||
|
|
@@ -88,21 +88,58 @@ fn distribute_plan( | |
| stage_id: &mut usize, | ||
| ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> { | ||
| let d_cfg = DistributedConfig::from_config_options(cfg)?; | ||
| let mut children = annotated_plan.children; | ||
| let parent_task_count = annotated_plan.task_count.as_usize(); | ||
|
|
||
| let children = annotated_plan.children; | ||
| // This is a leaf node, so we need to scale it up with the final task count. | ||
| if children.is_empty() { | ||
| let scaled_up = d_cfg.__private_task_estimator.scale_up_leaf_node( | ||
| &annotated_plan.plan, | ||
| annotated_plan.task_count.as_usize(), | ||
| parent_task_count, | ||
| cfg, | ||
| ); | ||
| return Ok(scaled_up.unwrap_or(annotated_plan.plan)); | ||
| } | ||
|
|
||
| let parent_task_count = annotated_plan.task_count.as_usize(); | ||
| let max_child_task_count = children.iter().map(|v| v.task_count.as_usize()).max(); | ||
| // Broadcast requires different task counts for build vs probe. | ||
| if annotated_plan.required_network_boundary == Some(RequiredNetworkBoundary::Broadcast) { | ||
| let mut build = children.remove(0); | ||
| let mut probe = children.remove(0); | ||
|
|
||
| set_task_count_until_boundary(&mut probe, parent_task_count); | ||
|
|
||
| // If there's only one consumer task, use Coalesce instead of Broadcast. | ||
| let build_child: Arc<dyn ExecutionPlan> = if parent_task_count == 1 { | ||
| set_task_count_until_boundary(&mut build, 1); | ||
| let build_side = distribute_plan(build, cfg, query_id, stage_id)?; | ||
| Arc::new(NetworkCoalesceExec::try_new( | ||
| build_side, query_id, *stage_id, 1, 1, | ||
| )?) | ||
| } else { | ||
| // Remove CoalescePartitionsExec since want multiple partitions flowing through | ||
| // BroadcastExec. Coalescing happens on consumer side. | ||
| let build_without_coalesce = unwrap_coalesce_partitions(build); | ||
| let build_task_count = build_without_coalesce.task_count.as_usize(); | ||
| let build_side = distribute_plan(build_without_coalesce, cfg, query_id, stage_id)?; | ||
| let broadcast_exec = Arc::new(BroadcastExec::new(build_side, parent_task_count)); | ||
|
|
||
| let network_broadcast = Arc::new(NetworkBroadcastExec::try_new( | ||
| broadcast_exec, | ||
| query_id, | ||
| *stage_id, | ||
| build_task_count, | ||
| )?); | ||
| // Add CoalescePartitionsExec above the network boundary on consumer side. | ||
| Arc::new(CoalescePartitionsExec::new(network_broadcast)) | ||
| }; | ||
| stage_id.add_assign(1); | ||
|
|
||
| let probe_side = distribute_plan(probe, cfg, query_id, stage_id)?; | ||
| return annotated_plan | ||
| .plan | ||
| .with_new_children(vec![build_child, probe_side]); | ||
| } | ||
|
|
||
| let max_child_task_count = children.iter().map(|v| v.task_count.as_usize()).max(); | ||
| let new_children = children | ||
| .into_iter() | ||
| .map(|child| distribute_plan(child, cfg, query_id, stage_id)) | ||
|
|
@@ -119,39 +156,60 @@ fn distribute_plan( | |
| return annotated_plan.plan.with_new_children(new_children); | ||
| } | ||
|
|
||
| // If the current node has a RepartitionExec below, it needs a shuffle, so put one | ||
| // NetworkShuffleExec boundary in between the RepartitionExec and the current node. | ||
| if nb_req == RequiredNetworkBoundary::Shuffle { | ||
| let new_child = Arc::new(NetworkShuffleExec::try_new( | ||
| require_one_child(new_children)?, | ||
| query_id, | ||
| *stage_id, | ||
| parent_task_count, | ||
| max_child_task_count.unwrap_or(1), | ||
| )?); | ||
| stage_id.add_assign(1); | ||
| return annotated_plan.plan.with_new_children(vec![new_child]); | ||
| match nb_req { | ||
| // If the current node has a RepartitionExec below, it needs a shuffle, so put one | ||
| // NetworkShuffleExec boundary in between the RepartitionExec and the current node. | ||
| RequiredNetworkBoundary::Shuffle => { | ||
| let new_child = Arc::new(NetworkShuffleExec::try_new( | ||
| require_one_child(new_children)?, | ||
| query_id, | ||
| *stage_id, | ||
| parent_task_count, | ||
| max_child_task_count.unwrap_or(1), | ||
| )?); | ||
| stage_id.add_assign(1); | ||
| annotated_plan.plan.with_new_children(vec![new_child]) | ||
| } | ||
| // If this is a CoalescePartitionsExec or a SortMergePreservingExec, it means that the original | ||
| // plan is trying to merge all partitions into one. We need to go one step ahead and also merge | ||
| // all distributed tasks into one. | ||
| RequiredNetworkBoundary::Coalesce => { | ||
| let new_child = Arc::new(NetworkCoalesceExec::try_new( | ||
| require_one_child(new_children)?, | ||
| query_id, | ||
| *stage_id, | ||
| parent_task_count, | ||
| max_child_task_count.unwrap_or(1), | ||
| )?); | ||
| stage_id.add_assign(1); | ||
| annotated_plan.plan.with_new_children(vec![new_child]) | ||
| } | ||
| RequiredNetworkBoundary::Broadcast => unreachable!("handled above"), | ||
| } | ||
| } | ||
|
|
||
| // If this is a CoalescePartitionsExec or a SortMergePreservingExec, it means that the original | ||
| // plan is trying to merge all partitions into one. We need to go one step ahead and also merge | ||
| // all distributed tasks into one. | ||
| if nb_req == RequiredNetworkBoundary::Coalesce { | ||
| let new_child = Arc::new(NetworkCoalesceExec::try_new( | ||
| require_one_child(new_children)?, | ||
| query_id, | ||
| *stage_id, | ||
| parent_task_count, | ||
| max_child_task_count.unwrap_or(1), | ||
| )?); | ||
| stage_id.add_assign(1); | ||
| return annotated_plan.plan.with_new_children(vec![new_child]); | ||
| fn set_task_count_until_boundary(plan: &mut AnnotatedPlan, task_count: usize) { | ||
| plan.task_count = TaskCountAnnotation::Desired(task_count); | ||
| if plan.required_network_boundary.is_none() { | ||
| for child in &mut plan.children { | ||
| set_task_count_until_boundary(child, task_count); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| internal_err!( | ||
| "Unreachable code reached in distribute_plan. Could not determine how to place a network boundary below {}", | ||
| annotated_plan.plan.name() | ||
| ) | ||
| /// Unwraps [CoalescePartitionsExec] if present and returns its child. | ||
| fn unwrap_coalesce_partitions(mut plan: AnnotatedPlan) -> AnnotatedPlan { | ||
| if plan | ||
| .plan | ||
| .as_any() | ||
| .downcast_ref::<CoalescePartitionsExec>() | ||
| .is_some() | ||
| && !plan.children.is_empty() | ||
| { | ||
| plan.children.remove(0) | ||
| } else { | ||
| plan | ||
| } | ||
| } | ||
|
|
||
| /// Rearranges the [CoalesceBatchesExec] nodes in the plan so that they are placed right below | ||
|
|
@@ -426,11 +484,23 @@ mod tests { | |
| }) | ||
| .await; | ||
| assert_snapshot!(plan, @r" | ||
| CoalesceBatchesExec: target_batch_size=8192 | ||
| HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2] | ||
| CoalescePartitionsExec | ||
| DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet | ||
| DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet | ||
| ┌───── DistributedExec ── Tasks: t0:[p0] | ||
| │ CoalescePartitionsExec | ||
| │ [Stage 2] => NetworkCoalesceExec: output_partitions=3, input_tasks=3 | ||
| └────────────────────────────────────────────────── | ||
| ┌───── Stage 2 ── Tasks: t0:[p0] t1:[p1] t2:[p2] | ||
| │ CoalesceBatchesExec: target_batch_size=8192 | ||
| │ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2] | ||
| │ CoalescePartitionsExec | ||
| │ [Stage 1] => NetworkBroadcastExec: partitions_per_consumer=1, stage_partitions=3, input_tasks=3 | ||
|
||
| │ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] | ||
| │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet | ||
| └────────────────────────────────────────────────── | ||
| ┌───── Stage 1 ── Tasks: t0:[p0..p2] t1:[p3..p5] t2:[p6..p8] | ||
| │ BroadcastExec: input_partitions=1, consumer_tasks=3, output_partitions=3 | ||
| │ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] | ||
| │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet | ||
| └────────────────────────────────────────────────── | ||
| "); | ||
| } | ||
|
|
||
|
|
@@ -468,28 +538,31 @@ mod tests { | |
| │ CoalescePartitionsExec | ||
| │ CoalesceBatchesExec: target_batch_size=8192 | ||
| │ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainTomorrow@1, RainTomorrow@1)], projection=[MinTemp@0, MaxTemp@2] | ||
| │ CoalescePartitionsExec | ||
| │ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2 | ||
| │ [Stage 3] => NetworkCoalesceExec: output_partitions=1, input_tasks=1 | ||
|
||
| │ ProjectionExec: expr=[avg(weather.MaxTemp)@1 as MaxTemp, RainTomorrow@0 as RainTomorrow] | ||
| │ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MaxTemp)] | ||
| │ [Stage 3] => NetworkShuffleExec: output_partitions=4, input_tasks=3 | ||
| │ [Stage 4] => NetworkShuffleExec: output_partitions=4, input_tasks=3 | ||
| └────────────────────────────────────────────────── | ||
| ┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3] | ||
| │ ProjectionExec: expr=[avg(weather.MinTemp)@1 as MinTemp, RainTomorrow@0 as RainTomorrow] | ||
| │ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MinTemp)] | ||
| │ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3 | ||
| ┌───── Stage 3 ── Tasks: t0:[p0] | ||
| │ CoalescePartitionsExec | ||
| │ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2 | ||
| └────────────────────────────────────────────────── | ||
gabotechs marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7] | ||
| │ CoalesceBatchesExec: target_batch_size=8192 | ||
| │ RepartitionExec: partitioning=Hash([RainTomorrow@0], 8), input_partitions=4 | ||
| │ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MinTemp)] | ||
| │ CoalesceBatchesExec: target_batch_size=8192 | ||
| │ FilterExec: RainToday@1 = yes, projection=[MinTemp@0, RainTomorrow@2] | ||
| │ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | ||
| │ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] | ||
| │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = yes, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= yes AND yes <= RainToday_max@1, required_guarantees=[RainToday in (yes)] | ||
| ┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3] | ||
| │ ProjectionExec: expr=[avg(weather.MinTemp)@1 as MinTemp, RainTomorrow@0 as RainTomorrow] | ||
| │ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MinTemp)] | ||
| │ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3 | ||
| └────────────────────────────────────────────────── | ||
| ┌───── Stage 3 ── Tasks: t0:[p0..p3] t1:[p0..p3] t2:[p0..p3] | ||
| ┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7] | ||
| │ CoalesceBatchesExec: target_batch_size=8192 | ||
| │ RepartitionExec: partitioning=Hash([RainTomorrow@0], 8), input_partitions=4 | ||
| │ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MinTemp)] | ||
| │ CoalesceBatchesExec: target_batch_size=8192 | ||
| │ FilterExec: RainToday@1 = yes, projection=[MinTemp@0, RainTomorrow@2] | ||
| │ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | ||
| │ PartitionIsolatorExec: t0:[p0,__,__] t1:[__,p0,__] t2:[__,__,p0] | ||
| │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@1 = yes, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= yes AND yes <= RainToday_max@1, required_guarantees=[RainToday in (yes)] | ||
| └────────────────────────────────────────────────── | ||
| ┌───── Stage 4 ── Tasks: t0:[p0..p3] t1:[p0..p3] t2:[p0..p3] | ||
| │ CoalesceBatchesExec: target_batch_size=8192 | ||
| │ RepartitionExec: partitioning=Hash([RainTomorrow@0], 4), input_partitions=4 | ||
| │ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MaxTemp)] | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any chance of handling this as a normal
if nb_req == RequiredNetworkBoundary::* { ... }statement at the end of this function like the other network boundaries?Also, one improvement that comes to mind for consistency with the other network boundaries, is to wrap all the
NetworkBroadcastExecinstantiation logic insideNetworkBroadcastExec::try_newinstead of here.For example, note how in
NetworkShuffleExec::try_newwe handle the logic of transforming the child nodes in order to adapt them to shuffles, rather than doing it in this function:https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/execution_plans/network_shuffle.rs#L151-L151
Do you think we could reach a point were instantiation one
NetworkBroadcastExecis no different than instantiating any of the other network boundaries?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored this best I could, let me know if you have more ideas. Some logic still lives here as unlike other operators Broadcast relies on the annotated plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could extract logic into a helper, but seems like unneeded indirection since logic only happens once