Skip to content

Commit d33d80d

Browse files
committed
stop job test
1 parent 505220f commit d33d80d

File tree

3 files changed

+37
-8
lines changed

3 files changed

+37
-8
lines changed

scheduler/tests/test_worker/test_worker_commands.py

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
1+
import json
2+
from threading import Thread
3+
from time import sleep
4+
15
from scheduler.helpers.queues import get_queue
2-
from scheduler.tests.jobs import test_job
6+
from scheduler.tests.jobs import test_job, two_seconds_job
7+
from ..test_job_decorator import long_running_func
38
from ..test_views.base import BaseTestCase
4-
from ...redis_models import JobModel
9+
from ...redis_models import JobModel, JobStatus, WorkerModel
510
from ...worker import create_worker
6-
from ...worker.commands import send_command
11+
from ...worker.commands import send_command, StopJobCommand
712
from ...worker.commands.suspend_worker import SuspendWorkCommand
813

914

@@ -40,3 +45,29 @@ def test_stop_worker_command__bad_worker_name(self):
4045
self.assertFalse(worker._model.is_suspended)
4146
job = JobModel.get(job.name, connection=queue.connection)
4247
self.assertFalse(job.is_queued)
48+
49+
def test_stop_job_command__success(self):
50+
# Arrange
51+
worker_name = "test"
52+
queue = get_queue("default")
53+
job = queue.create_and_enqueue_job(two_seconds_job)
54+
self.assertTrue(job.is_queued)
55+
worker = create_worker("default", name=worker_name, burst=True, with_scheduler=False)
56+
worker.bootstrap()
57+
58+
# Act
59+
t = Thread(target=worker.work, args=(0,), name="worker-thread")
60+
t.start()
61+
sleep(0.1)
62+
command = StopJobCommand(worker_name=worker_name, job_name=job.name)
63+
command_payload = json.dumps(command.command_payload())
64+
worker._command_listener.handle_payload(dict(data=command_payload))
65+
worker.monitor_job_execution_process(job,queue)
66+
67+
# Assert
68+
job = JobModel.get(job.name, connection=queue.connection)
69+
worker = WorkerModel.get(worker.name, connection=queue.connection)
70+
self.assertEqual(worker.stopped_job_name, job.name)
71+
self.assertIsNone(worker.current_job_name)
72+
self.assertEqual(job.status, JobStatus.STOPPED)
73+
t.join()

scheduler/worker/commands/stop_job.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ def process_command(self, connection: ConnectionType) -> None:
3232
if job_model is None:
3333
logger.error(f"Job {self.job_name} not found")
3434
return
35-
if worker_model.pid == worker_model.job_execution_process_pid:
36-
logger.warning(f"Job execution process ID and worker process id {worker_model.pid} are equal, skipping")
37-
return
3835
if not worker_model.job_execution_process_pid:
3936
logger.error(f"Worker {self.worker_name} has no job execution process")
4037
return
38+
if worker_model.pid == worker_model.job_execution_process_pid:
39+
logger.warning(f"Job execution process ID and worker process id {worker_model.pid} are equal, skipping")
40+
return
4141
if worker_model.current_job_name != self.job_name:
4242
logger.info(
4343
f"{self.worker_name} working on job {worker_model.current_job_name}, "

scheduler/worker/worker.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,6 @@ def fork_job_execution_process(self, job: JobModel, queue: Queue) -> None:
581581
logger.debug(
582582
f"[Worker {self.name}/{self._pid}]: Forking job execution process, job_execution_process_pid={child_pid}"
583583
)
584-
refresh_queue_connection(queue)
585584
self._model.job_execution_process_pid = child_pid
586585
self._model.save(connection=queue.connection)
587586
self.procline(f"Forked {child_pid} at {time.time()}")
@@ -605,7 +604,6 @@ def monitor_job_execution_process(self, job: JobModel, queue: Queue) -> None:
605604
:param queue: The Queue
606605
"""
607606
retpid = ret_val = None
608-
job.started_at = utcnow()
609607
while True:
610608
try:
611609
with SCHEDULER_CONFIG.DEATH_PENALTY_CLASS(

0 commit comments

Comments
 (0)