Skip to content

Commit f3fdd39

Browse files
committed
Merge branch 'pause-queue'
2 parents d22ee9b + 34428e2 commit f3fdd39

File tree

11 files changed

+433
-6
lines changed

11 files changed

+433
-6
lines changed

django_lightweight_queue/backends/base.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import datetime
12
from abc import ABCMeta, abstractmethod
23
from typing import Tuple, TypeVar, Optional, Collection
34

@@ -51,3 +52,19 @@ def deduplicate(
5152
progress_logger: ProgressLogger = NULL_PROGRESS_LOGGER
5253
) -> Tuple[int, int]:
5354
raise NotImplementedError()
55+
56+
57+
class BackendWithPause(BaseBackend, metaclass=ABCMeta):
58+
@abstractmethod
59+
def pause(self, queue: QueueName, until: datetime.datetime) -> None:
60+
raise NotImplementedError()
61+
62+
@abstractmethod
63+
def is_paused(self, queue: QueueName) -> bool:
64+
raise NotImplementedError()
65+
66+
67+
class BackendWithPauseResume(BackendWithPause, metaclass=ABCMeta):
68+
@abstractmethod
69+
def resume(self, queue: QueueName) -> None:
70+
raise NotImplementedError()

django_lightweight_queue/backends/redis.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
1+
import datetime
12
from typing import Optional, Collection
23

34
import redis
45

56
from .. import app_settings
67
from ..job import Job
7-
from .base import BaseBackend
8+
from .base import BackendWithPauseResume
89
from ..types import QueueName, WorkerNumber
10+
from ..utils import block_for_time
911

1012

11-
class RedisBackend(BaseBackend):
13+
class RedisBackend(BackendWithPauseResume):
1214
"""
1315
This backend has at-most-once semantics.
1416
"""
@@ -30,6 +32,15 @@ def bulk_enqueue(self, jobs: Collection[Job], queue: QueueName) -> None:
3032
)
3133

3234
def dequeue(self, queue: QueueName, worker_num: WorkerNumber, timeout: int) -> Optional[Job]:
35+
if self.is_paused(queue):
36+
# Block for a while to avoid constant polling ...
37+
block_for_time(
38+
lambda: self.is_paused(queue),
39+
timeout=datetime.timedelta(seconds=timeout),
40+
)
41+
# ... but always indicate that we did no work
42+
return None
43+
3344
raw = self.client.brpop(self._key(queue), timeout)
3445
if raw is None:
3546
return None
@@ -40,6 +51,33 @@ def dequeue(self, queue: QueueName, worker_num: WorkerNumber, timeout: int) -> O
4051
def length(self, queue: QueueName) -> int:
4152
return self.client.llen(self._key(queue))
4253

54+
def pause(self, queue: QueueName, until: datetime.datetime) -> None:
55+
"""
56+
Pause the given queue by setting a pause marker.
57+
"""
58+
59+
pause_key = self._pause_key(queue)
60+
61+
now = datetime.datetime.now(datetime.timezone.utc)
62+
delta = until - now
63+
64+
self.client.setex(
65+
pause_key,
66+
time=int(delta.total_seconds()),
67+
# Store the value for debugging, we rely on setex behaviour for
68+
# implementation.
69+
value=until.isoformat(' '),
70+
)
71+
72+
def resume(self, queue: QueueName) -> None:
73+
"""
74+
Resume the given queue by deleting the pause marker (if present).
75+
"""
76+
self.client.delete(self._pause_key(queue))
77+
78+
def is_paused(self, queue: QueueName) -> bool:
79+
return bool(self.client.exists(self._pause_key(queue)))
80+
4381
def _key(self, queue: QueueName) -> str:
4482
if app_settings.REDIS_PREFIX:
4583
return '{}:django_lightweight_queue:{}'.format(
@@ -48,3 +86,6 @@ def _key(self, queue: QueueName) -> str:
4886
)
4987

5088
return 'django_lightweight_queue:{}'.format(queue)
89+
90+
def _pause_key(self, queue: QueueName) -> str:
91+
return self._key(queue) + ':pause'

django_lightweight_queue/backends/reliable_redis.py

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,21 @@
1+
import datetime
12
from typing import Dict, List, Tuple, TypeVar, Optional, Collection
23

34
import redis
45

56
from .. import app_settings
67
from ..job import Job
7-
from .base import BackendWithDeduplicate
8+
from .base import BackendWithDeduplicate, BackendWithPauseResume
89
from ..types import QueueName, WorkerNumber
9-
from ..utils import get_worker_numbers
10+
from ..utils import block_for_time, get_worker_numbers
1011
from ..progress_logger import ProgressLogger, NULL_PROGRESS_LOGGER
1112

1213
# Work around https://github.com/python/mypy/issues/9914. Name needs to match
1314
# that in progress_logger.py.
1415
T = TypeVar('T')
1516

1617

17-
class ReliableRedisBackend(BackendWithDeduplicate):
18+
class ReliableRedisBackend(BackendWithDeduplicate, BackendWithPauseResume):
1819
"""
1920
This backend manages a per-queue-per-worker 'processing' queue. E.g. if we
2021
had a queue called 'django_lightweight_queue:things', and two workers, we
@@ -107,6 +108,15 @@ def dequeue(self, queue: QueueName, worker_number: WorkerNumber, timeout: int) -
107108
main_queue_key = self._key(queue)
108109
processing_queue_key = self._processing_key(queue, worker_number)
109110

111+
if self.is_paused(queue):
112+
# Block for a while to avoid constant polling ...
113+
block_for_time(
114+
lambda: self.is_paused(queue),
115+
timeout=datetime.timedelta(seconds=timeout),
116+
)
117+
# ... but always indicate that we did no work
118+
return None
119+
110120
# Get any jobs off our 'processing' queue - but do not block doing so -
111121
# this is to catch the fact there may be a job already in our
112122
# processing queue if this worker crashed and has just been restarted.
@@ -191,11 +201,41 @@ def deduplicate(
191201

192202
return original_size, self.client.llen(main_queue_key)
193203

204+
def pause(self, queue: QueueName, until: datetime.datetime) -> None:
205+
"""
206+
Pause the given queue by setting a pause marker.
207+
"""
208+
209+
pause_key = self._pause_key(queue)
210+
211+
now = datetime.datetime.now(datetime.timezone.utc)
212+
delta = until - now
213+
214+
self.client.setex(
215+
pause_key,
216+
time=int(delta.total_seconds()),
217+
# Store the value for debugging, we rely on setex behaviour for
218+
# implementation.
219+
value=until.isoformat(' '),
220+
)
221+
222+
def resume(self, queue: QueueName) -> None:
223+
"""
224+
Resume the given queue by deleting the pause marker (if present).
225+
"""
226+
self.client.delete(self._pause_key(queue))
227+
228+
def is_paused(self, queue: QueueName) -> bool:
229+
return bool(self.client.exists(self._pause_key(queue)))
230+
194231
def _key(self, queue: QueueName) -> str:
195232
key = 'django_lightweight_queue:{}'.format(queue)
196233

197234
return self._prefix_key(key)
198235

236+
def _pause_key(self, queue: QueueName) -> str:
237+
return self._key(queue) + ':pause'
238+
199239
def _processing_key(self, queue: QueueName, worker_number: WorkerNumber) -> str:
200240
key = 'django_lightweight_queue:{}:processing:{}'.format(
201241
queue,
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import re
2+
import argparse
3+
import datetime
4+
5+
from django.core.management.base import BaseCommand, CommandError
6+
7+
from ...types import QueueName
8+
from ...utils import get_backend
9+
from ...backends.base import BackendWithPause
10+
11+
DURATION_PATTERN = r'^((?P<hours>\d+)h)?((?P<minutes>\d+)m)?((?P<seconds>\d+)s)?$'
12+
TIME_FORMAT = r'%Y-%m-%dT%H:%M:%S%z'
13+
14+
15+
def utcnow() -> datetime.datetime:
16+
return datetime.datetime.now(datetime.timezone.utc)
17+
18+
19+
def parse_duration_to_time(duration: str) -> datetime.datetime:
20+
match = re.match(DURATION_PATTERN, duration)
21+
if match is None:
22+
raise ValueError(
23+
f"Unknown duration format {duration!r}. Try something like '1h2m3s'.",
24+
)
25+
26+
delta = datetime.timedelta(
27+
hours=int(match['hours'] or 0),
28+
minutes=int(match['minutes'] or 0),
29+
seconds=int(match['seconds'] or 0),
30+
)
31+
32+
return utcnow() + delta
33+
34+
35+
def parse_time(date_string: str) -> datetime.datetime:
36+
return datetime.datetime.strptime(date_string, TIME_FORMAT)
37+
38+
39+
class Command(BaseCommand):
40+
help = """
41+
Command to pause work on a redis-backed queue.
42+
43+
New jobs can still be added to the queue, however no jobs will be pulled off
44+
the queue for processing.
45+
""" # noqa:A003 # inherited name
46+
47+
def add_arguments(self, parser: argparse.ArgumentParser) -> None:
48+
parser.add_argument(
49+
'queue',
50+
action='store',
51+
help="The queue to pause.",
52+
)
53+
54+
group = parser.add_mutually_exclusive_group(required=True)
55+
group.add_argument(
56+
'--for',
57+
dest='until',
58+
action='store',
59+
type=parse_duration_to_time,
60+
help=(
61+
"The duration to pause the queue for. Specify a value like 1h2m3s, "
62+
"all levels of precision are optional, so 5m and 1h are both valid."
63+
),
64+
)
65+
group.add_argument(
66+
'--until',
67+
action='store',
68+
type=parse_time,
69+
help=(
70+
"The time at which the queue should reactivate. Specify as an "
71+
"ISO 8601 value, specifically one parsable via datetime.strptime "
72+
f"using {TIME_FORMAT.replace('%', r'%%')!r}, such as {utcnow():{TIME_FORMAT}}."
73+
),
74+
)
75+
76+
def handle(self, queue: QueueName, until: datetime.datetime, **options: object) -> None:
77+
if until < utcnow():
78+
raise CommandError("Refusing to pause until a time in the past.")
79+
80+
backend = get_backend(queue)
81+
82+
if not isinstance(backend, BackendWithPause):
83+
raise CommandError(
84+
"Configured backend '{}.{}' doesn't support pausing".format(
85+
type(backend).__module__,
86+
type(backend).__name__,
87+
),
88+
)
89+
90+
backend.pause(queue, until)
91+
92+
self.stdout.write(f"Paused queue {queue} until {until.isoformat(' ')}.")
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import argparse
2+
3+
from django.core.management.base import BaseCommand, CommandError
4+
5+
from ...types import QueueName
6+
from ...utils import get_backend
7+
from ...backends.base import BackendWithPauseResume
8+
9+
10+
class Command(BaseCommand):
11+
help = """
12+
Command to resume work immediately on a redis-backed queue.
13+
14+
This removes a pause which may be in place for the given queue, though it
15+
may not cause workers to resume work immediately.
16+
""" # noqa:A003 # inherited name
17+
18+
def add_arguments(self, parser: argparse.ArgumentParser) -> None:
19+
parser.add_argument(
20+
'queue',
21+
action='store',
22+
help="The queue to resume.",
23+
)
24+
25+
def handle(self, queue: QueueName, **options: object) -> None:
26+
backend = get_backend(queue)
27+
28+
if not isinstance(backend, BackendWithPauseResume):
29+
raise CommandError(
30+
"Configured backend '{}.{}' doesn't support resuming from paused".format(
31+
type(backend).__module__,
32+
type(backend).__name__,
33+
),
34+
)
35+
36+
backend.resume(queue)

django_lightweight_queue/utils.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import imp
2+
import time
3+
import datetime
24
import warnings
35
import importlib
46
from typing import (
@@ -27,6 +29,8 @@
2729

2830
_accepting_implied_queues = True
2931

32+
FIVE_SECONDS = datetime.timedelta(seconds=5)
33+
3034

3135
def load_extra_config(file_path: str) -> None:
3236
extra_settings = imp.load_source('extra_settings', file_path)
@@ -139,6 +143,35 @@ def load_all_tasks() -> None:
139143
import_all_submodules('tasks', app_settings.IGNORE_APPS)
140144

141145

146+
def block_for_time(
147+
should_continue_blocking: Callable[[], bool],
148+
timeout: datetime.timedelta,
149+
check_frequency: datetime.timedelta = FIVE_SECONDS,
150+
) -> bool:
151+
"""
152+
Block until a cancellation function or timeout indicates otherwise.
153+
154+
Returns whether or not the timeout was encountered.
155+
"""
156+
if not should_continue_blocking():
157+
return False
158+
159+
end = time.time() + timeout.total_seconds()
160+
161+
while should_continue_blocking():
162+
now = time.time()
163+
if now > end:
164+
# timed out
165+
return True
166+
167+
time.sleep(min(
168+
check_frequency.total_seconds(),
169+
end - now,
170+
))
171+
172+
return False
173+
174+
142175
try:
143176
import setproctitle
144177

0 commit comments

Comments
 (0)