1
+ from typing import Dict , List , Tuple , TypeVar , Optional
2
+
1
3
import redis
2
4
3
5
from .. import app_settings
4
6
from ..job import Job
5
- from .base import BaseBackend
7
+ from .base import BackendWithDeduplicate
8
+ from ..types import QueueName , WorkerNumber
6
9
from ..utils import get_worker_numbers
7
- from ..progress_logger import NULL_PROGRESS_LOGGER
10
+ from ..progress_logger import ProgressLogger , NULL_PROGRESS_LOGGER
11
+
12
+ # Work around https://github.com/python/mypy/issues/9914. Name needs to match
13
+ # that in progress_logger.py.
14
+ T = TypeVar ('T' )
8
15
9
16
10
- class ReliableRedisBackend (BaseBackend ):
17
+ class ReliableRedisBackend (BackendWithDeduplicate ):
11
18
"""
12
19
This backend manages a per-queue-per-worker 'processing' queue. E.g. if we
13
20
had a queue called 'django_lightweight_queue:things', and two workers, we
@@ -29,13 +36,13 @@ class ReliableRedisBackend(BaseBackend):
29
36
This backend has at-least-once semantics.
30
37
"""
31
38
32
- def __init__ (self ):
39
+ def __init__ (self ) -> None :
33
40
self .client = redis .StrictRedis (
34
41
host = app_settings .REDIS_HOST ,
35
42
port = app_settings .REDIS_PORT ,
36
43
)
37
44
38
- def startup (self , queue ) :
45
+ def startup (self , queue : QueueName ) -> None :
39
46
main_queue_key = self ._key (queue )
40
47
41
48
pattern = self ._prefix_key (
@@ -53,7 +60,7 @@ def startup(self, queue):
53
60
)
54
61
processing_queue_keys = current_processing_queue_keys - expected_processing_queue_keys
55
62
56
- def move_processing_jobs_to_main (pipe ) :
63
+ def move_processing_jobs_to_main (pipe : redis . client . Pipeline ) -> None :
57
64
# Collect all the data we need to add, before adding the data back
58
65
# to the main queue of and clearing the processing queues
59
66
# atomically, so if this crashes, we don't lose jobs
@@ -79,10 +86,10 @@ def move_processing_jobs_to_main(pipe):
79
86
* processing_queue_keys ,
80
87
)
81
88
82
- def enqueue (self , job , queue ) :
89
+ def enqueue (self , job : Job , queue : QueueName ) -> None :
83
90
self .client .lpush (self ._key (queue ), job .to_json ().encode ('utf-8' ))
84
91
85
- def dequeue (self , queue , worker_number , timeout ) :
92
+ def dequeue (self , queue : QueueName , worker_number : WorkerNumber , timeout : int ) -> Optional [ Job ] :
86
93
main_queue_key = self ._key (queue )
87
94
processing_queue_key = self ._processing_key (queue , worker_number )
88
95
@@ -104,7 +111,9 @@ def dequeue(self, queue, worker_number, timeout):
104
111
if data :
105
112
return Job .from_json (data .decode ('utf-8' ))
106
113
107
- def processed_job (self , queue , worker_number , job ):
114
+ return None
115
+
116
+ def processed_job (self , queue : QueueName , worker_number : WorkerNumber , job : Job ) -> None :
108
117
data = job .to_json ().encode ('utf-8' )
109
118
110
119
self .client .lrem (
@@ -113,10 +122,15 @@ def processed_job(self, queue, worker_number, job):
113
122
value = data ,
114
123
)
115
124
116
- def length (self , queue ) :
125
+ def length (self , queue : QueueName ) -> int :
117
126
return self .client .llen (self ._key (queue ))
118
127
119
- def deduplicate (self , queue , * , progress_logger = NULL_PROGRESS_LOGGER ):
128
+ def deduplicate (
129
+ self ,
130
+ queue : QueueName ,
131
+ * ,
132
+ progress_logger : ProgressLogger = NULL_PROGRESS_LOGGER
133
+ ) -> Tuple [int , int ]:
120
134
"""
121
135
Deduplicate the given queue by comparing the jobs in a manner which
122
136
ignores their created timestamps.
@@ -137,7 +151,7 @@ def deduplicate(self, queue, *, progress_logger=NULL_PROGRESS_LOGGER):
137
151
138
152
# A mapping of job_identity -> list of raw_job data; the entries in the
139
153
# latter list are ordered from newest to oldest
140
- jobs = {}
154
+ jobs = {} # type: Dict[str, List[bytes]]
141
155
142
156
progress_logger .info ("Collecting jobs" )
143
157
@@ -163,20 +177,20 @@ def deduplicate(self, queue, *, progress_logger=NULL_PROGRESS_LOGGER):
163
177
164
178
return original_size , self .client .llen (main_queue_key )
165
179
166
- def _key (self , queue ) :
180
+ def _key (self , queue : QueueName ) -> str :
167
181
key = 'django_lightweight_queue:{}' .format (queue )
168
182
169
183
return self ._prefix_key (key )
170
184
171
- def _processing_key (self , queue , worker_number ) :
185
+ def _processing_key (self , queue : QueueName , worker_number : WorkerNumber ) -> str :
172
186
key = 'django_lightweight_queue:{}:processing:{}' .format (
173
187
queue ,
174
188
worker_number ,
175
189
)
176
190
177
191
return self ._prefix_key (key )
178
192
179
- def _prefix_key (self , key ) :
193
+ def _prefix_key (self , key : str ) -> str :
180
194
if app_settings .REDIS_PREFIX :
181
195
return '{}:{}' .format (
182
196
app_settings .REDIS_PREFIX ,
0 commit comments