Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
- An experimental `NopReceiver` implementation has been added, which can be used as a place-holder receiver that never receives a message.

## Bug Fixes

Expand Down
4 changes: 3 additions & 1 deletion src/frequenz/channels/experimental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@
guidelines](https://github.com/frequenz-floss/docs/blob/v0.x.x/python/experimental-packages.md).
"""

from ._nop_receiver import NopReceiver
from ._optional_receiver import OptionalReceiver
from ._pipe import Pipe
from ._relay_sender import RelaySender
from ._with_previous import WithPrevious

__all__ = [
"NopReceiver",
Copy link

Copilot AI May 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider sorting the all list alphabetically to improve readability and maintainability.

Copilot uses AI. Check for mistakes.
"OptionalReceiver",
"WithPrevious",
"Pipe",
"RelaySender",
"WithPrevious",
]
57 changes: 57 additions & 0 deletions src/frequenz/channels/experimental/_nop_receiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""A receiver that will never receive a message.
It is useful as a place-holder receiver for use in contexts where a receiver is
necessary, but one is not available.
"""

import asyncio

from typing_extensions import override

from frequenz.channels import Receiver, ReceiverError, ReceiverMessageT_co
from frequenz.channels._receiver import ReceiverStoppedError


class NopReceiver(Receiver[ReceiverMessageT_co]):
"""A place-holder receiver that will never receive a message."""

def __init__(self) -> None:
"""Initialize this instance."""
self._closed: bool = False

@override
async def ready(self) -> bool:
"""Wait for ever unless the receiver is closed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forever

Returns:
Whether the receiver is still active.
"""
if self._closed:
return False
await asyncio.Future()
return False

@override
def consume(self) -> ReceiverMessageT_co: # noqa: DOC503 (raised indirectly)
"""Raise `ReceiverError` unless the NopReceiver is closed.
If the receiver is closed, then raise `ReceiverStoppedError`.
Returns:
The next message received.
Raises:
ReceiverStoppedError: If the receiver stopped producing messages.
ReceiverError: If there is some problem with the underlying receiver.
"""
if self._closed:
raise ReceiverStoppedError(self)
raise ReceiverError("`consume()` must be preceded by a call to `ready()`", self)

@override
def close(self) -> None:
"""Stop the receiver."""
self._closed = True
40 changes: 40 additions & 0 deletions tests/experimental/test_nop_receiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""Tests for the NopReceiver."""

import asyncio
from contextlib import closing

import pytest

from frequenz.channels import ReceiverError
from frequenz.channels._receiver import ReceiverStoppedError
from frequenz.channels.experimental import NopReceiver


async def test_never_ready() -> None:
"""Test that the receiver is never ready."""
# When it is not closed, `ready()` should never return.
with closing(NopReceiver[int]()) as receiver:
with pytest.raises(TimeoutError):
await asyncio.wait_for(receiver.ready(), timeout=0.1)

# When it is closed, `ready()` should return False.
receiver = NopReceiver[int]()
receiver.close()
assert await asyncio.wait_for(receiver.ready(), timeout=0.1) is False


async def test_consuming_raises() -> None:
"""Test that consume raises an error."""
# When it is not closed, `consume()` should raise a ReceiverError.
with closing(NopReceiver[int]()) as receiver:
with pytest.raises(ReceiverError):
receiver.consume()

# When it is closed, `consume()` should raise a ReceiverStoppedError.
receiver = NopReceiver[int]()
receiver.close()
with pytest.raises(ReceiverStoppedError):
receiver.consume()
6 changes: 3 additions & 3 deletions tests/test_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def update_tracker_on_receive(receiver_id: int, recv: Receiver[int]) -> No
senders.append(send_msg(acast.new_sender()))

await asyncio.gather(*senders)
await acast.close()
await acast.aclose()
await receivers_runs

with pytest.raises(SenderError):
Expand Down Expand Up @@ -92,7 +92,7 @@ async def test_anycast_after_close() -> None:

await sender.send(2)

await acast.close()
await acast.aclose()

with pytest.raises(SenderError):
await sender.send(5)
Expand Down Expand Up @@ -174,7 +174,7 @@ async def test_anycast_async_iterator() -> None:
async def send_messages() -> None:
for val in ["one", "two", "three", "four", "five"]:
await sender.send(val)
await acast.close()
await acast.aclose()

sender_task = asyncio.create_task(send_messages())

Expand Down
6 changes: 3 additions & 3 deletions tests/test_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async def update_tracker_on_receive(receiver_id: int, recv: Receiver[int]) -> No
senders.append(send_msg(bcast.new_sender()))

await asyncio.gather(*senders)
await bcast.close()
await bcast.aclose()
await receivers_runs

actual_sum = 0
Expand Down Expand Up @@ -93,7 +93,7 @@ async def test_broadcast_after_close() -> None:
receiver = bcast.new_receiver()
sender = bcast.new_sender()

await bcast.close()
await bcast.aclose()

with pytest.raises(SenderError):
await sender.send(5)
Expand Down Expand Up @@ -204,7 +204,7 @@ async def test_broadcast_async_iterator() -> None:
async def send_messages() -> None:
for val in range(0, 10):
await sender.send(val)
await bcast.close()
await bcast.aclose()

sender_task = asyncio.create_task(send_messages())

Expand Down
8 changes: 4 additions & 4 deletions tests/test_merge_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ async def send(ch1: Sender[int], ch2: Sender[int]) -> None:
for ctr in range(5):
await ch1.send(ctr + 1)
await ch2.send(ctr + 101)
await chan1.close()
await chan1.aclose()
await ch2.send(1000)
await chan2.close()
await chan2.aclose()

senders = asyncio.create_task(send(chan1.new_sender(), chan2.new_sender()))

Expand Down Expand Up @@ -52,8 +52,8 @@ async def send(ch1: Sender[int], ch2: Sender[int]) -> None:
for ctr in range(5):
await ch1.send(ctr + 1)
await ch2.send(ctr + 101)
await chan1.close()
await chan2.close()
await chan1.aclose()
await chan2.aclose()

rx1 = chan1.new_receiver()
rx2 = chan2.new_receiver()
Expand Down