@@ -417,6 +417,9 @@ def __init__(
417417 self ._q : deque [_T ] = deque (maxlen = limit )
418418 """The receiver's internal message queue."""
419419
420+ self ._closed : bool = False
421+ """Whether the receiver is closed."""
422+
420423 def enqueue (self , message : _T , / ) -> None :
421424 """Put a message into this receiver's queue.
422425
@@ -466,7 +469,7 @@ async def ready(self) -> bool:
466469 # consumed, then we return immediately.
467470 # pylint: disable=protected-access
468471 while len (self ._q ) == 0 :
469- if self ._channel ._closed :
472+ if self ._channel ._closed or self . _closed :
470473 return False
471474 async with self ._channel ._recv_cv :
472475 await self ._channel ._recv_cv .wait ()
@@ -486,9 +489,25 @@ def consume(self) -> _T:
486489 if not self ._q and self ._channel ._closed : # pylint: disable=protected-access
487490 raise ReceiverStoppedError (self ) from ChannelClosedError (self ._channel )
488491
492+ if self ._closed :
493+ raise ReceiverStoppedError (self )
494+
489495 assert self ._q , "`consume()` must be preceded by a call to `ready()`"
490496 return self ._q .popleft ()
491497
498+ @override
499+ def close (self ) -> None :
500+ """Close the receiver.
501+
502+ After calling this method, new messages will not be received. Once the
503+ receiver's buffer is drained, trying to receive a message will raise a
504+ [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
505+ """
506+ self ._closed = True
507+ self ._channel ._receivers .pop ( # pylint: disable=protected-access
508+ hash (self ), None
509+ )
510+
492511 def __str__ (self ) -> str :
493512 """Return a string representation of this receiver."""
494513 return f"{ self ._channel } :{ type (self ).__name__ } "
0 commit comments