@@ -34,10 +34,6 @@ def __init__(self, worker_id: int, process: ProcessProxy) -> None:
3434 self .status : Literal ['initialized' , 'spawned' , 'starting' , 'ready' , 'stopping' , 'exited' , 'error' , 'retired' ] = 'initialized'
3535 self .exit_msg_event = asyncio .Event ()
3636
37- def is_ready (self ) -> bool :
38- """Worker is ready to receive task requests"""
39- return bool (self .status == 'ready' )
40-
4137 async def start (self ) -> None :
4238 if self .status != 'initialized' :
4339 logger .error (f'Worker { self .worker_id } status is not initialized, can not start, status={ self .status } ' )
@@ -52,10 +48,26 @@ async def start(self) -> None:
5248 logger .debug (f'Worker { self .worker_id } pid={ self .process .pid } subprocess has spawned' )
5349 self .status = 'starting' # Not ready until it sends callback message
5450
51+ @property
52+ def is_ready (self ) -> bool :
53+ """Worker is ready to receive task requests"""
54+ return bool (self .status == 'ready' )
55+
5556 @property
5657 def counts_for_capacity (self ) -> bool :
58+ """Worker is ready to accept work or may become ready very soon, relevant for scale-up decisions"""
5759 return bool (self .status in ('initialized' , 'spawned' , 'starting' , 'ready' ))
5860
61+ @property
62+ def expected_alive (self ) -> bool :
63+ """Worker is expected to have an active process"""
64+ return bool (self .status in ('starting' , 'ready' ))
65+
66+ @property
67+ def inactive (self ) -> bool :
68+ """No further shutdown or callback messages are expected from this worker"""
69+ return bool (self .status in ('exited' , 'error' , 'initialized' ))
70+
5971 async def start_task (self , message : dict ) -> None :
6072 self .current_task = message # NOTE: this marks this worker as busy
6173 self .process .message_queue .put (message )
@@ -136,11 +148,6 @@ def mark_finished_task(self) -> None:
136148 self .started_at = None
137149 self .finished_count += 1
138150
139- @property
140- def inactive (self ) -> bool :
141- "Return True if no further shutdown or callback messages are expected from this worker"
142- return self .status in ['exited' , 'error' , 'initialized' ]
143-
144151 def next_wakeup (self ) -> Optional [float ]:
145152 """Used by next-run-runner for setting wakeups for task timeouts"""
146153 if self .is_active_cancel :
@@ -348,7 +355,7 @@ async def manage_old_workers(self) -> None:
348355 remove_ids = []
349356 for worker in current_workers :
350357 # Check if the worker has died unexpectedly.
351- if worker .status not in [ 'retired' , 'error' , 'exited' , 'initialized' , 'spawned' ] and not worker .process .is_alive ():
358+ if worker .expected_alive and not worker .process .is_alive ():
352359 logger .error (f'Worker { worker .worker_id } pid={ worker .process .pid } has died unexpectedly, status was { worker .status } ' )
353360
354361 if worker .current_task :
0 commit comments