Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ body:
- Build script, CI, dependencies, etc. (part:tooling)
- Channels, `Broadcast`, `Anycast`, etc. (part:channels)
- Core types (`Sender`, `Receiver`, exceptions, etc.) (part:core)
- Experimental features (the `experimental` package) (part:experimental)
- Utilities (`Event`, `FileWatcher`, `Timer`, etc.) (part:utilities)
- Synchronization of multiple sources (`select()`, `merge()`, etc.) (part:synchronization)
validations:
Expand Down
1 change: 1 addition & 0 deletions .github/keylabeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ labelMappings:
"part:channels": "part:channels"
"part:core": "part:core"
"part:docs": "part:docs"
"part:experimental": "part:experimental"
"part:synchronization": "part:synchronization"
"part:tests": "part:tests"
"part:tooling": "part:tooling"
Expand Down
5 changes: 5 additions & 0 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@
- "src/frequenz/channels/_receiver.py"
- "src/frequenz/channels/_sender.py"

"part:experimental":
- changed-files:
- any-glob-to-any-file:
- "src/frequenz/channels/experimental/*.py"

"part:synchronization":
- changed-files:
- any-glob-to-any-file:
Expand Down
5 changes: 4 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
### Experimental

- A new predicate, `OnlyIfPrevious`, to `filter()` messages based on the previous message.
- A new special case of `OnlyIfPrevious`, `ChangedOnly`, to skip messages if they are equal to the previous message.

## Bug Fixes

Expand Down
3 changes: 3 additions & 0 deletions src/frequenz/channels/experimental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
"""

from ._pipe import Pipe
from ._predicates import ChangedOnly, OnlyIfPrevious
from ._relay_sender import RelaySender

__all__ = [
"ChangedOnly",
"OnlyIfPrevious",
"Pipe",
"RelaySender",
]
184 changes: 184 additions & 0 deletions src/frequenz/channels/experimental/_predicates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""Predicates to be used in conjuntion with `Receiver.filter()`."""


from typing import Callable, Final, Generic, TypeGuard

from frequenz.channels._generic import ChannelMessageT


class _Sentinel:
"""A sentinel to denote that no value has been received yet."""

def __str__(self) -> str:
"""Return a string representation of this sentinel."""
return "<no value received yet>"


_SENTINEL: Final[_Sentinel] = _Sentinel()


class OnlyIfPrevious(Generic[ChannelMessageT]):
"""A predicate to check if a message has a particular relationship with the previous one.

This predicate can be used to filter out messages based on a custom condition on the
previous and current messages. This can be useful in cases where you want to
process messages only if they satisfy a particular condition with respect to the
previous message.

Tip:
If you want to use `==` as predicate, you can use the
[`ChangedOnly`][frequenz.channels.experimental.ChangedOnly] predicate.

Example: Receiving only messages that are not the same instance as the previous one.
```python
from frequenz.channels import Broadcast
from frequenz.channels.experimental import OnlyIfPrevious

channel = Broadcast[int | bool](name="example")
receiver = channel.new_receiver().filter(OnlyIfPrevious(lambda old, new: old is not new))
sender = channel.new_sender()

# This message will be received as it is the first message.
await sender.send(1)
assert await receiver.receive() == 1

# This message will be skipped as it is the same instance as the previous one.
await sender.send(1)

# This message will be received as it is a different instance from the previous
# one.
await sender.send(True)
assert await receiver.receive() is True
```

Example: Receiving only messages if they are bigger than the previous one.
```python
from frequenz.channels import Broadcast
from frequenz.channels.experimental import OnlyIfPrevious

channel = Broadcast[int](name="example")
receiver = channel.new_receiver().filter(
OnlyIfPrevious(lambda old, new: new > old, first_is_true=False)
)
sender = channel.new_sender()

# This message will skipped as first_is_true is False.
await sender.send(1)

# This message will be received as it is bigger than the previous one (1).
await sender.send(2)
assert await receiver.receive() == 2

# This message will be skipped as it is smaller than the previous one (1).
await sender.send(0)

# This message will be skipped as it is not bigger than the previous one (0).
await sender.send(0)

# This message will be received as it is bigger than the previous one (0).
await sender.send(1)
assert await receiver.receive() == 1

# This message will be received as it is bigger than the previous one (1).
await sender.send(2)
assert await receiver.receive() == 2
"""

def __init__(
self,
predicate: Callable[[ChannelMessageT, ChannelMessageT], bool],
*,
first_is_true: bool = True,
) -> None:
"""Initialize this instance.

Args:
predicate: A callable that takes two arguments, the previous message and the
current message, and returns a boolean indicating whether the current
message should be received.
first_is_true: Whether the first message should be considered as satisfying
the predicate. Defaults to `True`.
"""
self._predicate = predicate
self._last_message: ChannelMessageT | _Sentinel = _SENTINEL
self._first_is_true = first_is_true

def __call__(self, message: ChannelMessageT) -> bool:
"""Return whether `message` is the first one received or different from the previous one."""

def is_message(
value: ChannelMessageT | _Sentinel,
) -> TypeGuard[ChannelMessageT]:
return value is not _SENTINEL

old_message = self._last_message
self._last_message = message
if is_message(old_message):
return self._predicate(old_message, message)
return self._first_is_true

def __str__(self) -> str:
"""Return a string representation of this instance."""
return f"{type(self).__name__}:{self._predicate.__name__}"

def __repr__(self) -> str:
"""Return a string representation of this instance."""
return f"<{type(self).__name__}: {self._predicate!r} first_is_true={self._first_is_true!r}>"


class ChangedOnly(OnlyIfPrevious[object]):
"""A predicate to check if a message is different from the previous one.

This predicate can be used to filter out messages that are the same as the previous
one. This can be useful in cases where you want to avoid processing duplicate
messages.

Warning:
This predicate uses the `!=` operator to compare messages, which includes all
the weirdnesses of Python's equality comparison (e.g., `1 == 1.0`, `True == 1`,
`True == 1.0`, `False == 0` are all `True`).

If you need to use a different comparison, you can create a custom predicate
using [`OnlyIfPrevious`][frequenz.channels.experimental.OnlyIfPrevious].

Example:
```python
from frequenz.channels import Broadcast
from frequenz.channels.experimental import ChangedOnly

channel = Broadcast[int](name="skip_duplicates_test")
receiver = channel.new_receiver().filter(ChangedOnly())
sender = channel.new_sender()

# This message will be received as it is the first message.
await sender.send(1)
assert await receiver.receive() == 1

# This message will be skipped as it is the same as the previous one.
await sender.send(1)

# This message will be received as it is different from the previous one.
await sender.send(2)
assert await receiver.receive() == 2
```
"""

def __init__(self, *, first_is_true: bool = True) -> None:
"""Initialize this instance.

Args:
first_is_true: Whether the first message should be considered as different
from the previous one. Defaults to `True`.
"""
super().__init__(lambda old, new: old != new, first_is_true=first_is_true)

def __str__(self) -> str:
"""Return a string representation of this instance."""
return f"{type(self).__name__}"

def __repr__(self) -> str:
"""Return a string representation of this instance."""
return f"{type(self).__name__}(first_is_true={self._first_is_true!r})"
121 changes: 121 additions & 0 deletions tests/experimental/test_predicates_changed_only.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""Tests for the ChangedOnly implementation.

Most testing is done in the OnlyIfPrevious tests, these tests are limited to the
specifics of the ChangedOnly implementation.
"""

from dataclasses import dataclass
from unittest.mock import MagicMock

import pytest

from frequenz.channels.experimental import ChangedOnly, OnlyIfPrevious


@dataclass(frozen=True, kw_only=True)
class EqualityTestCase:
"""Test case for testing ChangedOnly behavior with tricky equality cases."""

title: str
first_value: object
second_value: object
expected_second_result: bool


EQUALITY_TEST_CASES = [
# Python's equality weirdness cases
EqualityTestCase(
title="Integer equals float",
first_value=1,
second_value=1.0,
expected_second_result=False,
),
EqualityTestCase(
title="Boolean equals integer",
first_value=True,
second_value=1,
expected_second_result=False,
),
EqualityTestCase(
title="Boolean equals float",
first_value=True,
second_value=1.0,
expected_second_result=False,
),
EqualityTestCase(
title="False equals zero",
first_value=False,
second_value=0,
expected_second_result=False,
),
EqualityTestCase(
title="Zero equals False",
first_value=0,
second_value=False,
expected_second_result=False,
),
# Edge cases that should be different
EqualityTestCase(
title="NaN is never equal to NaN",
first_value=float("nan"),
second_value=float("nan"),
expected_second_result=True,
),
EqualityTestCase(
title="Different list instances with same content",
first_value=[1],
second_value=[1],
expected_second_result=False,
),
]


def test_changed_only_inheritance() -> None:
"""Test that ChangedOnly is properly inheriting from OnlyIfPrevious."""
changed_only = ChangedOnly()
assert isinstance(changed_only, OnlyIfPrevious)


def test_changed_only_predicate_implementation() -> None:
"""Test that ChangedOnly properly implements the inequality predicate."""
# Create mock objects that we can control the equality comparison for
old = MagicMock()
new = MagicMock()

# Set up the inequality comparison
# mypy doesn't understand mocking __ne__ very well
old.__ne__.return_value = True # type: ignore[attr-defined]

changed_only = ChangedOnly()
# Skip the first message as it's handled by first_is_true
changed_only(old)
changed_only(new)

# Verify that __ne__ was called with the correct argument
old.__ne__.assert_called_once_with(new) # type: ignore[attr-defined]


@pytest.mark.parametrize(
"test_case",
EQUALITY_TEST_CASES,
ids=lambda test_case: test_case.title,
)
def test_changed_only_equality_cases(test_case: EqualityTestCase) -> None:
"""Test ChangedOnly behavior with Python's tricky equality cases.

Args:
test_case: The test case containing the input values and expected result.
"""
changed_only = ChangedOnly()
assert changed_only(test_case.first_value) is True # First is always True
assert changed_only(test_case.second_value) is test_case.expected_second_result


def test_changed_only_representation() -> None:
"""Test the string representation of ChangedOnly."""
changed_only = ChangedOnly()
assert str(changed_only) == "ChangedOnly"
assert repr(changed_only) == "ChangedOnly(first_is_true=True)"
Loading