55from queue import Empty
66from queue import Queue
77import sys
8+ import traceback
89from typing import Any
10+ from typing import Callable
911from typing import Sequence
1012import warnings
11- import traceback
1213
1314import execnet
1415import pytest
1516
1617from xdist .remote import Producer
1718from xdist .remote import WorkerInfo
19+ from xdist .scheduler import CustomGroup
1820from xdist .scheduler import EachScheduling
1921from xdist .scheduler import LoadFileScheduling
2022from xdist .scheduler import LoadGroupScheduling
2123from xdist .scheduler import LoadScheduling
2224from xdist .scheduler import LoadScopeScheduling
2325from xdist .scheduler import Scheduling
2426from xdist .scheduler import WorkStealingScheduling
25- from xdist .scheduler import CustomGroup
2627from xdist .workermanage import NodeManager
2728from xdist .workermanage import WorkerController
2829
@@ -60,14 +61,14 @@ def __init__(self, config: pytest.Config) -> None:
6061 self ._failed_collection_errors : dict [object , bool ] = {}
6162 self ._active_nodes : set [WorkerController ] = set ()
6263 self ._failed_nodes_count = 0
63- self .saved_put = None
64+ self .saved_put : Callable [[ tuple [ str , dict [ str , Any ]]], None ]
6465 self .remake_nodes = False
6566 self .ready_to_run_tests = False
6667 self ._max_worker_restart = get_default_max_worker_restart (self .config )
6768 # summary message to print at the end of the session
6869 self ._summary_report : str | None = None
6970 self .terminal = config .pluginmanager .getplugin ("terminalreporter" )
70- self .worker_status : dict [WorkerController , str ] = {}
71+ self .worker_status : dict [str , str ] = {}
7172 if self .terminal :
7273 self .trdist = TerminalDistReporter (config )
7374 config .pluginmanager .register (self .trdist , "terminaldistreporter" )
@@ -180,68 +181,75 @@ def loop_once(self) -> None:
180181 self .triggershutdown ()
181182
182183
183- def is_node_finishing (self , node : WorkerController ):
184+ def is_node_finishing (self , node : WorkerController ) -> bool :
184185 """Check if a test worker is considered to be finishing.
185186
186187 Evaluate whether it's on its last test, or if no tests are pending.
187188 """
189+ assert self .sched is not None
190+ assert type (self .sched ) is CustomGroup
188191 pending = self .sched .node2pending .get (node )
189192 return pending is not None and len (pending ) < 2
190193
191194
192- def is_node_clear (self , node : WorkerController ):
193- """Check if a test worker has no pending tests."""
194- pending = self .sched .node2pending .get (node )
195- return pending is None or len (pending ) == 0
196-
197-
198- def are_all_nodes_finishing (self ):
195+ def are_all_nodes_finishing (self ) -> bool :
199196 """Check if all workers are finishing (See 'is_node_finishing' above)."""
197+ assert self .sched is not None
200198 return all (self .is_node_finishing (node ) for node in self .sched .nodes )
201199
202200
203- def are_all_nodes_done (self ):
201+ def are_all_nodes_done (self ) -> bool :
204202 """Check if all nodes have reported to finish."""
205203 return all (s == "finished" for s in self .worker_status .values ())
206204
207205
208- def are_all_active_nodes_collected (self ):
206+ def are_all_active_nodes_collected (self ) -> bool :
209207 """Check if all nodes have reported collection to be complete."""
210208 if not all (n .gateway .id in self .worker_status for n in self ._active_nodes ):
211209 return False
212210 return all (self .worker_status [n .gateway .id ] == "collected" for n in self ._active_nodes )
213211
214212
215- def reset_nodes_if_needed (self ):
213+ def reset_nodes_if_needed (self ) -> None :
214+ assert self .sched is not None
215+ assert type (self .sched ) is CustomGroup
216216 if self .are_all_nodes_finishing () and self .ready_to_run_tests and not self .sched .do_resched :
217217 self .reset_nodes ()
218218
219219
220- def reset_nodes (self ):
220+ def reset_nodes (self ) -> None :
221221 """Issue shutdown notices to workers for rescheduling purposes."""
222+ assert self .sched is not None
223+ assert type (self .sched ) is CustomGroup
222224 if len (self .sched .pending ) != 0 :
223225 self .remake_nodes = True
224226 for node in self .sched .nodes :
225227 if self .is_node_finishing (node ):
226228 node .shutdown ()
227229
228230
229- def reschedule (self ):
231+ def reschedule (self ) -> None :
230232 """Reschedule tests."""
233+ assert self .sched is not None
234+ assert type (self .sched ) is CustomGroup
231235 self .sched .do_resched = False
232236 self .sched .check_schedule (self .sched .nodes [0 ], 1.0 , True )
233237
234238
235- def prepare_for_reschedule (self ):
239+ def prepare_for_reschedule (self ) -> None :
236240 """Update test workers and their status tracking so rescheduling is ready."""
241+ assert type (self .sched ) is CustomGroup
242+ assert self .sched is not None
237243 self .remake_nodes = False
238244 num_workers = self .sched .dist_groups [self .sched .pending_groups [0 ]]['group_workers' ]
239245 self .trdist ._status = {}
246+ assert self .nodemanager is not None
240247 new_nodes = self .nodemanager .setup_nodes (self .saved_put , num_workers )
241248 self .worker_status = {}
242249 self ._active_nodes = set ()
243250 self ._active_nodes .update (new_nodes )
244251 self .sched .node2pending = {}
252+ assert type (self .sched ) is CustomGroup
245253 self .sched .do_resched = True
246254
247255 #
@@ -287,7 +295,9 @@ def worker_workerfinished(self, node: WorkerController) -> None:
287295 try :
288296 self .prepare_for_reschedule ()
289297 except Exception as e :
290- self .shouldstop = f"Exception caught during preparation for rescheduling. Giving up.\n { '' .join (traceback .format_exception (e ))} "
298+ msg = ("Exception caught during preparation for rescheduling. Giving up."
299+ f"\n { '' .join (traceback .format_exception (e ))} " )
300+ self .shouldstop = msg
291301 return
292302 self .config .hook .pytest_testnodedown (node = node , error = None )
293303 if node .workeroutput ["exitstatus" ] == 2 : # keyboard-interrupt
@@ -308,10 +318,11 @@ def worker_workerfinished(self, node: WorkerController) -> None:
308318 assert not crashitem , (crashitem , node )
309319 self ._active_nodes .remove (node )
310320
311- def update_worker_status (self , node , status ) :
321+ def update_worker_status (self , node : WorkerController , status : str ) -> None :
312322 """Track the worker status.
313323
314- Can be used at callbacks like 'worker_workerfinished' so we remember wchic event was reported last by each worker.
324+ Can be used at callbacks like 'worker_workerfinished' so we remember wchic event
325+ was reported last by each worker.
315326 """
316327 self .worker_status [node .workerinfo ["id" ]] = status
317328
0 commit comments