77
88from redis import WatchError
99
10- from scheduler .broker_types import ConnectionType , FunctionReferenceType
1110from scheduler .helpers .callback import Callback
1211from scheduler .helpers .utils import utcnow , current_timestamp
1312from scheduler .redis_models import (
2221)
2322from scheduler .redis_models import JobStatus , SchedulerLock , Result , ResultType , JobModel
2423from scheduler .settings import logger , SCHEDULER_CONFIG
24+ from scheduler .types import ConnectionType , FunctionReferenceType
2525
2626
2727class InvalidJobOperation (Exception ):
@@ -100,6 +100,7 @@ def clean_registries(self, timestamp: Optional[float] = None) -> None:
100100 Removed jobs are added to the global failed job queue.
101101 """
102102 before_score = timestamp or current_timestamp ()
103+ self .queued_job_registry .compact ()
103104 started_jobs : List [Tuple [str , float ]] = self .active_job_registry .get_job_names_before (
104105 self .connection , before_score
105106 )
@@ -119,15 +120,12 @@ def clean_registries(self, timestamp: Optional[float] = None) -> None:
119120
120121 else :
121122 logger .warning (
122- f"{ self .__class__ .__name__ } cleanup: Moving job to { self .failed_job_registry .key } "
123- f"(due to AbandonedJobError)"
123+ f"Queue cleanup: Moving job to { self .failed_job_registry .key } (due to AbandonedJobError)"
124124 )
125- job .set_status (JobStatus .FAILED , connection = pipeline )
126125 exc_string = (
127126 f"Moved to { self .failed_job_registry .key } , due to AbandonedJobError, at { datetime .now ()} "
128127 )
129- job .save (connection = pipeline )
130- job .expire (ttl = - 1 , connection = pipeline )
128+ job .status = JobStatus .FAILED
131129 score = current_timestamp () + SCHEDULER_CONFIG .DEFAULT_FAILURE_TTL
132130 Result .create (
133131 connection = pipeline ,
@@ -138,8 +136,8 @@ def clean_registries(self, timestamp: Optional[float] = None) -> None:
138136 exc_string = exc_string ,
139137 )
140138 self .failed_job_registry .add (pipeline , job .name , score )
141- job .save (connection = pipeline )
142139 job .expire (connection = pipeline , ttl = SCHEDULER_CONFIG .DEFAULT_FAILURE_TTL )
140+ job .save (connection = pipeline )
143141
144142 for registry in self .REGISTRIES .values ():
145143 getattr (self , registry ).cleanup (connection = self .connection , timestamp = before_score )
@@ -188,24 +186,24 @@ def get_all_jobs(self) -> List[JobModel]:
188186 return JobModel .get_many (job_names , connection = self .connection )
189187
190188 def create_and_enqueue_job (
191- self ,
192- func : FunctionReferenceType ,
193- args : Union [Tuple , List , None ] = None ,
194- kwargs : Optional [Dict ] = None ,
195- timeout : Optional [int ] = None ,
196- result_ttl : Optional [int ] = None ,
197- job_info_ttl : Optional [int ] = None ,
198- description : Optional [str ] = None ,
199- name : Optional [str ] = None ,
200- at_front : bool = False ,
201- meta : Optional [Dict ] = None ,
202- on_success : Optional [Callback ] = None ,
203- on_failure : Optional [Callback ] = None ,
204- on_stopped : Optional [Callback ] = None ,
205- task_type : Optional [str ] = None ,
206- scheduled_task_id : Optional [int ] = None ,
207- when : Optional [datetime ] = None ,
208- pipeline : Optional [ConnectionType ] = None ,
189+ self ,
190+ func : FunctionReferenceType ,
191+ args : Union [Tuple , List , None ] = None ,
192+ kwargs : Optional [Dict ] = None ,
193+ timeout : Optional [int ] = None ,
194+ result_ttl : Optional [int ] = None ,
195+ job_info_ttl : Optional [int ] = None ,
196+ description : Optional [str ] = None ,
197+ name : Optional [str ] = None ,
198+ at_front : bool = False ,
199+ meta : Optional [Dict ] = None ,
200+ on_success : Optional [Callback ] = None ,
201+ on_failure : Optional [Callback ] = None ,
202+ on_stopped : Optional [Callback ] = None ,
203+ task_type : Optional [str ] = None ,
204+ scheduled_task_id : Optional [int ] = None ,
205+ when : Optional [datetime ] = None ,
206+ pipeline : Optional [ConnectionType ] = None ,
209207 ) -> JobModel :
210208 """Creates a job to represent the delayed function call and enqueues it.
211209 :param when: When to schedule the job (None to enqueue immediately)
@@ -312,7 +310,7 @@ def run_job(self, job: JobModel) -> JobModel:
312310 return job
313311
314312 def enqueue_job (
315- self , job_model : JobModel , connection : Optional [ConnectionType ] = None , at_front : bool = False
313+ self , job_model : JobModel , connection : Optional [ConnectionType ] = None , at_front : bool = False
316314 ) -> JobModel :
317315 """Enqueues a job for delayed execution without checking dependencies.
318316
@@ -363,10 +361,10 @@ def run_sync(self, job: JobModel) -> JobModel:
363361
364362 @classmethod
365363 def dequeue_any (
366- cls ,
367- queues : List [Self ],
368- timeout : Optional [int ],
369- connection : Optional [ConnectionType ] = None ,
364+ cls ,
365+ queues : List [Self ],
366+ timeout : Optional [int ],
367+ connection : Optional [ConnectionType ] = None ,
370368 ) -> Tuple [Optional [JobModel ], Optional [Self ]]:
371369 """Class method returning a Job instance at the front of the given set of Queues, where the order of the queues
372370 is important.
@@ -382,6 +380,8 @@ def dequeue_any(
382380
383381 while True :
384382 registries = [q .queued_job_registry for q in queues ]
383+ for registry in registries :
384+ registry .compact ()
385385
386386 registry_key , job_name = QueuedJobRegistry .pop (connection , registries , timeout )
387387 if job_name is None :
0 commit comments