diff --git a/channels_redis/core.py b/channels_redis/core.py index 1111fc2..3652b45 100644 --- a/channels_redis/core.py +++ b/channels_redis/core.py @@ -41,7 +41,7 @@ async def acquire(self, channel): def locked(self, channel): """ - Return ``True`` if the lock for the given channel is acquired. + Return `True` if the lock for the given channel is acquired. """ return self.locks[channel].locked() @@ -129,11 +129,11 @@ def __init__( self.client_prefix = uuid.uuid4().hex # Set up any encryption objects self._setup_encryption(symmetric_encryption_keys) - # Number of coroutines trying to receive right now + # Number of coroutines trying to `receive` right now self.receive_count = 0 # The receive lock self.receive_lock = None - # Event loop they are trying to receive on + # Event loop they are trying to `receive` on self.receive_event_loop = None # Buffered messages by process-local channel name self.receive_buffer = collections.defaultdict( @@ -141,7 +141,7 @@ def __init__( ) # Detached channel cleanup tasks self.receive_cleaners = [] - # Per-channel cleanup locks to prevent a receive starting and moving + # Per-channel cleanup locks to prevent a `receive` starting and moving # a message back into the main queue before its cleanup has completed self.receive_clean_locks = ChannelLock() @@ -166,7 +166,7 @@ def _setup_encryption(self, symmetric_encryption_keys): else: self.crypter = None - ### Channel layer API ### + # # # Channel layer API # # # extensions = ["groups", "flush"] @@ -274,7 +274,7 @@ async def receive(self, channel): self.receive_count += 1 try: if self.receive_count == 1: - # If we're the first coroutine in, create the receive lock! + # If we're the first coroutine in, create the `receive` lock! self.receive_lock = asyncio.Lock() self.receive_event_loop = loop else: @@ -329,7 +329,7 @@ async def receive(self, channel): if message or exception: if token: - # We will not be receving as we already have the message. + # We will not be receiving as we already have the message. self.receive_lock.release() if exception: @@ -339,7 +339,7 @@ async def receive(self, channel): else: assert token - # We hold the receive lock, receive and then release it. + # We hold the `receive` lock, receive and then release it. try: # There is no interruption point from when the message is # unpacked in receive_single to when we get back here, so @@ -370,13 +370,13 @@ async def receive(self, channel): finally: self.receive_count -= 1 - # If we were the last out, drop the receive lock + # If we were the last out, drop the `receive` lock if self.receive_count == 0: assert not self.receive_lock.locked() self.receive_lock = None self.receive_event_loop = None else: - # Do a plain direct receive + # Do a plain direct `receive` return (await self.receive_single(channel))[1] async def receive_single(self, channel): @@ -441,7 +441,7 @@ async def new_channel(self, prefix="specific"): """ return f"{prefix}.{self.client_prefix}!{uuid.uuid4().hex}" - ### Flush extension ### + # # # Flush extension # # # async def flush(self): """ @@ -482,7 +482,7 @@ async def wait_received(self): if self.receive_cleaners: await asyncio.wait(self.receive_cleaners[:]) - ### Groups extension ### + # # # Groups extension # # # async def group_add(self, group, channel): """ @@ -644,7 +644,7 @@ def _group_key(self, group): """ return f"{self.prefix}:group:{group}".encode("utf8") - ### Serialization ### + # # # Serialization # # # def serialize(self, message): """ @@ -654,7 +654,7 @@ def serialize(self, message): if self.crypter: value = self.crypter.encrypt(value) - # As we use an sorted set to expire messages we need to guarantee uniqueness, with 12 bytes. + # As we use a sorted set to expire messages we need to guarantee uniqueness, with 12 bytes. random_prefix = random.getrandbits(8 * 12).to_bytes(12, "big") return random_prefix + value @@ -669,7 +669,7 @@ def deserialize(self, message): message = self.crypter.decrypt(message, self.expiry + 10) return msgpack.unpackb(message, raw=False) - ### Internal functions ### + # # # Internal functions # # # def consistent_hash(self, value): return _consistent_hash(value, self.ring_size) @@ -688,7 +688,7 @@ def make_fernet(self, key): def __str__(self): return f"{self.__class__.__name__}(hosts={self.hosts})" - ### Connection handling ### + # # # Connection handling # # # def connection(self, index): """ diff --git a/channels_redis/pubsub.py b/channels_redis/pubsub.py index 78db68e..de8faae 100644 --- a/channels_redis/pubsub.py +++ b/channels_redis/pubsub.py @@ -158,10 +158,10 @@ async def receive(self, channel): try: message = await q.get() except (asyncio.CancelledError, asyncio.TimeoutError, GeneratorExit): - # We assume here that the reason we are cancelled is because the consumer - # is exiting, therefore we need to cleanup by unsubscribe below. Indeed, + # We assume here that the reason we are cancelled, is because the consumer + # is exiting, therefore we need to clean-up by unsubscribe below. Indeed, # currently the way that Django Channels works, this is a safe assumption. - # In the future, Dajngo Channels could change to call a *new* method that + # In the future, Django Channels could change to call a *new* method that # would serve as the antithesis of `new_channel()`; this new method might # be named `delete_channel()`. If that were the case, we would do the # following cleanup from that new `delete_channel()` method, but, since diff --git a/channels_redis/utils.py b/channels_redis/utils.py index 98e06ca..bc996c3 100644 --- a/channels_redis/utils.py +++ b/channels_redis/utils.py @@ -66,7 +66,7 @@ def create_pool(host): Takes the value of the "host" argument and returns a suited connection pool to the corresponding redis instance. """ - # avoid side-effects from modifying host + # Avoid side-effects from modifying host host = host.copy() if "address" in host: address = host.pop("address")