Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@

## Bug Fixes

<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
- Fix `NopReceiver.ready()` to properly terminate when receiver is closed.
14 changes: 8 additions & 6 deletions src/frequenz/channels/experimental/_nop_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ class NopReceiver(Receiver[ReceiverMessageT_co]):

def __init__(self) -> None:
"""Initialize this instance."""
self._closed: bool = False
self._ready_future: asyncio.Future[bool] = (
asyncio.get_running_loop().create_future()
)

@override
async def ready(self) -> bool:
Expand All @@ -29,10 +31,9 @@ async def ready(self) -> bool:
Returns:
Whether the receiver is still active.
"""
if self._closed:
if self._ready_future.done():
return False
await asyncio.Future()
return False
return await self._ready_future

@override
def consume(self) -> ReceiverMessageT_co: # noqa: DOC503 (raised indirectly)
Expand All @@ -47,11 +48,12 @@ def consume(self) -> ReceiverMessageT_co: # noqa: DOC503 (raised indirectly)
ReceiverStoppedError: If the receiver stopped producing messages.
ReceiverError: If there is some problem with the underlying receiver.
"""
if self._closed:
if self._ready_future.done():
raise ReceiverStoppedError(self)
raise ReceiverError("`consume()` must be preceded by a call to `ready()`", self)

@override
def close(self) -> None:
"""Stop the receiver."""
self._closed = True
if not self._ready_future.done():
self._ready_future.set_result(False)
25 changes: 25 additions & 0 deletions tests/experimental/test_nop_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,28 @@ async def test_consuming_raises() -> None:
receiver.close()
with pytest.raises(ReceiverStoppedError):
receiver.consume()


async def test_close_method_effect_on_ready() -> None:
"""Test `ready()` terminates when the receiver is closed.

When the receiver is closed, `ready()` should return False.
This test verifies that:
1. `ready()` returns False immediately after `close()` is called
2. `ready()` and `close()` can be called multiple times
"""
receiver = NopReceiver[int]()

# Create a task that waits for the receiver to be ready.
task = asyncio.create_task(receiver.ready())

# Wait for the task to start.
await asyncio.sleep(0.1)
Comment on lines +53 to +57
Copy link

Copilot AI Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Relying on sleep can lead to flaky tests; consider using an asyncio.Event or polling task.done() to deterministically wait until the task is awaiting ready().

Suggested change
# Create a task that waits for the receiver to be ready.
task = asyncio.create_task(receiver.ready())
# Wait for the task to start.
await asyncio.sleep(0.1)
# Create an event to signal when the task starts.
task_started = asyncio.Event()
async def wait_for_ready():
task_started.set() # Signal that the task has started.
return await receiver.ready()
# Create a task that waits for the receiver to be ready.
task = asyncio.create_task(wait_for_ready())
# Wait for the task to start.
await task_started.wait()

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I need to make sure ready is called before cancel.
This won't do that.


# Close the receiver and wait for the task to complete.
receiver.close()
assert await asyncio.wait_for(task, timeout=0.1) is False

# Second call to `close()` or `ready()` should not raise an error.
receiver.close()
assert await asyncio.wait_for(receiver.ready(), timeout=0.1) is False
Loading