diff --git a/.github/ISSUE_TEMPLATE/bug.yml b/.github/ISSUE_TEMPLATE/bug.yml index 73b6ada9..0f440753 100644 --- a/.github/ISSUE_TEMPLATE/bug.yml +++ b/.github/ISSUE_TEMPLATE/bug.yml @@ -52,6 +52,7 @@ body: - Build script, CI, dependencies, etc. (part:tooling) - Channels, `Broadcast`, `Anycast`, etc. (part:channels) - Core types (`Sender`, `Receiver`, exceptions, etc.) (part:core) + - Experimental features (the `experimental` package) (part:experimental) - Utilities (`Event`, `FileWatcher`, `Timer`, etc.) (part:utilities) - Synchronization of multiple sources (`select()`, `merge()`, etc.) (part:synchronization) validations: diff --git a/.github/keylabeler.yml b/.github/keylabeler.yml index c4b10014..24f0ce1c 100644 --- a/.github/keylabeler.yml +++ b/.github/keylabeler.yml @@ -15,6 +15,7 @@ labelMappings: "part:channels": "part:channels" "part:core": "part:core" "part:docs": "part:docs" + "part:experimental": "part:experimental" "part:synchronization": "part:synchronization" "part:tests": "part:tests" "part:tooling": "part:tooling" diff --git a/.github/labeler.yml b/.github/labeler.yml index dac58db6..b50ec0e3 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -49,6 +49,11 @@ - "src/frequenz/channels/_receiver.py" - "src/frequenz/channels/_sender.py" +"part:experimental": + - changed-files: + - any-glob-to-any-file: + - "src/frequenz/channels/experimental/*.py" + "part:synchronization": - changed-files: - any-glob-to-any-file: diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 96a0240b..5702410a 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -10,7 +10,10 @@ ## New Features - +### Experimental + +- A new predicate, `OnlyIfPrevious`, to `filter()` messages based on the previous message. +- A new special case of `OnlyIfPrevious`, `ChangedOnly`, to skip messages if they are equal to the previous message. ## Bug Fixes diff --git a/src/frequenz/channels/experimental/__init__.py b/src/frequenz/channels/experimental/__init__.py index 2d1881f4..6fe6b0d0 100644 --- a/src/frequenz/channels/experimental/__init__.py +++ b/src/frequenz/channels/experimental/__init__.py @@ -10,9 +10,12 @@ """ from ._pipe import Pipe +from ._predicates import ChangedOnly, OnlyIfPrevious from ._relay_sender import RelaySender __all__ = [ + "ChangedOnly", + "OnlyIfPrevious", "Pipe", "RelaySender", ] diff --git a/src/frequenz/channels/experimental/_predicates.py b/src/frequenz/channels/experimental/_predicates.py new file mode 100644 index 00000000..bf5e45f2 --- /dev/null +++ b/src/frequenz/channels/experimental/_predicates.py @@ -0,0 +1,184 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Predicates to be used in conjuntion with `Receiver.filter()`.""" + + +from typing import Callable, Final, Generic, TypeGuard + +from frequenz.channels._generic import ChannelMessageT + + +class _Sentinel: + """A sentinel to denote that no value has been received yet.""" + + def __str__(self) -> str: + """Return a string representation of this sentinel.""" + return "" + + +_SENTINEL: Final[_Sentinel] = _Sentinel() + + +class OnlyIfPrevious(Generic[ChannelMessageT]): + """A predicate to check if a message has a particular relationship with the previous one. + + This predicate can be used to filter out messages based on a custom condition on the + previous and current messages. This can be useful in cases where you want to + process messages only if they satisfy a particular condition with respect to the + previous message. + + Tip: + If you want to use `==` as predicate, you can use the + [`ChangedOnly`][frequenz.channels.experimental.ChangedOnly] predicate. + + Example: Receiving only messages that are not the same instance as the previous one. + ```python + from frequenz.channels import Broadcast + from frequenz.channels.experimental import OnlyIfPrevious + + channel = Broadcast[int | bool](name="example") + receiver = channel.new_receiver().filter(OnlyIfPrevious(lambda old, new: old is not new)) + sender = channel.new_sender() + + # This message will be received as it is the first message. + await sender.send(1) + assert await receiver.receive() == 1 + + # This message will be skipped as it is the same instance as the previous one. + await sender.send(1) + + # This message will be received as it is a different instance from the previous + # one. + await sender.send(True) + assert await receiver.receive() is True + ``` + + Example: Receiving only messages if they are bigger than the previous one. + ```python + from frequenz.channels import Broadcast + from frequenz.channels.experimental import OnlyIfPrevious + + channel = Broadcast[int](name="example") + receiver = channel.new_receiver().filter( + OnlyIfPrevious(lambda old, new: new > old, first_is_true=False) + ) + sender = channel.new_sender() + + # This message will skipped as first_is_true is False. + await sender.send(1) + + # This message will be received as it is bigger than the previous one (1). + await sender.send(2) + assert await receiver.receive() == 2 + + # This message will be skipped as it is smaller than the previous one (1). + await sender.send(0) + + # This message will be skipped as it is not bigger than the previous one (0). + await sender.send(0) + + # This message will be received as it is bigger than the previous one (0). + await sender.send(1) + assert await receiver.receive() == 1 + + # This message will be received as it is bigger than the previous one (1). + await sender.send(2) + assert await receiver.receive() == 2 + """ + + def __init__( + self, + predicate: Callable[[ChannelMessageT, ChannelMessageT], bool], + *, + first_is_true: bool = True, + ) -> None: + """Initialize this instance. + + Args: + predicate: A callable that takes two arguments, the previous message and the + current message, and returns a boolean indicating whether the current + message should be received. + first_is_true: Whether the first message should be considered as satisfying + the predicate. Defaults to `True`. + """ + self._predicate = predicate + self._last_message: ChannelMessageT | _Sentinel = _SENTINEL + self._first_is_true = first_is_true + + def __call__(self, message: ChannelMessageT) -> bool: + """Return whether `message` is the first one received or different from the previous one.""" + + def is_message( + value: ChannelMessageT | _Sentinel, + ) -> TypeGuard[ChannelMessageT]: + return value is not _SENTINEL + + old_message = self._last_message + self._last_message = message + if is_message(old_message): + return self._predicate(old_message, message) + return self._first_is_true + + def __str__(self) -> str: + """Return a string representation of this instance.""" + return f"{type(self).__name__}:{self._predicate.__name__}" + + def __repr__(self) -> str: + """Return a string representation of this instance.""" + return f"<{type(self).__name__}: {self._predicate!r} first_is_true={self._first_is_true!r}>" + + +class ChangedOnly(OnlyIfPrevious[object]): + """A predicate to check if a message is different from the previous one. + + This predicate can be used to filter out messages that are the same as the previous + one. This can be useful in cases where you want to avoid processing duplicate + messages. + + Warning: + This predicate uses the `!=` operator to compare messages, which includes all + the weirdnesses of Python's equality comparison (e.g., `1 == 1.0`, `True == 1`, + `True == 1.0`, `False == 0` are all `True`). + + If you need to use a different comparison, you can create a custom predicate + using [`OnlyIfPrevious`][frequenz.channels.experimental.OnlyIfPrevious]. + + Example: + ```python + from frequenz.channels import Broadcast + from frequenz.channels.experimental import ChangedOnly + + channel = Broadcast[int](name="skip_duplicates_test") + receiver = channel.new_receiver().filter(ChangedOnly()) + sender = channel.new_sender() + + # This message will be received as it is the first message. + await sender.send(1) + assert await receiver.receive() == 1 + + # This message will be skipped as it is the same as the previous one. + await sender.send(1) + + # This message will be received as it is different from the previous one. + await sender.send(2) + assert await receiver.receive() == 2 + ``` + """ + + def __init__(self, *, first_is_true: bool = True) -> None: + """Initialize this instance. + + Args: + first_is_true: Whether the first message should be considered as different + from the previous one. Defaults to `True`. + """ + super().__init__(lambda old, new: old != new, first_is_true=first_is_true) + + def __str__(self) -> str: + """Return a string representation of this instance.""" + return f"{type(self).__name__}" + + def __repr__(self) -> str: + """Return a string representation of this instance.""" + return f"{type(self).__name__}(first_is_true={self._first_is_true!r})" diff --git a/tests/experimental/test_predicates_changed_only.py b/tests/experimental/test_predicates_changed_only.py new file mode 100644 index 00000000..4d735d85 --- /dev/null +++ b/tests/experimental/test_predicates_changed_only.py @@ -0,0 +1,121 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Tests for the ChangedOnly implementation. + +Most testing is done in the OnlyIfPrevious tests, these tests are limited to the +specifics of the ChangedOnly implementation. +""" + +from dataclasses import dataclass +from unittest.mock import MagicMock + +import pytest + +from frequenz.channels.experimental import ChangedOnly, OnlyIfPrevious + + +@dataclass(frozen=True, kw_only=True) +class EqualityTestCase: + """Test case for testing ChangedOnly behavior with tricky equality cases.""" + + title: str + first_value: object + second_value: object + expected_second_result: bool + + +EQUALITY_TEST_CASES = [ + # Python's equality weirdness cases + EqualityTestCase( + title="Integer equals float", + first_value=1, + second_value=1.0, + expected_second_result=False, + ), + EqualityTestCase( + title="Boolean equals integer", + first_value=True, + second_value=1, + expected_second_result=False, + ), + EqualityTestCase( + title="Boolean equals float", + first_value=True, + second_value=1.0, + expected_second_result=False, + ), + EqualityTestCase( + title="False equals zero", + first_value=False, + second_value=0, + expected_second_result=False, + ), + EqualityTestCase( + title="Zero equals False", + first_value=0, + second_value=False, + expected_second_result=False, + ), + # Edge cases that should be different + EqualityTestCase( + title="NaN is never equal to NaN", + first_value=float("nan"), + second_value=float("nan"), + expected_second_result=True, + ), + EqualityTestCase( + title="Different list instances with same content", + first_value=[1], + second_value=[1], + expected_second_result=False, + ), +] + + +def test_changed_only_inheritance() -> None: + """Test that ChangedOnly is properly inheriting from OnlyIfPrevious.""" + changed_only = ChangedOnly() + assert isinstance(changed_only, OnlyIfPrevious) + + +def test_changed_only_predicate_implementation() -> None: + """Test that ChangedOnly properly implements the inequality predicate.""" + # Create mock objects that we can control the equality comparison for + old = MagicMock() + new = MagicMock() + + # Set up the inequality comparison + # mypy doesn't understand mocking __ne__ very well + old.__ne__.return_value = True # type: ignore[attr-defined] + + changed_only = ChangedOnly() + # Skip the first message as it's handled by first_is_true + changed_only(old) + changed_only(new) + + # Verify that __ne__ was called with the correct argument + old.__ne__.assert_called_once_with(new) # type: ignore[attr-defined] + + +@pytest.mark.parametrize( + "test_case", + EQUALITY_TEST_CASES, + ids=lambda test_case: test_case.title, +) +def test_changed_only_equality_cases(test_case: EqualityTestCase) -> None: + """Test ChangedOnly behavior with Python's tricky equality cases. + + Args: + test_case: The test case containing the input values and expected result. + """ + changed_only = ChangedOnly() + assert changed_only(test_case.first_value) is True # First is always True + assert changed_only(test_case.second_value) is test_case.expected_second_result + + +def test_changed_only_representation() -> None: + """Test the string representation of ChangedOnly.""" + changed_only = ChangedOnly() + assert str(changed_only) == "ChangedOnly" + assert repr(changed_only) == "ChangedOnly(first_is_true=True)" diff --git a/tests/experimental/test_predicates_only_if_previous.py b/tests/experimental/test_predicates_only_if_previous.py new file mode 100644 index 00000000..b76bb96d --- /dev/null +++ b/tests/experimental/test_predicates_only_if_previous.py @@ -0,0 +1,158 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Tests for the OnlyIfPrevious implementation.""" + +from dataclasses import dataclass +from typing import Callable, Generic, TypeVar + +import pytest + +from frequenz.channels.experimental import OnlyIfPrevious + +_T = TypeVar("_T") + + +@dataclass(frozen=True, kw_only=True) +class PredicateTestCase(Generic[_T]): + """Test case for testing OnlyIfPrevious behavior with different predicates.""" + + title: str + messages: list[_T] + expected_results: list[bool] + predicate: Callable[[_T, _T], bool] + first_is_true: bool + + +def always_true(old: object, new: object) -> bool: # pylint: disable=unused-argument + """Return always True.""" + return True + + +def always_false(old: object, new: object) -> bool: # pylint: disable=unused-argument + """Return always False.""" + return False + + +def is_greater(old: int, new: int) -> bool: + """Return weather the new value is greater than the old one.""" + return new > old + + +def is_not_same_instance(old: object, new: object) -> bool: + """Return weather the new value is not the same instance as the old one.""" + return old is not new + + +PREDICATE_TEST_CASES = [ + # Basic cases with different predicates + PredicateTestCase( + title="Always true predicate", + messages=[1, 2, 3], + expected_results=[True, True, True], + predicate=always_true, + first_is_true=True, + ), + PredicateTestCase( + title="Always false predicate with first_is_true=False", + messages=[1, 2, 3], + expected_results=[False, False, False], + predicate=always_false, + first_is_true=False, + ), + PredicateTestCase( + title="Greater than predicate", + messages=[1, 2, 0, 0, 1, 2], + expected_results=[False, True, False, False, True, True], + predicate=is_greater, + first_is_true=False, + ), + # Edge cases + PredicateTestCase( + title="Empty sequence", + messages=[], + expected_results=[], + predicate=always_true, + first_is_true=True, + ), + PredicateTestCase( + title="Single value with first_is_true=True", + messages=[1], + expected_results=[True], + predicate=always_false, + first_is_true=True, + ), + PredicateTestCase( + title="Single value with first_is_true=False", + messages=[1], + expected_results=[False], + predicate=always_true, + first_is_true=False, + ), + # Instance comparison + PredicateTestCase( + title="Same instances", + messages=[1, 1], + expected_results=[True, False], + predicate=is_not_same_instance, + first_is_true=True, + ), + PredicateTestCase( + title="Different instances of same values", + messages=[[1], [1]], + expected_results=[True, True], + predicate=is_not_same_instance, + first_is_true=True, + ), +] + + +@pytest.mark.parametrize( + "test_case", + PREDICATE_TEST_CASES, + ids=lambda test_case: test_case.title, +) +def test_only_if_previous(test_case: PredicateTestCase[_T]) -> None: + """Test the OnlyIfPrevious with different predicates and sequences. + + Args: + test_case: The test case containing the input values and expected results. + """ + only_if_previous = OnlyIfPrevious( + test_case.predicate, + first_is_true=test_case.first_is_true, + ) + results = [only_if_previous(msg) for msg in test_case.messages] + assert results == test_case.expected_results + + +def test_only_if_previous_state_independence() -> None: + """Test that multiple OnlyIfPrevious instances maintain independent state.""" + only_if_previous1 = OnlyIfPrevious(is_greater) + only_if_previous2 = OnlyIfPrevious(is_greater) + + # First message should be accepted (first_is_true default is True) + assert only_if_previous1(1) is True + assert only_if_previous2(10) is True + + # Second messages should be evaluated independently + assert only_if_previous1(0) is False # 0 is not greater than 1 + assert only_if_previous2(20) is True # 20 is greater than 10 + + +def test_only_if_previous_str_representation() -> None: + """Test the string representation of OnlyIfPrevious.""" + only_if_previous = OnlyIfPrevious(is_greater) + assert str(only_if_previous) == "OnlyIfPrevious:is_greater" + assert ( + repr(only_if_previous) == f"" + ) + + +def test_only_if_previous_sentinel_str() -> None: + """Test the string representation of the sentinel value.""" + only_if_previous = OnlyIfPrevious(always_true) + + # Access the private attribute for testing purposes + # pylint: disable=protected-access + assert str(only_if_previous._last_message) == "" diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py deleted file mode 100644 index 25e1e6d9..00000000 --- a/tests/utils/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -# License: MIT -# Copyright © 2022 Frequenz Energy-as-a-Service GmbH - -"""Tests for channel utils."""