@@ -118,14 +118,17 @@ def __init__(
118118 self .redis_namespace = redis_namespace
119119 self .redis_settings = redis_settings
120120
121+ # stale_tasks_monitor
121122 self ._task_stale_tasks_monitor : asyncio .Task | None = None
122123 self ._started_event_task_stale_tasks_monitor = asyncio .Event ()
123124
125+ # cancelled_tasks_removal
124126 self ._task_cancelled_tasks_removal : asyncio .Task | None = None
125127 self ._started_event_task_cancelled_tasks_removal = asyncio .Event ()
126128
127- self ._task_status_update_worker : asyncio .Task | None = None
128- self ._started_event_task_status_update_worker = asyncio .Event ()
129+ # status_update
130+ self ._task_status_update : asyncio .Task | None = None
131+ self ._started_event_task_status_update = asyncio .Event ()
129132
130133 self .redis_client_sdk : RedisClientSDK | None = None
131134
@@ -142,25 +145,25 @@ async def setup(self) -> None:
142145 task = exclusive (
143146 self .redis_client_sdk ,
144147 lock_key = f"{ __name__ } _{ self .redis_namespace } _stale_tasks_monitor" ,
145- )(self ._stale_tasks_monitor_worker ),
148+ )(self ._stale_tasks_monitor ),
146149 interval = self .stale_task_check_interval ,
147- task_name = f"{ __name__ } .{ self ._stale_tasks_monitor_worker .__name__ } " ,
150+ task_name = f"{ __name__ } .{ self ._stale_tasks_monitor .__name__ } " ,
148151 )
149152
150153 await self ._started_event_task_stale_tasks_monitor .wait ()
151154 self ._task_cancelled_tasks_removal = create_periodic_task (
152- task = self ._cancelled_tasks_removal_worker ,
155+ task = self ._cancelled_tasks_removal ,
153156 interval = _CANCEL_TASKS_CHECK_INTERVAL ,
154- task_name = f"{ __name__ } .{ self ._cancelled_tasks_removal_worker .__name__ } " ,
157+ task_name = f"{ __name__ } .{ self ._cancelled_tasks_removal .__name__ } " ,
155158 )
156159 await self ._started_event_task_cancelled_tasks_removal .wait ()
157160
158- self ._task_status_update_worker = create_periodic_task (
159- task = self ._status_update_worker ,
161+ self ._task_status_update = create_periodic_task (
162+ task = self ._status_update ,
160163 interval = _STATUS_UPDATE_CHECK_INTERNAL ,
161- task_name = f"{ __name__ } .{ self ._status_update_worker .__name__ } " ,
164+ task_name = f"{ __name__ } .{ self ._status_update .__name__ } " ,
162165 )
163- await self ._started_event_task_status_update_worker .wait ()
166+ await self ._started_event_task_status_update .wait ()
164167
165168 async def teardown (self ) -> None :
166169 for tracked_task in await self ._tasks_data .list_tasks_data ():
@@ -182,15 +185,15 @@ async def teardown(self) -> None:
182185 if self ._task_cancelled_tasks_removal :
183186 await cancel_wait_task (self ._task_cancelled_tasks_removal )
184187
185- if self ._task_status_update_worker :
186- await cancel_wait_task (self ._task_status_update_worker )
188+ if self ._task_status_update :
189+ await cancel_wait_task (self ._task_status_update )
187190
188191 if self .redis_client_sdk is not None :
189192 await self .redis_client_sdk .shutdown ()
190193
191194 await self ._tasks_data .shutdown ()
192195
193- async def _stale_tasks_monitor_worker (self ) -> None :
196+ async def _stale_tasks_monitor (self ) -> None :
194197 """
195198 A task is considered stale, if the task status is not queried
196199 in the last `stale_task_detect_timeout_s` and it is not a fire and forget type of task.
@@ -231,7 +234,7 @@ async def _stale_tasks_monitor_worker(self) -> None:
231234 task_id , with_task_context = task_context , reraise_errors = False
232235 )
233236
234- async def _cancelled_tasks_removal_worker (self ) -> None :
237+ async def _cancelled_tasks_removal (self ) -> None :
235238 """
236239 A task can be cancelled by the client, which implies it does not for sure
237240 run in the same process as the one processing the request.
@@ -244,12 +247,12 @@ async def _cancelled_tasks_removal_worker(self) -> None:
244247 for task_id , task_context in cancelled_tasks .items ():
245248 await self .remove_task (task_id , task_context , reraise_errors = False )
246249
247- async def _status_update_worker (self ) -> None :
250+ async def _status_update (self ) -> None :
248251 """
249- Worker which monitors locally running tasks and updates their status
252+ A task which monitors locally running tasks and updates their status
250253 in the Redis store when they are done.
251254 """
252- self ._started_event_task_status_update_worker .set ()
255+ self ._started_event_task_status_update .set ()
253256 task_id : TaskId
254257 for task_id in set (self ._created_tasks .keys ()):
255258 if task := self ._created_tasks .get (task_id , None ):
0 commit comments