Skip to content

Commit 0a48f9e

Browse files
committed
Add a predicate to filter messages based on the previous one
Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 6da7bcc commit 0a48f9e

File tree

4 files changed

+292
-1
lines changed

4 files changed

+292
-1
lines changed

RELEASE_NOTES.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010

1111
## New Features
1212

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
13+
### Experimental
14+
15+
- A new predicate, `OnlyIfPrevious`, to `filter()` messages based on the previous message.
1416

1517
## Bug Fixes
1618

src/frequenz/channels/experimental/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010
"""
1111

1212
from ._pipe import Pipe
13+
from ._predicates import OnlyIfPrevious
1314
from ._relay_sender import RelaySender
1415

1516
__all__ = [
17+
"OnlyIfPrevious",
1618
"Pipe",
1719
"RelaySender",
1820
]
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Predicates to be used in conjuntion with `Receiver.filter()`."""
5+
6+
7+
from typing import Callable, Final, Generic, TypeGuard
8+
9+
from frequenz.channels._generic import ChannelMessageT
10+
11+
12+
class _Sentinel:
13+
"""A sentinel to denote that no value has been received yet."""
14+
15+
def __str__(self) -> str:
16+
"""Return a string representation of this sentinel."""
17+
return "<no value received yet>"
18+
19+
20+
_SENTINEL: Final[_Sentinel] = _Sentinel()
21+
22+
23+
class OnlyIfPrevious(Generic[ChannelMessageT]):
24+
"""A predicate to check if a message has a particular relationship with the previous one.
25+
26+
This predicate can be used to filter out messages based on a custom condition on the
27+
previous and current messages. This can be useful in cases where you want to
28+
process messages only if they satisfy a particular condition with respect to the
29+
previous message.
30+
31+
Tip:
32+
If you want to use `==` as predicate, you can use the
33+
[`ChangedOnly`][frequenz.channels.experimental.ChangedOnly] predicate.
34+
35+
Example: Receiving only messages that are not the same instance as the previous one.
36+
```python
37+
from frequenz.channels import Broadcast
38+
from frequenz.channels.experimental import OnlyIfPrevious
39+
40+
channel = Broadcast[int | bool](name="example")
41+
receiver = channel.new_receiver().filter(OnlyIfPrevious(lambda old, new: old is not new))
42+
sender = channel.new_sender()
43+
44+
# This message will be received as it is the first message.
45+
await sender.send(1)
46+
assert await receiver.receive() == 1
47+
48+
# This message will be skipped as it is the same instance as the previous one.
49+
await sender.send(1)
50+
51+
# This message will be received as it is a different instance from the previous
52+
# one.
53+
await sender.send(True)
54+
assert await receiver.receive() is True
55+
```
56+
57+
Example: Receiving only messages if they are bigger than the previous one.
58+
```python
59+
from frequenz.channels import Broadcast
60+
from frequenz.channels.experimental import OnlyIfPrevious
61+
62+
channel = Broadcast[int](name="example")
63+
receiver = channel.new_receiver().filter(
64+
OnlyIfPrevious(lambda old, new: new > old, first_is_true=False)
65+
)
66+
sender = channel.new_sender()
67+
68+
# This message will skipped as first_is_true is False.
69+
await sender.send(1)
70+
71+
# This message will be received as it is bigger than the previous one (1).
72+
await sender.send(2)
73+
assert await receiver.receive() == 2
74+
75+
# This message will be skipped as it is smaller than the previous one (1).
76+
await sender.send(0)
77+
78+
# This message will be skipped as it is not bigger than the previous one (0).
79+
await sender.send(0)
80+
81+
# This message will be received as it is bigger than the previous one (0).
82+
await sender.send(1)
83+
assert await receiver.receive() == 1
84+
85+
# This message will be received as it is bigger than the previous one (1).
86+
await sender.send(2)
87+
assert await receiver.receive() == 2
88+
"""
89+
90+
def __init__(
91+
self,
92+
predicate: Callable[[ChannelMessageT, ChannelMessageT], bool],
93+
*,
94+
first_is_true: bool = True,
95+
) -> None:
96+
"""Initialize this instance.
97+
98+
Args:
99+
predicate: A callable that takes two arguments, the previous message and the
100+
current message, and returns a boolean indicating whether the current
101+
message should be received.
102+
first_is_true: Whether the first message should be considered as satisfying
103+
the predicate. Defaults to `True`.
104+
"""
105+
self._predicate = predicate
106+
self._last_message: ChannelMessageT | _Sentinel = _SENTINEL
107+
self._first_is_true = first_is_true
108+
109+
def __call__(self, message: ChannelMessageT) -> bool:
110+
"""Return whether `message` is the first one received or different from the previous one."""
111+
112+
def is_message(
113+
value: ChannelMessageT | _Sentinel,
114+
) -> TypeGuard[ChannelMessageT]:
115+
return value is not _SENTINEL
116+
117+
old_message = self._last_message
118+
self._last_message = message
119+
if is_message(old_message):
120+
return self._predicate(old_message, message)
121+
return self._first_is_true
122+
123+
def __str__(self) -> str:
124+
"""Return a string representation of this instance."""
125+
return f"{type(self).__name__}:{self._predicate.__name__}"
126+
127+
def __repr__(self) -> str:
128+
"""Return a string representation of this instance."""
129+
return f"<{type(self).__name__}: {self._predicate!r} first_is_true={self._first_is_true!r}>"
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tests for the OnlyIfPrevious implementation."""
5+
6+
from dataclasses import dataclass
7+
from typing import Callable, Generic, TypeVar
8+
9+
import pytest
10+
11+
from frequenz.channels.experimental import OnlyIfPrevious
12+
13+
_T = TypeVar("_T")
14+
15+
16+
@dataclass(frozen=True, kw_only=True)
17+
class PredicateTestCase(Generic[_T]):
18+
"""Test case for testing OnlyIfPrevious behavior with different predicates."""
19+
20+
title: str
21+
messages: list[_T]
22+
expected_results: list[bool]
23+
predicate: Callable[[_T, _T], bool]
24+
first_is_true: bool
25+
26+
27+
def always_true(old: object, new: object) -> bool: # pylint: disable=unused-argument
28+
"""Return always True."""
29+
return True
30+
31+
32+
def always_false(old: object, new: object) -> bool: # pylint: disable=unused-argument
33+
"""Return always False."""
34+
return False
35+
36+
37+
def is_greater(old: int, new: int) -> bool:
38+
"""Return weather the new value is greater than the old one."""
39+
return new > old
40+
41+
42+
def is_not_same_instance(old: object, new: object) -> bool:
43+
"""Return weather the new value is not the same instance as the old one."""
44+
return old is not new
45+
46+
47+
PREDICATE_TEST_CASES = [
48+
# Basic cases with different predicates
49+
PredicateTestCase(
50+
title="Always true predicate",
51+
messages=[1, 2, 3],
52+
expected_results=[True, True, True],
53+
predicate=always_true,
54+
first_is_true=True,
55+
),
56+
PredicateTestCase(
57+
title="Always false predicate with first_is_true=False",
58+
messages=[1, 2, 3],
59+
expected_results=[False, False, False],
60+
predicate=always_false,
61+
first_is_true=False,
62+
),
63+
PredicateTestCase(
64+
title="Greater than predicate",
65+
messages=[1, 2, 0, 0, 1, 2],
66+
expected_results=[False, True, False, False, True, True],
67+
predicate=is_greater,
68+
first_is_true=False,
69+
),
70+
# Edge cases
71+
PredicateTestCase(
72+
title="Empty sequence",
73+
messages=[],
74+
expected_results=[],
75+
predicate=always_true,
76+
first_is_true=True,
77+
),
78+
PredicateTestCase(
79+
title="Single value with first_is_true=True",
80+
messages=[1],
81+
expected_results=[True],
82+
predicate=always_false,
83+
first_is_true=True,
84+
),
85+
PredicateTestCase(
86+
title="Single value with first_is_true=False",
87+
messages=[1],
88+
expected_results=[False],
89+
predicate=always_true,
90+
first_is_true=False,
91+
),
92+
# Instance comparison
93+
PredicateTestCase(
94+
title="Same instances",
95+
messages=[1, 1],
96+
expected_results=[True, False],
97+
predicate=is_not_same_instance,
98+
first_is_true=True,
99+
),
100+
PredicateTestCase(
101+
title="Different instances of same values",
102+
messages=[[1], [1]],
103+
expected_results=[True, True],
104+
predicate=is_not_same_instance,
105+
first_is_true=True,
106+
),
107+
]
108+
109+
110+
@pytest.mark.parametrize(
111+
"test_case",
112+
PREDICATE_TEST_CASES,
113+
ids=lambda test_case: test_case.title,
114+
)
115+
def test_only_if_previous(test_case: PredicateTestCase[_T]) -> None:
116+
"""Test the OnlyIfPrevious with different predicates and sequences.
117+
118+
Args:
119+
test_case: The test case containing the input values and expected results.
120+
"""
121+
only_if_previous = OnlyIfPrevious(
122+
test_case.predicate,
123+
first_is_true=test_case.first_is_true,
124+
)
125+
results = [only_if_previous(msg) for msg in test_case.messages]
126+
assert results == test_case.expected_results
127+
128+
129+
def test_only_if_previous_state_independence() -> None:
130+
"""Test that multiple OnlyIfPrevious instances maintain independent state."""
131+
only_if_previous1 = OnlyIfPrevious(is_greater)
132+
only_if_previous2 = OnlyIfPrevious(is_greater)
133+
134+
# First message should be accepted (first_is_true default is True)
135+
assert only_if_previous1(1) is True
136+
assert only_if_previous2(10) is True
137+
138+
# Second messages should be evaluated independently
139+
assert only_if_previous1(0) is False # 0 is not greater than 1
140+
assert only_if_previous2(20) is True # 20 is greater than 10
141+
142+
143+
def test_only_if_previous_str_representation() -> None:
144+
"""Test the string representation of OnlyIfPrevious."""
145+
only_if_previous = OnlyIfPrevious(is_greater)
146+
assert str(only_if_previous) == "OnlyIfPrevious:is_greater"
147+
assert (
148+
repr(only_if_previous) == f"<OnlyIfPrevious: {is_greater!r} first_is_true=True>"
149+
)
150+
151+
152+
def test_only_if_previous_sentinel_str() -> None:
153+
"""Test the string representation of the sentinel value."""
154+
only_if_previous = OnlyIfPrevious(always_true)
155+
156+
# Access the private attribute for testing purposes
157+
# pylint: disable=protected-access
158+
assert str(only_if_previous._last_message) == "<no value received yet>"

0 commit comments

Comments
 (0)