99import traceback
1010from concurrent .futures import Future , ThreadPoolExecutor
1111from dataclasses import dataclass
12+ from string import Template
1213from types import ModuleType
13- from typing import List , Dict , Optional , Tuple , Type , Callable , cast
14+ from typing import List , Optional , Tuple , Type , Callable , cast
1415
1516from pyre_extensions import none_throws
1617from torchx .components .component_test_base import ComponentUtils
1718from torchx .components .integration_tests .component_provider import ComponentProvider
18- from torchx .specs import RunConfig , SchedulerBackend
19+ from torchx .specs import RunConfig , SchedulerBackend , AppDef
1920
2021
2122@dataclass
@@ -27,13 +28,34 @@ class SchedulerInfo:
2728
2829@dataclass
2930class AppDefRun :
30- provider_cls : Type [ ComponentProvider ]
31+ provider : ComponentProvider
3132 scheduler_info : SchedulerInfo
33+ app_def : AppDef
3234 future : Future
3335
34- def __str__ (self ) -> str :
35- msg = f"`{ self .scheduler_info .name } `:`{ self .scheduler_info .image } `"
36- return msg
36+
37+ _SUCCESS_APP_FORMAT_TEMPLATE = """
38+ Name: ${name}
39+ Scheduler: ${scheduler}"""
40+
41+ _FAIL_APP_FORMAT_TEMPLATE = """
42+ Name: ${name}
43+ Provider: ${provider}
44+ Scheduler: ${scheduler}
45+ Image: ${image}
46+ Error: ${error}"""
47+
48+ _REPORT_FORMAT_TEMPLATE = """
49+ ${boarder}
50+ Status: Success
51+ ${boarder}
52+ ${success_report}
53+ \n
54+ ${boarder}
55+ Status: Failed
56+ ${boarder}
57+ ${fail_report}
58+ """
3759
3860
3961class IntegComponentTest :
@@ -61,14 +83,14 @@ def run_components(
6183 dryrun : bool = False ,
6284 ) -> None :
6385 component_providers_cls = self ._get_component_providers (module )
64- futures : List [AppDefRun ] = []
86+ app_runs : List [AppDefRun ] = []
6587 executor = ThreadPoolExecutor ()
6688 for scheduler_info in scheduler_infos :
6789 sched_futures = self ._run_component_providers (
6890 executor , component_providers_cls , scheduler_info , dryrun
6991 )
70- futures += sched_futures
71- self ._wait_and_print_report (futures )
92+ app_runs += sched_futures
93+ self ._wait_and_print_report (app_runs )
7294
7395 def _get_component_providers (
7496 self , module : ModuleType
@@ -91,70 +113,94 @@ def _run_component_providers(
91113 scheduler_info : SchedulerInfo ,
92114 dryrun : bool = False ,
93115 ) -> List [AppDefRun ]:
94- futures : List [AppDefRun ] = []
116+ app_runs : List [AppDefRun ] = []
95117 for provider_cls in component_providers_cls :
118+ provider = self ._get_app_def_provider (provider_cls , scheduler_info )
96119 future = executor .submit (
97- self ._run_component_provider , provider_cls , scheduler_info , dryrun
120+ self ._run_component_provider , provider , scheduler_info , dryrun
98121 )
99- futures .append (
122+ app_runs .append (
100123 AppDefRun (
101- provider_cls = provider_cls ,
124+ provider = provider ,
102125 scheduler_info = scheduler_info ,
103126 future = future ,
127+ app_def = provider .get_app_def (),
104128 )
105129 )
106- return futures
130+ return app_runs
107131
108- def _run_component_provider (
132+ def _get_app_def_provider (
109133 self ,
110134 component_provider_cls : Type [ComponentProvider ],
111135 scheduler_info : SchedulerInfo ,
136+ ) -> ComponentProvider :
137+ provider_cls = cast (Callable [..., ComponentProvider ], component_provider_cls )
138+ return none_throws (provider_cls (scheduler_info .name , scheduler_info .image ))
139+
140+ def _run_component_provider (
141+ self ,
142+ provider : ComponentProvider ,
143+ scheduler_info : SchedulerInfo ,
112144 dryrun : bool = False ,
113- ) -> str :
114- provider : Optional [ComponentProvider ] = None
145+ ) -> None :
115146 try :
116- provider_cls = cast (
117- Callable [..., ComponentProvider ], component_provider_cls
118- )
119- provider = none_throws (
120- provider_cls (scheduler_info .name , scheduler_info .image )
121- )
122147 provider .setUp ()
123- app_def = provider .get_app_def ()
124148 ComponentUtils .run_appdef_on_scheduler (
125- app_def , scheduler_info .name , scheduler_info .runconfig , dryrun
149+ provider .get_app_def (),
150+ scheduler_info .name ,
151+ scheduler_info .runconfig ,
152+ dryrun ,
126153 )
127- return app_def .name
128154 finally :
129- if provider :
130- provider .tearDown ()
155+ provider .tearDown ()
131156
132- def _wait_and_print_report (self , futures : List [AppDefRun ]) -> None :
133- app_runs : Dict [str , List [str ]] = {}
157+ def _wait_and_print_report (self , app_runs : List [AppDefRun ]) -> None :
158+ succeeded_apps : List [AppDefRun ] = []
159+ failed_apps : List [Tuple [AppDefRun , str ]] = []
134160 deadline = time .monotonic () + self ._timeout
135- for future in futures :
161+ for app_run in app_runs :
136162 task_timeout = max (0 , int (deadline - time .monotonic ()))
137- status , msg = self ._get_app_def_run_status (future , task_timeout )
138- if status not in app_runs :
139- app_runs [status ] = []
140- app_runs [status ].append (msg )
141- for status , msgs in app_runs .items ():
142- print (f"\n Status: `{ status } `\n " )
143- print ("\n " .join (msgs ))
144- print ("\n " )
163+ error_msg = self ._get_app_def_run_status (app_run , task_timeout )
164+ if not error_msg :
165+ succeeded_apps .append (app_run )
166+ else :
167+ failed_apps .append ((app_run , error_msg ))
168+ success_report = ""
169+ for app_run in succeeded_apps :
170+ success_report_run = Template (_SUCCESS_APP_FORMAT_TEMPLATE ).substitute (
171+ name = app_run .app_def .name , scheduler = app_run .scheduler_info .name
172+ )
173+ success_report += f"{ success_report_run } \n "
174+ fail_report = ""
175+ for app_run , error_msg in failed_apps :
176+ fail_report_run = Template (_FAIL_APP_FORMAT_TEMPLATE ).substitute (
177+ name = app_run .app_def .name ,
178+ provider = app_run .provider ,
179+ scheduler = app_run .scheduler_info .name ,
180+ image = app_run .scheduler_info .image ,
181+ error = error_msg ,
182+ )
183+ fail_report += f"{ fail_report_run } \n "
184+ delim = "*"
185+ width = 80
186+ msg = Template (_REPORT_FORMAT_TEMPLATE ).substitute (
187+ boarder = delim * width ,
188+ success_report = success_report or "<NONE>" ,
189+ fail_report = fail_report or "<NONE>" ,
190+ )
191+ print (msg )
192+ if len (failed_apps ) > 0 :
193+ raise RuntimeError (
194+ "Component test failed, see report above for detailed issue"
195+ )
145196
146197 def _get_app_def_run_status (
147198 self , app_def_run : AppDefRun , timeout : int
148- ) -> Tuple [ str , str ]:
199+ ) -> Optional [ str ]:
149200 try :
150- print (f"Retrieving results from { app_def_run } : { app_def_run .provider_cls } " )
151- app_name = app_def_run .future .result (timeout = timeout )
152- return "succeeded" , f"` { app_name } `:` { app_def_run } `"
153- except Exception as e :
201+ print (f"Retrieving { app_def_run . app_def . name } : { app_def_run .provider } " )
202+ app_def_run .future .result (timeout = timeout )
203+ return None
204+ except Exception :
154205 stack_trace_msg = traceback .format_exc ().replace ("\n " , "\n " )
155-
156- msg = (
157- f"Failure while running: { app_def_run } , provider: `{ app_def_run .provider_cls } `: { e } \n "
158- f"Stacktrace: { stack_trace_msg } \n "
159- )
160- return "failed" , msg
206+ return stack_trace_msg
0 commit comments