Skip to content

Commit c772dbc

Browse files
committed
Base backend
1 parent fbd7eb0 commit c772dbc

File tree

8 files changed

+48
-27
lines changed

8 files changed

+48
-27
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from abc import ABCMeta, abstractmethod
2+
3+
from ..job import Job
4+
5+
6+
class BaseBackend(metaclass=ABCMeta):
7+
def startup(self, queue: str) -> None:
8+
pass
9+
10+
@abstractmethod
11+
def enqueue(self, job: Job, queue: str) -> None:
12+
raise NotImplementedError()
13+
14+
@abstractmethod
15+
def dequeue(self, queue: str, worker_num: int, timeout: float) -> Job:
16+
raise NotImplementedError()
17+
18+
@abstractmethod
19+
def length(self, queue: str) -> int:
20+
raise NotImplementedError()
21+
22+
def processed_job(self, queue: str, worker_num: int, job: Job) -> None:
23+
pass

django_lightweight_queue/backends/debug_web.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
from django.conf import settings
44
from django.shortcuts import reverse
55

6+
from .base import BaseBackend
67

7-
class DebugWebBackend(object):
8+
9+
class DebugWebBackend(BaseBackend):
810
"""
911
This backend aids debugging in concert with the 'debug-run' view.
1012
@@ -14,8 +16,6 @@ class DebugWebBackend(object):
1416
1517
See the docstring of that view for information (and limitations) about it.
1618
"""
17-
def startup(self, queue):
18-
pass
1919

2020
def enqueue(self, job, queue):
2121
path = reverse('django-lightweight-queue:debug-run')
@@ -28,6 +28,3 @@ def dequeue(self, queue, worker_num, timeout):
2828

2929
def length(self, queue):
3030
return 0
31-
32-
def processed_job(self, queue, worker_num, job):
33-
pass

django_lightweight_queue/backends/redis.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import redis
22

3-
from ..job import Job
43
from .. import app_settings
4+
from ..job import Job
5+
from .base import BaseBackend
6+
57

6-
class RedisBackend(object):
8+
class RedisBackend(BaseBackend):
79
"""
810
This backend has at-most-once semantics.
911
"""
@@ -14,9 +16,6 @@ def __init__(self):
1416
port=app_settings.REDIS_PORT,
1517
)
1618

17-
def startup(self, queue):
18-
pass
19-
2019
def enqueue(self, job, queue):
2120
self.client.lpush(self._key(queue), job.to_json().encode('utf-8'))
2221

@@ -31,9 +30,6 @@ def dequeue(self, queue, worker_num, timeout):
3130
def length(self, queue):
3231
return self.client.llen(self._key(queue))
3332

34-
def processed_job(self, queue, worker_num, job):
35-
pass
36-
3733
def _key(self, queue):
3834
if app_settings.REDIS_PREFIX:
3935
return '{}:django_lightweight_queue:{}'.format(

django_lightweight_queue/backends/reliable_redis.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import redis
22

3-
from ..job import Job
43
from .. import app_settings
4+
from ..job import Job
5+
from .base import BaseBackend
6+
57

6-
class ReliableRedisBackend(object):
8+
class ReliableRedisBackend(BaseBackend):
79
"""
810
This backend manages a per-queue-per-worker 'processing' queue. E.g. if we
911
had a queue called 'django_lightweight_queue:things', and two workers, we
Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
import time
22

3-
class SynchronousBackend(object):
3+
from .base import BaseBackend
4+
5+
6+
class SynchronousBackend(BaseBackend):
47
"""
58
This backend has at-most-once semantics.
69
"""
7-
def startup(self, queue):
8-
pass
910

1011
def enqueue(self, job, queue):
11-
job.run()
12+
job.run(queue=queue, worker_num=0)
1213

1314
def dequeue(self, queue, worker_num, timeout):
1415
# Cannot dequeue from the synchronous backend but we can emulate by
@@ -19,6 +20,3 @@ def length(self, queue):
1920
# The length is the number of items waiting to be processed, which can
2021
# be defined as always 0 for the synchronous backend
2122
return 0
22-
23-
def processed_job(self, queue, worker_num, job):
24-
pass

django_lightweight_queue/job.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,19 @@ def from_json(cls, val):
5353
def created_time_str(self):
5454
return self.created_time.strftime(TIME_FORMAT)
5555

56-
def run(self):
56+
def run(self, *, queue, worker_num):
57+
"""
58+
`queue` and `worker_num` arguments are required for context only and do
59+
not change the behaviour of job execution.
60+
"""
61+
5762
start = time.time()
5863

5964
middleware = get_middleware()
6065

6166
for instance in middleware:
6267
if hasattr(instance, 'process_job'):
63-
instance.process_job(self)
68+
instance.process_job(self, queue, worker_num)
6469

6570
try:
6671
task = self.get_task_instance()

django_lightweight_queue/views.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def debug_run(request):
5050
try:
5151
logging.root.addHandler(handler)
5252
with transaction.atomic():
53-
result = job.run()
53+
result = job.run(queue='debug', worker_num=0)
5454
transaction.set_rollback(rollback=True)
5555
finally:
5656
logging.root.removeHandler(handler)

django_lightweight_queue/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ def process(self, backend):
136136
self.log.debug("Running job %s", job)
137137
self.set_process_title("Running job %s" % job)
138138

139-
if job.run() and self.touch_filename:
139+
if job.run(queue=self.queue, worker_num=self.worker_num) and self.touch_filename:
140140
with open(self.touch_filename, 'a'):
141141
os.utime(self.touch_filename, None)
142142

0 commit comments

Comments
 (0)