@@ -79,8 +79,12 @@ class MultiprocessResult:
7979 err_msg : str | None = None
8080
8181
82+ class WorkerThreadExited :
83+ """Indicates that a worker thread has exited"""
84+
8285ExcStr = str
8386QueueOutput = tuple [Literal [False ], MultiprocessResult ] | tuple [Literal [True ], ExcStr ]
87+ QueueContent = QueueOutput | WorkerThreadExited
8488
8589
8690class ExitThread (Exception ):
@@ -376,8 +380,8 @@ def _runtest(self, test_name: TestName) -> MultiprocessResult:
376380 def run (self ) -> None :
377381 fail_fast = self .runtests .fail_fast
378382 fail_env_changed = self .runtests .fail_env_changed
379- while not self . _stopped :
380- try :
383+ try :
384+ while not self . _stopped :
381385 try :
382386 test_name = next (self .pending )
383387 except StopIteration :
@@ -396,11 +400,12 @@ def run(self) -> None:
396400
397401 if mp_result .result .must_stop (fail_fast , fail_env_changed ):
398402 break
399- except ExitThread :
400- break
401- except BaseException :
402- self .output .put ((True , traceback .format_exc ()))
403- break
403+ except ExitThread :
404+ pass
405+ except BaseException :
406+ self .output .put ((True , traceback .format_exc ()))
407+ finally :
408+ self .output .put (WorkerThreadExited ())
404409
405410 def _wait_completed (self ) -> None :
406411 popen = self ._popen
@@ -458,8 +463,9 @@ def __init__(self, num_workers: int, runtests: RunTests,
458463 self .log = logger .log
459464 self .display_progress = logger .display_progress
460465 self .results : TestResults = results
466+ self .live_worker_count = 0
461467
462- self .output : queue .Queue [QueueOutput ] = queue .Queue ()
468+ self .output : queue .Queue [QueueContent ] = queue .Queue ()
463469 tests_iter = runtests .iter_tests ()
464470 self .pending = MultiprocessIterator (tests_iter )
465471 self .timeout = runtests .timeout
@@ -497,6 +503,7 @@ def start_workers(self) -> None:
497503 self .log (msg )
498504 for worker in self .workers :
499505 worker .start ()
506+ self .live_worker_count += 1
500507
501508 def stop_workers (self ) -> None :
502509 start_time = time .monotonic ()
@@ -511,14 +518,18 @@ def _get_result(self) -> QueueOutput | None:
511518
512519 # bpo-46205: check the status of workers every iteration to avoid
513520 # waiting forever on an empty queue.
514- while any ( worker . is_alive () for worker in self . workers ) :
521+ while self . live_worker_count > 0 :
515522 if use_faulthandler :
516523 faulthandler .dump_traceback_later (MAIN_PROCESS_TIMEOUT ,
517524 exit = True )
518525
519526 # wait for a thread
520527 try :
521- return self .output .get (timeout = PROGRESS_UPDATE )
528+ result = self .output .get (timeout = PROGRESS_UPDATE )
529+ if isinstance (result , WorkerThreadExited ):
530+ self .live_worker_count -= 1
531+ continue
532+ return result
522533 except queue .Empty :
523534 pass
524535
@@ -528,12 +539,6 @@ def _get_result(self) -> QueueOutput | None:
528539 if running :
529540 self .log (running )
530541
531- # all worker threads are done: consume pending results
532- try :
533- return self .output .get (timeout = 0 )
534- except queue .Empty :
535- return None
536-
537542 def display_result (self , mp_result : MultiprocessResult ) -> None :
538543 result = mp_result .result
539544 pgo = self .runtests .pgo
0 commit comments