@@ -109,18 +109,18 @@ def from_model(cls, model: WorkerModel) -> Self:
109
109
return res
110
110
111
111
def __init__ (
112
- self ,
113
- queues ,
114
- name : str ,
115
- connection : Optional [ConnectionType ] = None ,
116
- maintenance_interval : int = SCHEDULER_CONFIG .DEFAULT_MAINTENANCE_TASK_INTERVAL ,
117
- job_monitoring_interval = SCHEDULER_CONFIG .DEFAULT_JOB_MONITORING_INTERVAL ,
118
- dequeue_strategy : DequeueStrategy = DequeueStrategy .DEFAULT ,
119
- disable_default_exception_handler : bool = False ,
120
- fork_job_execution : bool = True ,
121
- with_scheduler : bool = True ,
122
- burst : bool = False ,
123
- model : Optional [WorkerModel ] = None ,
112
+ self ,
113
+ queues ,
114
+ name : str ,
115
+ connection : Optional [ConnectionType ] = None ,
116
+ maintenance_interval : int = SCHEDULER_CONFIG .DEFAULT_MAINTENANCE_TASK_INTERVAL ,
117
+ job_monitoring_interval = SCHEDULER_CONFIG .DEFAULT_JOB_MONITORING_INTERVAL ,
118
+ dequeue_strategy : DequeueStrategy = DequeueStrategy .DEFAULT ,
119
+ disable_default_exception_handler : bool = False ,
120
+ fork_job_execution : bool = True ,
121
+ with_scheduler : bool = True ,
122
+ burst : bool = False ,
123
+ model : Optional [WorkerModel ] = None ,
124
124
): # noqa
125
125
self .fork_job_execution = fork_job_execution
126
126
self .job_monitoring_interval = job_monitoring_interval
@@ -212,9 +212,9 @@ def _install_signal_handlers(self) -> None:
212
212
signal .signal (signal .SIGTERM , self .request_stop )
213
213
214
214
def work (
215
- self ,
216
- max_jobs : Optional [int ] = None ,
217
- max_idle_time : Optional [int ] = None ,
215
+ self ,
216
+ max_jobs : Optional [int ] = None ,
217
+ max_idle_time : Optional [int ] = None ,
218
218
) -> bool :
219
219
"""Starts the work loop.
220
220
@@ -389,7 +389,7 @@ def run_maintenance_tasks(self):
389
389
self ._model .save (connection = self .connection )
390
390
391
391
def dequeue_job_and_maintain_ttl (
392
- self , timeout : Optional [int ], max_idle_time : Optional [int ] = None
392
+ self , timeout : Optional [int ], max_idle_time : Optional [int ] = None
393
393
) -> Tuple [JobModel , Queue ]:
394
394
"""Dequeues a job while maintaining the TTL.
395
395
:param timeout: The timeout for the dequeue operation.
@@ -564,7 +564,7 @@ def reorder_queues(self, reference_queue: Queue):
564
564
return
565
565
if self ._dequeue_strategy == DequeueStrategy .ROUND_ROBIN :
566
566
pos = self ._ordered_queues .index (reference_queue )
567
- self ._ordered_queues = self ._ordered_queues [pos + 1 :] + self ._ordered_queues [: pos + 1 ]
567
+ self ._ordered_queues = self ._ordered_queues [pos + 1 :] + self ._ordered_queues [: pos + 1 ]
568
568
return
569
569
if self ._dequeue_strategy == DequeueStrategy .RANDOM :
570
570
shuffle (self ._ordered_queues )
@@ -648,7 +648,7 @@ def monitor_job_execution_process(self, job: JobModel, queue: Queue) -> None:
648
648
while True :
649
649
try :
650
650
with SCHEDULER_CONFIG .DEATH_PENALTY_CLASS (
651
- self .job_monitoring_interval , JobExecutionMonitorTimeoutException
651
+ self .job_monitoring_interval , JobExecutionMonitorTimeoutException
652
652
):
653
653
retpid , ret_val , rusage = self .wait_for_job_execution_process ()
654
654
break
@@ -884,7 +884,7 @@ class RoundRobinWorker(Worker):
884
884
885
885
def reorder_queues (self , reference_queue ):
886
886
pos = self ._ordered_queues .index (reference_queue )
887
- self ._ordered_queues = self ._ordered_queues [pos + 1 :] + self ._ordered_queues [: pos + 1 ]
887
+ self ._ordered_queues = self ._ordered_queues [pos + 1 :] + self ._ordered_queues [: pos + 1 ]
888
888
889
889
890
890
class RandomWorker (Worker ):
0 commit comments