Skip to content

Commit 280ffa6

Browse files
Add OptionalReceiver for handling unset receivers
Add a new OptionalReceiver class that wraps an optional underlying receiver, allowing for indefinite waiting when no receiver is set. This eliminates the need for explicit if-else branches to check if receiver is None. Signed-off-by: Elzbieta Kotulska <[email protected]>
1 parent 0d7c2b7 commit 280ffa6

File tree

4 files changed

+148
-1
lines changed

4 files changed

+148
-1
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
## New Features
1515

16-
<!-- Here goes the main new features and examples or instructions on how to use them -->
16+
* Add a new `OptionalReceiver` class that wraps an optional underlying receiver, allowing for indefinite waiting when no receiver is set.
1717

1818
## Bug Fixes
1919

src/frequenz/channels/experimental/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@
99
removal, even in minor updates.
1010
"""
1111

12+
from ._optional_receiver import OptionalReceiver
1213
from ._pipe import Pipe
1314
from ._relay_sender import RelaySender
1415
from ._with_previous import WithPrevious
1516

1617
__all__ = [
18+
"OptionalReceiver",
1719
"WithPrevious",
1820
"Pipe",
1921
"RelaySender",
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A receiver that will wait indefinitely if there is no underlying receiver.
5+
6+
The `OptionalReceiver` is useful when the underlying receiver is not set initially.
7+
Instead of making `if-else` branches to check if the receiver is set, you can use
8+
this receiver to wait indefinitely if it is not set.
9+
"""
10+
11+
import asyncio
12+
13+
from typing_extensions import override
14+
15+
from frequenz.channels import Receiver, ReceiverError, ReceiverMessageT_co
16+
17+
18+
class OptionalReceiver(Receiver[ReceiverMessageT_co]):
19+
"""A receiver that will wait indefinitely if there is no underlying receiver.
20+
21+
This receiver is useful when the underlying receiver is not set initially.
22+
Instead of making `if-else` branches to check if the receiver is set, you can use
23+
this receiver to wait indefinitely if it is not set.
24+
"""
25+
26+
def __init__(self, receiver: Receiver[ReceiverMessageT_co] | None):
27+
"""Initialize this instance.
28+
29+
Args:
30+
receiver: The underlying receiver, or `None` if there is no receiver.
31+
"""
32+
self._receiver: Receiver[ReceiverMessageT_co] | None = receiver
33+
34+
@override
35+
async def ready(self) -> bool:
36+
"""Wait until the receiver is ready with a message or an error.
37+
38+
Once a call to `ready()` has finished, the message should be read with
39+
a call to `consume()` (`receive()` or iterated over). The receiver will
40+
remain ready (this method will return immediately) until it is
41+
consumed.
42+
43+
Returns:
44+
Whether the receiver is still active.
45+
"""
46+
if self._receiver is not None:
47+
return await self._receiver.ready()
48+
49+
# If there's no receiver, wait forever
50+
await asyncio.Event().wait()
51+
return False
52+
53+
@override
54+
def consume(self) -> ReceiverMessageT_co: # noqa: DOC503 (raised indirectly)
55+
"""Return the latest from the underlying receiver message once `ready()` is complete.
56+
57+
`ready()` must be called before each call to `consume()`.
58+
59+
Returns:
60+
The next message received.
61+
62+
Raises:
63+
ReceiverStoppedError: If the receiver stopped producing messages.
64+
ReceiverError: If there is some problem with the underlying receiver.
65+
"""
66+
if self._receiver is None:
67+
raise ReceiverError(
68+
"`consume()` must be preceded by a call to `ready()`", self
69+
)
70+
return self._receiver.consume()
71+
72+
def is_set(self) -> bool:
73+
"""Check if the receiver is set."""
74+
return self._receiver is not None
75+
76+
def close(self) -> None:
77+
"""Stop the receiver."""
78+
if self._receiver is not None:
79+
self._receiver.close()
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tests for the OptionalReceiver class."""
5+
6+
import asyncio
7+
from contextlib import aclosing, closing
8+
9+
import pytest
10+
11+
from frequenz.channels import Broadcast, ReceiverError, ReceiverStoppedError
12+
from frequenz.channels.experimental import OptionalReceiver
13+
14+
15+
async def test_receiver_with_none_does_not_end() -> None:
16+
"""Test that the receiver with None does not end."""
17+
with closing(OptionalReceiver[int](None)) as receiver:
18+
assert not receiver.is_set()
19+
20+
with pytest.raises(TimeoutError):
21+
await asyncio.wait_for(receiver.ready(), timeout=0.3)
22+
23+
24+
async def test_receiver_with_none_raises_error_when_consuming() -> None:
25+
"""Test that the receiver with None raises an error when consuming."""
26+
with closing(OptionalReceiver[int](None)) as receiver:
27+
assert not receiver.is_set()
28+
29+
with pytest.raises(ReceiverError):
30+
receiver.consume()
31+
32+
33+
async def test_receiver_with_underlying_receiver_forwards_messages() -> None:
34+
"""Test that the receiver forwards messages."""
35+
async with aclosing(Broadcast[int](name="test")) as channel:
36+
with closing(OptionalReceiver[int](channel.new_receiver())) as receiver:
37+
assert receiver.is_set()
38+
39+
sender = channel.new_sender()
40+
41+
await sender.send(5)
42+
value = await asyncio.wait_for(receiver.receive(), timeout=0.1)
43+
assert value == 5
44+
45+
await sender.send(100)
46+
value = await asyncio.wait_for(receiver.receive(), timeout=0.1)
47+
assert value == 100
48+
49+
50+
async def test_receiver_ends_when_underlying_receiver_ends() -> None:
51+
"""Test that the receiver ends when the underlying receiver ends."""
52+
async with aclosing(Broadcast[int](name="test")) as channel:
53+
with (
54+
closing(channel.new_receiver()) as receiver,
55+
closing(OptionalReceiver[int](receiver)) as optional_receiver,
56+
):
57+
assert optional_receiver.is_set()
58+
59+
receiver.close()
60+
# First check if ready method returns False
61+
is_active = await optional_receiver.ready()
62+
assert is_active is False
63+
64+
# Then check if `receive` method raises ReceiverStoppedError
65+
with pytest.raises(ReceiverStoppedError):
66+
await optional_receiver.receive()

0 commit comments

Comments
 (0)