diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 96a0240b..797fe4ae 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -10,7 +10,14 @@ ## New Features - +- An experimental `NopReceiver` implementation has been added, which can be used as a place-holder receiver that never receives a message. + +- The experimental `OptionalReceiver` has been deprecated. It will be removed with the next major release. It can be replaced with a `NopReceiver` as follows: + + ```python + opt_recv: Receiver[T] | None + recv: Receiver[T] = NopReceiver[T]() if opt_recv is None else opt_recv + ``` ## Bug Fixes diff --git a/src/frequenz/channels/experimental/__init__.py b/src/frequenz/channels/experimental/__init__.py index b4856c59..9876e8e3 100644 --- a/src/frequenz/channels/experimental/__init__.py +++ b/src/frequenz/channels/experimental/__init__.py @@ -10,14 +10,16 @@ guidelines](https://github.com/frequenz-floss/docs/blob/v0.x.x/python/experimental-packages.md). """ +from ._nop_receiver import NopReceiver from ._optional_receiver import OptionalReceiver from ._pipe import Pipe from ._relay_sender import RelaySender from ._with_previous import WithPrevious __all__ = [ + "NopReceiver", "OptionalReceiver", - "WithPrevious", "Pipe", "RelaySender", + "WithPrevious", ] diff --git a/src/frequenz/channels/experimental/_nop_receiver.py b/src/frequenz/channels/experimental/_nop_receiver.py new file mode 100644 index 00000000..7e5695ac --- /dev/null +++ b/src/frequenz/channels/experimental/_nop_receiver.py @@ -0,0 +1,57 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""A receiver that will never receive a message. + +It is useful as a place-holder receiver for use in contexts where a receiver is +necessary, but one is not available. +""" + +import asyncio + +from typing_extensions import override + +from frequenz.channels import Receiver, ReceiverError, ReceiverMessageT_co +from frequenz.channels._receiver import ReceiverStoppedError + + +class NopReceiver(Receiver[ReceiverMessageT_co]): + """A place-holder receiver that will never receive a message.""" + + def __init__(self) -> None: + """Initialize this instance.""" + self._closed: bool = False + + @override + async def ready(self) -> bool: + """Wait for ever unless the receiver is closed. + + Returns: + Whether the receiver is still active. + """ + if self._closed: + return False + await asyncio.Future() + return False + + @override + def consume(self) -> ReceiverMessageT_co: # noqa: DOC503 (raised indirectly) + """Raise `ReceiverError` unless the NopReceiver is closed. + + If the receiver is closed, then raise `ReceiverStoppedError`. + + Returns: + The next message received. + + Raises: + ReceiverStoppedError: If the receiver stopped producing messages. + ReceiverError: If there is some problem with the underlying receiver. + """ + if self._closed: + raise ReceiverStoppedError(self) + raise ReceiverError("`consume()` must be preceded by a call to `ready()`", self) + + @override + def close(self) -> None: + """Stop the receiver.""" + self._closed = True diff --git a/src/frequenz/channels/experimental/_optional_receiver.py b/src/frequenz/channels/experimental/_optional_receiver.py index f2cc3b82..b30df74c 100644 --- a/src/frequenz/channels/experimental/_optional_receiver.py +++ b/src/frequenz/channels/experimental/_optional_receiver.py @@ -10,11 +10,12 @@ import asyncio -from typing_extensions import override +from typing_extensions import deprecated, override from frequenz.channels import Receiver, ReceiverError, ReceiverMessageT_co +@deprecated("Use `frequenz.channels.experimental.NopReceiver` instead.") class OptionalReceiver(Receiver[ReceiverMessageT_co]): """A receiver that will wait indefinitely if there is no underlying receiver. diff --git a/tests/experimental/test_nop_receiver.py b/tests/experimental/test_nop_receiver.py new file mode 100644 index 00000000..06458c5b --- /dev/null +++ b/tests/experimental/test_nop_receiver.py @@ -0,0 +1,40 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Tests for the NopReceiver.""" + +import asyncio +from contextlib import closing + +import pytest + +from frequenz.channels import ReceiverError +from frequenz.channels._receiver import ReceiverStoppedError +from frequenz.channels.experimental import NopReceiver + + +async def test_never_ready() -> None: + """Test that the receiver is never ready.""" + # When it is not closed, `ready()` should never return. + with closing(NopReceiver[int]()) as receiver: + with pytest.raises(TimeoutError): + await asyncio.wait_for(receiver.ready(), timeout=0.1) + + # When it is closed, `ready()` should return False. + receiver = NopReceiver[int]() + receiver.close() + assert await asyncio.wait_for(receiver.ready(), timeout=0.1) is False + + +async def test_consuming_raises() -> None: + """Test that consume raises an error.""" + # When it is not closed, `consume()` should raise a ReceiverError. + with closing(NopReceiver[int]()) as receiver: + with pytest.raises(ReceiverError): + receiver.consume() + + # When it is closed, `consume()` should raise a ReceiverStoppedError. + receiver = NopReceiver[int]() + receiver.close() + with pytest.raises(ReceiverStoppedError): + receiver.consume() diff --git a/tests/test_anycast.py b/tests/test_anycast.py index 918c548c..140a122f 100644 --- a/tests/test_anycast.py +++ b/tests/test_anycast.py @@ -64,7 +64,7 @@ async def update_tracker_on_receive(receiver_id: int, recv: Receiver[int]) -> No senders.append(send_msg(acast.new_sender())) await asyncio.gather(*senders) - await acast.close() + await acast.aclose() await receivers_runs with pytest.raises(SenderError): @@ -92,7 +92,7 @@ async def test_anycast_after_close() -> None: await sender.send(2) - await acast.close() + await acast.aclose() with pytest.raises(SenderError): await sender.send(5) @@ -174,7 +174,7 @@ async def test_anycast_async_iterator() -> None: async def send_messages() -> None: for val in ["one", "two", "three", "four", "five"]: await sender.send(val) - await acast.close() + await acast.aclose() sender_task = asyncio.create_task(send_messages()) diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index f995a922..0cc89f33 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -58,7 +58,7 @@ async def update_tracker_on_receive(receiver_id: int, recv: Receiver[int]) -> No senders.append(send_msg(bcast.new_sender())) await asyncio.gather(*senders) - await bcast.close() + await bcast.aclose() await receivers_runs actual_sum = 0 @@ -93,7 +93,7 @@ async def test_broadcast_after_close() -> None: receiver = bcast.new_receiver() sender = bcast.new_sender() - await bcast.close() + await bcast.aclose() with pytest.raises(SenderError): await sender.send(5) @@ -204,7 +204,7 @@ async def test_broadcast_async_iterator() -> None: async def send_messages() -> None: for val in range(0, 10): await sender.send(val) - await bcast.close() + await bcast.aclose() sender_task = asyncio.create_task(send_messages()) diff --git a/tests/test_merge_integration.py b/tests/test_merge_integration.py index f5a61970..2854a20c 100644 --- a/tests/test_merge_integration.py +++ b/tests/test_merge_integration.py @@ -22,9 +22,9 @@ async def send(ch1: Sender[int], ch2: Sender[int]) -> None: for ctr in range(5): await ch1.send(ctr + 1) await ch2.send(ctr + 101) - await chan1.close() + await chan1.aclose() await ch2.send(1000) - await chan2.close() + await chan2.aclose() senders = asyncio.create_task(send(chan1.new_sender(), chan2.new_sender())) @@ -52,8 +52,8 @@ async def send(ch1: Sender[int], ch2: Sender[int]) -> None: for ctr in range(5): await ch1.send(ctr + 1) await ch2.send(ctr + 101) - await chan1.close() - await chan2.close() + await chan1.aclose() + await chan2.aclose() rx1 = chan1.new_receiver() rx2 = chan2.new_receiver()