1+ use super :: ExecutionStage ;
2+ use crate :: plan:: PartitionIsolatorExec ;
3+ use crate :: {
4+ task:: { format_pg, ExecutionTask } ,
5+ ArrowFlightReadExec ,
6+ } ;
7+ use datafusion:: physical_plan:: { displayable, ExecutionPlan , ExecutionPlanProperties } ;
8+ use datafusion:: {
9+ error:: Result ,
10+ physical_plan:: { DisplayAs , DisplayFormatType } ,
11+ } ;
12+ use itertools:: Itertools ;
113/// Be able to display a nice tree for stages.
214///
315/// The challenge to doing this at the moment is that `TreeRenderVistor`
1224/// the Stage tree.
1325use std:: fmt:: Write ;
1426
15- use datafusion:: {
16- error:: Result ,
17- physical_plan:: { DisplayAs , DisplayFormatType } ,
18- } ;
19-
20- use crate :: {
21- common:: util:: display_plan_with_partition_in_out,
22- task:: { format_pg, ExecutionTask } ,
23- } ;
24-
25- use super :: ExecutionStage ;
26-
2727// Unicode box-drawing characters for creating borders and connections.
2828const LTCORNER : & str = "┌" ; // Left top corner
2929const LDCORNER : & str = "└" ; // Left bottom corner
3030const VERTICAL : & str = "│" ; // Vertical line
3131const HORIZONTAL : & str = "─" ; // Horizontal line
3232
33+ impl ExecutionStage {
34+ fn format ( & self , plan : & dyn ExecutionPlan , indent : usize , f : & mut String ) -> std:: fmt:: Result {
35+ let mut node_str = displayable ( plan) . one_line ( ) . to_string ( ) ;
36+ node_str. pop ( ) ;
37+ write ! ( f, "{} {node_str}" , " " . repeat( indent) ) ?;
38+
39+ if let Some ( ArrowFlightReadExec :: Ready ( ready) ) =
40+ plan. as_any ( ) . downcast_ref :: < ArrowFlightReadExec > ( )
41+ {
42+ let Some ( input_stage) = & self . child_stages_iter ( ) . find ( |v| v. num == ready. stage_num )
43+ else {
44+ writeln ! ( f, "Wrong partition number {}" , ready. stage_num) ?;
45+ return Ok ( ( ) ) ;
46+ } ;
47+ let tasks = input_stage. tasks . len ( ) ;
48+ let partitions = plan. output_partitioning ( ) . partition_count ( ) ;
49+ let stage = ready. stage_num ;
50+ write ! (
51+ f,
52+ " input_stage={stage}, input_partitions={partitions}, input_tasks={tasks}" ,
53+ ) ?;
54+ }
55+
56+ if plan. as_any ( ) . is :: < PartitionIsolatorExec > ( ) {
57+ write ! ( f, " {}" , format_tasks_for_partition_isolator( & self . tasks) ) ?;
58+ }
59+ writeln ! ( f) ?;
60+
61+ for child in plan. children ( ) {
62+ self . format ( child. as_ref ( ) , indent + 2 , f) ?;
63+ }
64+ Ok ( ( ) )
65+ }
66+ }
67+
3368impl DisplayAs for ExecutionStage {
3469 fn fmt_as ( & self , t : DisplayFormatType , f : & mut std:: fmt:: Formatter ) -> std:: fmt:: Result {
3570 #[ allow( clippy:: format_in_format_args) ]
@@ -44,10 +79,11 @@ impl DisplayAs for ExecutionStage {
4479 LTCORNER ,
4580 HORIZONTAL . repeat( 5 ) ,
4681 format!( " {} " , self . name) ,
47- format_tasks ( & self . tasks) ,
82+ format_tasks_for_stage ( & self . tasks) ,
4883 ) ?;
49- let plan_str = display_plan_with_partition_in_out ( self . plan . as_ref ( ) )
50- . map_err ( |_| std:: fmt:: Error { } ) ?;
84+
85+ let mut plan_str = String :: new ( ) ;
86+ self . format ( self . plan . as_ref ( ) , 0 , & mut plan_str) ?;
5187 let plan_str = plan_str
5288 . split ( '\n' )
5389 . filter ( |v| !v. is_empty ( ) )
@@ -186,10 +222,28 @@ pub fn display_stage_graphviz(stage: &ExecutionStage) -> Result<String> {
186222 Ok ( f)
187223}
188224
189- fn format_tasks ( tasks : & [ ExecutionTask ] ) -> String {
190- tasks
191- . iter ( )
192- . map ( |task| format ! ( "{task}" ) )
193- . collect :: < Vec < String > > ( )
194- . join ( "," )
225+ fn format_tasks_for_stage ( tasks : & [ ExecutionTask ] ) -> String {
226+ let mut result = "Tasks: " . to_string ( ) ;
227+ for ( i, t) in tasks. iter ( ) . enumerate ( ) {
228+ result += & format ! ( "t{i}:[" ) ;
229+ result += & t. partition_group . iter ( ) . map ( |v| format ! ( "p{v}" ) ) . join ( "," ) ;
230+ result += "] "
231+ }
232+ result
233+ }
234+
235+ fn format_tasks_for_partition_isolator ( tasks : & [ ExecutionTask ] ) -> String {
236+ let mut result = "Tasks: " . to_string ( ) ;
237+ let mut partitions = vec ! [ ] ;
238+ for t in tasks. iter ( ) {
239+ partitions. extend ( vec ! [ "__" . to_string( ) ; t. partition_group. len( ) ] )
240+ }
241+ for ( i, t) in tasks. iter ( ) . enumerate ( ) {
242+ let mut partitions = partitions. clone ( ) ;
243+ for ( i, p) in t. partition_group . iter ( ) . enumerate ( ) {
244+ partitions[ * p as usize ] = format ! ( "p{i}" )
245+ }
246+ result += & format ! ( "t{i}:[{}] " , partitions. join( "," ) ) ;
247+ }
248+ result
195249}
0 commit comments