Skip to content

Commit 4096546

Browse files
authored
Implement the Mapping interface for GroupingLatestValueCache (#430)
2 parents 4d0ee05 + 27094a8 commit 4096546

File tree

4 files changed

+131
-48
lines changed

4 files changed

+131
-48
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
## New Features
1212

13-
- This release introduces the experimental `GroupingLatestValueCache`. It is similar to the `LatestValueCache`, but accepts an additional key-function as an argument, which takes each incoming message and returns a key for that message. The latest value received for each unique key gets cached and is available to look up on-demand.
13+
- This release introduces the experimental `GroupingLatestValueCache`. It is similar to the `LatestValueCache`, but accepts an additional key-function as an argument, which takes each incoming message and returns a key for that message. The latest value received for each unique key gets cached and is available to look up on-demand through a `collections.abc.Mapping` interface.
1414

1515
## Bug Fixes
1616

src/frequenz/channels/experimental/__init__.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,26 @@
1010
guidelines](https://github.com/frequenz-floss/docs/blob/v0.x.x/python/experimental-packages.md).
1111
"""
1212

13-
from ._grouping_latest_value_cache import GroupingLatestValueCache
13+
from ._grouping_latest_value_cache import (
14+
DefaultT,
15+
GroupingLatestValueCache,
16+
HashableT,
17+
ValueT_co,
18+
)
1419
from ._nop_receiver import NopReceiver
1520
from ._optional_receiver import OptionalReceiver
1621
from ._pipe import Pipe
1722
from ._relay_sender import RelaySender
1823
from ._with_previous import WithPrevious
1924

2025
__all__ = [
26+
"DefaultT",
2127
"GroupingLatestValueCache",
28+
"HashableT",
2229
"NopReceiver",
2330
"OptionalReceiver",
2431
"Pipe",
2532
"RelaySender",
33+
"ValueT_co",
2634
"WithPrevious",
2735
]

src/frequenz/channels/experimental/_grouping_latest_value_cache.py

Lines changed: 111 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@
1010
takes a [Receiver][frequenz.channels.Receiver] and a `key` function as arguments and
1111
stores the latest value received by that receiver for each key separately.
1212
13-
As soon as a value is received for a `key`, the
14-
[`has_value`][frequenz.channels.experimental.GroupingLatestValueCache.has_value] method
15-
returns `True` for that `key`, and the [`get`][frequenz.channels.LatestValueCache.get]
16-
method for that `key` returns the latest value received. The `get` method will raise an
17-
exception if called before any messages have been received from the receiver for a given
18-
`key`.
13+
The `GroupingLatestValueCache` implements the [`Mapping`][collections.abc.Mapping]
14+
interface, so it can be used like a dictionary. In addition, it provides a
15+
[has_value][frequenz.channels.experimental.GroupingLatestValueCache.has_value] method to
16+
check if a value has been received for a specific key, and a
17+
[clear][frequenz.channels.experimental.GroupingLatestValueCache.clear] method to clear
18+
the cached value for a specific key.
1919
2020
Example:
2121
```python
@@ -39,26 +39,30 @@
3939

4040
import asyncio
4141
import typing
42-
from collections.abc import Set
42+
from collections.abc import ItemsView, Iterator, KeysView, Mapping, ValuesView
43+
44+
from typing_extensions import override
4345

4446
from .._receiver import Receiver
4547

46-
T_co = typing.TypeVar("T_co", covariant=True)
47-
HashableT = typing.TypeVar("HashableT", bound=typing.Hashable)
48+
ValueT_co = typing.TypeVar("ValueT_co", covariant=True)
49+
"""Covariant type variable for the values cached by the `GroupingLatestValueCache`."""
4850

51+
DefaultT = typing.TypeVar("DefaultT")
52+
"""Type variable for the default value returned by `GroupingLatestValueCache.get`."""
53+
54+
HashableT = typing.TypeVar("HashableT", bound=typing.Hashable)
55+
"""Type variable for the keys used to group values in the `GroupingLatestValueCache`."""
4956

50-
class GroupingLatestValueCache(typing.Generic[T_co, HashableT]):
51-
"""A cache that stores the latest value in a receiver.
5257

53-
It provides a way to look up the latest value in a stream without any delay,
54-
as long as there has been one value received.
55-
"""
58+
class GroupingLatestValueCache(Mapping[HashableT, ValueT_co]):
59+
"""A cache that stores the latest value in a receiver, grouped by key."""
5660

5761
def __init__(
5862
self,
59-
receiver: Receiver[T_co],
60-
key: typing.Callable[[T_co], typing.Any],
63+
receiver: Receiver[ValueT_co],
6164
*,
65+
key: typing.Callable[[ValueT_co], HashableT],
6266
unique_id: str | None = None,
6367
) -> None:
6468
"""Create a new cache.
@@ -71,10 +75,10 @@ def __init__(
7175
provided, a unique identifier will be generated from the object's
7276
[`id()`][id]. It is used mostly for debugging purposes.
7377
"""
74-
self._receiver: Receiver[T_co] = receiver
75-
self._key: typing.Callable[[T_co], HashableT] = key
78+
self._receiver: Receiver[ValueT_co] = receiver
79+
self._key: typing.Callable[[ValueT_co], HashableT] = key
7680
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
77-
self._latest_value_by_key: dict[HashableT, T_co] = {}
81+
self._latest_value_by_key: dict[HashableT, ValueT_co] = {}
7882
self._task: asyncio.Task[None] = asyncio.create_task(
7983
self._run(), name=f"LatestValueCache«{self._unique_id}»"
8084
)
@@ -84,47 +88,121 @@ def unique_id(self) -> str:
8488
"""The unique identifier of this instance."""
8589
return self._unique_id
8690

87-
def keys(self) -> Set[HashableT]:
91+
@override
92+
def keys(self) -> KeysView[HashableT]:
8893
"""Return the set of keys for which values have been received.
8994
9095
If no key function is provided, this will return an empty set.
9196
"""
9297
return self._latest_value_by_key.keys()
9398

94-
def get(self, key: HashableT) -> T_co:
99+
@override
100+
def items(self) -> ItemsView[HashableT, ValueT_co]:
101+
"""Return an iterator over the key-value pairs of the latest values received."""
102+
return self._latest_value_by_key.items()
103+
104+
@override
105+
def values(self) -> ValuesView[ValueT_co]:
106+
"""Return an iterator over the latest values received."""
107+
return self._latest_value_by_key.values()
108+
109+
@typing.overload
110+
def get(self, key: HashableT, default: None = None) -> ValueT_co | None:
111+
"""Return the latest value that has been received for a specific key."""
112+
113+
# MyPy passes this overload as a valid signature, but pylint does not like it.
114+
@typing.overload
115+
def get( # pylint: disable=signature-differs
116+
self, key: HashableT, default: DefaultT
117+
) -> ValueT_co | DefaultT:
118+
"""Return the latest value that has been received for a specific key."""
119+
120+
@override
121+
def get(
122+
self, key: HashableT, default: DefaultT | None = None
123+
) -> ValueT_co | DefaultT | None:
95124
"""Return the latest value that has been received.
96125
97-
This raises a `ValueError` if no value has been received yet. Use `has_value` to
98-
check whether a value has been received yet, before trying to access the value,
99-
to avoid the exception.
100-
101126
Args:
102127
key: An optional key to retrieve the latest value for that key. If not
103128
provided, it retrieves the latest value received overall.
129+
default: The default value to return if no value has been received yet for
130+
the specified key. If not provided, it defaults to `None`.
104131
105132
Returns:
106133
The latest value that has been received.
134+
"""
135+
return self._latest_value_by_key.get(key, default)
136+
137+
@override
138+
def __iter__(self) -> Iterator[HashableT]:
139+
"""Return an iterator over the keys for which values have been received."""
140+
return iter(self._latest_value_by_key)
141+
142+
@override
143+
def __len__(self) -> int:
144+
"""Return the number of keys for which values have been received."""
145+
return len(self._latest_value_by_key)
146+
147+
@override
148+
def __getitem__(self, key: HashableT) -> ValueT_co:
149+
"""Return the latest value that has been received for a specific key.
150+
151+
Args:
152+
key: The key to retrieve the latest value for.
153+
154+
Returns:
155+
The latest value that has been received for that key.
107156
108157
Raises:
109-
ValueError: If no value has been received yet.
158+
KeyError: If no value has been received yet for that key.
110159
"""
111160
if key not in self._latest_value_by_key:
112-
raise ValueError(f"No value received for key: {key!r}")
161+
raise KeyError(f"No value received for key: {key!r}")
113162
return self._latest_value_by_key[key]
114163

115-
def has_value(self, key: HashableT) -> bool:
116-
"""Check whether a value has been received yet.
117-
118-
If `key` is provided, it checks whether a value has been received for that key.
164+
@override
165+
def __contains__(self, key: object, /) -> bool:
166+
"""Check if a value has been received for a specific key.
119167
120168
Args:
121-
key: An optional key to check if a value has been received for that key.
169+
key: The key to check for.
122170
123171
Returns:
124-
`True` if a value has been received, `False` otherwise.
172+
`True` if a value has been received for that key, `False` otherwise.
125173
"""
126174
return key in self._latest_value_by_key
127175

176+
@override
177+
def __eq__(self, other: object, /) -> bool:
178+
"""Check if this cache is equal to another object.
179+
180+
Two caches are considered equal if they have the same keys and values.
181+
182+
Args:
183+
other: The object to compare with.
184+
185+
Returns:
186+
`True` if the caches are equal, `False` otherwise.
187+
"""
188+
if not isinstance(other, GroupingLatestValueCache):
189+
return NotImplemented
190+
return self._latest_value_by_key == other._latest_value_by_key
191+
192+
@override
193+
def __ne__(self, value: object, /) -> bool:
194+
"""Check if this cache is not equal to another object.
195+
196+
Args:
197+
value: The object to compare with.
198+
199+
Returns:
200+
`True` if the caches are not equal, `False` otherwise.
201+
"""
202+
if not isinstance(value, GroupingLatestValueCache):
203+
return NotImplemented
204+
return self._latest_value_by_key != value._latest_value_by_key
205+
128206
def clear(self, key: HashableT) -> None:
129207
"""Clear the latest value for a specific key.
130208

tests/test_grouping_latest_value_cache_integration.py

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@ async def test_latest_value_cache_key() -> None:
1616
"""Ensure LatestValueCache works with keys."""
1717
channel = Broadcast[tuple[int, str]](name="lvc_test")
1818

19-
cache: GroupingLatestValueCache[tuple[int, str], int] = GroupingLatestValueCache(
19+
cache: GroupingLatestValueCache[int, tuple[int, str]] = GroupingLatestValueCache(
2020
channel.new_receiver(), key=lambda x: x[0]
2121
)
2222
sender = channel.new_sender()
2323

24-
assert not cache.has_value(5)
25-
with pytest.raises(ValueError, match="No value received for key: 0"):
26-
cache.get(0)
24+
assert 5 not in cache
25+
assert cache.get(0) is None
2726

2827
assert cache.keys() == set()
2928

@@ -32,17 +31,16 @@ async def test_latest_value_cache_key() -> None:
3231
await sender.send((5, "c"))
3332
await asyncio.sleep(0)
3433

35-
assert cache.has_value(5)
36-
assert cache.has_value(6)
37-
assert not cache.has_value(7)
34+
assert 5 in cache
35+
assert 6 in cache
36+
assert 7 not in cache
3837

3938
assert cache.get(5) == (5, "c")
4039
assert cache.get(6) == (6, "b")
4140

4241
assert cache.keys() == {5, 6}
4342

44-
with pytest.raises(ValueError, match="No value received for key: 7"):
45-
cache.get(7)
43+
assert cache.get(7, default=(7, "default")) == (7, "default")
4644

4745
await sender.send((12, "d"))
4846
await asyncio.sleep(0)
@@ -62,9 +60,8 @@ async def test_latest_value_cache_key() -> None:
6260
assert cache.keys() == {5, 6, 12}
6361

6462
cache.clear(5)
65-
assert not cache.has_value(5)
66-
assert cache.has_value(6)
63+
assert 5 not in cache
64+
assert 6 in cache
6765

68-
with pytest.raises(ValueError, match="No value received for key: 5"):
69-
assert cache.get(5)
66+
assert cache.get(5) is None
7067
assert cache.keys() == {6, 12}

0 commit comments

Comments
 (0)