Skip to content

Commit f5a4981

Browse files
authored
Implement a NopReceiver (#419)
It is useful as a place-holder receiver for use in contexts where a receiver is necessary, but one is not available.
2 parents 7aeb268 + eb31a09 commit f5a4981

File tree

8 files changed

+120
-13
lines changed

8 files changed

+120
-13
lines changed

RELEASE_NOTES.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,14 @@
1010

1111
## New Features
1212

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
13+
- An experimental `NopReceiver` implementation has been added, which can be used as a place-holder receiver that never receives a message.
14+
15+
- The experimental `OptionalReceiver` has been deprecated. It will be removed with the next major release. It can be replaced with a `NopReceiver` as follows:
16+
17+
```python
18+
opt_recv: Receiver[T] | None
19+
recv: Receiver[T] = NopReceiver[T]() if opt_recv is None else opt_recv
20+
```
1421

1522
## Bug Fixes
1623

src/frequenz/channels/experimental/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,16 @@
1010
guidelines](https://github.com/frequenz-floss/docs/blob/v0.x.x/python/experimental-packages.md).
1111
"""
1212

13+
from ._nop_receiver import NopReceiver
1314
from ._optional_receiver import OptionalReceiver
1415
from ._pipe import Pipe
1516
from ._relay_sender import RelaySender
1617
from ._with_previous import WithPrevious
1718

1819
__all__ = [
20+
"NopReceiver",
1921
"OptionalReceiver",
20-
"WithPrevious",
2122
"Pipe",
2223
"RelaySender",
24+
"WithPrevious",
2325
]
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A receiver that will never receive a message.
5+
6+
It is useful as a place-holder receiver for use in contexts where a receiver is
7+
necessary, but one is not available.
8+
"""
9+
10+
import asyncio
11+
12+
from typing_extensions import override
13+
14+
from frequenz.channels import Receiver, ReceiverError, ReceiverMessageT_co
15+
from frequenz.channels._receiver import ReceiverStoppedError
16+
17+
18+
class NopReceiver(Receiver[ReceiverMessageT_co]):
19+
"""A place-holder receiver that will never receive a message."""
20+
21+
def __init__(self) -> None:
22+
"""Initialize this instance."""
23+
self._closed: bool = False
24+
25+
@override
26+
async def ready(self) -> bool:
27+
"""Wait for ever unless the receiver is closed.
28+
29+
Returns:
30+
Whether the receiver is still active.
31+
"""
32+
if self._closed:
33+
return False
34+
await asyncio.Future()
35+
return False
36+
37+
@override
38+
def consume(self) -> ReceiverMessageT_co: # noqa: DOC503 (raised indirectly)
39+
"""Raise `ReceiverError` unless the NopReceiver is closed.
40+
41+
If the receiver is closed, then raise `ReceiverStoppedError`.
42+
43+
Returns:
44+
The next message received.
45+
46+
Raises:
47+
ReceiverStoppedError: If the receiver stopped producing messages.
48+
ReceiverError: If there is some problem with the underlying receiver.
49+
"""
50+
if self._closed:
51+
raise ReceiverStoppedError(self)
52+
raise ReceiverError("`consume()` must be preceded by a call to `ready()`", self)
53+
54+
@override
55+
def close(self) -> None:
56+
"""Stop the receiver."""
57+
self._closed = True

src/frequenz/channels/experimental/_optional_receiver.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@
1010

1111
import asyncio
1212

13-
from typing_extensions import override
13+
from typing_extensions import deprecated, override
1414

1515
from frequenz.channels import Receiver, ReceiverError, ReceiverMessageT_co
1616

1717

18+
@deprecated("Use `frequenz.channels.experimental.NopReceiver` instead.")
1819
class OptionalReceiver(Receiver[ReceiverMessageT_co]):
1920
"""A receiver that will wait indefinitely if there is no underlying receiver.
2021
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tests for the NopReceiver."""
5+
6+
import asyncio
7+
from contextlib import closing
8+
9+
import pytest
10+
11+
from frequenz.channels import ReceiverError
12+
from frequenz.channels._receiver import ReceiverStoppedError
13+
from frequenz.channels.experimental import NopReceiver
14+
15+
16+
async def test_never_ready() -> None:
17+
"""Test that the receiver is never ready."""
18+
# When it is not closed, `ready()` should never return.
19+
with closing(NopReceiver[int]()) as receiver:
20+
with pytest.raises(TimeoutError):
21+
await asyncio.wait_for(receiver.ready(), timeout=0.1)
22+
23+
# When it is closed, `ready()` should return False.
24+
receiver = NopReceiver[int]()
25+
receiver.close()
26+
assert await asyncio.wait_for(receiver.ready(), timeout=0.1) is False
27+
28+
29+
async def test_consuming_raises() -> None:
30+
"""Test that consume raises an error."""
31+
# When it is not closed, `consume()` should raise a ReceiverError.
32+
with closing(NopReceiver[int]()) as receiver:
33+
with pytest.raises(ReceiverError):
34+
receiver.consume()
35+
36+
# When it is closed, `consume()` should raise a ReceiverStoppedError.
37+
receiver = NopReceiver[int]()
38+
receiver.close()
39+
with pytest.raises(ReceiverStoppedError):
40+
receiver.consume()

tests/test_anycast.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ async def update_tracker_on_receive(receiver_id: int, recv: Receiver[int]) -> No
6464
senders.append(send_msg(acast.new_sender()))
6565

6666
await asyncio.gather(*senders)
67-
await acast.close()
67+
await acast.aclose()
6868
await receivers_runs
6969

7070
with pytest.raises(SenderError):
@@ -92,7 +92,7 @@ async def test_anycast_after_close() -> None:
9292

9393
await sender.send(2)
9494

95-
await acast.close()
95+
await acast.aclose()
9696

9797
with pytest.raises(SenderError):
9898
await sender.send(5)
@@ -174,7 +174,7 @@ async def test_anycast_async_iterator() -> None:
174174
async def send_messages() -> None:
175175
for val in ["one", "two", "three", "four", "five"]:
176176
await sender.send(val)
177-
await acast.close()
177+
await acast.aclose()
178178

179179
sender_task = asyncio.create_task(send_messages())
180180

tests/test_broadcast.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ async def update_tracker_on_receive(receiver_id: int, recv: Receiver[int]) -> No
5858
senders.append(send_msg(bcast.new_sender()))
5959

6060
await asyncio.gather(*senders)
61-
await bcast.close()
61+
await bcast.aclose()
6262
await receivers_runs
6363

6464
actual_sum = 0
@@ -93,7 +93,7 @@ async def test_broadcast_after_close() -> None:
9393
receiver = bcast.new_receiver()
9494
sender = bcast.new_sender()
9595

96-
await bcast.close()
96+
await bcast.aclose()
9797

9898
with pytest.raises(SenderError):
9999
await sender.send(5)
@@ -204,7 +204,7 @@ async def test_broadcast_async_iterator() -> None:
204204
async def send_messages() -> None:
205205
for val in range(0, 10):
206206
await sender.send(val)
207-
await bcast.close()
207+
await bcast.aclose()
208208

209209
sender_task = asyncio.create_task(send_messages())
210210

tests/test_merge_integration.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ async def send(ch1: Sender[int], ch2: Sender[int]) -> None:
2222
for ctr in range(5):
2323
await ch1.send(ctr + 1)
2424
await ch2.send(ctr + 101)
25-
await chan1.close()
25+
await chan1.aclose()
2626
await ch2.send(1000)
27-
await chan2.close()
27+
await chan2.aclose()
2828

2929
senders = asyncio.create_task(send(chan1.new_sender(), chan2.new_sender()))
3030

@@ -52,8 +52,8 @@ async def send(ch1: Sender[int], ch2: Sender[int]) -> None:
5252
for ctr in range(5):
5353
await ch1.send(ctr + 1)
5454
await ch2.send(ctr + 101)
55-
await chan1.close()
56-
await chan2.close()
55+
await chan1.aclose()
56+
await chan2.aclose()
5757

5858
rx1 = chan1.new_receiver()
5959
rx2 = chan2.new_receiver()

0 commit comments

Comments
 (0)