Skip to content

Commit 8db631a

Browse files
Add OptionalReceiver for handling unset receivers (#409)
Add a new OptionalReceiver class that wraps an optional underlying receiver, allowing for indefinite waiting when no receiver is set. This eliminates the need for explicit if-else branches to check if receiver is None.
2 parents 5b869a7 + 280ffa6 commit 8db631a

File tree

4 files changed

+148
-1
lines changed

4 files changed

+148
-1
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
## New Features
1515

16-
<!-- Here goes the main new features and examples or instructions on how to use them -->
16+
* Add a new `OptionalReceiver` class that wraps an optional underlying receiver, allowing for indefinite waiting when no receiver is set.
1717

1818
## Bug Fixes
1919

src/frequenz/channels/experimental/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@
1010
guidelines](https://github.com/frequenz-floss/docs/blob/v0.x.x/python/experimental-packages.md).
1111
"""
1212

13+
from ._optional_receiver import OptionalReceiver
1314
from ._pipe import Pipe
1415
from ._relay_sender import RelaySender
1516
from ._with_previous import WithPrevious
1617

1718
__all__ = [
19+
"OptionalReceiver",
1820
"WithPrevious",
1921
"Pipe",
2022
"RelaySender",
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A receiver that will wait indefinitely if there is no underlying receiver.
5+
6+
The `OptionalReceiver` is useful when the underlying receiver is not set initially.
7+
Instead of making `if-else` branches to check if the receiver is set, you can use
8+
this receiver to wait indefinitely if it is not set.
9+
"""
10+
11+
import asyncio
12+
13+
from typing_extensions import override
14+
15+
from frequenz.channels import Receiver, ReceiverError, ReceiverMessageT_co
16+
17+
18+
class OptionalReceiver(Receiver[ReceiverMessageT_co]):
19+
"""A receiver that will wait indefinitely if there is no underlying receiver.
20+
21+
This receiver is useful when the underlying receiver is not set initially.
22+
Instead of making `if-else` branches to check if the receiver is set, you can use
23+
this receiver to wait indefinitely if it is not set.
24+
"""
25+
26+
def __init__(self, receiver: Receiver[ReceiverMessageT_co] | None):
27+
"""Initialize this instance.
28+
29+
Args:
30+
receiver: The underlying receiver, or `None` if there is no receiver.
31+
"""
32+
self._receiver: Receiver[ReceiverMessageT_co] | None = receiver
33+
34+
@override
35+
async def ready(self) -> bool:
36+
"""Wait until the receiver is ready with a message or an error.
37+
38+
Once a call to `ready()` has finished, the message should be read with
39+
a call to `consume()` (`receive()` or iterated over). The receiver will
40+
remain ready (this method will return immediately) until it is
41+
consumed.
42+
43+
Returns:
44+
Whether the receiver is still active.
45+
"""
46+
if self._receiver is not None:
47+
return await self._receiver.ready()
48+
49+
# If there's no receiver, wait forever
50+
await asyncio.Event().wait()
51+
return False
52+
53+
@override
54+
def consume(self) -> ReceiverMessageT_co: # noqa: DOC503 (raised indirectly)
55+
"""Return the latest from the underlying receiver message once `ready()` is complete.
56+
57+
`ready()` must be called before each call to `consume()`.
58+
59+
Returns:
60+
The next message received.
61+
62+
Raises:
63+
ReceiverStoppedError: If the receiver stopped producing messages.
64+
ReceiverError: If there is some problem with the underlying receiver.
65+
"""
66+
if self._receiver is None:
67+
raise ReceiverError(
68+
"`consume()` must be preceded by a call to `ready()`", self
69+
)
70+
return self._receiver.consume()
71+
72+
def is_set(self) -> bool:
73+
"""Check if the receiver is set."""
74+
return self._receiver is not None
75+
76+
def close(self) -> None:
77+
"""Stop the receiver."""
78+
if self._receiver is not None:
79+
self._receiver.close()
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tests for the OptionalReceiver class."""
5+
6+
import asyncio
7+
from contextlib import aclosing, closing
8+
9+
import pytest
10+
11+
from frequenz.channels import Broadcast, ReceiverError, ReceiverStoppedError
12+
from frequenz.channels.experimental import OptionalReceiver
13+
14+
15+
async def test_receiver_with_none_does_not_end() -> None:
16+
"""Test that the receiver with None does not end."""
17+
with closing(OptionalReceiver[int](None)) as receiver:
18+
assert not receiver.is_set()
19+
20+
with pytest.raises(TimeoutError):
21+
await asyncio.wait_for(receiver.ready(), timeout=0.3)
22+
23+
24+
async def test_receiver_with_none_raises_error_when_consuming() -> None:
25+
"""Test that the receiver with None raises an error when consuming."""
26+
with closing(OptionalReceiver[int](None)) as receiver:
27+
assert not receiver.is_set()
28+
29+
with pytest.raises(ReceiverError):
30+
receiver.consume()
31+
32+
33+
async def test_receiver_with_underlying_receiver_forwards_messages() -> None:
34+
"""Test that the receiver forwards messages."""
35+
async with aclosing(Broadcast[int](name="test")) as channel:
36+
with closing(OptionalReceiver[int](channel.new_receiver())) as receiver:
37+
assert receiver.is_set()
38+
39+
sender = channel.new_sender()
40+
41+
await sender.send(5)
42+
value = await asyncio.wait_for(receiver.receive(), timeout=0.1)
43+
assert value == 5
44+
45+
await sender.send(100)
46+
value = await asyncio.wait_for(receiver.receive(), timeout=0.1)
47+
assert value == 100
48+
49+
50+
async def test_receiver_ends_when_underlying_receiver_ends() -> None:
51+
"""Test that the receiver ends when the underlying receiver ends."""
52+
async with aclosing(Broadcast[int](name="test")) as channel:
53+
with (
54+
closing(channel.new_receiver()) as receiver,
55+
closing(OptionalReceiver[int](receiver)) as optional_receiver,
56+
):
57+
assert optional_receiver.is_set()
58+
59+
receiver.close()
60+
# First check if ready method returns False
61+
is_active = await optional_receiver.ready()
62+
assert is_active is False
63+
64+
# Then check if `receive` method raises ReceiverStoppedError
65+
with pytest.raises(ReceiverStoppedError):
66+
await optional_receiver.receive()

0 commit comments

Comments
 (0)