diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 209fefb2..2da763f8 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -13,7 +13,7 @@ ## New Features - +* Add a new `OptionalReceiver` class that wraps an optional underlying receiver, allowing for indefinite waiting when no receiver is set. ## Bug Fixes diff --git a/src/frequenz/channels/experimental/__init__.py b/src/frequenz/channels/experimental/__init__.py index 940cc304..ebdde259 100644 --- a/src/frequenz/channels/experimental/__init__.py +++ b/src/frequenz/channels/experimental/__init__.py @@ -9,11 +9,13 @@ removal, even in minor updates. """ +from ._optional_receiver import OptionalReceiver from ._pipe import Pipe from ._relay_sender import RelaySender from ._with_previous import WithPrevious __all__ = [ + "OptionalReceiver", "WithPrevious", "Pipe", "RelaySender", diff --git a/src/frequenz/channels/experimental/_optional_receiver.py b/src/frequenz/channels/experimental/_optional_receiver.py new file mode 100644 index 00000000..f2cc3b82 --- /dev/null +++ b/src/frequenz/channels/experimental/_optional_receiver.py @@ -0,0 +1,79 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""A receiver that will wait indefinitely if there is no underlying receiver. + +The `OptionalReceiver` is useful when the underlying receiver is not set initially. +Instead of making `if-else` branches to check if the receiver is set, you can use +this receiver to wait indefinitely if it is not set. +""" + +import asyncio + +from typing_extensions import override + +from frequenz.channels import Receiver, ReceiverError, ReceiverMessageT_co + + +class OptionalReceiver(Receiver[ReceiverMessageT_co]): + """A receiver that will wait indefinitely if there is no underlying receiver. + + This receiver is useful when the underlying receiver is not set initially. + Instead of making `if-else` branches to check if the receiver is set, you can use + this receiver to wait indefinitely if it is not set. + """ + + def __init__(self, receiver: Receiver[ReceiverMessageT_co] | None): + """Initialize this instance. + + Args: + receiver: The underlying receiver, or `None` if there is no receiver. + """ + self._receiver: Receiver[ReceiverMessageT_co] | None = receiver + + @override + async def ready(self) -> bool: + """Wait until the receiver is ready with a message or an error. + + Once a call to `ready()` has finished, the message should be read with + a call to `consume()` (`receive()` or iterated over). The receiver will + remain ready (this method will return immediately) until it is + consumed. + + Returns: + Whether the receiver is still active. + """ + if self._receiver is not None: + return await self._receiver.ready() + + # If there's no receiver, wait forever + await asyncio.Event().wait() + return False + + @override + def consume(self) -> ReceiverMessageT_co: # noqa: DOC503 (raised indirectly) + """Return the latest from the underlying receiver message once `ready()` is complete. + + `ready()` must be called before each call to `consume()`. + + Returns: + The next message received. + + Raises: + ReceiverStoppedError: If the receiver stopped producing messages. + ReceiverError: If there is some problem with the underlying receiver. + """ + if self._receiver is None: + raise ReceiverError( + "`consume()` must be preceded by a call to `ready()`", self + ) + return self._receiver.consume() + + def is_set(self) -> bool: + """Check if the receiver is set.""" + return self._receiver is not None + + def close(self) -> None: + """Stop the receiver.""" + if self._receiver is not None: + self._receiver.close() diff --git a/tests/experimental/test_optional_receiver.py b/tests/experimental/test_optional_receiver.py new file mode 100644 index 00000000..08021939 --- /dev/null +++ b/tests/experimental/test_optional_receiver.py @@ -0,0 +1,66 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Tests for the OptionalReceiver class.""" + +import asyncio +from contextlib import aclosing, closing + +import pytest + +from frequenz.channels import Broadcast, ReceiverError, ReceiverStoppedError +from frequenz.channels.experimental import OptionalReceiver + + +async def test_receiver_with_none_does_not_end() -> None: + """Test that the receiver with None does not end.""" + with closing(OptionalReceiver[int](None)) as receiver: + assert not receiver.is_set() + + with pytest.raises(TimeoutError): + await asyncio.wait_for(receiver.ready(), timeout=0.3) + + +async def test_receiver_with_none_raises_error_when_consuming() -> None: + """Test that the receiver with None raises an error when consuming.""" + with closing(OptionalReceiver[int](None)) as receiver: + assert not receiver.is_set() + + with pytest.raises(ReceiverError): + receiver.consume() + + +async def test_receiver_with_underlying_receiver_forwards_messages() -> None: + """Test that the receiver forwards messages.""" + async with aclosing(Broadcast[int](name="test")) as channel: + with closing(OptionalReceiver[int](channel.new_receiver())) as receiver: + assert receiver.is_set() + + sender = channel.new_sender() + + await sender.send(5) + value = await asyncio.wait_for(receiver.receive(), timeout=0.1) + assert value == 5 + + await sender.send(100) + value = await asyncio.wait_for(receiver.receive(), timeout=0.1) + assert value == 100 + + +async def test_receiver_ends_when_underlying_receiver_ends() -> None: + """Test that the receiver ends when the underlying receiver ends.""" + async with aclosing(Broadcast[int](name="test")) as channel: + with ( + closing(channel.new_receiver()) as receiver, + closing(OptionalReceiver[int](receiver)) as optional_receiver, + ): + assert optional_receiver.is_set() + + receiver.close() + # First check if ready method returns False + is_active = await optional_receiver.ready() + assert is_active is False + + # Then check if `receive` method raises ReceiverStoppedError + with pytest.raises(ReceiverStoppedError): + await optional_receiver.receive()