From c7d8e69816a7822a2009c936ec85e6d69ce9852d Mon Sep 17 00:00:00 2001 From: Elzbieta Kotulska Date: Mon, 16 Jun 2025 15:47:50 +0200 Subject: [PATCH 1/2] Make NopReceiver.ready() respond to close() method The ready() method was not terminating when close() was called, causing it to hang indefinitely. This fix ensures ready() returns False when the receiver is closed. Signed-off-by: Elzbieta Kotulska --- RELEASE_NOTES.md | 2 +- .../channels/experimental/_nop_receiver.py | 12 ++++----- tests/experimental/test_nop_receiver.py | 25 +++++++++++++++++++ 3 files changed, 32 insertions(+), 7 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 52636b7c..a0d9f244 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -14,4 +14,4 @@ ## Bug Fixes - +- Fix `NopReceiver.ready()` to properly terminate when receiver is closed. diff --git a/src/frequenz/channels/experimental/_nop_receiver.py b/src/frequenz/channels/experimental/_nop_receiver.py index 7e5695ac..028ed61e 100644 --- a/src/frequenz/channels/experimental/_nop_receiver.py +++ b/src/frequenz/channels/experimental/_nop_receiver.py @@ -20,7 +20,7 @@ class NopReceiver(Receiver[ReceiverMessageT_co]): def __init__(self) -> None: """Initialize this instance.""" - self._closed: bool = False + self._ready_future: asyncio.Future[bool] = asyncio.Future() @override async def ready(self) -> bool: @@ -29,10 +29,9 @@ async def ready(self) -> bool: Returns: Whether the receiver is still active. """ - if self._closed: + if self._ready_future.done(): return False - await asyncio.Future() - return False + return await self._ready_future @override def consume(self) -> ReceiverMessageT_co: # noqa: DOC503 (raised indirectly) @@ -47,11 +46,12 @@ def consume(self) -> ReceiverMessageT_co: # noqa: DOC503 (raised indirectly) ReceiverStoppedError: If the receiver stopped producing messages. ReceiverError: If there is some problem with the underlying receiver. """ - if self._closed: + if self._ready_future.done(): raise ReceiverStoppedError(self) raise ReceiverError("`consume()` must be preceded by a call to `ready()`", self) @override def close(self) -> None: """Stop the receiver.""" - self._closed = True + if not self._ready_future.done(): + self._ready_future.set_result(False) diff --git a/tests/experimental/test_nop_receiver.py b/tests/experimental/test_nop_receiver.py index 06458c5b..fd270ae9 100644 --- a/tests/experimental/test_nop_receiver.py +++ b/tests/experimental/test_nop_receiver.py @@ -38,3 +38,28 @@ async def test_consuming_raises() -> None: receiver.close() with pytest.raises(ReceiverStoppedError): receiver.consume() + + +async def test_close_method_effect_on_ready() -> None: + """Test `ready()` terminates when the receiver is closed. + + When the receiver is closed, `ready()` should return False. + This test verifies that: + 1. `ready()` returns False immediately after `close()` is called + 2. `ready()` and `close()` can be called multiple times + """ + receiver = NopReceiver[int]() + + # Create a task that waits for the receiver to be ready. + task = asyncio.create_task(receiver.ready()) + + # Wait for the task to start. + await asyncio.sleep(0.1) + + # Close the receiver and wait for the task to complete. + receiver.close() + assert await asyncio.wait_for(task, timeout=0.1) is False + + # Second call to `close()` or `ready()` should not raise an error. + receiver.close() + assert await asyncio.wait_for(receiver.ready(), timeout=0.1) is False From 7130676d2ab9a5620b63f77c6dd379c4675878bb Mon Sep 17 00:00:00 2001 From: Elzbieta Kotulska Date: Mon, 16 Jun 2025 16:10:00 +0200 Subject: [PATCH 2/2] Change NopReceiver creating of asyncio.Future to loop.create_future() Because it is recommended in the python documentation. This way alternative event loop implementations can inject their own optimized implementations of a Future object. Signed-off-by: Elzbieta Kotulska --- src/frequenz/channels/experimental/_nop_receiver.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/frequenz/channels/experimental/_nop_receiver.py b/src/frequenz/channels/experimental/_nop_receiver.py index 028ed61e..c5f47b30 100644 --- a/src/frequenz/channels/experimental/_nop_receiver.py +++ b/src/frequenz/channels/experimental/_nop_receiver.py @@ -20,7 +20,9 @@ class NopReceiver(Receiver[ReceiverMessageT_co]): def __init__(self) -> None: """Initialize this instance.""" - self._ready_future: asyncio.Future[bool] = asyncio.Future() + self._ready_future: asyncio.Future[bool] = ( + asyncio.get_running_loop().create_future() + ) @override async def ready(self) -> bool: