Skip to content

Commit f927908

Browse files
authored
Merge pull request #79 from thread/queue-clear-cmd
Add `queue_clear` command
2 parents 23afd7d + 018dabf commit f927908

File tree

5 files changed

+89
-12
lines changed

5 files changed

+89
-12
lines changed

django_lightweight_queue/backends/base.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ def deduplicate(
5454
raise NotImplementedError()
5555

5656

57+
class BackendWithClear(BaseBackend, metaclass=ABCMeta):
58+
@abstractmethod
59+
def clear(self, queue: QueueName) -> None:
60+
raise NotImplementedError()
61+
62+
5763
class BackendWithPause(BaseBackend, metaclass=ABCMeta):
5864
@abstractmethod
5965
def pause(self, queue: QueueName, until: datetime.datetime) -> None:

django_lightweight_queue/backends/redis.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55

66
from .. import app_settings
77
from ..job import Job
8-
from .base import BackendWithPauseResume
8+
from .base import BackendWithClear, BackendWithPauseResume
99
from ..types import QueueName, WorkerNumber
1010
from ..utils import block_for_time
1111

1212

13-
class RedisBackend(BackendWithPauseResume):
13+
class RedisBackend(BackendWithPauseResume, BackendWithClear):
1414
"""
1515
This backend has at-most-once semantics.
1616
"""
@@ -78,6 +78,9 @@ def resume(self, queue: QueueName) -> None:
7878
def is_paused(self, queue: QueueName) -> bool:
7979
return bool(self.client.exists(self._pause_key(queue)))
8080

81+
def clear(self, queue: QueueName) -> None:
82+
self.client.delete(self._key(queue))
83+
8184
def _key(self, queue: QueueName) -> str:
8285
if app_settings.REDIS_PREFIX:
8386
return '{}:django_lightweight_queue:{}'.format(

django_lightweight_queue/backends/reliable_redis.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@
55

66
from .. import app_settings
77
from ..job import Job
8-
from .base import BackendWithDeduplicate, BackendWithPauseResume
8+
from .base import (
9+
BackendWithClear,
10+
BackendWithDeduplicate,
11+
BackendWithPauseResume,
12+
)
913
from ..types import QueueName, WorkerNumber
1014
from ..utils import block_for_time, get_worker_numbers
1115
from ..progress_logger import ProgressLogger, NULL_PROGRESS_LOGGER
@@ -15,7 +19,7 @@
1519
T = TypeVar('T')
1620

1721

18-
class ReliableRedisBackend(BackendWithDeduplicate, BackendWithPauseResume):
22+
class ReliableRedisBackend(BackendWithClear, BackendWithDeduplicate, BackendWithPauseResume):
1923
"""
2024
This backend manages a per-queue-per-worker 'processing' queue. E.g. if we
2125
had a queue called 'django_lightweight_queue:things', and two workers, we
@@ -228,6 +232,9 @@ def resume(self, queue: QueueName) -> None:
228232
def is_paused(self, queue: QueueName) -> bool:
229233
return bool(self.client.exists(self._pause_key(queue)))
230234

235+
def clear(self, queue: QueueName) -> None:
236+
self.client.delete(self._key(queue))
237+
231238
def _key(self, queue: QueueName) -> str:
232239
key = 'django_lightweight_queue:{}'.format(queue)
233240

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import argparse
2+
3+
from django.core.management.base import BaseCommand, CommandError
4+
5+
from ...types import QueueName
6+
from ...utils import get_backend
7+
from ...backends.base import BackendWithClear
8+
9+
10+
class Command(BaseCommand):
11+
help = """
12+
Command to clear work on a redis-backed queue.
13+
14+
All pending jobs will be deleted from the queue. In flight jobs won't be
15+
affected.
16+
""" # noqa:A003 # inherited name
17+
18+
def add_arguments(self, parser: argparse.ArgumentParser) -> None:
19+
parser.add_argument(
20+
'queue',
21+
action='store',
22+
help="The queue to pause.",
23+
)
24+
25+
parser.add_argument(
26+
'--yes',
27+
'skip_prompt',
28+
action='store_true',
29+
help="Skip confirmation prompt.",
30+
)
31+
32+
def handle(self, queue: QueueName, skip_prompt: bool = False, **options: object) -> None:
33+
34+
backend = get_backend(queue)
35+
36+
if not isinstance(backend, BackendWithClear):
37+
raise CommandError(
38+
"Configured backend '{}.{}' doesn't support clearing".format(
39+
type(backend).__module__,
40+
type(backend).__name__,
41+
),
42+
)
43+
44+
if not skip_prompt:
45+
prompt = "Clear all jobs from queue {}) [y/N] ".format(queue)
46+
choice = input(prompt).lower()
47+
48+
if choice != "y":
49+
raise CommandError("Aborting")
50+
51+
backend.clear(queue)

tests/test_reliable_redis_backend.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from .mixins import RedisCleanupMixin
1919

2020

21-
class ReliableRedisDeduplicationTests(RedisCleanupMixin, unittest.TestCase):
21+
class ReliableRedisTests(RedisCleanupMixin, unittest.TestCase):
2222
longMessage = True
2323
prefix = settings.LIGHTWEIGHT_QUEUE_REDIS_PREFIX
2424

@@ -60,19 +60,19 @@ def setUp(self) -> None:
6060
self.backend = ReliableRedisBackend()
6161
self.client = self.backend.client
6262

63-
super(ReliableRedisDeduplicationTests, self).setUp()
63+
super(ReliableRedisTests, self).setUp()
6464

6565
self.start_time = datetime.datetime.utcnow()
6666

67-
def test_empty_queue(self):
67+
def test_deduplicate_empty_queue(self):
6868
result = self.backend.deduplicate('empty-queue')
6969
self.assertEqual(
7070
(0, 0),
7171
result,
7272
"Should do nothing when queue empty",
7373
)
7474

75-
def test_single_entry_in_queue(self):
75+
def test_deduplicate_single_entry_in_queue(self):
7676
QUEUE = 'single-job-queue'
7777

7878
self.enqueue_job(QUEUE)
@@ -96,7 +96,7 @@ def test_single_entry_in_queue(self):
9696
"Should still be a single entry in the queue",
9797
)
9898

99-
def test_unique_entries_in_queue(self):
99+
def test_deduplicate_unique_entries_in_queue(self):
100100
QUEUE = 'unique-jobs-queue'
101101

102102
self.enqueue_job(QUEUE, args=('args1',))
@@ -121,7 +121,7 @@ def test_unique_entries_in_queue(self):
121121
"Should still be a single entry in the queue",
122122
)
123123

124-
def test_duplicate_entries_in_queue(self):
124+
def test_deduplicate_duplicate_entries_in_queue(self):
125125
QUEUE = 'duplicate-jobs-queue'
126126

127127
self.enqueue_job(QUEUE)
@@ -146,7 +146,7 @@ def test_duplicate_entries_in_queue(self):
146146
"Should still be a single entry in the queue",
147147
)
148148

149-
def test_preserves_order_with_fixed_timestamps(self):
149+
def test_deduplicate_preserves_order_with_fixed_timestamps(self):
150150
QUEUE = 'job-queue'
151151
WORKER_NUMBER = 0
152152

@@ -201,7 +201,7 @@ def test_preserves_order_with_fixed_timestamps(self):
201201
"Third job dequeued should be the third job enqueued",
202202
)
203203

204-
def test_preserves_order_with_unique_timestamps(self):
204+
def test_deduplicate_preserves_order_with_unique_timestamps(self):
205205
QUEUE = 'job-queue'
206206
WORKER_NUMBER = 0
207207

@@ -348,3 +348,13 @@ def test_pause(self):
348348

349349
self.assertIsNone(job, "Should have indicated no work was done")
350350
self.assertTrue(mock_sleep.called)
351+
352+
def test_clear(self):
353+
QUEUE = 'the-queue'
354+
355+
self.enqueue_job(QUEUE)
356+
357+
self.backend.clear(QUEUE)
358+
359+
dequeued = self.backend.dequeue(QUEUE, worker_number=3, timeout=1)
360+
self.assertIsNone(dequeued)

0 commit comments

Comments
 (0)