11use crate :: channel_resolver_ext:: get_distributed_channel_resolver;
2+ use crate :: execution_plans:: MetricsWrapperExec ;
23use crate :: execution_plans:: NetworkCoalesceExec ;
4+ use crate :: metrics:: TaskMetricsRewriter ;
35use crate :: { ChannelResolver , NetworkShuffleExec , PartitionIsolatorExec } ;
46use datafusion:: common:: { exec_err, internal_datafusion_err, internal_err, plan_err} ;
57use datafusion:: error:: { DataFusionError , Result } ;
68use datafusion:: execution:: TaskContext ;
9+ use datafusion:: physical_plan:: display:: DisplayableExecutionPlan ;
710use datafusion:: physical_plan:: {
811 DisplayAs , DisplayFormatType , ExecutionPlan , ExecutionPlanProperties , displayable,
912} ;
@@ -91,6 +94,8 @@ pub struct StageExec {
9194 pub tasks : Vec < ExecutionTask > ,
9295 /// tree depth of our location in the stage tree, used for display only
9396 pub depth : usize ,
97+
98+ pub display_ctx : Option < DisplayCtx > ,
9499}
95100
96101/// A [StageExec] that is the input of another [StageExec].
@@ -192,6 +197,7 @@ impl StageExec {
192197 . collect ( ) ,
193198 tasks : vec ! [ ExecutionTask { url: None } ; n_tasks] ,
194199 depth : 0 ,
200+ display_ctx : None ,
195201 }
196202 }
197203
@@ -239,6 +245,7 @@ impl StageExec {
239245 inputs : assigned_input_stages,
240246 tasks : assigned_tasks,
241247 depth : self . depth ,
248+ display_ctx : self . display_ctx . clone ( ) ,
242249 } ;
243250
244251 Ok ( assigned_stage)
@@ -292,9 +299,31 @@ impl ExecutionPlan for StageExec {
292299
293300 fn with_new_children (
294301 self : Arc < Self > ,
295- _children : Vec < Arc < dyn ExecutionPlan > > ,
302+ children : Vec < Arc < dyn ExecutionPlan > > ,
296303 ) -> Result < Arc < dyn ExecutionPlan > > {
297- plan_err ! ( "with_new_children() not supported for StageExec" )
304+ let num_children = children. len ( ) ;
305+ let child_stage_execs = children
306+ . into_iter ( )
307+ . filter ( |child| child. as_any ( ) . downcast_ref :: < StageExec > ( ) . is_some ( ) )
308+ . map ( |child| child. as_any ( ) . downcast_ref :: < StageExec > ( ) . unwrap ( ) . clone ( ) )
309+ . collect :: < Vec < _ > > ( ) ;
310+ if child_stage_execs. len ( ) != num_children {
311+ return plan_err ! ( "not all children are StageExec" ) ;
312+ }
313+ let stage = StageExec {
314+ query_id : self . query_id ,
315+ num : self . num ,
316+ name : self . name . clone ( ) ,
317+ plan : self . plan . clone ( ) ,
318+ inputs : child_stage_execs
319+ . into_iter ( )
320+ . map ( |s| InputStage :: Decoded ( Arc :: new ( s) ) )
321+ . collect ( ) ,
322+ tasks : self . tasks . clone ( ) ,
323+ depth : self . depth ,
324+ display_ctx : self . display_ctx . clone ( ) ,
325+ } ;
326+ Ok ( Arc :: new ( stage) )
298327 }
299328
300329 fn properties ( & self ) -> & datafusion:: physical_plan:: PlanProperties {
@@ -344,9 +373,13 @@ impl ExecutionPlan for StageExec {
344373 }
345374}
346375
376+ use crate :: metrics:: proto:: MetricsSetProto ;
377+ use crate :: protobuf:: StageKey ;
347378use bytes:: Bytes ;
379+ use datafusion:: common:: HashMap ;
348380use datafusion:: common:: tree_node:: { TreeNode , TreeNodeRecursion } ;
349381use datafusion:: physical_expr:: Partitioning ;
382+
350383/// Be able to display a nice tree for stages.
351384///
352385/// The challenge to doing this at the moment is that `TreeRenderVistor`
@@ -367,9 +400,40 @@ const LDCORNER: &str = "└"; // Left bottom corner
367400const VERTICAL : & str = "│" ; // Vertical line
368401const HORIZONTAL : & str = "─" ; // Horizontal line
369402
403+ // Context used to display a StageExec, tasks, and plans.
404+ #[ derive( Debug , Clone ) ]
405+ pub struct DisplayCtx {
406+ metrics : Arc < HashMap < StageKey , Vec < MetricsSetProto > > > ,
407+ }
408+
409+ impl DisplayCtx {
410+ pub fn new ( metrics : HashMap < StageKey , Vec < MetricsSetProto > > ) -> Self {
411+ Self {
412+ metrics : Arc :: new ( metrics) ,
413+ }
414+ }
415+ }
416+
417+ #[ derive( Clone , Copy ) ]
418+ enum TaskFmt {
419+ All ,
420+ TaskID { task_id : usize } ,
421+ }
422+
370423impl StageExec {
371- fn format ( & self , plan : & dyn ExecutionPlan , indent : usize , f : & mut String ) -> std:: fmt:: Result {
372- let mut node_str = displayable ( plan) . one_line ( ) . to_string ( ) ;
424+ fn format (
425+ & self ,
426+ plan : & dyn ExecutionPlan ,
427+ indent : usize ,
428+ task_fmt : TaskFmt ,
429+ f : & mut String ,
430+ ) -> std:: fmt:: Result {
431+ let mut node_str = match & self . display_ctx {
432+ None => displayable ( plan) . one_line ( ) . to_string ( ) ,
433+ Some ( _) => DisplayableExecutionPlan :: with_metrics ( plan)
434+ . one_line ( )
435+ . to_string ( ) ,
436+ } ;
373437 node_str. pop ( ) ;
374438 write ! ( f, "{} {node_str}" , " " . repeat( indent) ) ?;
375439
@@ -406,17 +470,29 @@ impl StageExec {
406470 ) ?;
407471 }
408472
409- if let Some ( isolator) = plan. as_any ( ) . downcast_ref :: < PartitionIsolatorExec > ( ) {
410- write ! (
411- f,
412- " {}" ,
413- format_tasks_for_partition_isolator( isolator, & self . tasks)
414- ) ?;
473+ let mut maybe_partition_isolator = plan;
474+ if self . display_ctx . is_some ( ) {
475+ if let Some ( wrapper) = plan. as_any ( ) . downcast_ref :: < MetricsWrapperExec > ( ) {
476+ maybe_partition_isolator = wrapper. get_inner ( ) . as_ref ( ) ;
477+ }
478+ }
479+
480+ if let Some ( isolator) = maybe_partition_isolator
481+ . as_any ( )
482+ . downcast_ref :: < PartitionIsolatorExec > ( )
483+ {
484+ let task_info = match task_fmt {
485+ TaskFmt :: All => format_tasks_for_partition_isolator ( isolator, & self . tasks ) ,
486+ TaskFmt :: TaskID { task_id } => {
487+ format_task_for_partition_isolator ( isolator, task_id, self . tasks . len ( ) )
488+ }
489+ } ;
490+ write ! ( f, " {}" , task_info) ?;
415491 }
416492 writeln ! ( f) ?;
417493
418494 for child in plan. children ( ) {
419- self . format ( child. as_ref ( ) , indent + 2 , f) ?;
495+ self . format ( child. as_ref ( ) , indent + 2 , task_fmt , f) ?;
420496 }
421497 Ok ( ( ) )
422498 }
@@ -430,32 +506,96 @@ impl DisplayAs for StageExec {
430506 write ! ( f, "{}" , self . name)
431507 }
432508 DisplayFormatType :: Verbose => {
433- writeln ! (
434- f,
435- "{}{} {} {}" ,
436- LTCORNER ,
437- HORIZONTAL . repeat( 5 ) ,
438- self . name,
439- format_tasks_for_stage( self . tasks. len( ) , & self . plan)
440- ) ?;
509+ match & self . display_ctx {
510+ None => {
511+ writeln ! (
512+ f,
513+ "{}{} {} {}" ,
514+ LTCORNER ,
515+ HORIZONTAL . repeat( 5 ) ,
516+ self . name,
517+ format_tasks_for_stage( self . tasks. len( ) , & self . plan)
518+ ) ?;
441519
442- let mut plan_str = String :: new ( ) ;
443- self . format ( self . plan . as_ref ( ) , 0 , & mut plan_str) ?;
444- let plan_str = plan_str
445- . split ( '\n' )
446- . filter ( |v| !v. is_empty ( ) )
447- . collect :: < Vec < _ > > ( )
448- . join ( & format ! ( "\n {}{}" , " " . repeat( self . depth) , VERTICAL ) ) ;
449- writeln ! ( f, "{}{}{}" , " " . repeat( self . depth) , VERTICAL , plan_str) ?;
450- write ! (
451- f,
452- "{}{}{}" ,
453- " " . repeat( self . depth) ,
454- LDCORNER ,
455- HORIZONTAL . repeat( 50 )
456- ) ?;
520+ let mut plan_str = String :: new ( ) ;
521+ self . format ( self . plan . as_ref ( ) , 0 , TaskFmt :: All , & mut plan_str) ?;
522+ let plan_str = plan_str
523+ . split ( '\n' )
524+ . filter ( |v| !v. is_empty ( ) )
525+ . collect :: < Vec < _ > > ( )
526+ . join ( & format ! ( "\n {}{}" , " " . repeat( self . depth) , VERTICAL ) ) ;
527+ writeln ! ( f, "{}{}{}" , " " . repeat( self . depth) , VERTICAL , plan_str) ?;
528+ // Add bottom border
529+ write ! (
530+ f,
531+ "{}{}{}" ,
532+ " " . repeat( self . depth) ,
533+ LDCORNER ,
534+ HORIZONTAL . repeat( 50 )
535+ ) ?;
457536
458- Ok ( ( ) )
537+ Ok ( ( ) )
538+ }
539+ Some ( display_ctx) => {
540+ for ( i, _) in self . tasks . iter ( ) . enumerate ( ) {
541+ let mut extra_spacing = "" . to_string ( ) ;
542+ if i > 0 {
543+ writeln ! ( f) ?; // Add newline for each task
544+ extra_spacing = " " . repeat ( self . depth ) ; // with_indent() in DisplayableExectutionPlan will not add indentation for tasks, so we add it manually.
545+ }
546+ writeln ! (
547+ f,
548+ "{}{}{}{} {}" ,
549+ extra_spacing,
550+ LTCORNER ,
551+ HORIZONTAL . repeat( 5 ) ,
552+ format!( " {} " , self . name) ,
553+ format_task_for_stage( i, & self . plan) ,
554+ ) ?;
555+ // Uniquely identify the task.
556+ let key = StageKey {
557+ query_id : self . query_id . to_string ( ) ,
558+ stage_id : self . num as u64 ,
559+ task_number : i as u64 ,
560+ } ;
561+
562+ let mut plan_str = String :: new ( ) ;
563+ let plan = match display_ctx. metrics . get ( & key) {
564+ Some ( metrics) => {
565+ let result = TaskMetricsRewriter :: new ( metrics. to_owned ( ) )
566+ . enrich_task_with_metrics ( self . plan . clone ( ) ) ;
567+ if let Err ( e) = result {
568+ write ! ( f, "Error enriching task with metrics: {}" , e) ?;
569+ return Err ( std:: fmt:: Error ) ;
570+ }
571+ result. unwrap ( )
572+ }
573+ None => self . plan . clone ( ) ,
574+ } ;
575+ self . format (
576+ plan. as_ref ( ) ,
577+ 0 ,
578+ TaskFmt :: TaskID { task_id : i } ,
579+ & mut plan_str,
580+ ) ?;
581+ let plan_str = plan_str
582+ . split ( '\n' )
583+ . filter ( |v| !v. is_empty ( ) )
584+ . collect :: < Vec < _ > > ( )
585+ . join ( & format ! ( "\n {}{}" , " " . repeat( self . depth) , VERTICAL ) ) ;
586+ writeln ! ( f, "{}{}{}" , " " . repeat( self . depth) , VERTICAL , plan_str) ?;
587+ // Add bottom border
588+ write ! (
589+ f,
590+ "{}{}{}" ,
591+ " " . repeat( self . depth) ,
592+ LDCORNER ,
593+ HORIZONTAL . repeat( 50 )
594+ ) ?;
595+ }
596+ Ok ( ( ) )
597+ }
598+ }
459599 }
460600 DisplayFormatType :: TreeRender => write ! (
461601 f,
@@ -483,6 +623,22 @@ fn format_tasks_for_stage(n_tasks: usize, head: &Arc<dyn ExecutionPlan>) -> Stri
483623 result
484624}
485625
626+ fn format_task_for_stage ( task_number : usize , head : & Arc < dyn ExecutionPlan > ) -> String {
627+ let partitioning = head. properties ( ) . output_partitioning ( ) ;
628+ let input_partitions = partitioning. partition_count ( ) ;
629+ let hash_shuffle = matches ! ( partitioning, Partitioning :: Hash ( _, _) ) ;
630+ let off = task_number * if hash_shuffle { 0 } else { input_partitions } ;
631+
632+ let mut result = "Task " . to_string ( ) ;
633+ result += & format ! ( "t{task_number}:[" ) ;
634+ result += & ( off..( off + input_partitions) )
635+ . map ( |v| format ! ( "p{v}" ) )
636+ . join ( "," ) ;
637+ result += "] " ;
638+
639+ result
640+ }
641+
486642fn format_tasks_for_partition_isolator (
487643 isolator : & PartitionIsolatorExec ,
488644 tasks : & [ ExecutionTask ] ,
@@ -506,6 +662,28 @@ fn format_tasks_for_partition_isolator(
506662 result
507663}
508664
665+ fn format_task_for_partition_isolator (
666+ isolator : & PartitionIsolatorExec ,
667+ task_number : usize ,
668+ num_tasks : usize ,
669+ ) -> String {
670+ let input_partitions = isolator. input ( ) . output_partitioning ( ) . partition_count ( ) ;
671+ let partition_groups = PartitionIsolatorExec :: partition_groups ( input_partitions, num_tasks) ;
672+
673+ let n: usize = partition_groups. iter ( ) . map ( |v| v. len ( ) ) . sum ( ) ;
674+ let mut partitions = vec ! [ "__" . to_string( ) ; n] ;
675+
676+ let mut result = "Task " . to_string ( ) ;
677+ partition_groups
678+ . get ( task_number)
679+ . unwrap ( )
680+ . iter ( )
681+ . enumerate ( )
682+ . for_each ( |( j, p) | partitions[ * p] = format ! ( "p{j}" ) ) ;
683+ result += & format ! ( "t{task_number}:[{}] " , partitions. join( "," ) ) ;
684+ result
685+ }
686+
509687// num_colors must agree with the colorscheme selected from
510688// https://graphviz.org/doc/info/colors.html
511689const NUM_COLORS : usize = 6 ;
0 commit comments