Skip to content

Commit 30dbd7b

Browse files
committed
wip
1 parent ebbce68 commit 30dbd7b

File tree

5 files changed

+20
-20
lines changed

5 files changed

+20
-20
lines changed
Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import errno
22
import os
33
import signal
4-
from typing import Optional
4+
from typing import Any
55

6-
from scheduler.types import ConnectionType
76
from scheduler.redis_models import WorkerModel
87
from scheduler.settings import logger
8+
from scheduler.types import ConnectionType
99
from scheduler.worker.commands.worker_commands import WorkerCommand
1010

1111

@@ -14,27 +14,26 @@ class KillWorkerCommand(WorkerCommand):
1414

1515
command_name = "kill-worker"
1616

17-
def __init__(self, *args, **kwargs) -> None:
17+
def __init__(self, *args: Any, **kwargs: Any) -> None:
1818
super().__init__(*args, **kwargs)
19-
self.worker_pid: Optional[int] = None
2019

2120
def process_command(self, connection: ConnectionType) -> None:
2221
from scheduler.worker import Worker
23-
24-
logger.info("Received kill-worker command.")
22+
if self.worker_name is None:
23+
raise ValueError("Worker name must be provided")
24+
logger.info(f"Received kill-worker command for {self.worker_name}")
2525
worker_model = WorkerModel.get(self.worker_name, connection)
26-
self.worker_pid = worker_model.pid
27-
if self.worker_pid is None:
26+
if worker_model is None or worker_model.pid is None:
2827
raise ValueError("Worker PID is not set")
29-
logger.info(f"Killing worker main process {self.worker_pid}...")
28+
logger.info(f"Killing worker main process {worker_model.pid}...")
3029
try:
3130
Worker.from_model(worker_model).request_stop(signal.SIGTERM, None)
32-
os.killpg(os.getpgid(self.worker_pid), signal.SIGTERM)
33-
logger.info(f"Killed worker main process pid {self.worker_pid}")
31+
os.killpg(os.getpgid(worker_model.pid), signal.SIGTERM)
32+
logger.info(f"Killed worker main process pid {worker_model.pid}")
3433
except OSError as e:
3534
if e.errno == errno.ESRCH:
3635
logger.debug(
37-
f"Worker main process for {self.worker_name}:{self.worker_pid} already dead"
36+
f"Worker main process for {self.worker_name}:{worker_model.pid} already dead"
3837
) # "No such process" is fine with us
3938
else:
4039
raise

scheduler/worker/commands/stop_job.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def __init__(self, *args, job_name: str, worker_name: Optional[str], **kwargs) -
1919
if self.job_name is None:
2020
raise WorkerCommandError("job_name for kill-job command is required")
2121

22-
def command_payload(self) -> Dict[str, Any]:
22+
def command_payload(self, **kwargs) -> Dict[str, Any]:
2323
return super().command_payload(job_name=self.job_name)
2424

2525
def process_command(self, connection: ConnectionType) -> None:

scheduler/worker/commands/suspend_worker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ def process_command(self, connection: ConnectionType) -> None:
1414
worker_model = WorkerModel.get(self.worker_name, connection)
1515
if worker_model is None:
1616
logger.warning(f"Worker {self.worker_name} not found")
17+
return
1718
if worker_model.is_suspended:
1819
logger.warning(f"Worker {self.worker_name} already suspended")
1920
return

scheduler/worker/scheduler.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class SchedulerStatus(str, Enum):
2323
STOPPED = "stopped"
2424

2525

26-
def _reschedule_tasks():
26+
def _reschedule_tasks() -> None:
2727
enabled_jobs = list(Task.objects.filter(enabled=True))
2828
for item in enabled_jobs:
2929
logger.debug(f"Rescheduling {str(item)}")
@@ -32,11 +32,11 @@ def _reschedule_tasks():
3232

3333
class WorkerScheduler:
3434
def __init__(
35-
self,
36-
queues: Sequence[Queue],
37-
connection: ConnectionType,
38-
worker_name: str,
39-
interval: Optional[int] = None,
35+
self,
36+
queues: Sequence[Queue],
37+
connection: ConnectionType,
38+
worker_name: str,
39+
interval: Optional[int] = None,
4040
) -> None:
4141
interval = interval or SCHEDULER_CONFIG.SCHEDULER_INTERVAL
4242
self._queues = queues

scheduler/worker/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ def handle_job_failure(self, job: JobModel, queue: Queue, exc_string="") -> None
312312
# Ensure that custom exception handlers are called even if the Broker is down
313313
pass
314314

315-
def bootstrap(self):
315+
def bootstrap(self)-> None:
316316
"""Bootstraps the worker.
317317
Runs the basic tasks that should run when the worker actually starts working.
318318
Used so that new workers can focus on the work loop implementation rather

0 commit comments

Comments
 (0)