Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
- An experimental `NopReceiver` implementation has been added, which can be used as a place-holder receiver that never receives a message.

## Bug Fixes

Expand Down
2 changes: 2 additions & 0 deletions src/frequenz/channels/experimental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
guidelines](https://github.com/frequenz-floss/docs/blob/v0.x.x/python/experimental-packages.md).
"""

from ._nop_receiver import NopReceiver
from ._optional_receiver import OptionalReceiver
from ._pipe import Pipe
from ._relay_sender import RelaySender
from ._with_previous import WithPrevious

__all__ = [
"NopReceiver",
Copy link

Copilot AI May 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider sorting the all list alphabetically to improve readability and maintainability.

Copilot uses AI. Check for mistakes.
"OptionalReceiver",
"Pipe",
"RelaySender",
Expand Down
57 changes: 57 additions & 0 deletions src/frequenz/channels/experimental/_nop_receiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""A receiver that will never receive a message.
It is useful as a place-holder receiver for use in contexts where a receiver is
necessary, but one is not available.
"""

import asyncio

from typing_extensions import override

from frequenz.channels import Receiver, ReceiverError, ReceiverMessageT_co
from frequenz.channels._receiver import ReceiverStoppedError


class NopReceiver(Receiver[ReceiverMessageT_co]):
"""A place-holder receiver that will never receive a message."""

def __init__(self) -> None:
"""Initialize this instance."""
self._closed: bool = False

@override
async def ready(self) -> bool:
"""Wait for ever unless the receiver is closed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forever

Returns:
Whether the receiver is still active.
"""
if self._closed:
return False
await asyncio.Future()
return False

@override
def consume(self) -> ReceiverMessageT_co: # noqa: DOC503 (raised indirectly)
"""Raise `ReceiverError` unless the NopReceiver is closed.
If the receiver is closed, then raise `ReceiverStoppedError`.
Returns:
The next message received.
Raises:
ReceiverStoppedError: If the receiver stopped producing messages.
ReceiverError: If there is some problem with the underlying receiver.
"""
if self._closed:
raise ReceiverStoppedError(self)
raise ReceiverError("`consume()` must be preceded by a call to `ready()`", self)

@override
def close(self) -> None:
"""Stop the receiver."""
self._closed = True
40 changes: 40 additions & 0 deletions tests/experimental/test_nop_receiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""Tests for the NopReceiver."""

import asyncio
from contextlib import closing

import pytest

from frequenz.channels import ReceiverError
from frequenz.channels._receiver import ReceiverStoppedError
from frequenz.channels.experimental import NopReceiver


async def test_never_ready() -> None:
"""Test that the receiver is never ready."""
# When it is not closed, `ready()` should never return.
with closing(NopReceiver[int]()) as receiver:
with pytest.raises(TimeoutError):
await asyncio.wait_for(receiver.ready(), timeout=0.1)

# When it is closed, `ready()` should return False.
receiver = NopReceiver[int]()
receiver.close()
assert await asyncio.wait_for(receiver.ready(), timeout=0.1) is False


async def test_consuming_raises() -> None:
"""Test that consume raises an error."""
# When it is not closed, `consume()` should raise a ReceiverError.
with closing(NopReceiver[int]()) as receiver:
with pytest.raises(ReceiverError):
receiver.consume()

# When it is closed, `consume()` should raise a ReceiverStoppedError.
receiver = NopReceiver[int]()
receiver.close()
with pytest.raises(ReceiverStoppedError):
receiver.consume()