Skip to content

Commit 3debe11

Browse files
committed
test:suspend-worker
1 parent e15ab3e commit 3debe11

File tree

5 files changed

+137
-91
lines changed

5 files changed

+137
-91
lines changed

docs/changelog.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changelog
22

3+
## v4.0.0 🌈
4+
5+
See breaking changes in 4.0.0 beta versions.
6+
37
## v4.0.0b3 🌈
48

59
Refactor the code to make it more organized and easier to maintain. This includes:
Lines changed: 26 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,84 +1,42 @@
1-
from time import sleep
2-
3-
from django.test import tag
4-
51
from scheduler.helpers.queues import get_queue
6-
from scheduler.redis_models import JobStatus, JobModel, WorkerModel
7-
from scheduler.tests.jobs import long_job, two_seconds_job
8-
from .. import testtools
2+
from scheduler.tests.jobs import test_job
93
from ..test_views.base import BaseTestCase
10-
from ...worker.commands import KillWorkerCommand, send_command, StopJobCommand
4+
from ...redis_models import JobModel
5+
from ...worker import create_worker
6+
from ...worker.commands import send_command
7+
from ...worker.commands.suspend_worker import SuspendWorkCommand
118

129

13-
@tag("multiprocess")
1410
class WorkerCommandsTest(BaseTestCase):
15-
def test_kill_job_command__current_job(self):
11+
def test_stop_worker_command__green(self):
1612
# Arrange
17-
queue = get_queue("django_tasks_scheduler_test")
18-
job = queue.create_and_enqueue_job(long_job)
13+
worker_name = "test"
14+
queue = get_queue("default")
15+
job = queue.create_and_enqueue_job(test_job)
1916
self.assertTrue(job.is_queued)
20-
process, worker_name = testtools.run_worker_in_process("django_tasks_scheduler_test")
21-
sleep(0.1)
22-
job = JobModel.get(job.name, connection=queue.connection)
23-
self.assertEqual(JobStatus.STARTED, job.status)
24-
17+
worker = create_worker("default", name=worker_name, burst=True, with_scheduler=False)
18+
worker.worker_start()
2519
# Act
26-
send_command(queue.connection, StopJobCommand(worker_name=worker_name, job_name=job.name))
20+
send_command(queue.connection, SuspendWorkCommand(worker_name=worker_name))
21+
worker.work()
2722

2823
# Assert
24+
self.assertTrue(job.is_queued)
25+
self.assertTrue(worker._model.is_suspended)
2926

30-
process.terminate()
31-
process.join(2)
32-
process.kill()
33-
34-
job = JobModel.get(job.name, connection=queue.connection)
35-
worker_model = WorkerModel.get(worker_name, connection=queue.connection)
36-
self.assertEqual(job.name, worker_model.stopped_job_name)
37-
self.assertEqual(job.name, worker_model.current_job_name)
38-
self.assertEqual(0, worker_model.completed_jobs)
39-
self.assertEqual(0, worker_model.failed_job_count)
40-
self.assertEqual(0, worker_model.successful_job_count)
41-
self.assertEqual(JobStatus.STOPPED, job.status)
42-
self.assertNotIn(job.name, queue.queued_job_registry.all())
43-
44-
def test_kill_job_command__different_job(self):
27+
def test_stop_worker_command__bad_worker_name(self):
4528
# Arrange
46-
queue = get_queue("django_tasks_scheduler_test")
47-
job = queue.create_and_enqueue_job(two_seconds_job)
29+
worker_name = "test"
30+
queue = get_queue("default")
31+
job = queue.create_and_enqueue_job(test_job)
4832
self.assertTrue(job.is_queued)
49-
process, worker_name = testtools.run_worker_in_process("django_tasks_scheduler_test")
50-
sleep(0.2)
51-
job = JobModel.get(job.name, connection=queue.connection)
52-
self.assertEqual(JobStatus.STARTED, job.status)
53-
33+
worker = create_worker("default", name=worker_name, burst=True, with_scheduler=False)
34+
worker.bootstrap()
5435
# Act
55-
send_command(queue.connection, StopJobCommand(worker_name=worker_name, job_name=job.name + "1"))
56-
sleep(0.1)
57-
process.kill()
58-
process.join()
36+
send_command(queue.connection, SuspendWorkCommand(worker_name=worker_name + "1"))
37+
worker.work()
38+
5939
# Assert
40+
self.assertFalse(worker._model.is_suspended)
6041
job = JobModel.get(job.name, connection=queue.connection)
61-
self.assertEqual(JobStatus.STARTED, job.status)
62-
self.assertNotIn(job.name, queue.queued_job_registry.all())
63-
worker_model = WorkerModel.get(worker_name, connection=queue.connection)
64-
self.assertEqual(0, worker_model.completed_jobs)
65-
self.assertEqual(0, worker_model.failed_job_count)
66-
self.assertEqual(0, worker_model.successful_job_count)
67-
self.assertIsNone(worker_model.stopped_job_name)
68-
self.assertEqual(job.name, worker_model.current_job_name)
69-
70-
def test_kill_worker_command(self):
71-
queue = get_queue("django_tasks_scheduler_test")
72-
process, worker_name = testtools.run_worker_in_process("django_tasks_scheduler_test")
73-
sleep(0.1)
74-
# act
75-
send_command(queue.connection, KillWorkerCommand(worker_name=worker_name))
76-
# assert
77-
sleep(0.2)
78-
process.kill()
79-
process.join()
80-
worker_model = WorkerModel.get(worker_name, connection=queue.connection)
81-
self.assertEqual(0, worker_model.completed_jobs)
82-
self.assertEqual(0, worker_model.failed_job_count)
83-
self.assertEqual(0, worker_model.successful_job_count)
84-
self.assertIsNotNone(worker_model.shutdown_requested_date)
42+
self.assertFalse(job.is_queued)
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
from time import sleep
2+
3+
from django.test import tag
4+
5+
from scheduler.helpers.queues import get_queue
6+
from scheduler.redis_models import JobStatus, JobModel, WorkerModel
7+
from scheduler.tests.jobs import long_job, two_seconds_job
8+
from .. import testtools
9+
from ..test_views.base import BaseTestCase
10+
from ...worker.commands import KillWorkerCommand, send_command, StopJobCommand
11+
12+
13+
@tag("multiprocess")
14+
class WorkerCommandsTest(BaseTestCase):
15+
def test_kill_job_command__current_job(self):
16+
# Arrange
17+
queue = get_queue("django_tasks_scheduler_test")
18+
job = queue.create_and_enqueue_job(long_job)
19+
self.assertTrue(job.is_queued)
20+
process, worker_name = testtools.run_worker_in_process("django_tasks_scheduler_test")
21+
sleep(0.1)
22+
job = JobModel.get(job.name, connection=queue.connection)
23+
self.assertEqual(JobStatus.STARTED, job.status)
24+
25+
# Act
26+
send_command(queue.connection, StopJobCommand(worker_name=worker_name, job_name=job.name))
27+
28+
# Assert
29+
30+
process.terminate()
31+
process.join(2)
32+
process.kill()
33+
34+
job = JobModel.get(job.name, connection=queue.connection)
35+
worker_model = WorkerModel.get(worker_name, connection=queue.connection)
36+
self.assertEqual(job.name, worker_model.stopped_job_name)
37+
self.assertEqual(job.name, worker_model.current_job_name)
38+
self.assertEqual(0, worker_model.completed_jobs)
39+
self.assertEqual(0, worker_model.failed_job_count)
40+
self.assertEqual(0, worker_model.successful_job_count)
41+
self.assertEqual(JobStatus.STOPPED, job.status)
42+
self.assertNotIn(job.name, queue.queued_job_registry.all())
43+
44+
def test_kill_job_command__different_job(self):
45+
# Arrange
46+
queue = get_queue("django_tasks_scheduler_test")
47+
job = queue.create_and_enqueue_job(two_seconds_job)
48+
self.assertTrue(job.is_queued)
49+
process, worker_name = testtools.run_worker_in_process("django_tasks_scheduler_test")
50+
sleep(0.2)
51+
job = JobModel.get(job.name, connection=queue.connection)
52+
self.assertEqual(JobStatus.STARTED, job.status)
53+
54+
# Act
55+
send_command(queue.connection, StopJobCommand(worker_name=worker_name, job_name=job.name + "1"))
56+
sleep(0.1)
57+
process.kill()
58+
process.join()
59+
# Assert
60+
job = JobModel.get(job.name, connection=queue.connection)
61+
self.assertEqual(JobStatus.STARTED, job.status)
62+
self.assertNotIn(job.name, queue.queued_job_registry.all())
63+
worker_model = WorkerModel.get(worker_name, connection=queue.connection)
64+
self.assertEqual(0, worker_model.completed_jobs)
65+
self.assertEqual(0, worker_model.failed_job_count)
66+
self.assertEqual(0, worker_model.successful_job_count)
67+
self.assertIsNone(worker_model.stopped_job_name)
68+
self.assertEqual(job.name, worker_model.current_job_name)
69+
70+
def test_kill_worker_command(self):
71+
queue = get_queue("django_tasks_scheduler_test")
72+
process, worker_name = testtools.run_worker_in_process("django_tasks_scheduler_test")
73+
sleep(0.1)
74+
# act
75+
send_command(queue.connection, KillWorkerCommand(worker_name=worker_name))
76+
# assert
77+
sleep(0.2)
78+
process.kill()
79+
process.join()
80+
worker_model = WorkerModel.get(worker_name, connection=queue.connection)
81+
self.assertEqual(0, worker_model.completed_jobs)
82+
self.assertEqual(0, worker_model.failed_job_count)
83+
self.assertEqual(0, worker_model.successful_job_count)
84+
self.assertIsNotNone(worker_model.shutdown_requested_date)

scheduler/worker/commands/suspend_worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ class SuspendWorkCommand(WorkerCommand):
1010
command_name = "suspend"
1111

1212
def process_command(self, connection: ConnectionType) -> None:
13-
logger.debug(f"Received command to suspend worker {self.job_name}")
13+
logger.debug(f"Received command to suspend worker {self.worker_name}")
1414
worker_model = WorkerModel.get(self.worker_name, connection)
1515
if worker_model is None:
1616
logger.warning(f"Worker {self.worker_name} not found")

scheduler/worker/worker.py

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -109,18 +109,18 @@ def from_model(cls, model: WorkerModel) -> Self:
109109
return res
110110

111111
def __init__(
112-
self,
113-
queues,
114-
name: str,
115-
connection: Optional[ConnectionType] = None,
116-
maintenance_interval: int = SCHEDULER_CONFIG.DEFAULT_MAINTENANCE_TASK_INTERVAL,
117-
job_monitoring_interval=SCHEDULER_CONFIG.DEFAULT_JOB_MONITORING_INTERVAL,
118-
dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT,
119-
disable_default_exception_handler: bool = False,
120-
fork_job_execution: bool = True,
121-
with_scheduler: bool = True,
122-
burst: bool = False,
123-
model: Optional[WorkerModel] = None,
112+
self,
113+
queues,
114+
name: str,
115+
connection: Optional[ConnectionType] = None,
116+
maintenance_interval: int = SCHEDULER_CONFIG.DEFAULT_MAINTENANCE_TASK_INTERVAL,
117+
job_monitoring_interval=SCHEDULER_CONFIG.DEFAULT_JOB_MONITORING_INTERVAL,
118+
dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT,
119+
disable_default_exception_handler: bool = False,
120+
fork_job_execution: bool = True,
121+
with_scheduler: bool = True,
122+
burst: bool = False,
123+
model: Optional[WorkerModel] = None,
124124
): # noqa
125125
self.fork_job_execution = fork_job_execution
126126
self.job_monitoring_interval = job_monitoring_interval
@@ -212,9 +212,9 @@ def _install_signal_handlers(self) -> None:
212212
signal.signal(signal.SIGTERM, self.request_stop)
213213

214214
def work(
215-
self,
216-
max_jobs: Optional[int] = None,
217-
max_idle_time: Optional[int] = None,
215+
self,
216+
max_jobs: Optional[int] = None,
217+
max_idle_time: Optional[int] = None,
218218
) -> bool:
219219
"""Starts the work loop.
220220
@@ -351,13 +351,13 @@ def _check_for_suspension(self, burst: bool) -> None:
351351
"""Check to see if workers have been suspended by `rq suspend`"""
352352
before_state = None
353353
notified = False
354-
while self._model.shutdown_requested_date is not None and self._model.is_suspended:
354+
while self._model.is_suspended:
355355
if burst:
356356
logger.info(
357357
f"[Worker {self.name}/{self._pid}]: Suspended in burst mode, exiting, "
358358
f"Note: There could still be unfinished jobs on the queue"
359359
)
360-
raise StopRequested
360+
raise StopRequested()
361361

362362
if not notified:
363363
logger.info(f"[Worker {self.name}/{self._pid}]: Worker suspended, trigger ResumeCommand")
@@ -389,7 +389,7 @@ def run_maintenance_tasks(self):
389389
self._model.save(connection=self.connection)
390390

391391
def dequeue_job_and_maintain_ttl(
392-
self, timeout: Optional[int], max_idle_time: Optional[int] = None
392+
self, timeout: Optional[int], max_idle_time: Optional[int] = None
393393
) -> Tuple[JobModel, Queue]:
394394
"""Dequeues a job while maintaining the TTL.
395395
:param timeout: The timeout for the dequeue operation.
@@ -564,7 +564,7 @@ def reorder_queues(self, reference_queue: Queue):
564564
return
565565
if self._dequeue_strategy == DequeueStrategy.ROUND_ROBIN:
566566
pos = self._ordered_queues.index(reference_queue)
567-
self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1]
567+
self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
568568
return
569569
if self._dequeue_strategy == DequeueStrategy.RANDOM:
570570
shuffle(self._ordered_queues)
@@ -595,7 +595,7 @@ def refresh(self, update_queues: bool = False):
595595
"""
596596
self._model = WorkerModel.get(self.name, connection=self.connection)
597597
if self._model is None:
598-
msg = f"[Worker {self.name}/{self._pid}]: Worker {self.name} not found, quitting..."
598+
msg = f"[Worker {self.name}/{self._pid}]: Worker broker record for {self.name} not found, quitting..."
599599
logger.error(msg)
600600
raise WorkerNotFound(msg)
601601
if update_queues:
@@ -648,7 +648,7 @@ def monitor_job_execution_process(self, job: JobModel, queue: Queue) -> None:
648648
while True:
649649
try:
650650
with SCHEDULER_CONFIG.DEATH_PENALTY_CLASS(
651-
self.job_monitoring_interval, JobExecutionMonitorTimeoutException
651+
self.job_monitoring_interval, JobExecutionMonitorTimeoutException
652652
):
653653
retpid, ret_val, rusage = self.wait_for_job_execution_process()
654654
break
@@ -884,7 +884,7 @@ class RoundRobinWorker(Worker):
884884

885885
def reorder_queues(self, reference_queue):
886886
pos = self._ordered_queues.index(reference_queue)
887-
self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1]
887+
self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
888888

889889

890890
class RandomWorker(Worker):

0 commit comments

Comments
 (0)