9
9
10
10
from redis import asyncio as aioredis
11
11
12
- from channels .exceptions import ChannelFull
13
- from channels .layers import BaseChannelLayer
12
+ from channels .exceptions import ChannelFull # type: ignore[import-untyped]
13
+ from channels .layers import BaseChannelLayer # type: ignore[import-untyped]
14
14
15
15
from .serializers import registry
16
16
from .utils import (
@@ -38,12 +38,10 @@ class ChannelLock:
38
38
"""
39
39
40
40
def __init__ (self ) -> None :
41
- self .locks : collections .defaultdict [str , asyncio .Lock ] = (
42
- collections .defaultdict (asyncio .Lock )
43
- )
44
- self .wait_counts : collections .defaultdict [str , int ] = collections .defaultdict (
45
- int
41
+ self .locks : typing .DefaultDict [str , asyncio .Lock ] = collections .defaultdict (
42
+ asyncio .Lock
46
43
)
44
+ self .wait_counts : typing .DefaultDict [str , int ] = collections .defaultdict (int )
47
45
48
46
async def acquire (self , channel : str ) -> bool :
49
47
"""
@@ -69,7 +67,7 @@ def release(self, channel: str) -> None:
69
67
del self .wait_counts [channel ]
70
68
71
69
72
- class BoundedQueue (asyncio .Queue [ typing . Any ] ):
70
+ class BoundedQueue (asyncio .Queue ):
73
71
def put_nowait (self , item : typing .Any ) -> None :
74
72
if self .full ():
75
73
# see: https://github.com/django/channels_redis/issues/212
@@ -157,11 +155,11 @@ def __init__(
157
155
# Event loop they are trying to receive on
158
156
self .receive_event_loop : typing .Optional [asyncio .AbstractEventLoop ] = None
159
157
# Buffered messages by process-local channel name
160
- self .receive_buffer : collections . defaultdict [str , BoundedQueue ] = (
158
+ self .receive_buffer : typing . DefaultDict [str , BoundedQueue ] = (
161
159
collections .defaultdict (functools .partial (BoundedQueue , self .capacity ))
162
160
)
163
161
# Detached channel cleanup tasks
164
- self .receive_cleaners : typing .List [asyncio .Task [typing .Any ]] = []
162
+ self .receive_cleaners : typing .List [" asyncio.Task[typing.Any]" ] = []
165
163
# Per-channel cleanup locks to prevent a receive starting and moving
166
164
# a message back into the main queue before its cleanup has completed
167
165
self .receive_clean_locks = ChannelLock ()
@@ -240,7 +238,7 @@ async def _brpop_with_clean(
240
238
connection = self .connection (index )
241
239
# Cancellation here doesn't matter, we're not doing anything destructive
242
240
# and the script executes atomically...
243
- await connection .eval (cleanup_script , 0 , channel , backup_queue )
241
+ await connection .eval (cleanup_script , 0 , channel , backup_queue ) # type: ignore[misc]
244
242
# ...and it doesn't matter here either, the message will be safe in the backup.
245
243
result = await connection .bzpopmin (channel , timeout = timeout )
246
244
@@ -271,9 +269,9 @@ async def receive(self, channel: str) -> typing.Any:
271
269
assert self .valid_channel_name (channel )
272
270
if "!" in channel :
273
271
real_channel = self .non_local_name (channel )
274
- assert real_channel .endswith (self . client_prefix + "!" ), (
275
- "Wrong client prefix "
276
- )
272
+ assert real_channel .endswith (
273
+ self . client_prefix + "! "
274
+ ), "Wrong client prefix"
277
275
# Enter receiving section
278
276
loop = asyncio .get_running_loop ()
279
277
self .receive_count += 1
@@ -293,7 +291,7 @@ async def receive(self, channel: str) -> typing.Any:
293
291
message = None
294
292
while self .receive_buffer [channel ].empty ():
295
293
_tasks = [
296
- self .receive_lock .acquire (),
294
+ self .receive_lock .acquire (), # type: ignore[union-attr]
297
295
self .receive_buffer [channel ].get (),
298
296
]
299
297
tasks = [asyncio .ensure_future (task ) for task in _tasks ]
@@ -312,7 +310,7 @@ async def receive(self, channel: str) -> typing.Any:
312
310
if not task .cancel ():
313
311
assert task .done ()
314
312
if task .result () is True :
315
- self .receive_lock .release ()
313
+ self .receive_lock .release () # type: ignore[union-attr]
316
314
317
315
raise
318
316
@@ -335,7 +333,7 @@ async def receive(self, channel: str) -> typing.Any:
335
333
if message or exception :
336
334
if token :
337
335
# We will not be receving as we already have the message.
338
- self .receive_lock .release ()
336
+ self .receive_lock .release () # type: ignore[union-attr]
339
337
340
338
if exception :
341
339
raise exception
@@ -362,7 +360,7 @@ async def receive(self, channel: str) -> typing.Any:
362
360
del self .receive_buffer [channel ]
363
361
raise
364
362
finally :
365
- self .receive_lock .release ()
363
+ self .receive_lock .release () # type: ignore[union-attr]
366
364
367
365
# We know there's a message available, because there
368
366
# couldn't have been any interruption between empty() and here
@@ -377,7 +375,7 @@ async def receive(self, channel: str) -> typing.Any:
377
375
self .receive_count -= 1
378
376
# If we were the last out, drop the receive lock
379
377
if self .receive_count == 0 :
380
- assert not self .receive_lock .locked ()
378
+ assert not self .receive_lock .locked () # type: ignore[union-attr]
381
379
self .receive_lock = None
382
380
self .receive_event_loop = None
383
381
else :
@@ -422,7 +420,7 @@ async def receive_single(
422
420
)
423
421
self .receive_cleaners .append (cleaner )
424
422
425
- def _cleanup_done (cleaner : asyncio .Task [ typing . Any ] ) -> None :
423
+ def _cleanup_done (cleaner : " asyncio.Task" ) -> None :
426
424
self .receive_cleaners .remove (cleaner )
427
425
self .receive_clean_locks .release (channel_key )
428
426
@@ -468,7 +466,7 @@ async def flush(self) -> None:
468
466
# Go through each connection and remove all with prefix
469
467
for i in range (self .ring_size ):
470
468
connection = self .connection (i )
471
- await connection .eval (delete_prefix , 0 , self .prefix + "*" )
469
+ await connection .eval (delete_prefix , 0 , self .prefix + "*" ) # type: ignore[union-attr,misc]
472
470
# Now clear the pools as well
473
471
await self .close_pools ()
474
472
@@ -584,7 +582,7 @@ async def group_send(self, group: str, message: typing.Any) -> None:
584
582
585
583
# channel_keys does not contain a single redis key more than once
586
584
connection = self .connection (connection_index )
587
- channels_over_capacity = await connection .eval (
585
+ channels_over_capacity = await connection .eval ( # type: ignore[misc]
588
586
group_send_lua , len (channel_redis_keys ), * channel_redis_keys , * args
589
587
)
590
588
_channels_over_capacity = - 1.0
0 commit comments