Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
- An experimental `NopReceiver` implementation has been added, which can be used as a place-holder receiver that never receives a message.

- The experimental `OptionalReceiver` has been deprecated. It will be removed with the next major release. It can be replaced with a `NopReceiver` as follows:

```python
opt_recv: Receiver[T] | None
recv: Receiver[T] = NopReceiver[T]() if opt_recv is None else opt_recv
```

## Bug Fixes

Expand Down
4 changes: 3 additions & 1 deletion src/frequenz/channels/experimental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@
guidelines](https://github.com/frequenz-floss/docs/blob/v0.x.x/python/experimental-packages.md).
"""

from ._nop_receiver import NopReceiver
from ._optional_receiver import OptionalReceiver
from ._pipe import Pipe
from ._relay_sender import RelaySender
from ._with_previous import WithPrevious

__all__ = [
"NopReceiver",
Copy link

Copilot AI May 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider sorting the all list alphabetically to improve readability and maintainability.

Copilot uses AI. Check for mistakes.
"OptionalReceiver",
"WithPrevious",
"Pipe",
"RelaySender",
"WithPrevious",
]
57 changes: 57 additions & 0 deletions src/frequenz/channels/experimental/_nop_receiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""A receiver that will never receive a message.
It is useful as a place-holder receiver for use in contexts where a receiver is
necessary, but one is not available.
"""

import asyncio

from typing_extensions import override

from frequenz.channels import Receiver, ReceiverError, ReceiverMessageT_co
from frequenz.channels._receiver import ReceiverStoppedError


class NopReceiver(Receiver[ReceiverMessageT_co]):
"""A place-holder receiver that will never receive a message."""

def __init__(self) -> None:
"""Initialize this instance."""
self._closed: bool = False

@override
async def ready(self) -> bool:
"""Wait for ever unless the receiver is closed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forever

Returns:
Whether the receiver is still active.
"""
if self._closed:
return False
await asyncio.Future()
return False

@override
def consume(self) -> ReceiverMessageT_co: # noqa: DOC503 (raised indirectly)
"""Raise `ReceiverError` unless the NopReceiver is closed.
If the receiver is closed, then raise `ReceiverStoppedError`.
Returns:
The next message received.
Raises:
ReceiverStoppedError: If the receiver stopped producing messages.
ReceiverError: If there is some problem with the underlying receiver.
"""
if self._closed:
raise ReceiverStoppedError(self)
raise ReceiverError("`consume()` must be preceded by a call to `ready()`", self)

@override
def close(self) -> None:
"""Stop the receiver."""
self._closed = True
3 changes: 2 additions & 1 deletion src/frequenz/channels/experimental/_optional_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@

import asyncio

from typing_extensions import override
from typing_extensions import deprecated, override

from frequenz.channels import Receiver, ReceiverError, ReceiverMessageT_co


@deprecated("Use `frequenz.channels.experimental.NopReceiver` instead.")
class OptionalReceiver(Receiver[ReceiverMessageT_co]):
"""A receiver that will wait indefinitely if there is no underlying receiver.

Expand Down
40 changes: 40 additions & 0 deletions tests/experimental/test_nop_receiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""Tests for the NopReceiver."""

import asyncio
from contextlib import closing

import pytest

from frequenz.channels import ReceiverError
from frequenz.channels._receiver import ReceiverStoppedError
from frequenz.channels.experimental import NopReceiver


async def test_never_ready() -> None:
"""Test that the receiver is never ready."""
# When it is not closed, `ready()` should never return.
with closing(NopReceiver[int]()) as receiver:
with pytest.raises(TimeoutError):
await asyncio.wait_for(receiver.ready(), timeout=0.1)

# When it is closed, `ready()` should return False.
receiver = NopReceiver[int]()
receiver.close()
assert await asyncio.wait_for(receiver.ready(), timeout=0.1) is False


async def test_consuming_raises() -> None:
"""Test that consume raises an error."""
# When it is not closed, `consume()` should raise a ReceiverError.
with closing(NopReceiver[int]()) as receiver:
with pytest.raises(ReceiverError):
receiver.consume()

# When it is closed, `consume()` should raise a ReceiverStoppedError.
receiver = NopReceiver[int]()
receiver.close()
with pytest.raises(ReceiverStoppedError):
receiver.consume()
6 changes: 3 additions & 3 deletions tests/test_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def update_tracker_on_receive(receiver_id: int, recv: Receiver[int]) -> No
senders.append(send_msg(acast.new_sender()))

await asyncio.gather(*senders)
await acast.close()
await acast.aclose()
await receivers_runs

with pytest.raises(SenderError):
Expand Down Expand Up @@ -92,7 +92,7 @@ async def test_anycast_after_close() -> None:

await sender.send(2)

await acast.close()
await acast.aclose()

with pytest.raises(SenderError):
await sender.send(5)
Expand Down Expand Up @@ -174,7 +174,7 @@ async def test_anycast_async_iterator() -> None:
async def send_messages() -> None:
for val in ["one", "two", "three", "four", "five"]:
await sender.send(val)
await acast.close()
await acast.aclose()

sender_task = asyncio.create_task(send_messages())

Expand Down
6 changes: 3 additions & 3 deletions tests/test_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async def update_tracker_on_receive(receiver_id: int, recv: Receiver[int]) -> No
senders.append(send_msg(bcast.new_sender()))

await asyncio.gather(*senders)
await bcast.close()
await bcast.aclose()
await receivers_runs

actual_sum = 0
Expand Down Expand Up @@ -93,7 +93,7 @@ async def test_broadcast_after_close() -> None:
receiver = bcast.new_receiver()
sender = bcast.new_sender()

await bcast.close()
await bcast.aclose()

with pytest.raises(SenderError):
await sender.send(5)
Expand Down Expand Up @@ -204,7 +204,7 @@ async def test_broadcast_async_iterator() -> None:
async def send_messages() -> None:
for val in range(0, 10):
await sender.send(val)
await bcast.close()
await bcast.aclose()

sender_task = asyncio.create_task(send_messages())

Expand Down
8 changes: 4 additions & 4 deletions tests/test_merge_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ async def send(ch1: Sender[int], ch2: Sender[int]) -> None:
for ctr in range(5):
await ch1.send(ctr + 1)
await ch2.send(ctr + 101)
await chan1.close()
await chan1.aclose()
await ch2.send(1000)
await chan2.close()
await chan2.aclose()

senders = asyncio.create_task(send(chan1.new_sender(), chan2.new_sender()))

Expand Down Expand Up @@ -52,8 +52,8 @@ async def send(ch1: Sender[int], ch2: Sender[int]) -> None:
for ctr in range(5):
await ch1.send(ctr + 1)
await ch2.send(ctr + 101)
await chan1.close()
await chan2.close()
await chan1.aclose()
await chan2.aclose()

rx1 = chan1.new_receiver()
rx2 = chan2.new_receiver()
Expand Down
Loading