Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
* Add a new `OptionalReceiver` class that wraps an optional underlying receiver, allowing for indefinite waiting when no receiver is set.

## Bug Fixes

Expand Down
2 changes: 2 additions & 0 deletions src/frequenz/channels/experimental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
removal, even in minor updates.
"""

from ._optional_receiver import OptionalReceiver
from ._pipe import Pipe
from ._relay_sender import RelaySender
from ._with_previous import WithPrevious

__all__ = [
"OptionalReceiver",
"WithPrevious",
"Pipe",
"RelaySender",
Expand Down
79 changes: 79 additions & 0 deletions src/frequenz/channels/experimental/_optional_receiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""A receiver that will wait indefinitely if there is no underlying receiver.

The `OptionalReceiver` is useful when the underlying receiver is not set initially.
Instead of making `if-else` branches to check if the receiver is set, you can use
this receiver to wait indefinitely if it is not set.
"""

import asyncio

from typing_extensions import override

from frequenz.channels import Receiver, ReceiverError, ReceiverMessageT_co


class OptionalReceiver(Receiver[ReceiverMessageT_co]):
"""A receiver that will wait indefinitely if there is no underlying receiver.

This receiver is useful when the underlying receiver is not set initially.
Instead of making `if-else` branches to check if the receiver is set, you can use
this receiver to wait indefinitely if it is not set.
"""

def __init__(self, receiver: Receiver[ReceiverMessageT_co] | None):
"""Initialize this instance.

Args:
receiver: The underlying receiver, or `None` if there is no receiver.
"""
self._receiver: Receiver[ReceiverMessageT_co] | None = receiver

@override
async def ready(self) -> bool:
"""Wait until the receiver is ready with a message or an error.

Once a call to `ready()` has finished, the message should be read with
a call to `consume()` (`receive()` or iterated over). The receiver will
remain ready (this method will return immediately) until it is
consumed.

Returns:
Whether the receiver is still active.
"""
if self._receiver is not None:
return await self._receiver.ready()

# If there's no receiver, wait forever
await asyncio.Event().wait()
return False

@override
def consume(self) -> ReceiverMessageT_co: # noqa: DOC503 (raised indirectly)
"""Return the latest from the underlying receiver message once `ready()` is complete.

`ready()` must be called before each call to `consume()`.

Returns:
The next message received.

Raises:
ReceiverStoppedError: If the receiver stopped producing messages.
ReceiverError: If there is some problem with the underlying receiver.
"""
if self._receiver is None:
raise ReceiverError(
"`consume()` must be preceded by a call to `ready()`", self
)
return self._receiver.consume()

def is_set(self) -> bool:
"""Check if the receiver is set."""
return self._receiver is not None

def close(self) -> None:
"""Stop the receiver."""
if self._receiver is not None:
self._receiver.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no receiver, you might want to keep a closed flag. You can then return immediately when ready is called, and on consume, you can raise a ReceiverClosedError, like the other receivers.

66 changes: 66 additions & 0 deletions tests/experimental/test_optional_receiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""Tests for the OptionalReceiver class."""

import asyncio
from contextlib import aclosing, closing

import pytest

from frequenz.channels import Broadcast, ReceiverError, ReceiverStoppedError
from frequenz.channels.experimental import OptionalReceiver


async def test_receiver_with_none_does_not_end() -> None:
"""Test that the receiver with None does not end."""
with closing(OptionalReceiver[int](None)) as receiver:
assert not receiver.is_set()

with pytest.raises(TimeoutError):
await asyncio.wait_for(receiver.ready(), timeout=0.3)


async def test_receiver_with_none_raises_error_when_consuming() -> None:
"""Test that the receiver with None raises an error when consuming."""
with closing(OptionalReceiver[int](None)) as receiver:
assert not receiver.is_set()

with pytest.raises(ReceiverError):
receiver.consume()


async def test_receiver_with_underlying_receiver_forwards_messages() -> None:
"""Test that the receiver forwards messages."""
async with aclosing(Broadcast[int](name="test")) as channel:
with closing(OptionalReceiver[int](channel.new_receiver())) as receiver:
assert receiver.is_set()

sender = channel.new_sender()

await sender.send(5)
value = await asyncio.wait_for(receiver.receive(), timeout=0.1)
assert value == 5

await sender.send(100)
value = await asyncio.wait_for(receiver.receive(), timeout=0.1)
assert value == 100


async def test_receiver_ends_when_underlying_receiver_ends() -> None:
"""Test that the receiver ends when the underlying receiver ends."""
async with aclosing(Broadcast[int](name="test")) as channel:
with (
closing(channel.new_receiver()) as receiver,
closing(OptionalReceiver[int](receiver)) as optional_receiver,
):
assert optional_receiver.is_set()

receiver.close()
# First check if ready method returns False
is_active = await optional_receiver.ready()
assert is_active is False

# Then check if `receive` method raises ReceiverStoppedError
with pytest.raises(ReceiverStoppedError):
await optional_receiver.receive()