@@ -110,6 +110,74 @@ impl PhysicalOptimizerRule for DistributedPhysicalOptimizerRule {
110110/// wether the cardinality has increased or not.
111111///
112112/// 5. This is repeated until all the [NetworkBoundary]s are placed.
113+ ///
114+ /// ## Example:
115+ ///
116+ /// Given a plan with 3 stages:
117+ ///
118+ /// ```text
119+ /// ┌─────────────────┐
120+ /// │ Stage 3 │? tasks
121+ /// └────────▲────────┘
122+ /// ┌────────┴────────┐
123+ /// │ Stage 2 │? tasks
124+ /// └────────▲────────┘
125+ /// ┌────────┴────────┐
126+ /// │ Stage 1 │? tasks
127+ /// └─────────────────┘
128+ /// ```
129+ ///
130+ /// 1. Calculate the number of tasks for a bottom stage based on how much data the leaf nodes
131+ /// (e.g. `DataSourceExec`s) are expected to pull.
132+ ///
133+ /// ```text
134+ /// ┌─────────────────┐
135+ /// │ Stage 3 │? tasks
136+ /// └────────▲────────┘
137+ /// ┌────────┴────────┐
138+ /// │ Stage 2 │? tasks
139+ /// └────────▲────────┘
140+ /// ┌────────┴────────┐
141+ /// │ Stage 1 │3 tasks
142+ /// └─────────────────┘
143+ /// ```
144+ ///
145+ /// 2. Based on the calculated tasks in the leaf stage (e.g. 3 tasks), calculate the amount of
146+ /// tasks in the next stage.
147+ /// This is done by multiplying the task count by a scale factor every time a node that
148+ /// increments or reduces the cardinality of the data appears, which is information present in
149+ /// the `fn cardinality_effect(&self) -> CardinalityEffect` method. For example, if "Stage 1"
150+ /// has a partial aggregation step, and the scale factor is 1.5, it will look like this:
151+ ///
152+ /// ```text
153+ /// ┌─────────────────┐
154+ /// │ Stage 3 │? tasks
155+ /// └────────▲────────┘
156+ /// ┌────────┴────────┐
157+ /// │ Stage 2 │3/1.5 = 2 tasks
158+ /// └────────▲────────┘
159+ /// ┌────────┴────────┐
160+ /// │ Stage 1 │3 tasks (cardinality effect factor of 1.5)
161+ /// └─────────────────┘
162+ /// ```
163+ ///
164+ ///
165+ /// 3. This is repeated recursively until all tasks have been assigned to all stages, keeping into
166+ /// account the cardinality effect different nodes in subplans have. If there is no
167+ /// cardinality effect (e.g. `ProjectExec` nodes), then the task count is kept across stages:
168+ ///
169+ /// ```text
170+ /// ┌─────────────────┐
171+ /// │ Stage 3 │2 tasks
172+ /// └────────▲────────┘
173+ /// ┌────────┴────────┐
174+ /// │ Stage 2 │2 tasks
175+ /// └────────▲────────┘
176+ /// ┌────────┴────────┐
177+ /// │ Stage 1 │3 tasks
178+ /// └─────────────────┘
179+ /// ```
180+ ///
113181pub fn apply_network_boundaries (
114182 mut plan : Arc < dyn ExecutionPlan > ,
115183 cfg : & ConfigOptions ,
@@ -130,6 +198,8 @@ pub fn apply_network_boundaries(
130198 Ok ( ctx. plan )
131199}
132200
201+ /// [ApplyNetworkBoundariesCtx] helps keeping track of the stage of the task count calculations
202+ /// while recursing through [ExecutionPlan]s.
133203struct ApplyNetworkBoundariesCtx {
134204 task_count : usize ,
135205 this_stage_sf : f64 ,
0 commit comments