@@ -217,6 +217,15 @@ def consume(self) -> ReceiverMessageT_co:
217217 ReceiverError: If there is some problem with the receiver.
218218 """
219219
220+ def close (self ) -> None :
221+ """Close the receiver.
222+
223+ After calling this method, new messages will not be available from the receiver.
224+ Once the receiver's buffer is drained, trying to receive a message will raise a
225+ [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
226+ """
227+ raise NotImplementedError ("close() must be implemented by subclasses" )
228+
220229 def __aiter__ (self ) -> Self :
221230 """Get an async iterator over the received messages.
222231
@@ -464,6 +473,16 @@ def consume(self) -> MappedMessageT_co: # noqa: DOC502
464473 """
465474 return self ._mapping_function (self ._receiver .consume ())
466475
476+ @override
477+ def close (self ) -> None :
478+ """Close the receiver.
479+
480+ After calling this method, new messages will not be received. Once the
481+ receiver's buffer is drained, trying to receive a message will raise a
482+ [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
483+ """
484+ self ._receiver .close ()
485+
467486 def __str__ (self ) -> str :
468487 """Return a string representation of the mapper."""
469488 return f"{ type (self ).__name__ } :{ self ._receiver } :{ self ._mapping_function } "
@@ -553,6 +572,16 @@ def consume(self) -> ReceiverMessageT_co:
553572 self ._next_message = _SENTINEL
554573 return message
555574
575+ @override
576+ def close (self ) -> None :
577+ """Close the receiver.
578+
579+ After calling this method, new messages will not be received. Once the
580+ receiver's buffer is drained, trying to receive a message will raise a
581+ [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
582+ """
583+ self ._receiver .close ()
584+
556585 def __str__ (self ) -> str :
557586 """Return a string representation of the filter."""
558587 return f"{ type (self ).__name__ } :{ self ._receiver } :{ self ._filter_function } "
0 commit comments