-
Notifications
You must be signed in to change notification settings - Fork 21
Gene.bordegaray/2025/12/add broadcast exec #279
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Gene.bordegaray/2025/12/add broadcast exec #279
Conversation
gabotechs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flushing the first round of comments. This looks really good! pretty clean design.
For now, I don't think I have any major comment, the path forward looks like the correct one.
| CoalesceBatchesExec: task_count=Desired(3) | ||
| HashJoinExec: task_count=Desired(3), required_network_boundary=Broadcast | ||
| CoalescePartitionsExec: task_count=Desired(3) | ||
| DataSourceExec: task_count=Desired(3) | ||
| DataSourceExec: task_count=Desired(3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be cool to add a test that demonstrates how having a node that reduces cardinality below the HashJoinExec prompts the task annotator to choose less tasks for the stage containing the join.
Maybe something like doing a JOIN between the results of two aggregations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit confused with this and not understanding. Could you explain a bit more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When reaching a network boundary, we are delimiting the end of one stage and the beginning of another one.
This is the moment when we:
- make sure all nodes in the stage below we just formed agree on a specific task count
- we chose a starting point for the task count of the new stage we just started forming
For choosing this starting point, we take the final decided task count for the stage below, and multiply it by a scale factor that depends on how much the cardinality of the data was reduced in the stage below (e.g., aggregation/filtering nodes).
As we have a new type of network boundary that delimits stages, it would be nice to add a tests that verifies that this mechanism still applies when crossing those new boundaries.
The docs in https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/plan_annotator.rs#L67-L67 explains a bit better how this works.
| let stream = futures::stream::once(async move { | ||
| let batches = cache | ||
| .get_or_try_init(|| async { | ||
| let stream = input.execute(real_partition, context)?; | ||
| let batches: Vec<RecordBatch> = | ||
| futures::TryStreamExt::try_collect(stream).await?; | ||
| Ok::<_, datafusion::error::DataFusionError>(Arc::new(batches)) | ||
| }) | ||
| .await?; | ||
| let batches = Arc::clone(batches); | ||
| let batches_vec: Vec<RecordBatch> = batches.iter().cloned().collect(); | ||
| Ok::<_, datafusion::error::DataFusionError>(stream::iter( | ||
| batches_vec.into_iter().map(Ok), | ||
| )) | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, instead of collecting all the batches together and then emitting them, we could broadcast them in a streaming fashion. This will allow to early send produced batches over the wire while new ones are being produced, which has good chances of speeding up execution.
I imagine for that to work we would need to use a tokio::sync::broadcast channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense, and I see the upsides, especially for larger datasets.
I did some research on tokio::sync::broadcast and think the implementation will be non-trivial (you may have some insight that I lack and feel free to add here). I added a comment noting this with follow-up work.
Hoping to address this and the single stage collapse in follow up work to get this in and avoid more merge conflicts. Of course, unless you think it is necessary for these changes to be included in this PR 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using a tokio::sync::broadcast should not be too bad, it should not be much more code than this. There might even be other options using other types of queues that are even simpler.
Overall, if we can avoid shipping intermediate states that will need to be deleted in follow up PRs, that would be ideal, although I do acknowledge that PR conflicts are a real issue that specially you have suffered in this PR.
I would expect this change to be so simple that it doesn't matter much to do it either now or in a follow up, so if you think doing it in a follow up will make your life easier then lets do it, but I would create a GitHub issue for tracking it and leave a TODO comment with a brief explanation and a link to the issue.
| if annotated_plan.required_network_boundary == Some(RequiredNetworkBoundary::Broadcast) { | ||
| let mut build = children.remove(0); | ||
| let mut probe = children.remove(0); | ||
|
|
||
| set_task_count_until_boundary(&mut probe, parent_task_count); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any chance of handling this as a normal if nb_req == RequiredNetworkBoundary::* { ... } statement at the end of this function like the other network boundaries?
Also, one improvement that comes to mind for consistency with the other network boundaries, is to wrap all the NetworkBroadcastExec instantiation logic inside NetworkBroadcastExec::try_new instead of here.
For example, note how in NetworkShuffleExec::try_new we handle the logic of transforming the child nodes in order to adapt them to shuffles, rather than doing it in this function:
Do you think we could reach a point were instantiation one NetworkBroadcastExec is no different than instantiating any of the other network boundaries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored this best I could, let me know if you have more ideas. Some logic still lives here as unlike other operators Broadcast relies on the annotated plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could extract logic into a helper, but seems like unneeded indirection since logic only happens once
gabotechs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a lot of my suggestions arise from the fact that further changes are needed to the plan_annotator.rs file.
Most of the things I find "weird" in this PR come from the fact that annotations should look differently. The concept of "annotating" a plan with the task count is fairly new, and was introduced very recently in #259, so I might be the one to blame for not announcing/documenting it properly. I think if you get familiar with it most of the changes I'm suggesting should feel very natural.
BTW, this is awesome work, the reason why I'm leaving so many comments is because this piece is so fundamental to the project that IMO should be as close to perfection as possible, but this is definitely one of the best (if not the best) contribution this project has had so far.
| let mut build = children.remove(0); | ||
| let mut probe = children.remove(0); | ||
|
|
||
| set_task_count_until_boundary(&mut probe, parent_task_count); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be happening here, the whole point of separating the planner in an "annotation" + "distribute" steps is so that all task count assignations happen in the "annotation" step so that the "distribute" step can safely assume that task counts are set in stone. This is mainly for:
- organization and separation of concerns
- avoiding bugs where one node is scaled up assuming N task counts, but suddenly a later step overwrites that with M tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, I find it suspicious that this PR needed so little changes to the plan_annotator.rs file.
| // If there's only one consumer task, use Coalesce instead of Broadcast. | ||
| let build_child: Arc<dyn ExecutionPlan> = if parent_task_count == 1 { | ||
| set_task_count_until_boundary(&mut build, 1); | ||
| let build_side = distribute_plan(build, cfg, query_id, stage_id)?; | ||
| Arc::new(NetworkCoalesceExec::try_new( | ||
| build_side, query_id, *stage_id, 1, 1, | ||
| )?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to special case this here instead of just letting the BroadcastExec to naturally broadcast to only 1 consumer task? If parent_task_count == 1 I think we should be able to just let be without special cases.
| │ CoalescePartitionsExec | ||
| │ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2 | ||
| │ [Stage 3] => NetworkCoalesceExec: output_partitions=1, input_tasks=1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still looks wrong. The DistributedExec stage and Stage 3 should be collapsing into 1 without any repercussion in correctness. We should be able to accomplish that in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. We do not want to change available tests. New tests should be added for new bahavior
| // Skip network boundary if both parent and all children have 1 task. | ||
| // Broadcast is excluded because it's used for CollectLeft join build sides that need explicit | ||
| // materialization. Without the stage boundary nested CollectLeft joins cause re-execution | ||
| // (seen in TPC-DS Q99: 29x slow-down) since CoalescePartitionsExec doesn't cache. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 I don't fully understand this comment. If the communication is from 1 producer task to 1 consumer task, no caching mechanism in the broadcast step should come into play.
Serving data from cache in BroadcastExec should only be relevant if consumer tasks > 1, otherwise BroadcastExec should be acting as pretty much just a pure passthrough node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the same question. Something like this will be easier to understand if you add a specific plan example in the comment.
| CoalesceBatchesExec: task_count=Desired(3) | ||
| HashJoinExec: task_count=Desired(3), required_network_boundary=Broadcast | ||
| CoalescePartitionsExec: task_count=Desired(3) | ||
| DataSourceExec: task_count=Desired(3) | ||
| DataSourceExec: task_count=Desired(3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When reaching a network boundary, we are delimiting the end of one stage and the beginning of another one.
This is the moment when we:
- make sure all nodes in the stage below we just formed agree on a specific task count
- we chose a starting point for the task count of the new stage we just started forming
For choosing this starting point, we take the final decided task count for the stage below, and multiply it by a scale factor that depends on how much the cardinality of the data was reduced in the stage below (e.g., aggregation/filtering nodes).
As we have a new type of network boundary that delimits stages, it would be nice to add a tests that verifies that this mechanism still applies when crossing those new boundaries.
The docs in https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/plan_annotator.rs#L67-L67 explains a bit better how this works.
| let stream = futures::stream::once(async move { | ||
| let batches = cache | ||
| .get_or_try_init(|| async { | ||
| let stream = input.execute(real_partition, context)?; | ||
| let batches: Vec<RecordBatch> = | ||
| futures::TryStreamExt::try_collect(stream).await?; | ||
| Ok::<_, datafusion::error::DataFusionError>(Arc::new(batches)) | ||
| }) | ||
| .await?; | ||
| let batches = Arc::clone(batches); | ||
| let batches_vec: Vec<RecordBatch> = batches.iter().cloned().collect(); | ||
| Ok::<_, datafusion::error::DataFusionError>(stream::iter( | ||
| batches_vec.into_iter().map(Ok), | ||
| )) | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using a tokio::sync::broadcast should not be too bad, it should not be much more code than this. There might even be other options using other types of queues that are even simpler.
Overall, if we can avoid shipping intermediate states that will need to be deleted in follow up PRs, that would be ideal, although I do acknowledge that PR conflicts are a real issue that specially you have suffered in this PR.
I would expect this change to be so simple that it doesn't matter much to do it either now or in a follow up, so if you think doing it in a follow up will make your life easier then lets do it, but I would create a GitHub issue for tracking it and leave a TODO comment with a brief explanation and a link to the issue.
| let network_broadcast = Arc::new(Self::new_inner( | ||
| broadcast_exec, | ||
| query_id, | ||
| stage_num, | ||
| input_task_count, | ||
| )?); | ||
| Ok(Arc::new(CoalescePartitionsExec::new(network_broadcast))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 I find this constructor a bit weird. It's not returning a Self like all the other constructor methods for the other execution plans in this project.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this might be one of the artifacts or network boundary annotations in plan_annotator.rs not being placed correctly.
| HashJoinExec: task_count=Desired(1), required_network_boundary=Broadcast | ||
| CoalescePartitionsExec: task_count=Maximum(1), required_network_boundary=Coalesce |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks wrong, we should not need two consecutive different network boundaries here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 This might actually be the reason you needed so much special casing and changes in the distribute_plan function.
| HashJoinExec: task_count=Desired(3), required_network_boundary=Broadcast | ||
| CoalescePartitionsExec: task_count=Desired(3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 I don't think this is right, it should look more like:
| HashJoinExec: task_count=Desired(3), required_network_boundary=Broadcast | |
| CoalescePartitionsExec: task_count=Desired(3) | |
| HashJoinExec: task_count=Desired(3) | |
| CoalescePartitionsExec: task_count=Desired(3), required_network_boundary=Broadcast |
The CoalescePartitionsExec is still needed right below the HashJoinExec, and the network boundary should only affect the build side, not the HashJoinExec as a whole.
NGA-TRAN
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What a PR. Nice work @gene-bordegaray 🎉
I have a question for @gabotechs, a few questions for you and some thing for us to chat.
| /// ``` | ||
| /// | ||
| /// Notice that the first consumer task, [NetworkBroadcastExec] task 1, triggers the execution of | ||
| /// the operator below the [BroadCastExec] and populates each cache index with the repective |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// the operator below the [BroadCastExec] and populates each cache index with the repective | |
| /// the operator below the [BroadCastExec] and populates each cache index with the respective |
| /// Notice that the first consumer task, [NetworkBroadcastExec] task 1, triggers the execution of | ||
| /// the operator below the [BroadCastExec] and populates each cache index with the repective | ||
| /// partition. Subsequent consumer tasks, rather than executing the same partitions, read the | ||
| /// data from the cache for each partition. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Questions regarding Stage N-1, Stage N and Stage N+1 in the diagram:
- Does N here have anything to do with the input N number of partitions? I assume not, right? If so, you may want to use different letter for it to avoid confusion.
- Does the word
Stagehere means a general stage we use so far which means there will be data shuffling between stages. I also assume not. I think what you mean here is 3 steps and 2 stages. The BroadcastExec is in the same stage of its child. If so, you may want to modify the diagram for that, too
I am thinking about the name NetworkBroadcastExec. Can we make it NetworkBroadcastCoalesceExec or can we simply use the available NetworkCoalesecExec?
This is a question for @gabotechs. Does the BroadcastExec have to produce N * M output partitions? Is it possible to make a network channel to send one partition to M tasks? Just like send once but we have many receivers. Thus, we only need N output partitions here
| // Skip network boundary if both parent and all children have 1 task. | ||
| // Broadcast is excluded because it's used for CollectLeft join build sides that need explicit | ||
| // materialization. Without the stage boundary nested CollectLeft joins cause re-execution | ||
| // (seen in TPC-DS Q99: 29x slow-down) since CoalescePartitionsExec doesn't cache. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the same question. Something like this will be easier to understand if you add a specific plan example in the comment.
| /// ┌──────┴─┴─┴─┴─┴─┴──────┐ ┌──────┴─┴─┴─┴─┴─┴──────┐ Stage N-2 | ||
| /// │Arc<dyn ExecutionPlan> │ ... │Arc<dyn ExecutionPlan> │ │ | ||
| /// │ (task 1) │ │ (task N) │ │ | ||
| /// └───────────────────────┘ └───────────────────────┘ ■ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the diagram in Broadcast gets modified, these 2 diagrams need to get changed accordingly.
This beautiful diagram makes me think we definitely need examples (and then document) for our broadcast to show best practice because it simply should only be used in certain cases. For example, we may want to either merge all partitions or split them into different partitions after NetworkBroadcastExec.
Let us chat. I will explain this to you better because it is not that simple
| │ CoalesceBatchesExec: target_batch_size=8192 | ||
| │ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2] | ||
| │ CoalescePartitionsExec | ||
| │ [Stage 1] => NetworkBroadcastExec: partitions_per_consumer=1, stage_partitions=3, input_tasks=3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very cool. I think we will be able to display clearer information here. Let us chat
| │ CoalescePartitionsExec | ||
| │ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2 | ||
| │ [Stage 3] => NetworkCoalesceExec: output_partitions=1, input_tasks=1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. We do not want to change available tests. New tests should be added for new bahavior
| └────────────────────────────────────────────────── | ||
| ┌───── Stage 10 ── Tasks: t0:[p0] | ||
| │ CoalescePartitionsExec, metrics=[output_rows=1000, elapsed_compute=<metric>] | ||
| │ DataSourceExec: file_groups={6 groups: [[/testdata/tpch/explain_analyze_sf0.1/supplier/1.parquet, /testdata/tpch/explain_analyze_sf0.1/supplier/10.parquet, /testdata/tpch/explain_analyze_sf0.1/supplier/11.parquet], [/testdata/tpch/explain_analyze_sf0.1/supplier/12.parquet, /testdata/tpch/explain_analyze_sf0.1/supplier/13.parquet, /testdata/tpch/explain_analyze_sf0.1/supplier/14.parquet], [/testdata/tpch/explain_analyze_sf0.1/supplier/15.parquet, /testdata/tpch/explain_analyze_sf0.1/supplier/16.parquet, /testdata/tpch/explain_analyze_sf0.1/supplier/2.parquet], [/testdata/tpch/explain_analyze_sf0.1/supplier/3.parquet, /testdata/tpch/explain_analyze_sf0.1/supplier/4.parquet, /testdata/tpch/explain_analyze_sf0.1/supplier/5.parquet], [/testdata/tpch/explain_analyze_sf0.1/supplier/6.parquet, /testdata/tpch/explain_analyze_sf0.1/supplier/7.parquet, /testdata/tpch/explain_analyze_sf0.1/supplier/8.parquet], ...]}, projection=[s_suppkey, s_nationkey], file_type=parquet, predicate=DynamicFilter [ s_nationkey@1 >= 0 AND s_nationkey@1 <= 24 ], pruning_predicate=s_nationkey_null_count@1 != row_count@2 AND s_nationkey_max@0 >= 0 AND s_nationkey_null_count@1 != row_count@2 AND s_nationkey_min@3 <= 24, required_guarantees=[], metrics=[output_rows=1000, elapsed_compute=<metric>, batches_split=<metric>, bytes_scanned=<metric>, file_open_errors=<metric>, file_scan_errors=<metric>, num_predicate_creation_errors=<metric>, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=<metric>, pushdown_rows_matched=<metric>, pushdown_rows_pruned=<metric>, bloom_filter_eval_time=<metric>, metadata_load_time=<metric>, page_index_eval_time=<metric>, row_pushdown_eval_time=<metric>, statistics_eval_time=<metric>, time_elapsed_opening=<metric>, time_elapsed_processing=<metric>, time_elapsed_scanning_total=<metric>, time_elapsed_scanning_until_data=<metric>] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can pick one or a few of the tpcds queries for our examples and documents. In general, we do not want too much network shuffling (aka too many stages). We will identify use cases for this and also list the cases that we do not want to use broadcast for us to play with and document
Closes #223
last one got closed and I can't reopen
Added a separate exexution plan node
BroadcastExecto handle the partition caching logic that previously lived in the flight service.Still get cool speed-ups 😄