33import inspect
44import logging
55import signal
6+ from collections .abc import Sequence
67from dataclasses import dataclass
78from datetime import datetime , timedelta , timezone
89from functools import partial
910from signal import Signals
1011from time import time
11- from typing import TYPE_CHECKING , Any , Callable , Dict , List , Optional , Sequence , Set , Tuple , Union , cast
12+ from typing import TYPE_CHECKING , Any , Callable , Optional , Union , cast
1213
1314from redis .exceptions import ResponseError , WatchError
1415
@@ -118,7 +119,7 @@ def __eq__(self, other: Any) -> bool:
118119
119120
120121class FailedJobs (RuntimeError ):
121- def __init__ (self , count : int , job_results : List [JobResult ]):
122+ def __init__ (self , count : int , job_results : list [JobResult ]):
122123 self .count = count
123124 self .job_results = job_results
124125
@@ -208,7 +209,7 @@ def __init__(
208209 max_tries : int = 5 ,
209210 health_check_interval : 'SecondsTimedelta' = 3600 ,
210211 health_check_key : Optional [str ] = None ,
211- ctx : Optional [Dict [Any , Any ]] = None ,
212+ ctx : Optional [dict [Any , Any ]] = None ,
212213 retry_jobs : bool = True ,
213214 allow_abort_jobs : bool = False ,
214215 max_burst_jobs : int = - 1 ,
@@ -218,14 +219,14 @@ def __init__(
218219 timezone : Optional [timezone ] = None ,
219220 log_results : bool = True ,
220221 ):
221- self .functions : Dict [str , Union [Function , CronJob ]] = {f .name : f for f in map (func , functions )}
222+ self .functions : dict [str , Union [Function , CronJob ]] = {f .name : f for f in map (func , functions )}
222223 if queue_name is None :
223224 if redis_pool is not None :
224225 queue_name = redis_pool .default_queue_name
225226 else :
226227 raise ValueError ('If queue_name is absent, redis_pool must be present.' )
227228 self .queue_name = queue_name
228- self .cron_jobs : List [CronJob ] = []
229+ self .cron_jobs : list [CronJob ] = []
229230 if cron_jobs is not None :
230231 if not all (isinstance (cj , CronJob ) for cj in cron_jobs ):
231232 raise RuntimeError ('cron_jobs, must be instances of CronJob' )
@@ -262,9 +263,9 @@ def __init__(
262263 else :
263264 self .redis_settings = None
264265 # self.tasks holds references to run_job coroutines currently running
265- self .tasks : Dict [str , asyncio .Task [Any ]] = {}
266+ self .tasks : dict [str , asyncio .Task [Any ]] = {}
266267 # self.job_tasks holds references the actual jobs running
267- self .job_tasks : Dict [str , asyncio .Task [Any ]] = {}
268+ self .job_tasks : dict [str , asyncio .Task [Any ]] = {}
268269 self .main_task : Optional [asyncio .Task [None ]] = None
269270 self .loop = asyncio .get_event_loop ()
270271 self .ctx = ctx or {}
@@ -289,7 +290,7 @@ def __init__(
289290 self .retry_jobs = retry_jobs
290291 self .allow_abort_jobs = allow_abort_jobs
291292 self .allow_pick_jobs : bool = True
292- self .aborting_tasks : Set [str ] = set ()
293+ self .aborting_tasks : set [str ] = set ()
293294 self .max_burst_jobs = max_burst_jobs
294295 self .job_serializer = job_serializer
295296 self .job_deserializer = job_deserializer
@@ -409,7 +410,7 @@ async def _cancel_aborted_jobs(self) -> None:
409410 pipe .zremrangebyscore (abort_jobs_ss , min = timestamp_ms () + abort_job_max_age , max = float ('inf' ))
410411 abort_job_ids , _ = await pipe .execute ()
411412
412- aborted : Set [str ] = set ()
413+ aborted : set [str ] = set ()
413414 for job_id_bytes in abort_job_ids :
414415 job_id = job_id_bytes .decode ()
415416 try :
@@ -428,7 +429,7 @@ def _release_sem_dec_counter_on_complete(self) -> None:
428429 self .job_counter = self .job_counter - 1
429430 self .sem .release ()
430431
431- async def start_jobs (self , job_ids : List [bytes ]) -> None :
432+ async def start_jobs (self , job_ids : list [bytes ]) -> None :
432433 """
433434 For each job id, get the job definition, check it's not running and start it in a task
434435 """
@@ -484,8 +485,8 @@ async def run_job(self, job_id: str, score: int) -> None: # noqa: C901
484485 abort_job = False
485486
486487 function_name , enqueue_time_ms = '<unknown>' , 0
487- args : Tuple [Any , ...] = ()
488- kwargs : Dict [Any , Any ] = {}
488+ args : tuple [Any , ...] = ()
489+ kwargs : dict [Any , Any ] = {}
489490
490491 async def job_failed (exc : BaseException ) -> None :
491492 self .jobs_failed += 1
@@ -879,7 +880,7 @@ def __repr__(self) -> str:
879880 )
880881
881882
882- def get_kwargs (settings_cls : 'WorkerSettingsType' ) -> Dict [str , NameError ]:
883+ def get_kwargs (settings_cls : 'WorkerSettingsType' ) -> dict [str , NameError ]:
883884 worker_args = set (inspect .signature (Worker ).parameters .keys ())
884885 d = settings_cls if isinstance (settings_cls , dict ) else settings_cls .__dict__
885886 return {k : v for k , v in d .items () if k in worker_args }
0 commit comments