11use crate :: channel_resolver_ext:: get_distributed_channel_resolver;
2+ use crate :: execution_plans:: MetricsWrapperExec ;
23use crate :: execution_plans:: NetworkCoalesceExec ;
4+ use crate :: metrics:: TaskMetricsRewriter ;
35use crate :: { ChannelResolver , NetworkShuffleExec , PartitionIsolatorExec } ;
46use datafusion:: common:: { exec_err, internal_datafusion_err, internal_err, plan_err} ;
57use datafusion:: error:: { DataFusionError , Result } ;
68use datafusion:: execution:: TaskContext ;
9+ use datafusion:: physical_plan:: display:: DisplayableExecutionPlan ;
710use datafusion:: physical_plan:: {
811 DisplayAs , DisplayFormatType , ExecutionPlan , ExecutionPlanProperties , displayable,
912} ;
@@ -91,6 +94,8 @@ pub struct StageExec {
9194 pub tasks : Vec < ExecutionTask > ,
9295 /// tree depth of our location in the stage tree, used for display only
9396 pub depth : usize ,
97+ /// Optional extra information used at display time.
98+ pub display_ctx : Option < DisplayCtx > ,
9499}
95100
96101/// A [StageExec] that is the input of another [StageExec].
@@ -192,6 +197,7 @@ impl StageExec {
192197 . collect ( ) ,
193198 tasks : vec ! [ ExecutionTask { url: None } ; n_tasks] ,
194199 depth : 0 ,
200+ display_ctx : None ,
195201 }
196202 }
197203
@@ -239,6 +245,7 @@ impl StageExec {
239245 inputs : assigned_input_stages,
240246 tasks : assigned_tasks,
241247 depth : self . depth ,
248+ display_ctx : self . display_ctx . clone ( ) ,
242249 } ;
243250
244251 Ok ( assigned_stage)
@@ -292,9 +299,31 @@ impl ExecutionPlan for StageExec {
292299
293300 fn with_new_children (
294301 self : Arc < Self > ,
295- _children : Vec < Arc < dyn ExecutionPlan > > ,
302+ children : Vec < Arc < dyn ExecutionPlan > > ,
296303 ) -> Result < Arc < dyn ExecutionPlan > > {
297- plan_err ! ( "with_new_children() not supported for StageExec" )
304+ let num_children = children. len ( ) ;
305+ let child_stage_execs = children
306+ . into_iter ( )
307+ . filter ( |child| child. as_any ( ) . downcast_ref :: < StageExec > ( ) . is_some ( ) )
308+ . map ( |child| child. as_any ( ) . downcast_ref :: < StageExec > ( ) . unwrap ( ) . clone ( ) )
309+ . collect :: < Vec < _ > > ( ) ;
310+ if child_stage_execs. len ( ) != num_children {
311+ return plan_err ! ( "The children of a StageExec must all be StageExec" ) ;
312+ }
313+ let stage = StageExec {
314+ query_id : self . query_id ,
315+ num : self . num ,
316+ name : self . name . clone ( ) ,
317+ plan : self . plan . clone ( ) ,
318+ inputs : child_stage_execs
319+ . into_iter ( )
320+ . map ( |s| InputStage :: Decoded ( Arc :: new ( s) ) )
321+ . collect ( ) ,
322+ tasks : self . tasks . clone ( ) ,
323+ depth : self . depth ,
324+ display_ctx : self . display_ctx . clone ( ) ,
325+ } ;
326+ Ok ( Arc :: new ( stage) )
298327 }
299328
300329 fn properties ( & self ) -> & datafusion:: physical_plan:: PlanProperties {
@@ -344,9 +373,13 @@ impl ExecutionPlan for StageExec {
344373 }
345374}
346375
376+ use crate :: metrics:: proto:: MetricsSetProto ;
377+ use crate :: protobuf:: StageKey ;
347378use bytes:: Bytes ;
379+ use datafusion:: common:: HashMap ;
348380use datafusion:: common:: tree_node:: { TreeNode , TreeNodeRecursion } ;
349381use datafusion:: physical_expr:: Partitioning ;
382+
350383/// Be able to display a nice tree for stages.
351384///
352385/// The challenge to doing this at the moment is that `TreeRenderVistor`
@@ -367,9 +400,44 @@ const LDCORNER: &str = "└"; // Left bottom corner
367400const VERTICAL : & str = "│" ; // Vertical line
368401const HORIZONTAL : & str = "─" ; // Horizontal line
369402
403+ // Context used to display a StageExec, tasks, and plans.
404+ #[ derive( Debug , Clone ) ]
405+ pub struct DisplayCtx {
406+ metrics : Arc < HashMap < StageKey , Vec < MetricsSetProto > > > ,
407+ }
408+
409+ impl DisplayCtx {
410+ pub fn new ( metrics : HashMap < StageKey , Vec < MetricsSetProto > > ) -> Self {
411+ Self {
412+ metrics : Arc :: new ( metrics) ,
413+ }
414+ }
415+ }
416+
417+ /// TaskFmt is used to control how tasks are displayed when displaying a [StageExec]'s inner plan.
418+ #[ derive( Clone , Copy ) ]
419+ enum TaskFmt {
420+ // Display all tasks
421+ All ,
422+ // Display only the task with the given task_id
423+ TaskID { task_id : usize } ,
424+ }
425+
370426impl StageExec {
371- fn format ( & self , plan : & dyn ExecutionPlan , indent : usize , f : & mut String ) -> std:: fmt:: Result {
372- let mut node_str = displayable ( plan) . one_line ( ) . to_string ( ) ;
427+ fn format (
428+ & self ,
429+ plan : & dyn ExecutionPlan ,
430+ indent : usize ,
431+ task_fmt : TaskFmt ,
432+ f : & mut String ,
433+ ) -> std:: fmt:: Result {
434+ // If metrics are available, then display the plan with metrics.
435+ let mut node_str = match & self . display_ctx {
436+ None => displayable ( plan) . one_line ( ) . to_string ( ) ,
437+ Some ( _) => DisplayableExecutionPlan :: with_metrics ( plan)
438+ . one_line ( )
439+ . to_string ( ) ,
440+ } ;
373441 node_str. pop ( ) ;
374442 write ! ( f, "{} {node_str}" , " " . repeat( indent) ) ?;
375443
@@ -406,17 +474,29 @@ impl StageExec {
406474 ) ?;
407475 }
408476
409- if let Some ( isolator) = plan. as_any ( ) . downcast_ref :: < PartitionIsolatorExec > ( ) {
410- write ! (
411- f,
412- " {}" ,
413- format_tasks_for_partition_isolator( isolator, & self . tasks)
414- ) ?;
477+ let mut maybe_partition_isolator = plan;
478+ // It's possible that the plan node is wrapped in a MetricsWrapperExec for displaying purposes.
479+ // Check if that's the case.
480+ if let Some ( wrapper) = plan. as_any ( ) . downcast_ref :: < MetricsWrapperExec > ( ) {
481+ maybe_partition_isolator = wrapper. get_inner ( ) . as_ref ( ) ;
482+ }
483+
484+ if let Some ( isolator) = maybe_partition_isolator
485+ . as_any ( )
486+ . downcast_ref :: < PartitionIsolatorExec > ( )
487+ {
488+ let task_info = match task_fmt {
489+ TaskFmt :: All => format_tasks_for_partition_isolator ( isolator, & self . tasks ) ,
490+ TaskFmt :: TaskID { task_id } => {
491+ format_task_for_partition_isolator ( isolator, task_id, self . tasks . len ( ) )
492+ }
493+ } ;
494+ write ! ( f, " {}" , task_info) ?;
415495 }
416496 writeln ! ( f) ?;
417497
418498 for child in plan. children ( ) {
419- self . format ( child. as_ref ( ) , indent + 2 , f) ?;
499+ self . format ( child. as_ref ( ) , indent + 2 , task_fmt , f) ?;
420500 }
421501 Ok ( ( ) )
422502 }
@@ -430,32 +510,99 @@ impl DisplayAs for StageExec {
430510 write ! ( f, "{}" , self . name)
431511 }
432512 DisplayFormatType :: Verbose => {
433- writeln ! (
434- f,
435- "{}{} {} {}" ,
436- LTCORNER ,
437- HORIZONTAL . repeat( 5 ) ,
438- self . name,
439- format_tasks_for_stage( self . tasks. len( ) , & self . plan)
440- ) ?;
513+ match & self . display_ctx {
514+ None => {
515+ writeln ! (
516+ f,
517+ "{}{} {} {}" ,
518+ LTCORNER ,
519+ HORIZONTAL . repeat( 5 ) ,
520+ self . name,
521+ format_tasks_for_stage( self . tasks. len( ) , & self . plan)
522+ ) ?;
441523
442- let mut plan_str = String :: new ( ) ;
443- self . format ( self . plan . as_ref ( ) , 0 , & mut plan_str) ?;
444- let plan_str = plan_str
445- . split ( '\n' )
446- . filter ( |v| !v. is_empty ( ) )
447- . collect :: < Vec < _ > > ( )
448- . join ( & format ! ( "\n {}{}" , " " . repeat( self . depth) , VERTICAL ) ) ;
449- writeln ! ( f, "{}{}{}" , " " . repeat( self . depth) , VERTICAL , plan_str) ?;
450- write ! (
451- f,
452- "{}{}{}" ,
453- " " . repeat( self . depth) ,
454- LDCORNER ,
455- HORIZONTAL . repeat( 50 )
456- ) ?;
524+ let mut plan_str = String :: new ( ) ;
525+ self . format ( self . plan . as_ref ( ) , 0 , TaskFmt :: All , & mut plan_str) ?;
526+ let plan_str = plan_str
527+ . split ( '\n' )
528+ . filter ( |v| !v. is_empty ( ) )
529+ . collect :: < Vec < _ > > ( )
530+ . join ( & format ! ( "\n {}{}" , " " . repeat( self . depth) , VERTICAL ) ) ;
531+ writeln ! ( f, "{}{}{}" , " " . repeat( self . depth) , VERTICAL , plan_str) ?;
532+ // Add bottom border
533+ write ! (
534+ f,
535+ "{}{}{}" ,
536+ " " . repeat( self . depth) ,
537+ LDCORNER ,
538+ HORIZONTAL . repeat( 50 )
539+ ) ?;
457540
458- Ok ( ( ) )
541+ Ok ( ( ) )
542+ }
543+ Some ( display_ctx) => {
544+ // If metrics are available, always display each task separately.
545+ for ( i, _) in self . tasks . iter ( ) . enumerate ( ) {
546+ let mut extra_spacing = "" . to_string ( ) ;
547+ if i > 0 {
548+ // Add newline for each task
549+ writeln ! ( f) ?;
550+ // with_indent() in DisplayableExectutionPlan will not add indentation for tasks, so we add it manually.
551+ extra_spacing = " " . repeat ( self . depth ) ;
552+ }
553+ writeln ! (
554+ f,
555+ "{}{}{}{} {}" ,
556+ extra_spacing,
557+ LTCORNER ,
558+ HORIZONTAL . repeat( 5 ) ,
559+ format!( " {} " , self . name) ,
560+ format_task_for_stage( i, & self . plan) ,
561+ ) ?;
562+ // Uniquely identify the task.
563+ let key = StageKey {
564+ query_id : self . query_id . to_string ( ) ,
565+ stage_id : self . num as u64 ,
566+ task_number : i as u64 ,
567+ } ;
568+
569+ let mut plan_str = String :: new ( ) ;
570+ let plan = match display_ctx. metrics . get ( & key) {
571+ Some ( metrics) => {
572+ let result = TaskMetricsRewriter :: new ( metrics. to_owned ( ) )
573+ . enrich_task_with_metrics ( self . plan . clone ( ) ) ;
574+ if let Err ( e) = result {
575+ write ! ( f, "Error enriching task with metrics: {}" , e) ?;
576+ return Err ( std:: fmt:: Error ) ;
577+ }
578+ result. unwrap ( )
579+ }
580+ None => self . plan . clone ( ) ,
581+ } ;
582+ self . format (
583+ plan. as_ref ( ) ,
584+ 0 ,
585+ TaskFmt :: TaskID { task_id : i } ,
586+ & mut plan_str,
587+ ) ?;
588+ let plan_str = plan_str
589+ . split ( '\n' )
590+ . filter ( |v| !v. is_empty ( ) )
591+ . collect :: < Vec < _ > > ( )
592+ . join ( & format ! ( "\n {}{}" , " " . repeat( self . depth) , VERTICAL ) ) ;
593+ writeln ! ( f, "{}{}{}" , " " . repeat( self . depth) , VERTICAL , plan_str) ?;
594+ // Add bottom border
595+ write ! (
596+ f,
597+ "{}{}{}" ,
598+ " " . repeat( self . depth) ,
599+ LDCORNER ,
600+ HORIZONTAL . repeat( 50 )
601+ ) ?;
602+ }
603+ Ok ( ( ) )
604+ }
605+ }
459606 }
460607 DisplayFormatType :: TreeRender => write ! (
461608 f,
@@ -483,6 +630,22 @@ fn format_tasks_for_stage(n_tasks: usize, head: &Arc<dyn ExecutionPlan>) -> Stri
483630 result
484631}
485632
633+ fn format_task_for_stage ( task_number : usize , head : & Arc < dyn ExecutionPlan > ) -> String {
634+ let partitioning = head. properties ( ) . output_partitioning ( ) ;
635+ let input_partitions = partitioning. partition_count ( ) ;
636+ let hash_shuffle = matches ! ( partitioning, Partitioning :: Hash ( _, _) ) ;
637+ let off = task_number * if hash_shuffle { 0 } else { input_partitions } ;
638+
639+ let mut result = "Task " . to_string ( ) ;
640+ result += & format ! ( "t{task_number}:[" ) ;
641+ result += & ( off..( off + input_partitions) )
642+ . map ( |v| format ! ( "p{v}" ) )
643+ . join ( "," ) ;
644+ result += "] " ;
645+
646+ result
647+ }
648+
486649fn format_tasks_for_partition_isolator (
487650 isolator : & PartitionIsolatorExec ,
488651 tasks : & [ ExecutionTask ] ,
@@ -506,6 +669,28 @@ fn format_tasks_for_partition_isolator(
506669 result
507670}
508671
672+ fn format_task_for_partition_isolator (
673+ isolator : & PartitionIsolatorExec ,
674+ task_number : usize ,
675+ num_tasks : usize ,
676+ ) -> String {
677+ let input_partitions = isolator. input ( ) . output_partitioning ( ) . partition_count ( ) ;
678+ let partition_groups = PartitionIsolatorExec :: partition_groups ( input_partitions, num_tasks) ;
679+
680+ let n: usize = partition_groups. iter ( ) . map ( |v| v. len ( ) ) . sum ( ) ;
681+ let mut partitions = vec ! [ "__" . to_string( ) ; n] ;
682+
683+ let mut result = "Task " . to_string ( ) ;
684+ partition_groups
685+ . get ( task_number)
686+ . unwrap ( )
687+ . iter ( )
688+ . enumerate ( )
689+ . for_each ( |( j, p) | partitions[ * p] = format ! ( "p{j}" ) ) ;
690+ result += & format ! ( "t{task_number}:[{}] " , partitions. join( "," ) ) ;
691+ result
692+ }
693+
509694// num_colors must agree with the colorscheme selected from
510695// https://graphviz.org/doc/info/colors.html
511696const NUM_COLORS : usize = 6 ;
0 commit comments