Skip to content

Commit 681d377

Browse files
committed
ruff
1 parent fc699a5 commit 681d377

File tree

6 files changed

+51
-47
lines changed

6 files changed

+51
-47
lines changed

scheduler/helpers/queues/queue_logic.py

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -186,24 +186,24 @@ def get_all_jobs(self) -> List[JobModel]:
186186
return JobModel.get_many(job_names, connection=self.connection)
187187

188188
def create_and_enqueue_job(
189-
self,
190-
func: FunctionReferenceType,
191-
args: Union[Tuple, List, None] = None,
192-
kwargs: Optional[Dict] = None,
193-
timeout: Optional[int] = None,
194-
result_ttl: Optional[int] = None,
195-
job_info_ttl: Optional[int] = None,
196-
description: Optional[str] = None,
197-
name: Optional[str] = None,
198-
at_front: bool = False,
199-
meta: Optional[Dict] = None,
200-
on_success: Optional[Callback] = None,
201-
on_failure: Optional[Callback] = None,
202-
on_stopped: Optional[Callback] = None,
203-
task_type: Optional[str] = None,
204-
scheduled_task_id: Optional[int] = None,
205-
when: Optional[datetime] = None,
206-
pipeline: Optional[ConnectionType] = None,
189+
self,
190+
func: FunctionReferenceType,
191+
args: Union[Tuple, List, None] = None,
192+
kwargs: Optional[Dict] = None,
193+
timeout: Optional[int] = None,
194+
result_ttl: Optional[int] = None,
195+
job_info_ttl: Optional[int] = None,
196+
description: Optional[str] = None,
197+
name: Optional[str] = None,
198+
at_front: bool = False,
199+
meta: Optional[Dict] = None,
200+
on_success: Optional[Callback] = None,
201+
on_failure: Optional[Callback] = None,
202+
on_stopped: Optional[Callback] = None,
203+
task_type: Optional[str] = None,
204+
scheduled_task_id: Optional[int] = None,
205+
when: Optional[datetime] = None,
206+
pipeline: Optional[ConnectionType] = None,
207207
) -> JobModel:
208208
"""Creates a job to represent the delayed function call and enqueues it.
209209
:param when: When to schedule the job (None to enqueue immediately)
@@ -310,7 +310,7 @@ def run_job(self, job: JobModel) -> JobModel:
310310
return job
311311

312312
def enqueue_job(
313-
self, job_model: JobModel, connection: Optional[ConnectionType] = None, at_front: bool = False
313+
self, job_model: JobModel, connection: Optional[ConnectionType] = None, at_front: bool = False
314314
) -> JobModel:
315315
"""Enqueues a job for delayed execution without checking dependencies.
316316
@@ -361,10 +361,10 @@ def run_sync(self, job: JobModel) -> JobModel:
361361

362362
@classmethod
363363
def dequeue_any(
364-
cls,
365-
queues: List[Self],
366-
timeout: Optional[int],
367-
connection: Optional[ConnectionType] = None,
364+
cls,
365+
queues: List[Self],
366+
timeout: Optional[int],
367+
connection: Optional[ConnectionType] = None,
368368
) -> Tuple[Optional[JobModel], Optional[Self]]:
369369
"""Class method returning a Job instance at the front of the given set of Queues, where the order of the queues
370370
is important.

scheduler/models/task.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,9 @@ def is_scheduled(self) -> bool:
186186
return False
187187
# check whether job_id is in scheduled/queued/active jobs
188188
res = (
189-
(self.job_name in self.rqueue.scheduled_job_registry.all())
190-
or (self.job_name in self.rqueue.queued_job_registry.all())
191-
or (self.job_name in self.rqueue.active_job_registry.all())
189+
(self.job_name in self.rqueue.scheduled_job_registry.all())
190+
or (self.job_name in self.rqueue.queued_job_registry.all())
191+
or (self.job_name in self.rqueue.active_job_registry.all())
192192
)
193193
# If the job_id is not scheduled/queued/started,
194194
# update the job_id to None. (The job_id belongs to a previous run which is completed)

scheduler/redis_models/registry/base_registry.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def add(self, connection: ConnectionType, job_name: str, score: float, update_ex
3030
def delete(self, connection: ConnectionType, job_name: str) -> None:
3131
connection.zrem(self._key, job_name)
3232

33+
3334
class JobNamesRegistry(ZSetModel):
3435
_element_key_template: ClassVar[str] = ":registry:{}"
3536

scheduler/tests/test_mgmt_commands/test_scheduler_worker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from scheduler.helpers.queues import get_queue
55
from scheduler.redis_models import JobModel
6+
from scheduler.settings import SCHEDULER_CONFIG
67
from scheduler.tests import test_settings # noqa
78
from scheduler.tests.jobs import failing_job
89

@@ -26,6 +27,7 @@ def test_scheduler_worker__no_queues_params(self):
2627
self.assertTrue(job.is_failed)
2728

2829
def test_scheduler_worker__run_jobs(self):
30+
SCHEDULER_CONFIG.SCHEDULER_INTERVAL = 1
2931
queue = get_queue("default")
3032

3133
# enqueue some jobs that will fail

scheduler/types/settings_types.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from dataclasses import dataclass
22
from enum import Enum
3-
from typing import Callable, Dict, Optional, List, Tuple, Any, Self, Type, Iterator
3+
from typing import Callable, Dict, Optional, List, Tuple, Any, Self, Type
44

55
from scheduler.timeouts import BaseDeathPenalty, UnixSignalDeathPenalty
66

@@ -36,6 +36,7 @@ class SchedulerConfiguration:
3636
SCHEDULER_FALLBACK_PERIOD_SECS: int = 120 # Period (secs) to wait before requiring to reacquire locks
3737
DEATH_PENALTY_CLASS: Type["BaseDeathPenalty"] = UnixSignalDeathPenalty
3838

39+
3940
@dataclass(slots=True, frozen=True, kw_only=True)
4041
class QueueConfiguration:
4142
__CONNECTION_FIELDS__ = {

scheduler/worker/worker.py

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import scheduler
2121
from scheduler.helpers.queues import get_queue
2222
from scheduler.helpers.queues import get_queues
23-
from scheduler.redis_models import WorkerModel, JobModel, JobStatus, KvLock, DequeueTimeout
23+
from scheduler.redis_models import WorkerModel, JobModel, JobStatus, DequeueTimeout
2424
from scheduler.settings import SCHEDULER_CONFIG, logger
2525
from scheduler.types import Broker
2626
from scheduler.types import (
@@ -106,18 +106,18 @@ def from_model(cls, model: WorkerModel) -> Self:
106106
return res
107107

108108
def __init__(
109-
self,
110-
queues,
111-
name: str,
112-
connection: Optional[ConnectionType] = None,
113-
maintenance_interval: int = SCHEDULER_CONFIG.DEFAULT_MAINTENANCE_TASK_INTERVAL,
114-
job_monitoring_interval=SCHEDULER_CONFIG.DEFAULT_JOB_MONITORING_INTERVAL,
115-
dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT,
116-
disable_default_exception_handler: bool = False,
117-
fork_job_execution: bool = True,
118-
with_scheduler: bool = True,
119-
burst: bool = False,
120-
model: Optional[WorkerModel] = None,
109+
self,
110+
queues,
111+
name: str,
112+
connection: Optional[ConnectionType] = None,
113+
maintenance_interval: int = SCHEDULER_CONFIG.DEFAULT_MAINTENANCE_TASK_INTERVAL,
114+
job_monitoring_interval=SCHEDULER_CONFIG.DEFAULT_JOB_MONITORING_INTERVAL,
115+
dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT,
116+
disable_default_exception_handler: bool = False,
117+
fork_job_execution: bool = True,
118+
with_scheduler: bool = True,
119+
burst: bool = False,
120+
model: Optional[WorkerModel] = None,
121121
): # noqa
122122
self.fork_job_execution = fork_job_execution
123123
self.job_monitoring_interval = job_monitoring_interval
@@ -209,9 +209,9 @@ def _install_signal_handlers(self) -> None:
209209
signal.signal(signal.SIGTERM, self.request_stop)
210210

211211
def work(
212-
self,
213-
max_jobs: Optional[int] = None,
214-
max_idle_time: Optional[int] = None,
212+
self,
213+
max_jobs: Optional[int] = None,
214+
max_idle_time: Optional[int] = None,
215215
) -> bool:
216216
"""Starts the work loop.
217217
@@ -383,7 +383,7 @@ def run_maintenance_tasks(self):
383383
self.clean_registries()
384384

385385
def dequeue_job_and_maintain_ttl(
386-
self, timeout: Optional[int], max_idle_time: Optional[int] = None
386+
self, timeout: Optional[int], max_idle_time: Optional[int] = None
387387
) -> Tuple[JobModel, Queue]:
388388
"""Dequeues a job while maintaining the TTL.
389389
:param timeout: The timeout for the dequeue operation.
@@ -558,7 +558,7 @@ def reorder_queues(self, reference_queue: Queue):
558558
return
559559
if self._dequeue_strategy == DequeueStrategy.ROUND_ROBIN:
560560
pos = self._ordered_queues.index(reference_queue)
561-
self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
561+
self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1]
562562
return
563563
if self._dequeue_strategy == DequeueStrategy.RANDOM:
564564
shuffle(self._ordered_queues)
@@ -642,7 +642,7 @@ def monitor_job_execution_process(self, job: JobModel, queue: Queue) -> None:
642642
while True:
643643
try:
644644
with SCHEDULER_CONFIG.DEATH_PENALTY_CLASS(
645-
self.job_monitoring_interval, JobExecutionMonitorTimeoutException
645+
self.job_monitoring_interval, JobExecutionMonitorTimeoutException
646646
):
647647
retpid, ret_val, rusage = self.wait_for_job_execution_process()
648648
break
@@ -878,7 +878,7 @@ class RoundRobinWorker(Worker):
878878

879879
def reorder_queues(self, reference_queue):
880880
pos = self._ordered_queues.index(reference_queue)
881-
self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
881+
self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1]
882882

883883

884884
class RandomWorker(Worker):

0 commit comments

Comments
 (0)