diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 52636b7c..a0d9f244 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -14,4 +14,4 @@ ## Bug Fixes - +- Fix `NopReceiver.ready()` to properly terminate when receiver is closed. diff --git a/src/frequenz/channels/experimental/_nop_receiver.py b/src/frequenz/channels/experimental/_nop_receiver.py index 7e5695ac..c5f47b30 100644 --- a/src/frequenz/channels/experimental/_nop_receiver.py +++ b/src/frequenz/channels/experimental/_nop_receiver.py @@ -20,7 +20,9 @@ class NopReceiver(Receiver[ReceiverMessageT_co]): def __init__(self) -> None: """Initialize this instance.""" - self._closed: bool = False + self._ready_future: asyncio.Future[bool] = ( + asyncio.get_running_loop().create_future() + ) @override async def ready(self) -> bool: @@ -29,10 +31,9 @@ async def ready(self) -> bool: Returns: Whether the receiver is still active. """ - if self._closed: + if self._ready_future.done(): return False - await asyncio.Future() - return False + return await self._ready_future @override def consume(self) -> ReceiverMessageT_co: # noqa: DOC503 (raised indirectly) @@ -47,11 +48,12 @@ def consume(self) -> ReceiverMessageT_co: # noqa: DOC503 (raised indirectly) ReceiverStoppedError: If the receiver stopped producing messages. ReceiverError: If there is some problem with the underlying receiver. """ - if self._closed: + if self._ready_future.done(): raise ReceiverStoppedError(self) raise ReceiverError("`consume()` must be preceded by a call to `ready()`", self) @override def close(self) -> None: """Stop the receiver.""" - self._closed = True + if not self._ready_future.done(): + self._ready_future.set_result(False) diff --git a/tests/experimental/test_nop_receiver.py b/tests/experimental/test_nop_receiver.py index 06458c5b..fd270ae9 100644 --- a/tests/experimental/test_nop_receiver.py +++ b/tests/experimental/test_nop_receiver.py @@ -38,3 +38,28 @@ async def test_consuming_raises() -> None: receiver.close() with pytest.raises(ReceiverStoppedError): receiver.consume() + + +async def test_close_method_effect_on_ready() -> None: + """Test `ready()` terminates when the receiver is closed. + + When the receiver is closed, `ready()` should return False. + This test verifies that: + 1. `ready()` returns False immediately after `close()` is called + 2. `ready()` and `close()` can be called multiple times + """ + receiver = NopReceiver[int]() + + # Create a task that waits for the receiver to be ready. + task = asyncio.create_task(receiver.ready()) + + # Wait for the task to start. + await asyncio.sleep(0.1) + + # Close the receiver and wait for the task to complete. + receiver.close() + assert await asyncio.wait_for(task, timeout=0.1) is False + + # Second call to `close()` or `ready()` should not raise an error. + receiver.close() + assert await asyncio.wait_for(receiver.ready(), timeout=0.1) is False