1- import os , typing , shutil , time , itertools , signal
1+ import os , typing , shutil , time , itertools , threading
22from random import sample , seed
33
44import rich , rich .table
3030# Per-test timeout (1 hour)
3131TEST_TIMEOUT_SECONDS = 3600
3232
33+ # Global abort flag for thread-safe early termination
34+ # This flag is set when the failure rate exceeds the threshold, signaling
35+ # all worker threads to exit gracefully. This avoids raising exceptions
36+ # from worker threads which could leave the scheduler in an undefined state.
37+ abort_tests = threading .Event ()
38+
3339class TestTimeoutError (MFCException ):
3440 pass
3541
36- def timeout_handler (signum , frame ):
37- raise TestTimeoutError ("Test case exceeded 1 hour timeout" )
38-
3942# pylint: disable=too-many-branches, trailing-whitespace
4043def __filter (cases_ ) -> typing .List [TestCase ]:
4144 cases = cases_ [:]
@@ -173,6 +176,15 @@ def test():
173176 [ sched .Task (ppn = case .ppn , func = handle_case , args = [case ], load = case .get_cell_count ()) for case in cases ],
174177 ARG ("jobs" ), ARG ("gpus" ))
175178
179+ # Check if we aborted due to high failure rate
180+ if abort_tests .is_set ():
181+ total_completed = nFAIL + nPASS
182+ cons .print ()
183+ cons .unindent ()
184+ raise MFCException (
185+ f"Excessive test failures: { nFAIL } /{ total_completed } failed ({ nFAIL / total_completed * 100 :.1f} %)"
186+ )
187+
176188 nSKIP = len (skipped_cases )
177189 cons .print ()
178190 cons .unindent ()
@@ -199,22 +211,32 @@ def _handle_case(case: TestCase, devices: typing.Set[int]):
199211 global current_test_number
200212 start_time = time .time ()
201213
202- # Set timeout alarm
203- signal .signal (signal .SIGALRM , timeout_handler )
204- signal .alarm (TEST_TIMEOUT_SECONDS )
214+ # Set timeout using threading.Timer (works in worker threads)
215+ # Note: signal.alarm() only works in the main thread, so we use
216+ # threading.Timer which works correctly in worker threads spawned by sched.sched
217+ timeout_flag = threading .Event ()
218+ timeout_timer = threading .Timer (TEST_TIMEOUT_SECONDS , timeout_flag .set )
219+ timeout_timer .start ()
205220
206221 tol = case .compute_tolerance ()
207222 case .delete_output ()
208223 case .create_directory ()
209224
210225 if ARG ("dry_run" ):
211226 cons .print (f" [bold magenta]{ case .get_uuid ()} [/bold magenta] SKIP { case .trace } " )
212- signal . alarm ( 0 ) # Cancel alarm
227+ timeout_timer . cancel ()
213228 return
214229
215230 try :
231+ # Check timeout before starting
232+ if timeout_flag .is_set ():
233+ raise TestTimeoutError ("Test case exceeded 1 hour timeout" )
216234 cmd = case .run ([PRE_PROCESS , SIMULATION ], gpus = devices )
217235
236+ # Check timeout after simulation
237+ if timeout_flag .is_set ():
238+ raise TestTimeoutError ("Test case exceeded 1 hour timeout" )
239+
218240 out_filepath = os .path .join (case .get_dirpath (), "out_pre_sim.txt" )
219241
220242 common .file_write (out_filepath , cmd .stdout )
@@ -261,26 +283,28 @@ def _handle_case(case: TestCase, devices: typing.Set[int]):
261283 out_filepath = os .path .join (case .get_dirpath (), "out_post.txt" )
262284 common .file_write (out_filepath , cmd .stdout )
263285
264- for silo_filepath in os .listdir (os .path .join (case .get_dirpath (), 'silo_hdf5' , 'p0' )):
265- silo_filepath = os .path .join (case .get_dirpath (), 'silo_hdf5' , 'p0' , silo_filepath )
266- h5dump = f"{ HDF5 .get_install_dirpath (case .to_input_file ())} /bin/h5dump"
286+ silo_dir = os .path .join (case .get_dirpath (), 'silo_hdf5' , 'p0' )
287+ if os .path .isdir (silo_dir ):
288+ for silo_filename in os .listdir (silo_dir ):
289+ silo_filepath = os .path .join (silo_dir , silo_filename )
290+ h5dump = f"{ HDF5 .get_install_dirpath (case .to_input_file ())} /bin/h5dump"
267291
268- if not os .path .exists (h5dump or "" ):
269- if not does_command_exist ("h5dump" ):
270- raise MFCException ("h5dump couldn't be found." )
292+ if not os .path .exists (h5dump or "" ):
293+ if not does_command_exist ("h5dump" ):
294+ raise MFCException ("h5dump couldn't be found." )
271295
272- h5dump = shutil .which ("h5dump" )
296+ h5dump = shutil .which ("h5dump" )
273297
274- output , err = get_program_output ([h5dump , silo_filepath ])
298+ output , err = get_program_output ([h5dump , silo_filepath ])
275299
276- if err != 0 :
277- raise MFCException (f"Test { case } : Failed to run h5dump. You can find the run's output in { out_filepath } , and the case dictionary in { case .get_filepath ()} ." )
300+ if err != 0 :
301+ raise MFCException (f"Test { case } : Failed to run h5dump. You can find the run's output in { out_filepath } , and the case dictionary in { case .get_filepath ()} ." )
278302
279- if "nan," in output :
280- raise MFCException (f"Test { case } : Post Process has detected a NaN. You can find the run's output in { out_filepath } , and the case dictionary in { case .get_filepath ()} ." )
303+ if "nan," in output :
304+ raise MFCException (f"Test { case } : Post Process has detected a NaN. You can find the run's output in { out_filepath } , and the case dictionary in { case .get_filepath ()} ." )
281305
282- if "inf," in output :
283- raise MFCException (f"Test { case } : Post Process has detected an Infinity. You can find the run's output in { out_filepath } , and the case dictionary in { case .get_filepath ()} ." )
306+ if "inf," in output :
307+ raise MFCException (f"Test { case } : Post Process has detected an Infinity. You can find the run's output in { out_filepath } , and the case dictionary in { case .get_filepath ()} ." )
284308
285309 case .delete_output ()
286310
@@ -298,14 +322,18 @@ def _handle_case(case: TestCase, devices: typing.Set[int]):
298322 f"Check the log at: { os .path .join (case .get_dirpath (), 'out_pre_sim.txt' )} "
299323 ) from exc
300324 finally :
301- signal . alarm ( 0 ) # Cancel alarm
325+ timeout_timer . cancel ( ) # Cancel timeout timer
302326
303327
304328def handle_case (case : TestCase , devices : typing .Set [int ]):
305329 # pylint: disable=global-statement, global-variable-not-assigned
306330 global nFAIL , nPASS , nSKIP
307331 global errors
308332
333+ # Check if we should abort before processing this case
334+ if abort_tests .is_set ():
335+ return # Exit gracefully if abort was requested
336+
309337 nAttempts = 0
310338 if ARG ('single' ):
311339 max_attempts = max (ARG ('max_attempts' ), 3 )
@@ -337,10 +365,10 @@ def handle_case(case: TestCase, devices: typing.Set[int]):
337365 failure_rate = nFAIL / total_completed
338366 if failure_rate >= FAILURE_RATE_THRESHOLD :
339367 cons .print (f"\n [bold red]CRITICAL: { failure_rate * 100 :.1f} % failure rate detected after { total_completed } tests.[/bold red]" )
340- cons .print (f "[bold red]This suggests a systemic issue (bad build, broken environment, etc.)[/bold red]" )
341- cons .print (f "[bold red]Aborting remaining tests to fail fast.[/bold red]\n " )
342- raise MFCException (
343- f"Excessive test failures: { nFAIL } / { total_completed } failed ( { failure_rate * 100 :.1f } %)"
344- )
368+ cons .print ("[bold red]This suggests a systemic issue (bad build, broken environment, etc.)[/bold red]" )
369+ cons .print ("[bold red]Aborting remaining tests to fail fast.[/bold red]\n " )
370+ # Set abort flag instead of raising exception from worker thread
371+ abort_tests . set ()
372+ return # Exit gracefully
345373
346374 return
0 commit comments