Skip to content

Commit 00472d7

Browse files
committed
wip
1 parent 7bff43b commit 00472d7

File tree

3 files changed

+22
-21
lines changed

3 files changed

+22
-21
lines changed

scheduler/worker/commands/kill_worker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
1919

2020
def process_command(self, connection: ConnectionType) -> None:
2121
from scheduler.worker import Worker
22+
2223
logger.info(f"Received kill-worker command for {self.worker_name}")
2324
worker_model = WorkerModel.get(self.worker_name, connection)
2425
if worker_model is None or worker_model.pid is None:

scheduler/worker/scheduler.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ def _reschedule_tasks() -> None:
3232

3333
class WorkerScheduler:
3434
def __init__(
35-
self,
36-
queues: Sequence[Queue],
37-
connection: ConnectionType,
38-
worker_name: str,
39-
interval: Optional[int] = None,
35+
self,
36+
queues: Sequence[Queue],
37+
connection: ConnectionType,
38+
worker_name: str,
39+
interval: Optional[int] = None,
4040
) -> None:
4141
interval = interval or SCHEDULER_CONFIG.SCHEDULER_INTERVAL
4242
self._queues = queues

scheduler/worker/worker.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -99,18 +99,18 @@ def from_model(cls, model: WorkerModel) -> Self:
9999
return res
100100

101101
def __init__(
102-
self,
103-
queues: Iterable[Union[str, Queue]],
104-
name: str,
105-
connection: ConnectionType,
106-
maintenance_interval: int = SCHEDULER_CONFIG.DEFAULT_MAINTENANCE_TASK_INTERVAL,
107-
job_monitoring_interval: int = SCHEDULER_CONFIG.DEFAULT_JOB_MONITORING_INTERVAL,
108-
dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT,
109-
disable_default_exception_handler: bool = False,
110-
fork_job_execution: bool = True,
111-
with_scheduler: bool = True,
112-
burst: bool = False,
113-
model: Optional[WorkerModel] = None,
102+
self,
103+
queues: Iterable[Union[str, Queue]],
104+
name: str,
105+
connection: ConnectionType,
106+
maintenance_interval: int = SCHEDULER_CONFIG.DEFAULT_MAINTENANCE_TASK_INTERVAL,
107+
job_monitoring_interval: int = SCHEDULER_CONFIG.DEFAULT_JOB_MONITORING_INTERVAL,
108+
dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT,
109+
disable_default_exception_handler: bool = False,
110+
fork_job_execution: bool = True,
111+
with_scheduler: bool = True,
112+
burst: bool = False,
113+
model: Optional[WorkerModel] = None,
114114
) -> None:
115115
self.fork_job_execution = fork_job_execution
116116
self.job_monitoring_interval: int = job_monitoring_interval
@@ -376,7 +376,7 @@ def run_maintenance_tasks(self) -> None:
376376
self._model.save(connection=self.connection)
377377

378378
def dequeue_job_and_maintain_ttl(
379-
self, timeout: Optional[int], max_idle_time: Optional[int] = None
379+
self, timeout: Optional[int], max_idle_time: Optional[int] = None
380380
) -> Tuple[Optional[JobModel], Optional[Queue]]:
381381
"""Dequeues a job while maintaining the TTL.
382382
:param timeout: The timeout for the dequeue operation.
@@ -551,7 +551,7 @@ def reorder_queues(self, reference_queue: Queue) -> None:
551551
return
552552
if self._dequeue_strategy == DequeueStrategy.ROUND_ROBIN:
553553
pos = self._ordered_queues.index(reference_queue)
554-
self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
554+
self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1]
555555
return
556556
if self._dequeue_strategy == DequeueStrategy.RANDOM:
557557
shuffle(self._ordered_queues)
@@ -635,7 +635,7 @@ def monitor_job_execution_process(self, job: JobModel, queue: Queue) -> None:
635635
while True:
636636
try:
637637
with SCHEDULER_CONFIG.DEATH_PENALTY_CLASS(
638-
self.job_monitoring_interval, JobExecutionMonitorTimeoutException
638+
self.job_monitoring_interval, JobExecutionMonitorTimeoutException
639639
):
640640
retpid, ret_val = self.wait_for_job_execution_process()
641641
break
@@ -878,7 +878,7 @@ class RoundRobinWorker(Worker):
878878

879879
def reorder_queues(self, reference_queue: Queue) -> None:
880880
pos = self._ordered_queues.index(reference_queue)
881-
self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
881+
self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1]
882882

883883

884884
class RandomWorker(Worker):

0 commit comments

Comments
 (0)