Skip to content

Commit 25ebf75

Browse files
committed
Implement a NopReceiver
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 4097be1 commit 25ebf75

File tree

4 files changed

+100
-1
lines changed

4 files changed

+100
-1
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
## New Features
1212

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
13+
- An experimental `NopReceiver` implementation has been added, which can be used as a place-holder receiver that never receives a message.
1414

1515
## Bug Fixes
1616

src/frequenz/channels/experimental/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,14 @@
1010
guidelines](https://github.com/frequenz-floss/docs/blob/v0.x.x/python/experimental-packages.md).
1111
"""
1212

13+
from ._nop_receiver import NopReceiver
1314
from ._optional_receiver import OptionalReceiver
1415
from ._pipe import Pipe
1516
from ._relay_sender import RelaySender
1617
from ._with_previous import WithPrevious
1718

1819
__all__ = [
20+
"NopReceiver",
1921
"OptionalReceiver",
2022
"Pipe",
2123
"RelaySender",
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A receiver that will never receive a message.
5+
6+
It is useful as a place-holder receiver for use in contexts where a receiver is
7+
necessary, but one is not available.
8+
"""
9+
10+
import asyncio
11+
12+
from typing_extensions import override
13+
14+
from frequenz.channels import Receiver, ReceiverError, ReceiverMessageT_co
15+
from frequenz.channels._receiver import ReceiverStoppedError
16+
17+
18+
class NopReceiver(Receiver[ReceiverMessageT_co]):
19+
"""A place-holder receiver that will never receive a message."""
20+
21+
def __init__(self) -> None:
22+
"""Initialize this instance."""
23+
self._closed: bool = False
24+
25+
@override
26+
async def ready(self) -> bool:
27+
"""Wait for ever unless the receiver is closed.
28+
29+
Returns:
30+
Whether the receiver is still active.
31+
"""
32+
if self._closed:
33+
return False
34+
await asyncio.Future()
35+
return False
36+
37+
@override
38+
def consume(self) -> ReceiverMessageT_co: # noqa: DOC503 (raised indirectly)
39+
"""Raise `ReceiverError` unless the NopReceiver is closed.
40+
41+
If the receiver is closed, then raise `ReceiverStoppedError`.
42+
43+
Returns:
44+
The next message received.
45+
46+
Raises:
47+
ReceiverStoppedError: If the receiver stopped producing messages.
48+
ReceiverError: If there is some problem with the underlying receiver.
49+
"""
50+
if self._closed:
51+
raise ReceiverStoppedError(self)
52+
raise ReceiverError("`consume()` must be preceded by a call to `ready()`", self)
53+
54+
@override
55+
def close(self) -> None:
56+
"""Stop the receiver."""
57+
self._closed = True
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tests for the NopReceiver."""
5+
6+
import asyncio
7+
from contextlib import closing
8+
9+
import pytest
10+
11+
from frequenz.channels import ReceiverError
12+
from frequenz.channels._receiver import ReceiverStoppedError
13+
from frequenz.channels.experimental import NopReceiver
14+
15+
16+
async def test_never_ready() -> None:
17+
"""Test that the receiver is never ready."""
18+
# When it is not closed, `ready()` should never return.
19+
with closing(NopReceiver[int]()) as receiver:
20+
with pytest.raises(TimeoutError):
21+
await asyncio.wait_for(receiver.ready(), timeout=0.1)
22+
23+
# When it is closed, `ready()` should return False.
24+
receiver = NopReceiver[int]()
25+
receiver.close()
26+
assert await asyncio.wait_for(receiver.ready(), timeout=0.1) is False
27+
28+
29+
async def test_consuming_raises() -> None:
30+
"""Test that consume raises an error."""
31+
# When it is not closed, `consume()` should raise a ReceiverError.
32+
with closing(NopReceiver[int]()) as receiver:
33+
with pytest.raises(ReceiverError):
34+
receiver.consume()
35+
36+
# When it is closed, `consume()` should raise a ReceiverStoppedError.
37+
receiver = NopReceiver[int]()
38+
receiver.close()
39+
with pytest.raises(ReceiverStoppedError):
40+
receiver.consume()

0 commit comments

Comments
 (0)