Skip to content

Commit 1a91c6a

Browse files
authored
Add useful predicates to filter based on the previous value (#341)
This PR started as a way to remove duplicates from a receiver, but then was expanded to `OnlyIfPrevious`, a more generic approach accepting any type of sub-predicate to compare with the previous message. `OnlyIfPrevious` is used as a base to implement also a specific predicate to remove the duplicates (`ChangedOnly`), as this is a very common use case. `OnlyIfPrevious` could be made even more generic by accepting also a function to determine what to save as the last message instead of always saving the new message. Such functionality would allow to implement a filter that ensured the received messages are monotonically increasing for example, by saving as the last message the `max(old, new)` instead of just the `new` message. But this is left as a future improvement, as it is also not trivial because such function should be able to also receive the *sentinel*, which can complicate the API a bit.
2 parents 6362d60 + 2b9541c commit 1a91c6a

File tree

9 files changed

+477
-5
lines changed

9 files changed

+477
-5
lines changed

.github/ISSUE_TEMPLATE/bug.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ body:
5252
- Build script, CI, dependencies, etc. (part:tooling)
5353
- Channels, `Broadcast`, `Anycast`, etc. (part:channels)
5454
- Core types (`Sender`, `Receiver`, exceptions, etc.) (part:core)
55+
- Experimental features (the `experimental` package) (part:experimental)
5556
- Utilities (`Event`, `FileWatcher`, `Timer`, etc.) (part:utilities)
5657
- Synchronization of multiple sources (`select()`, `merge()`, etc.) (part:synchronization)
5758
validations:

.github/keylabeler.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ labelMappings:
1515
"part:channels": "part:channels"
1616
"part:core": "part:core"
1717
"part:docs": "part:docs"
18+
"part:experimental": "part:experimental"
1819
"part:synchronization": "part:synchronization"
1920
"part:tests": "part:tests"
2021
"part:tooling": "part:tooling"

.github/labeler.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@
4949
- "src/frequenz/channels/_receiver.py"
5050
- "src/frequenz/channels/_sender.py"
5151

52+
"part:experimental":
53+
- changed-files:
54+
- any-glob-to-any-file:
55+
- "src/frequenz/channels/experimental/*.py"
56+
5257
"part:synchronization":
5358
- changed-files:
5459
- any-glob-to-any-file:

RELEASE_NOTES.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010

1111
## New Features
1212

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
13+
### Experimental
14+
15+
- A new predicate, `OnlyIfPrevious`, to `filter()` messages based on the previous message.
16+
- A new special case of `OnlyIfPrevious`, `ChangedOnly`, to skip messages if they are equal to the previous message.
1417

1518
## Bug Fixes
1619

src/frequenz/channels/experimental/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@
1010
"""
1111

1212
from ._pipe import Pipe
13+
from ._predicates import ChangedOnly, OnlyIfPrevious
1314
from ._relay_sender import RelaySender
1415

1516
__all__ = [
17+
"ChangedOnly",
18+
"OnlyIfPrevious",
1619
"Pipe",
1720
"RelaySender",
1821
]
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Predicates to be used in conjuntion with `Receiver.filter()`."""
5+
6+
7+
from typing import Callable, Final, Generic, TypeGuard
8+
9+
from frequenz.channels._generic import ChannelMessageT
10+
11+
12+
class _Sentinel:
13+
"""A sentinel to denote that no value has been received yet."""
14+
15+
def __str__(self) -> str:
16+
"""Return a string representation of this sentinel."""
17+
return "<no value received yet>"
18+
19+
20+
_SENTINEL: Final[_Sentinel] = _Sentinel()
21+
22+
23+
class OnlyIfPrevious(Generic[ChannelMessageT]):
24+
"""A predicate to check if a message has a particular relationship with the previous one.
25+
26+
This predicate can be used to filter out messages based on a custom condition on the
27+
previous and current messages. This can be useful in cases where you want to
28+
process messages only if they satisfy a particular condition with respect to the
29+
previous message.
30+
31+
Tip:
32+
If you want to use `==` as predicate, you can use the
33+
[`ChangedOnly`][frequenz.channels.experimental.ChangedOnly] predicate.
34+
35+
Example: Receiving only messages that are not the same instance as the previous one.
36+
```python
37+
from frequenz.channels import Broadcast
38+
from frequenz.channels.experimental import OnlyIfPrevious
39+
40+
channel = Broadcast[int | bool](name="example")
41+
receiver = channel.new_receiver().filter(OnlyIfPrevious(lambda old, new: old is not new))
42+
sender = channel.new_sender()
43+
44+
# This message will be received as it is the first message.
45+
await sender.send(1)
46+
assert await receiver.receive() == 1
47+
48+
# This message will be skipped as it is the same instance as the previous one.
49+
await sender.send(1)
50+
51+
# This message will be received as it is a different instance from the previous
52+
# one.
53+
await sender.send(True)
54+
assert await receiver.receive() is True
55+
```
56+
57+
Example: Receiving only messages if they are bigger than the previous one.
58+
```python
59+
from frequenz.channels import Broadcast
60+
from frequenz.channels.experimental import OnlyIfPrevious
61+
62+
channel = Broadcast[int](name="example")
63+
receiver = channel.new_receiver().filter(
64+
OnlyIfPrevious(lambda old, new: new > old, first_is_true=False)
65+
)
66+
sender = channel.new_sender()
67+
68+
# This message will skipped as first_is_true is False.
69+
await sender.send(1)
70+
71+
# This message will be received as it is bigger than the previous one (1).
72+
await sender.send(2)
73+
assert await receiver.receive() == 2
74+
75+
# This message will be skipped as it is smaller than the previous one (1).
76+
await sender.send(0)
77+
78+
# This message will be skipped as it is not bigger than the previous one (0).
79+
await sender.send(0)
80+
81+
# This message will be received as it is bigger than the previous one (0).
82+
await sender.send(1)
83+
assert await receiver.receive() == 1
84+
85+
# This message will be received as it is bigger than the previous one (1).
86+
await sender.send(2)
87+
assert await receiver.receive() == 2
88+
"""
89+
90+
def __init__(
91+
self,
92+
predicate: Callable[[ChannelMessageT, ChannelMessageT], bool],
93+
*,
94+
first_is_true: bool = True,
95+
) -> None:
96+
"""Initialize this instance.
97+
98+
Args:
99+
predicate: A callable that takes two arguments, the previous message and the
100+
current message, and returns a boolean indicating whether the current
101+
message should be received.
102+
first_is_true: Whether the first message should be considered as satisfying
103+
the predicate. Defaults to `True`.
104+
"""
105+
self._predicate = predicate
106+
self._last_message: ChannelMessageT | _Sentinel = _SENTINEL
107+
self._first_is_true = first_is_true
108+
109+
def __call__(self, message: ChannelMessageT) -> bool:
110+
"""Return whether `message` is the first one received or different from the previous one."""
111+
112+
def is_message(
113+
value: ChannelMessageT | _Sentinel,
114+
) -> TypeGuard[ChannelMessageT]:
115+
return value is not _SENTINEL
116+
117+
old_message = self._last_message
118+
self._last_message = message
119+
if is_message(old_message):
120+
return self._predicate(old_message, message)
121+
return self._first_is_true
122+
123+
def __str__(self) -> str:
124+
"""Return a string representation of this instance."""
125+
return f"{type(self).__name__}:{self._predicate.__name__}"
126+
127+
def __repr__(self) -> str:
128+
"""Return a string representation of this instance."""
129+
return f"<{type(self).__name__}: {self._predicate!r} first_is_true={self._first_is_true!r}>"
130+
131+
132+
class ChangedOnly(OnlyIfPrevious[object]):
133+
"""A predicate to check if a message is different from the previous one.
134+
135+
This predicate can be used to filter out messages that are the same as the previous
136+
one. This can be useful in cases where you want to avoid processing duplicate
137+
messages.
138+
139+
Warning:
140+
This predicate uses the `!=` operator to compare messages, which includes all
141+
the weirdnesses of Python's equality comparison (e.g., `1 == 1.0`, `True == 1`,
142+
`True == 1.0`, `False == 0` are all `True`).
143+
144+
If you need to use a different comparison, you can create a custom predicate
145+
using [`OnlyIfPrevious`][frequenz.channels.experimental.OnlyIfPrevious].
146+
147+
Example:
148+
```python
149+
from frequenz.channels import Broadcast
150+
from frequenz.channels.experimental import ChangedOnly
151+
152+
channel = Broadcast[int](name="skip_duplicates_test")
153+
receiver = channel.new_receiver().filter(ChangedOnly())
154+
sender = channel.new_sender()
155+
156+
# This message will be received as it is the first message.
157+
await sender.send(1)
158+
assert await receiver.receive() == 1
159+
160+
# This message will be skipped as it is the same as the previous one.
161+
await sender.send(1)
162+
163+
# This message will be received as it is different from the previous one.
164+
await sender.send(2)
165+
assert await receiver.receive() == 2
166+
```
167+
"""
168+
169+
def __init__(self, *, first_is_true: bool = True) -> None:
170+
"""Initialize this instance.
171+
172+
Args:
173+
first_is_true: Whether the first message should be considered as different
174+
from the previous one. Defaults to `True`.
175+
"""
176+
super().__init__(lambda old, new: old != new, first_is_true=first_is_true)
177+
178+
def __str__(self) -> str:
179+
"""Return a string representation of this instance."""
180+
return f"{type(self).__name__}"
181+
182+
def __repr__(self) -> str:
183+
"""Return a string representation of this instance."""
184+
return f"{type(self).__name__}(first_is_true={self._first_is_true!r})"
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tests for the ChangedOnly implementation.
5+
6+
Most testing is done in the OnlyIfPrevious tests, these tests are limited to the
7+
specifics of the ChangedOnly implementation.
8+
"""
9+
10+
from dataclasses import dataclass
11+
from unittest.mock import MagicMock
12+
13+
import pytest
14+
15+
from frequenz.channels.experimental import ChangedOnly, OnlyIfPrevious
16+
17+
18+
@dataclass(frozen=True, kw_only=True)
19+
class EqualityTestCase:
20+
"""Test case for testing ChangedOnly behavior with tricky equality cases."""
21+
22+
title: str
23+
first_value: object
24+
second_value: object
25+
expected_second_result: bool
26+
27+
28+
EQUALITY_TEST_CASES = [
29+
# Python's equality weirdness cases
30+
EqualityTestCase(
31+
title="Integer equals float",
32+
first_value=1,
33+
second_value=1.0,
34+
expected_second_result=False,
35+
),
36+
EqualityTestCase(
37+
title="Boolean equals integer",
38+
first_value=True,
39+
second_value=1,
40+
expected_second_result=False,
41+
),
42+
EqualityTestCase(
43+
title="Boolean equals float",
44+
first_value=True,
45+
second_value=1.0,
46+
expected_second_result=False,
47+
),
48+
EqualityTestCase(
49+
title="False equals zero",
50+
first_value=False,
51+
second_value=0,
52+
expected_second_result=False,
53+
),
54+
EqualityTestCase(
55+
title="Zero equals False",
56+
first_value=0,
57+
second_value=False,
58+
expected_second_result=False,
59+
),
60+
# Edge cases that should be different
61+
EqualityTestCase(
62+
title="NaN is never equal to NaN",
63+
first_value=float("nan"),
64+
second_value=float("nan"),
65+
expected_second_result=True,
66+
),
67+
EqualityTestCase(
68+
title="Different list instances with same content",
69+
first_value=[1],
70+
second_value=[1],
71+
expected_second_result=False,
72+
),
73+
]
74+
75+
76+
def test_changed_only_inheritance() -> None:
77+
"""Test that ChangedOnly is properly inheriting from OnlyIfPrevious."""
78+
changed_only = ChangedOnly()
79+
assert isinstance(changed_only, OnlyIfPrevious)
80+
81+
82+
def test_changed_only_predicate_implementation() -> None:
83+
"""Test that ChangedOnly properly implements the inequality predicate."""
84+
# Create mock objects that we can control the equality comparison for
85+
old = MagicMock()
86+
new = MagicMock()
87+
88+
# Set up the inequality comparison
89+
# mypy doesn't understand mocking __ne__ very well
90+
old.__ne__.return_value = True # type: ignore[attr-defined]
91+
92+
changed_only = ChangedOnly()
93+
# Skip the first message as it's handled by first_is_true
94+
changed_only(old)
95+
changed_only(new)
96+
97+
# Verify that __ne__ was called with the correct argument
98+
old.__ne__.assert_called_once_with(new) # type: ignore[attr-defined]
99+
100+
101+
@pytest.mark.parametrize(
102+
"test_case",
103+
EQUALITY_TEST_CASES,
104+
ids=lambda test_case: test_case.title,
105+
)
106+
def test_changed_only_equality_cases(test_case: EqualityTestCase) -> None:
107+
"""Test ChangedOnly behavior with Python's tricky equality cases.
108+
109+
Args:
110+
test_case: The test case containing the input values and expected result.
111+
"""
112+
changed_only = ChangedOnly()
113+
assert changed_only(test_case.first_value) is True # First is always True
114+
assert changed_only(test_case.second_value) is test_case.expected_second_result
115+
116+
117+
def test_changed_only_representation() -> None:
118+
"""Test the string representation of ChangedOnly."""
119+
changed_only = ChangedOnly()
120+
assert str(changed_only) == "ChangedOnly"
121+
assert repr(changed_only) == "ChangedOnly(first_is_true=True)"

0 commit comments

Comments
 (0)