@@ -26,19 +26,98 @@ use std::sync::Arc;
2626use tonic:: Request ;
2727use tonic:: metadata:: MetadataMap ;
2828
29+ /// [ExecutionPlan] that broadcasts data from a single task to multiple tasks across the network.
30+ ///
31+ /// This operator is used when a small dataset needs to be replicated to all workers in the next
32+ /// stage. The most common use case is hash joins with `CollectLeft` partition mode, where the
33+ /// small build side (left table) is collected into a single partition and then broadcast to all
34+ /// workers processing the large probe side (right table).
35+ ///
36+ /// Unlike [NetworkShuffleExec] which redistributes data across tasks, [NetworkBroadcastExec]
37+ /// replicates the entire input to each task in the next stage. This allows parallel execution
38+ /// of operations that would otherwise be forced to run single-threaded.
39+ ///
40+ /// 1 to many (broadcast)
41+ ///
42+ /// ┌───────────────────────────┐ ┌───────────────────────────┐ ┌───────────────────────────┐ ■
43+ /// │ NetworkBroadcastExec │ │ NetworkBroadcastExec │ │ NetworkBroadcastExec │ │
44+ /// │ (task 1) │ │ (task 2) │ │ (task 3) │ │
45+ /// │ (full copy) │ │ (full copy) │ │ (full copy) │ Stage N+1
46+ /// └───────────────────────────┘ └───────────────────────────┘ └───────────────────────────┘ │
47+ /// ▲ ▲ ▲ │
48+ /// │ │ │ ■
49+ /// └──────────────────────────────┴──────────────────────────────┘
50+ /// │ ■
51+ /// ┌───────────────────────────┐ │
52+ /// │ CoalesceExec or │ │
53+ /// │ HashJoinExec build │ Stage N
54+ /// │ (task 1) │ │
55+ /// └───────────────────────────┘ │
56+ /// ■
57+ ///
58+ /// Broadcast join example (CollectLeft hash join)
59+ ///
60+ /// Stage N+1: Hash Join (3 tasks running in parallel)
61+ /// ┌──────────────────────┐ ┌──────────────────────┐ ┌──────────────────────┐
62+ /// │ HashJoinExec t1 │ │ HashJoinExec t2 │ │ HashJoinExec t3 │
63+ /// │ left: small (bcast) │ │ left: small (bcast) │ │ left: small (bcast) │
64+ /// │ right: large (p1) │ │ right: large (p2) │ │ right: large (p3) │
65+ /// └──┬─────────────────┬─┘ └──┬─────────────────┬─┘ └──┬─────────────────┬─┘
66+ /// │ │ │ │ │ │
67+ /// ▼ │ ▼ │ ▼ │
68+ /// NetworkBroadcast │ NetworkBroadcast │ NetworkBroadcast │
69+ /// (full copy) │ (full copy) │ (full copy) │
70+ /// │ │ │ │ │ │
71+ /// │ │ │ │ │ │
72+ /// │ ▼ │ ▼ │ ▼
73+ /// │ Large table │ Large table │ Large table
74+ /// │ partition 1 │ partition 2 │ partition 3
75+ /// │ │ │ │ │ │
76+ /// └─────────────────┼────────┴─────────────────┼────────┴─────────────────┘
77+ /// │ │
78+ /// Stage N: Small table collected + Large table partitioned
79+ /// │ │
80+ /// ┌───────▼──────┐ ┌────────▼────────┐
81+ /// │ Small table │ │ Large table │
82+ /// │ (1 task, │ │ (3 partitions) │
83+ /// │ collected) │ └─────────────────┘
84+ /// └──────────────┘
85+ ///
86+ /// The communication between two stages across a [NetworkBroadcastExec] has these characteristics:
87+ ///
88+ /// - The input stage typically has 1 task containing the collected/small dataset
89+ /// - Each task in Stage N+1 receives a complete copy of the data from Stage N
90+ /// - This enables parallel execution while ensuring all tasks have access to the full dataset
91+ /// - Commonly used for broadcast joins where the build side is small enough to replicate
92+ ///
93+ /// This node has two variants:
94+ /// 1. Pending: acts as a placeholder for the distributed optimization step to mark it as ready.
95+ /// 2. Ready: runs within a distributed stage and queries the input stage over the network
96+ /// using Arrow Flight, broadcasting the data to all tasks.
97+ ///
98+ /// [NetworkShuffleExec]: crate::execution_plans::NetworkShuffleExec
2999#[ derive( Debug , Clone ) ]
30100pub enum NetworkBroadcastExec {
31101 Pending ( NetworkBroadcastPending ) ,
32102 Ready ( NetworkBroadcastReady ) ,
33103}
34104
105+ /// Placeholder version of the [NetworkBroadcastExec] node. It acts as a marker for the
106+ /// distributed optimization step, which will replace it with the appropriate
107+ /// [NetworkBroadcastReady] node.
35108#[ derive( Debug , Clone ) ]
36109pub struct NetworkBroadcastPending {
37110 properties : PlanProperties ,
38111 input_tasks : usize ,
39112 input : Arc < dyn ExecutionPlan > ,
40113}
41114
115+ /// Ready version of the [NetworkBroadcastExec] node. This node is created by:
116+ /// - the distributed optimization step based on an original [NetworkBroadcastPending]
117+ /// - deserialized from a protobuf plan sent over the network.
118+ ///
119+ /// This variant contains the input [Stage] information and executes by broadcasting
120+ /// data from the input stage to all tasks in the current stage over Arrow Flight.
42121#[ derive( Debug , Clone ) ]
43122pub struct NetworkBroadcastReady {
44123 pub ( crate ) properties : PlanProperties ,
0 commit comments