1+ //! Distributed query analysis and performance profiling execution plans
2+ //!
3+ //! This module provides execution plan nodes for analyzing distributed query performance
4+ //! in a similar way to DataFusion's `EXPLAIN ANALYZE` functionality, but adapted for
5+ //! distributed execution across multiple worker nodes.
6+ //!
7+ //! # Purpose
8+ //!
9+ //! When executing `EXPLAIN ANALYZE` queries in a distributed environment, we need to:
10+ //! - Collect execution metrics from all worker nodes
11+ //! - Aggregate performance statistics across the distributed execution
12+ //! - Present a unified view of the distributed query execution plan with metrics
13+ //! - Track resource usage and timing information per stage and partition
14+ //!
15+ //! # Architecture
16+ //!
17+ //! The analysis system uses a two-tier approach:
18+ //! - **`DistributedAnalyzeExec`**: Wraps individual stages for metric collection
19+ //! - **`DistributedAnalyzeRootExec`**: Aggregates results from all workers at the proxy
20+ //!
21+ //! # Execution Flow
22+ //!
23+ //! 1. Query planning inserts `DistributedAnalyzeExec` nodes around distributed stages
24+ //! 2. Workers execute their stages while collecting performance metrics
25+ //! 3. Workers send annotated execution plans back to the proxy
26+ //! 4. `DistributedAnalyzeRootExec` at the proxy collects and formats all results
27+ //! 5. Final output shows the complete distributed execution plan with metrics
28+ //!
29+ //! # Integration with Standard DataFusion
30+ //!
31+ //! These execution plans integrate with DataFusion's metric collection system,
32+ //! ensuring that distributed analysis provides the same level of detail as
33+ //! single-node `EXPLAIN ANALYZE` queries.
34+
135use std:: { fmt:: Formatter , sync:: Arc } ;
236
337use arrow:: {
@@ -24,6 +58,22 @@ use crate::{
2458 vocab:: { CtxAnnotatedOutputs , CtxHost , CtxPartitionGroup , CtxStageId } ,
2559} ;
2660
61+ /// Distributed analysis execution plan for individual stage metric collection
62+ ///
63+ /// This execution plan wraps another execution plan to enable metric collection
64+ /// for distributed `EXPLAIN ANALYZE` queries. It acts as a transparent wrapper
65+ /// that delegates execution to its child while enabling metric tracking that
66+ /// can be collected by the distributed analysis system.
67+ ///
68+ /// # Purpose
69+ /// - Enables metric collection for individual distributed stages
70+ /// - Maintains compatibility with DataFusion's analysis infrastructure
71+ /// - Provides transparent execution with metric annotation capabilities
72+ /// - Supports both verbose and statistics display modes
73+ ///
74+ /// # Usage
75+ /// Inserted automatically during distributed query planning when `EXPLAIN ANALYZE`
76+ /// is used, wrapping stages that will execute on worker nodes.
2777#[ derive( Debug ) ]
2878pub struct DistributedAnalyzeExec {
2979 /// Control how much extra to print
@@ -35,6 +85,7 @@ pub struct DistributedAnalyzeExec {
3585}
3686
3787impl DistributedAnalyzeExec {
88+ /// Creates a new distributed analysis wrapper
3889 pub fn new ( input : Arc < dyn ExecutionPlan > , verbose : bool , show_statistics : bool ) -> Self {
3990 Self {
4091 input,
@@ -43,6 +94,7 @@ impl DistributedAnalyzeExec {
4394 }
4495 }
4596
97+ /// Generates an annotated plan string with metrics for this stage
4698 pub fn annotated_plan ( & self ) -> String {
4799 DisplayableExecutionPlan :: with_metrics ( self . input . as_ref ( ) )
48100 . set_show_statistics ( self . show_statistics )
@@ -100,6 +152,30 @@ impl ExecutionPlan for DistributedAnalyzeExec {
100152 }
101153}
102154
155+ /// Distributed analysis root execution plan for result aggregation and formatting
156+ ///
157+ /// This execution plan serves as the root node for distributed `EXPLAIN ANALYZE` queries,
158+ /// collecting annotated execution plans from all worker nodes and formatting them into
159+ /// a unified result. It runs on the proxy node and aggregates performance data from
160+ /// the entire distributed execution.
161+ ///
162+ /// # Purpose
163+ /// - Collects annotated execution plans from all distributed workers
164+ /// - Aggregates and sorts results by stage and partition for readable output
165+ /// - Formats the final `EXPLAIN ANALYZE` output with distributed metrics
166+ /// - Provides a unified view of distributed query performance
167+ ///
168+ /// # Output Schema
169+ /// Produces a two-column result:
170+ /// - **Task**: Stage and partition information with worker host details
171+ /// - **Plan with Metrics**: Annotated execution plan with performance metrics
172+ ///
173+ /// # Context Dependencies
174+ /// Requires session context extensions for distributed execution metadata:
175+ /// - `CtxAnnotatedOutputs`: Collection of worker analysis results
176+ /// - `CtxHost`: Current worker host information
177+ /// - `CtxStageId`: Current stage identifier
178+ /// - `CtxPartitionGroup`: Partition assignment information
103179#[ derive( Debug ) ]
104180pub struct DistributedAnalyzeRootExec {
105181 /// Control how much extra to print
@@ -113,7 +189,12 @@ pub struct DistributedAnalyzeRootExec {
113189}
114190
115191impl DistributedAnalyzeRootExec {
192+ /// Creates a new distributed analysis root execution plan
193+ ///
194+ /// Sets up the output schema for the analysis results and configures
195+ /// plan properties for single-partition bounded execution.
116196 pub fn new ( input : Arc < dyn ExecutionPlan > , verbose : bool , show_statistics : bool ) -> Self {
197+ // Define output schema for analysis results
117198 let field_a = Field :: new ( "Task" , DataType :: Utf8 , false ) ;
118199 let field_b = Field :: new ( "Plan with Metrics" , DataType :: Utf8 , false ) ;
119200 let schema = Arc :: new ( Schema :: new ( vec ! [ field_a, field_b] ) ) ;
@@ -179,6 +260,7 @@ impl ExecutionPlan for DistributedAnalyzeRootExec {
179260 partition : usize ,
180261 context : std:: sync:: Arc < datafusion:: execution:: TaskContext > ,
181262 ) -> Result < SendableRecordBatchStream > {
263+ // Extract annotated outputs from all workers
182264 let task_outputs = context
183265 . session_config ( )
184266 . get_extension :: < CtxAnnotatedOutputs > ( )
@@ -191,6 +273,7 @@ impl ExecutionPlan for DistributedAnalyzeRootExec {
191273 "DistributedAnalyzeRootExec expects only partition 0"
192274 ) ;
193275
276+ // Extract distributed execution context information
194277 let host = context
195278 . session_config ( )
196279 . get_extension :: < CtxHost > ( )
@@ -225,7 +308,7 @@ impl ExecutionPlan for DistributedAnalyzeRootExec {
225308 . 0
226309 . clone ( ) ;
227310
228- // we want to gather all partitions
311+ // Coalesce all partitions to single stream for processing
229312 let coalesce = Arc :: new ( CoalescePartitionsExec :: new ( self . input . clone ( ) ) ) ;
230313
231314 let mut input_stream = coalesce. execute ( partition, context) ?;
@@ -242,15 +325,19 @@ impl ExecutionPlan for DistributedAnalyzeRootExec {
242325 . to_string ( )
243326 } ;
244327
328+ // Aggregates analysis results from all distributed workers into formatted output
329+ //
330+ // This async block handles the core logic of distributed analysis:
331+ // 1. Consumes input stream (discarding data like standard AnalyzeExec)
332+ // 2. Collects annotated execution plans from all workers
333+ // 3. Sorts results by stage and partition for readable output
334+ // 4. Formats final RecordBatch with task information and metrics
245335 let output = async move {
246- // consume input, and we do not have to send it downstream as we are the
247- // root of the distributed analyze so we can discard the results just like
248- // regular AnalyzeExec
336+ // Consume input stream without forwarding data
249337 let mut done = false ;
250338 while !done {
251339 match input_stream. next ( ) . await . transpose ( ) {
252340 Ok ( Some ( batch) ) => {
253- // we consume the batch, yum.
254341 trace ! ( "consumed {} " , batch. num_rows( ) ) ;
255342 }
256343 Ok ( None ) => done = true ,
@@ -259,6 +346,8 @@ impl ExecutionPlan for DistributedAnalyzeRootExec {
259346 }
260347 }
261348 }
349+
350+ // Generate annotated plan for current stage
262351 let annotated_plan = fmt_plan ( ) ;
263352 let toutput = AnnotatedTaskOutput {
264353 plan : annotated_plan,
@@ -267,13 +356,15 @@ impl ExecutionPlan for DistributedAnalyzeRootExec {
267356 partition_group,
268357 } ;
269358
359+ // Collect and sort all worker analysis results
270360 let mut tasks = task_outputs. lock ( ) ;
271361 tasks. push ( toutput) ;
272362
273363 tasks. sort_by_key ( |t| ( t. stage_id , t. partition_group . clone ( ) ) ) ;
274364
275365 trace ! ( "sorted tasks: {:?}" , tasks) ;
276366
367+ // Build output columns with task info and annotated plans
277368 let mut task_builder = StringBuilder :: with_capacity ( 1 , 1024 ) ;
278369 let mut plan_builder = StringBuilder :: with_capacity ( 1 , 1024 ) ;
279370
@@ -291,6 +382,7 @@ impl ExecutionPlan for DistributedAnalyzeRootExec {
291382 plan_builder. append_value ( & task_output. plan ) ;
292383 }
293384
385+ // Create final RecordBatch with formatted analysis results
294386 RecordBatch :: try_new (
295387 schema_capture,
296388 vec ! [
0 commit comments