9
9
import traceback
10
10
from concurrent .futures import Future , ThreadPoolExecutor
11
11
from dataclasses import dataclass
12
+ from string import Template
12
13
from types import ModuleType
13
- from typing import List , Dict , Optional , Tuple , Type , Callable , cast
14
+ from typing import List , Optional , Tuple , Type , Callable , cast
14
15
15
16
from pyre_extensions import none_throws
16
17
from torchx .components .component_test_base import ComponentUtils
17
18
from torchx .components .integration_tests .component_provider import ComponentProvider
18
- from torchx .specs import RunConfig , SchedulerBackend
19
+ from torchx .specs import RunConfig , SchedulerBackend , AppDef
19
20
20
21
21
22
@dataclass
@@ -27,13 +28,34 @@ class SchedulerInfo:
27
28
28
29
@dataclass
29
30
class AppDefRun :
30
- provider_cls : Type [ ComponentProvider ]
31
+ provider : ComponentProvider
31
32
scheduler_info : SchedulerInfo
33
+ app_def : AppDef
32
34
future : Future
33
35
34
- def __str__ (self ) -> str :
35
- msg = f"`{ self .scheduler_info .name } `:`{ self .scheduler_info .image } `"
36
- return msg
36
+
37
+ _SUCCESS_APP_FORMAT_TEMPLATE = """
38
+ Name: ${name}
39
+ Scheduler: ${scheduler}"""
40
+
41
+ _FAIL_APP_FORMAT_TEMPLATE = """
42
+ Name: ${name}
43
+ Provider: ${provider}
44
+ Scheduler: ${scheduler}
45
+ Image: ${image}
46
+ Error: ${error}"""
47
+
48
+ _REPORT_FORMAT_TEMPLATE = """
49
+ ${boarder}
50
+ Status: Success
51
+ ${boarder}
52
+ ${success_report}
53
+ \n
54
+ ${boarder}
55
+ Status: Failed
56
+ ${boarder}
57
+ ${fail_report}
58
+ """
37
59
38
60
39
61
class IntegComponentTest :
@@ -61,14 +83,14 @@ def run_components(
61
83
dryrun : bool = False ,
62
84
) -> None :
63
85
component_providers_cls = self ._get_component_providers (module )
64
- futures : List [AppDefRun ] = []
86
+ app_runs : List [AppDefRun ] = []
65
87
executor = ThreadPoolExecutor ()
66
88
for scheduler_info in scheduler_infos :
67
89
sched_futures = self ._run_component_providers (
68
90
executor , component_providers_cls , scheduler_info , dryrun
69
91
)
70
- futures += sched_futures
71
- self ._wait_and_print_report (futures )
92
+ app_runs += sched_futures
93
+ self ._wait_and_print_report (app_runs )
72
94
73
95
def _get_component_providers (
74
96
self , module : ModuleType
@@ -91,70 +113,94 @@ def _run_component_providers(
91
113
scheduler_info : SchedulerInfo ,
92
114
dryrun : bool = False ,
93
115
) -> List [AppDefRun ]:
94
- futures : List [AppDefRun ] = []
116
+ app_runs : List [AppDefRun ] = []
95
117
for provider_cls in component_providers_cls :
118
+ provider = self ._get_app_def_provider (provider_cls , scheduler_info )
96
119
future = executor .submit (
97
- self ._run_component_provider , provider_cls , scheduler_info , dryrun
120
+ self ._run_component_provider , provider , scheduler_info , dryrun
98
121
)
99
- futures .append (
122
+ app_runs .append (
100
123
AppDefRun (
101
- provider_cls = provider_cls ,
124
+ provider = provider ,
102
125
scheduler_info = scheduler_info ,
103
126
future = future ,
127
+ app_def = provider .get_app_def (),
104
128
)
105
129
)
106
- return futures
130
+ return app_runs
107
131
108
- def _run_component_provider (
132
+ def _get_app_def_provider (
109
133
self ,
110
134
component_provider_cls : Type [ComponentProvider ],
111
135
scheduler_info : SchedulerInfo ,
136
+ ) -> ComponentProvider :
137
+ provider_cls = cast (Callable [..., ComponentProvider ], component_provider_cls )
138
+ return none_throws (provider_cls (scheduler_info .name , scheduler_info .image ))
139
+
140
+ def _run_component_provider (
141
+ self ,
142
+ provider : ComponentProvider ,
143
+ scheduler_info : SchedulerInfo ,
112
144
dryrun : bool = False ,
113
- ) -> str :
114
- provider : Optional [ComponentProvider ] = None
145
+ ) -> None :
115
146
try :
116
- provider_cls = cast (
117
- Callable [..., ComponentProvider ], component_provider_cls
118
- )
119
- provider = none_throws (
120
- provider_cls (scheduler_info .name , scheduler_info .image )
121
- )
122
147
provider .setUp ()
123
- app_def = provider .get_app_def ()
124
148
ComponentUtils .run_appdef_on_scheduler (
125
- app_def , scheduler_info .name , scheduler_info .runconfig , dryrun
149
+ provider .get_app_def (),
150
+ scheduler_info .name ,
151
+ scheduler_info .runconfig ,
152
+ dryrun ,
126
153
)
127
- return app_def .name
128
154
finally :
129
- if provider :
130
- provider .tearDown ()
155
+ provider .tearDown ()
131
156
132
- def _wait_and_print_report (self , futures : List [AppDefRun ]) -> None :
133
- app_runs : Dict [str , List [str ]] = {}
157
+ def _wait_and_print_report (self , app_runs : List [AppDefRun ]) -> None :
158
+ succeeded_apps : List [AppDefRun ] = []
159
+ failed_apps : List [Tuple [AppDefRun , str ]] = []
134
160
deadline = time .monotonic () + self ._timeout
135
- for future in futures :
161
+ for app_run in app_runs :
136
162
task_timeout = max (0 , int (deadline - time .monotonic ()))
137
- status , msg = self ._get_app_def_run_status (future , task_timeout )
138
- if status not in app_runs :
139
- app_runs [status ] = []
140
- app_runs [status ].append (msg )
141
- for status , msgs in app_runs .items ():
142
- print (f"\n Status: `{ status } `\n " )
143
- print ("\n " .join (msgs ))
144
- print ("\n " )
163
+ error_msg = self ._get_app_def_run_status (app_run , task_timeout )
164
+ if not error_msg :
165
+ succeeded_apps .append (app_run )
166
+ else :
167
+ failed_apps .append ((app_run , error_msg ))
168
+ success_report = ""
169
+ for app_run in succeeded_apps :
170
+ success_report_run = Template (_SUCCESS_APP_FORMAT_TEMPLATE ).substitute (
171
+ name = app_run .app_def .name , scheduler = app_run .scheduler_info .name
172
+ )
173
+ success_report += f"{ success_report_run } \n "
174
+ fail_report = ""
175
+ for app_run , error_msg in failed_apps :
176
+ fail_report_run = Template (_FAIL_APP_FORMAT_TEMPLATE ).substitute (
177
+ name = app_run .app_def .name ,
178
+ provider = app_run .provider ,
179
+ scheduler = app_run .scheduler_info .name ,
180
+ image = app_run .scheduler_info .image ,
181
+ error = error_msg ,
182
+ )
183
+ fail_report += f"{ fail_report_run } \n "
184
+ delim = "*"
185
+ width = 80
186
+ msg = Template (_REPORT_FORMAT_TEMPLATE ).substitute (
187
+ boarder = delim * width ,
188
+ success_report = success_report or "<NONE>" ,
189
+ fail_report = fail_report or "<NONE>" ,
190
+ )
191
+ print (msg )
192
+ if len (failed_apps ) > 0 :
193
+ raise RuntimeError (
194
+ "Component test failed, see report above for detailed issue"
195
+ )
145
196
146
197
def _get_app_def_run_status (
147
198
self , app_def_run : AppDefRun , timeout : int
148
- ) -> Tuple [ str , str ]:
199
+ ) -> Optional [ str ]:
149
200
try :
150
- print (f"Retrieving results from { app_def_run } : { app_def_run .provider_cls } " )
151
- app_name = app_def_run .future .result (timeout = timeout )
152
- return "succeeded" , f"` { app_name } `:` { app_def_run } `"
153
- except Exception as e :
201
+ print (f"Retrieving { app_def_run . app_def . name } : { app_def_run .provider } " )
202
+ app_def_run .future .result (timeout = timeout )
203
+ return None
204
+ except Exception :
154
205
stack_trace_msg = traceback .format_exc ().replace ("\n " , "\n " )
155
-
156
- msg = (
157
- f"Failure while running: { app_def_run } , provider: `{ app_def_run .provider_cls } `: { e } \n "
158
- f"Stacktrace: { stack_trace_msg } \n "
159
- )
160
- return "failed" , msg
206
+ return stack_trace_msg
0 commit comments