Skip to content

Commit ba5c022

Browse files
committed
Add clear() method to supporting backends
1 parent 3e0aade commit ba5c022

File tree

4 files changed

+30
-4
lines changed

4 files changed

+30
-4
lines changed

django_lightweight_queue/backends/base.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ def deduplicate(
5454
raise NotImplementedError()
5555

5656

57+
class BackendWithClear(BaseBackend, metaclass=ABCMeta):
58+
@abstractmethod
59+
def clear(self, queue: QueueName) -> None:
60+
raise NotImplementedError()
61+
62+
5763
class BackendWithPause(BaseBackend, metaclass=ABCMeta):
5864
@abstractmethod
5965
def pause(self, queue: QueueName, until: datetime.datetime) -> None:

django_lightweight_queue/backends/redis.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55

66
from .. import app_settings
77
from ..job import Job
8-
from .base import BackendWithPauseResume
8+
from .base import BackendWithClear, BackendWithPauseResume
99
from ..types import QueueName, WorkerNumber
1010
from ..utils import block_for_time
1111

1212

13-
class RedisBackend(BackendWithPauseResume):
13+
class RedisBackend(BackendWithPauseResume, BackendWithClear):
1414
"""
1515
This backend has at-most-once semantics.
1616
"""
@@ -78,6 +78,9 @@ def resume(self, queue: QueueName) -> None:
7878
def is_paused(self, queue: QueueName) -> bool:
7979
return bool(self.client.exists(self._pause_key(queue)))
8080

81+
def clear(self, queue: QueueName) -> None:
82+
self.client.delete(self._key(queue))
83+
8184
def _key(self, queue: QueueName) -> str:
8285
if app_settings.REDIS_PREFIX:
8386
return '{}:django_lightweight_queue:{}'.format(

django_lightweight_queue/backends/reliable_redis.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@
55

66
from .. import app_settings
77
from ..job import Job
8-
from .base import BackendWithDeduplicate, BackendWithPauseResume
8+
from .base import (
9+
BackendWithClear,
10+
BackendWithDeduplicate,
11+
BackendWithPauseResume,
12+
)
913
from ..types import QueueName, WorkerNumber
1014
from ..utils import block_for_time, get_worker_numbers
1115
from ..progress_logger import ProgressLogger, NULL_PROGRESS_LOGGER
@@ -15,7 +19,7 @@
1519
T = TypeVar('T')
1620

1721

18-
class ReliableRedisBackend(BackendWithDeduplicate, BackendWithPauseResume):
22+
class ReliableRedisBackend(BackendWithClear, BackendWithDeduplicate, BackendWithPauseResume):
1923
"""
2024
This backend manages a per-queue-per-worker 'processing' queue. E.g. if we
2125
had a queue called 'django_lightweight_queue:things', and two workers, we
@@ -228,6 +232,9 @@ def resume(self, queue: QueueName) -> None:
228232
def is_paused(self, queue: QueueName) -> bool:
229233
return bool(self.client.exists(self._pause_key(queue)))
230234

235+
def clear(self, queue: QueueName) -> None:
236+
self.client.delete(self._key(queue))
237+
231238
def _key(self, queue: QueueName) -> str:
232239
key = 'django_lightweight_queue:{}'.format(queue)
233240

tests/test_reliable_redis_backend.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,3 +348,13 @@ def test_pause(self):
348348

349349
self.assertIsNone(job, "Should have indicated no work was done")
350350
self.assertTrue(mock_sleep.called)
351+
352+
def test_clear(self):
353+
QUEUE = 'the-queue'
354+
355+
self.enqueue_job(QUEUE)
356+
357+
self.backend.clear(QUEUE)
358+
359+
dequeued = self.backend.dequeue(QUEUE, worker_number=3, timeout=1)
360+
self.assertIsNone(dequeued)

0 commit comments

Comments
 (0)