Skip to content

Commit 5b496a8

Browse files
Add NullableReceiver for handling unset receivers
Add a new NullableReceiver class that wraps an optional underlying receiver, allowing for indefinite waiting when no receiver is set. This eliminates the need for explicit if-else branches to check if receiver is None. Signed-off-by: Elzbieta Kotulska <[email protected]>
1 parent 0d7c2b7 commit 5b496a8

File tree

4 files changed

+135
-1
lines changed

4 files changed

+135
-1
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
## New Features
1515

16-
<!-- Here goes the main new features and examples or instructions on how to use them -->
16+
* Add a new `NullableReceiver` class that wraps an optional underlying receiver, allowing for indefinite waiting when no receiver is set.
1717

1818
## Bug Fixes
1919

src/frequenz/channels/experimental/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@
99
removal, even in minor updates.
1010
"""
1111

12+
from ._nullable_receiver import NullableReceiver
1213
from ._pipe import Pipe
1314
from ._relay_sender import RelaySender
1415
from ._with_previous import WithPrevious
1516

1617
__all__ = [
18+
"NullableReceiver",
1719
"WithPrevious",
1820
"Pipe",
1921
"RelaySender",
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A receiver that will wait indefinitely if there is no underlying receiver.
5+
6+
The `NullableReceiver` is useful when the underlying receiver is not set initially.
7+
Instead of making `if-else` branches to check if the receiver is set, you can use
8+
this receiver to wait indefinitely if it is not set.
9+
"""
10+
11+
import asyncio
12+
from typing import override
13+
14+
from frequenz.channels import Receiver, ReceiverError, ReceiverMessageT_co
15+
16+
17+
class NullableReceiver(Receiver[ReceiverMessageT_co]):
18+
"""A receiver that will wait indefinitely if there is no underlying receiver.
19+
20+
This receiver is useful when the underlying receiver is not set initially.
21+
Instead of making `if-else` branches to check if the receiver is set, you can use
22+
this receiver to wait indefinitely if it is not set.
23+
"""
24+
25+
def __init__(self, receiver: Receiver[ReceiverMessageT_co] | None):
26+
"""Initialize this instance.
27+
28+
Args:
29+
receiver: The underlying receiver, or `None` if there is no receiver.
30+
"""
31+
self._receiver: Receiver[ReceiverMessageT_co] | None = receiver
32+
33+
@override
34+
async def ready(self) -> bool:
35+
"""Wait until the receiver is ready with a message or an error.
36+
37+
Once a call to `ready()` has finished, the message should be read with
38+
a call to `consume()` (`receive()` or iterated over). The receiver will
39+
remain ready (this method will return immediately) until it is
40+
consumed.
41+
42+
Returns:
43+
Whether the receiver is still active.
44+
"""
45+
if self._receiver is not None:
46+
return await self._receiver.ready()
47+
48+
# If there's no receiver, wait forever
49+
await asyncio.Event().wait()
50+
return False
51+
52+
@override
53+
def consume(self) -> ReceiverMessageT_co: # noqa: DOC503 (raised indirectly)
54+
"""Return the latest from the underlying receiver message once `ready()` is complete.
55+
56+
`ready()` must be called before each call to `consume()`.
57+
58+
Returns:
59+
The next message received.
60+
61+
Raises:
62+
ReceiverStoppedError: If the receiver stopped producing messages.
63+
ReceiverError: If there is some problem with the underlying receiver.
64+
"""
65+
if self._receiver is None:
66+
raise ReceiverError(
67+
"`consume()` must be preceded by a call to `ready()`", self
68+
)
69+
return self._receiver.consume()
70+
71+
def close(self) -> None:
72+
"""Stop the receiver."""
73+
if self._receiver is not None:
74+
self._receiver.close()
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tests for the NullableReceiver class."""
5+
6+
import asyncio
7+
from contextlib import aclosing, closing
8+
9+
import pytest
10+
11+
from frequenz.channels import Broadcast, ReceiverError, ReceiverStoppedError
12+
from frequenz.channels.experimental import NullableReceiver
13+
14+
15+
async def test_receiver_with_none_does_not_end() -> None:
16+
"""Test that the receiver with None does not end."""
17+
with closing(NullableReceiver[int](None)) as receiver:
18+
with pytest.raises(TimeoutError):
19+
await asyncio.wait_for(receiver.ready(), timeout=0.3)
20+
21+
22+
async def test_receiver_with_none_raises_error_when_consuming() -> None:
23+
"""Test that the receiver with None raises an error when consuming."""
24+
with closing(NullableReceiver[int](None)) as receiver:
25+
with pytest.raises(ReceiverError):
26+
receiver.consume()
27+
28+
29+
async def test_receiver_with_underlying_receiver_forwards_messages() -> None:
30+
"""Test that the receiver forwards messages."""
31+
async with aclosing(Broadcast[int](name="test")) as channel:
32+
with closing(NullableReceiver[int](channel.new_receiver())) as receiver:
33+
sender = channel.new_sender()
34+
35+
await sender.send(5)
36+
value = await asyncio.wait_for(receiver.receive(), timeout=0.1)
37+
assert value == 5
38+
39+
await sender.send(100)
40+
value = await asyncio.wait_for(receiver.receive(), timeout=0.1)
41+
assert value == 100
42+
43+
44+
async def test_receiver_ends_when_underlying_receiver_ends() -> None:
45+
"""Test that the receiver ends when the underlying receiver ends."""
46+
async with aclosing(Broadcast[int](name="test")) as channel:
47+
with (
48+
closing(channel.new_receiver()) as receiver,
49+
closing(NullableReceiver[int](receiver)) as nullable_receiver,
50+
):
51+
receiver.close()
52+
# First check if ready method returns False
53+
is_active = await nullable_receiver.ready()
54+
assert is_active is False
55+
56+
# Then check if `receive` method raises ReceiverStoppedError
57+
with pytest.raises(ReceiverStoppedError):
58+
await nullable_receiver.receive()

0 commit comments

Comments
 (0)