Skip to content

Commit 305d9f4

Browse files
committed
Refactor to move process_job function onto Worker class
1 parent 0254ac1 commit 305d9f4

File tree

2 files changed

+77
-84
lines changed

2 files changed

+77
-84
lines changed

django_dbq/management/commands/worker.py

Lines changed: 55 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -14,66 +14,6 @@
1414
DEFAULT_QUEUE_NAME = "default"
1515

1616

17-
def process_job(queue_name):
18-
"""This function grabs the next available job for a given queue, and runs its next task."""
19-
20-
with transaction.atomic():
21-
job = Job.objects.get_ready_or_none(queue_name)
22-
if not job:
23-
return
24-
25-
logger.info(
26-
'Processing job: name="%s" queue="%s" id=%s state=%s next_task=%s',
27-
job.name,
28-
queue_name,
29-
job.pk,
30-
job.state,
31-
job.next_task,
32-
)
33-
job.state = Job.STATES.PROCESSING
34-
job.save()
35-
36-
try:
37-
task_function = import_string(job.next_task)
38-
task_function(job)
39-
job.update_next_task()
40-
if not job.next_task:
41-
job.state = Job.STATES.COMPLETE
42-
else:
43-
job.state = Job.STATES.READY
44-
except Exception as exception:
45-
logger.exception("Job id=%s failed", job.pk)
46-
job.state = Job.STATES.FAILED
47-
48-
failure_hook_name = job.get_failure_hook_name()
49-
if failure_hook_name:
50-
logger.info(
51-
"Running failure hook %s for job id=%s", failure_hook_name, job.pk
52-
)
53-
failure_hook_function = import_string(failure_hook_name)
54-
failure_hook_function(job, exception)
55-
else:
56-
logger.info("No failure hook for job id=%s", job.pk)
57-
58-
logger.info(
59-
'Updating job: name="%s" id=%s state=%s next_task=%s',
60-
job.name,
61-
job.pk,
62-
job.state,
63-
job.next_task or "none",
64-
)
65-
66-
try:
67-
job.save()
68-
except:
69-
logger.error(
70-
"Failed to save job: id=%s org=%s",
71-
job.pk,
72-
job.workspace.get("organisation_id"),
73-
)
74-
raise
75-
76-
7717
class Worker:
7818
def __init__(self, name, rate_limit_in_seconds):
7919
self.queue_name = name
@@ -103,9 +43,63 @@ def process_job(self):
10343
):
10444
return
10545

106-
process_job(self.queue_name)
46+
self._process_job()
47+
10748
self.last_job_finished = timezone.now()
10849

50+
def _process_job(self):
51+
with transaction.atomic():
52+
job = Job.objects.get_ready_or_none(self.queue_name)
53+
if not job:
54+
return
55+
56+
logger.info(
57+
'Processing job: name="%s" queue="%s" id=%s state=%s next_task=%s',
58+
job.name,
59+
self.queue_name,
60+
job.pk,
61+
job.state,
62+
job.next_task,
63+
)
64+
job.state = Job.STATES.PROCESSING
65+
job.save()
66+
67+
try:
68+
task_function = import_string(job.next_task)
69+
task_function(job)
70+
job.update_next_task()
71+
if not job.next_task:
72+
job.state = Job.STATES.COMPLETE
73+
else:
74+
job.state = Job.STATES.READY
75+
except Exception as exception:
76+
logger.exception("Job id=%s failed", job.pk)
77+
job.state = Job.STATES.FAILED
78+
79+
failure_hook_name = job.get_failure_hook_name()
80+
if failure_hook_name:
81+
logger.info(
82+
"Running failure hook %s for job id=%s", failure_hook_name, job.pk
83+
)
84+
failure_hook_function = import_string(failure_hook_name)
85+
failure_hook_function(job, exception)
86+
else:
87+
logger.info("No failure hook for job id=%s", job.pk)
88+
89+
logger.info(
90+
'Updating job: name="%s" id=%s state=%s next_task=%s',
91+
job.name,
92+
job.pk,
93+
job.state,
94+
job.next_task or "none",
95+
)
96+
97+
try:
98+
job.save()
99+
except:
100+
logger.exception("Failed to save job: id=%s", job.pk)
101+
raise
102+
109103

110104
class Command(BaseCommand):
111105

django_dbq/tests.py

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from django.test.utils import override_settings
88
from django.utils import timezone
99

10-
from django_dbq.management.commands.worker import process_job, Worker
10+
from django_dbq.management.commands.worker import Worker
1111
from django_dbq.models import Job
1212

1313
from io import StringIO
@@ -116,41 +116,40 @@ def test_queue_depth_for_queue_with_zero_jobs(self):
116116

117117
@freezegun.freeze_time()
118118
@mock.patch("django_dbq.management.commands.worker.sleep")
119-
@mock.patch("django_dbq.management.commands.worker.process_job")
120119
class WorkerProcessProcessJobTestCase(TestCase):
121120
def setUp(self):
122121
super().setUp()
123-
self.MockWorker = mock.MagicMock()
124-
self.MockWorker.queue_name = "default"
125-
self.MockWorker.rate_limit_in_seconds = 5
126-
self.MockWorker.last_job_finished = None
122+
self.mock_worker = mock.MagicMock()
123+
self.mock_worker.queue_name = "default"
124+
self.mock_worker.rate_limit_in_seconds = 5
125+
self.mock_worker.last_job_finished = None
127126

128-
def test_process_job_no_previous_job_run(self, mock_process_job, mock_sleep):
129-
Worker.process_job(self.MockWorker)
127+
def test_process_job_no_previous_job_run(self, mock_sleep):
128+
Worker.process_job(self.mock_worker)
130129
self.assertEqual(mock_sleep.call_count, 1)
131-
self.assertEqual(mock_process_job.call_count, 1)
132-
self.assertEqual(self.MockWorker.last_job_finished, timezone.now())
130+
self.assertEqual(self.mock_worker._process_job.call_count, 1)
131+
self.assertEqual(self.mock_worker.last_job_finished, timezone.now())
133132

134-
def test_process_job_previous_job_too_soon(self, mock_process_job, mock_sleep):
135-
self.MockWorker.last_job_finished = timezone.now() - timezone.timedelta(
133+
def test_process_job_previous_job_too_soon(self, mock_sleep):
134+
self.mock_worker.last_job_finished = timezone.now() - timezone.timedelta(
136135
seconds=2
137136
)
138-
Worker.process_job(self.MockWorker)
137+
Worker.process_job(self.mock_worker)
139138
self.assertEqual(mock_sleep.call_count, 1)
140-
self.assertEqual(mock_process_job.call_count, 0)
139+
self.assertEqual(self.mock_worker._process_job.call_count, 0)
141140
self.assertEqual(
142-
self.MockWorker.last_job_finished,
141+
self.mock_worker.last_job_finished,
143142
timezone.now() - timezone.timedelta(seconds=2),
144143
)
145144

146-
def test_process_job_previous_job_long_time_ago(self, mock_process_job, mock_sleep):
147-
self.MockWorker.last_job_finished = timezone.now() - timezone.timedelta(
145+
def test_process_job_previous_job_long_time_ago(self, mock_sleep):
146+
self.mock_worker.last_job_finished = timezone.now() - timezone.timedelta(
148147
seconds=7
149148
)
150-
Worker.process_job(self.MockWorker)
149+
Worker.process_job(self.mock_worker)
151150
self.assertEqual(mock_sleep.call_count, 1)
152-
self.assertEqual(mock_process_job.call_count, 1)
153-
self.assertEqual(self.MockWorker.last_job_finished, timezone.now())
151+
self.assertEqual(self.mock_worker._process_job.call_count, 1)
152+
self.assertEqual(self.mock_worker.last_job_finished, timezone.now())
154153

155154

156155
@override_settings(JOBS={"testjob": {"tasks": ["a"]}})
@@ -246,7 +245,7 @@ def test_task_sequence(self):
246245
class ProcessJobTestCase(TestCase):
247246
def test_process_job(self):
248247
job = Job.objects.create(name="testjob")
249-
process_job("default")
248+
Worker("default", 1)._process_job()
250249
job = Job.objects.get()
251250
self.assertEqual(job.state, Job.STATES.COMPLETE)
252251

@@ -255,7 +254,7 @@ def test_process_job_wrong_queue(self):
255254
Processing a different queue shouldn't touch our other job
256255
"""
257256
job = Job.objects.create(name="testjob", queue_name="lol")
258-
process_job("default")
257+
Worker("default", 1)._process_job()
259258
job = Job.objects.get()
260259
self.assertEqual(job.state, Job.STATES.NEW)
261260

@@ -294,7 +293,7 @@ def test_creation_hook_only_runs_on_create(self):
294293
class JobFailureHookTestCase(TestCase):
295294
def test_failure_hook(self):
296295
job = Job.objects.create(name="testjob")
297-
process_job("default")
296+
Worker("default", 1)._process_job()
298297
job = Job.objects.get()
299298
self.assertEqual(job.state, Job.STATES.FAILED)
300299
self.assertEqual(job.workspace["output"], "failure hook ran")

0 commit comments

Comments
 (0)