Skip to content

Commit 2aeff28

Browse files
authored
Improve GroupingLatestValueCache (#433)
Fixes some documentation issues and brings the interface more close to `MutableMapping`.
2 parents 4096546 + 082bee9 commit 2aeff28

File tree

2 files changed

+175
-57
lines changed

2 files changed

+175
-57
lines changed

src/frequenz/channels/experimental/_grouping_latest_value_cache.py

Lines changed: 116 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,88 @@
11
# License: MIT
22
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
33

4-
"""The GroupingLatestValueCache caches the latest values in a receiver grouped by key.
4+
"""The GroupingLatestValueCache caches the latest values in a receiver grouped by key."""
55

6-
It provides a way to look up on demand, the latest value in a stream for any key, as
7-
long as there has been at least one value received for that key.
86

9-
[GroupingLatestValueCache][frequenz.channels.experimental.GroupingLatestValueCache]
10-
takes a [Receiver][frequenz.channels.Receiver] and a `key` function as arguments and
11-
stores the latest value received by that receiver for each key separately.
7+
import asyncio
8+
from collections.abc import Callable, ItemsView, Iterator, KeysView, Mapping, ValuesView
9+
from typing import Hashable, TypeVar, overload
1210

13-
The `GroupingLatestValueCache` implements the [`Mapping`][collections.abc.Mapping]
14-
interface, so it can be used like a dictionary. In addition, it provides a
15-
[has_value][frequenz.channels.experimental.GroupingLatestValueCache.has_value] method to
16-
check if a value has been received for a specific key, and a
17-
[clear][frequenz.channels.experimental.GroupingLatestValueCache.clear] method to clear
18-
the cached value for a specific key.
11+
from typing_extensions import override
1912

20-
Example:
21-
```python
22-
from frequenz.channels import Broadcast
23-
from frequenz.channels.experimental import GroupingLatestValueCache
13+
from .._receiver import Receiver
2414

25-
channel = Broadcast[tuple[int, str]](name="lvc_test")
15+
ValueT_co = TypeVar("ValueT_co", covariant=True)
16+
"""Covariant type variable for the values cached by the `GroupingLatestValueCache`."""
2617

27-
cache = GroupingLatestValueCache(channel.new_receiver(), key=lambda x: x[0])
28-
sender = channel.new_sender()
18+
DefaultT = TypeVar("DefaultT")
19+
"""Type variable for the default value returned by `GroupingLatestValueCache.get`."""
2920

30-
assert not cache.has_value(6)
21+
HashableT = TypeVar("HashableT", bound=Hashable)
22+
"""Type variable for the keys used to group values in the `GroupingLatestValueCache`."""
3123

32-
await sender.send((6, "twenty-six"))
3324

34-
assert cache.has_value(6)
35-
assert cache.get(6) == (6, "twenty-six")
36-
```
37-
"""
25+
class _NotSpecified:
26+
"""A sentinel value to indicate that no default value was provided."""
3827

28+
def __repr__(self) -> str:
29+
"""Return a string representation of this sentinel."""
30+
return "<_NotSpecified>"
3931

40-
import asyncio
41-
import typing
42-
from collections.abc import ItemsView, Iterator, KeysView, Mapping, ValuesView
4332

44-
from typing_extensions import override
33+
class GroupingLatestValueCache(Mapping[HashableT, ValueT_co]):
34+
"""A cache that stores the latest value in a receiver, grouped by key.
4535
46-
from .._receiver import Receiver
36+
It provides a way to look up on demand, the latest value in a stream for any key, as
37+
long as there has been at least one value received for that key.
4738
48-
ValueT_co = typing.TypeVar("ValueT_co", covariant=True)
49-
"""Covariant type variable for the values cached by the `GroupingLatestValueCache`."""
39+
[GroupingLatestValueCache][frequenz.channels.experimental.GroupingLatestValueCache]
40+
takes a [Receiver][frequenz.channels.Receiver] and a `key` function as arguments and
41+
stores the latest value received by that receiver for each key separately.
5042
51-
DefaultT = typing.TypeVar("DefaultT")
52-
"""Type variable for the default value returned by `GroupingLatestValueCache.get`."""
43+
The `GroupingLatestValueCache` implements the [`Mapping`][collections.abc.Mapping]
44+
interface, so it can be used like a dictionary. Additionally other methods from
45+
[`MutableMapping`][collections.abc.MutableMapping] are implemented, but only
46+
methods removing items from the cache are allowed, such as
47+
[`pop()`][frequenz.channels.experimental.GroupingLatestValueCache.pop],
48+
[`popitem()`][frequenz.channels.experimental.GroupingLatestValueCache.popitem],
49+
[`clear()`][frequenz.channels.experimental.GroupingLatestValueCache.clear], and
50+
[`__delitem__()`][frequenz.channels.experimental.GroupingLatestValueCache.__delitem__].
51+
Other update methods are not provided because the user should not update the
52+
cache values directly.
5353
54-
HashableT = typing.TypeVar("HashableT", bound=typing.Hashable)
55-
"""Type variable for the keys used to group values in the `GroupingLatestValueCache`."""
54+
Example:
55+
```python
56+
from frequenz.channels import Broadcast
57+
from frequenz.channels.experimental import GroupingLatestValueCache
5658
59+
channel = Broadcast[tuple[int, str]](name="lvc_test")
5760
58-
class GroupingLatestValueCache(Mapping[HashableT, ValueT_co]):
59-
"""A cache that stores the latest value in a receiver, grouped by key."""
61+
cache = GroupingLatestValueCache(channel.new_receiver(), key=lambda x: x[0])
62+
sender = channel.new_sender()
63+
64+
assert cache.get(6) is None
65+
assert 6 not in cache
66+
67+
await sender.send((6, "twenty-six"))
68+
69+
assert 6 in cache
70+
assert cache.get(6) == (6, "twenty-six")
71+
72+
del cache[6]
73+
74+
assert cache.get(6) is None
75+
assert 6 not in cache
76+
77+
await cache.stop()
78+
```
79+
"""
6080

6181
def __init__(
6282
self,
6383
receiver: Receiver[ValueT_co],
6484
*,
65-
key: typing.Callable[[ValueT_co], HashableT],
85+
key: Callable[[ValueT_co], HashableT],
6686
unique_id: str | None = None,
6787
) -> None:
6888
"""Create a new cache.
@@ -76,7 +96,7 @@ def __init__(
7696
[`id()`][id]. It is used mostly for debugging purposes.
7797
"""
7898
self._receiver: Receiver[ValueT_co] = receiver
79-
self._key: typing.Callable[[ValueT_co], HashableT] = key
99+
self._key: Callable[[ValueT_co], HashableT] = key
80100
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
81101
self._latest_value_by_key: dict[HashableT, ValueT_co] = {}
82102
self._task: asyncio.Task[None] = asyncio.create_task(
@@ -106,12 +126,12 @@ def values(self) -> ValuesView[ValueT_co]:
106126
"""Return an iterator over the latest values received."""
107127
return self._latest_value_by_key.values()
108128

109-
@typing.overload
129+
@overload
110130
def get(self, key: HashableT, default: None = None) -> ValueT_co | None:
111131
"""Return the latest value that has been received for a specific key."""
112132

113133
# MyPy passes this overload as a valid signature, but pylint does not like it.
114-
@typing.overload
134+
@overload
115135
def get( # pylint: disable=signature-differs
116136
self, key: HashableT, default: DefaultT
117137
) -> ValueT_co | DefaultT:
@@ -153,12 +173,7 @@ def __getitem__(self, key: HashableT) -> ValueT_co:
153173
154174
Returns:
155175
The latest value that has been received for that key.
156-
157-
Raises:
158-
KeyError: If no value has been received yet for that key.
159176
"""
160-
if key not in self._latest_value_by_key:
161-
raise KeyError(f"No value received for key: {key!r}")
162177
return self._latest_value_by_key[key]
163178

164179
@override
@@ -185,9 +200,13 @@ def __eq__(self, other: object, /) -> bool:
185200
Returns:
186201
`True` if the caches are equal, `False` otherwise.
187202
"""
188-
if not isinstance(other, GroupingLatestValueCache):
189-
return NotImplemented
190-
return self._latest_value_by_key == other._latest_value_by_key
203+
match other:
204+
case GroupingLatestValueCache():
205+
return self._latest_value_by_key == other._latest_value_by_key
206+
case Mapping():
207+
return self._latest_value_by_key == other
208+
case _:
209+
return NotImplemented
191210

192211
@override
193212
def __ne__(self, value: object, /) -> bool:
@@ -199,17 +218,59 @@ def __ne__(self, value: object, /) -> bool:
199218
Returns:
200219
`True` if the caches are not equal, `False` otherwise.
201220
"""
202-
if not isinstance(value, GroupingLatestValueCache):
203-
return NotImplemented
204-
return self._latest_value_by_key != value._latest_value_by_key
221+
return not self.__eq__(value)
205222

206-
def clear(self, key: HashableT) -> None:
223+
def __delitem__(self, key: HashableT) -> None:
207224
"""Clear the latest value for a specific key.
208225
209226
Args:
210227
key: The key for which to clear the latest value.
211228
"""
212-
_ = self._latest_value_by_key.pop(key, None)
229+
del self._latest_value_by_key[key]
230+
231+
@overload
232+
def pop(self, key: HashableT, /) -> ValueT_co | None:
233+
"""Remove the latest value for a specific key and return it."""
234+
235+
@overload
236+
def pop(self, key: HashableT, /, default: DefaultT) -> ValueT_co | DefaultT:
237+
"""Remove the latest value for a specific key and return it."""
238+
239+
def pop(
240+
self, key: HashableT, /, default: DefaultT | _NotSpecified = _NotSpecified()
241+
) -> ValueT_co | DefaultT | None:
242+
"""Remove the latest value for a specific key and return it.
243+
244+
If no value has been received yet for that key, it returns the default value or
245+
raises a `KeyError` if no default value is provided.
246+
247+
Args:
248+
key: The key for which to remove the latest value.
249+
default: The default value to return if no value has been received yet for
250+
the specified key.
251+
252+
Returns:
253+
The latest value that has been received for that key, or the default value if
254+
no value has been received yet and a default value is provided.
255+
"""
256+
if isinstance(default, _NotSpecified):
257+
return self._latest_value_by_key.pop(key)
258+
return self._latest_value_by_key.pop(key, default)
259+
260+
def popitem(self) -> tuple[HashableT, ValueT_co]:
261+
"""Remove and return a (key, value) pair from the cache.
262+
263+
Pairs are returned in LIFO (last-in, first-out) order.
264+
265+
Returns:
266+
A tuple containing the key and the latest value that has been received for
267+
that key.
268+
"""
269+
return self._latest_value_by_key.popitem()
270+
271+
def clear(self) -> None:
272+
"""Clear all entries from the cache."""
273+
self._latest_value_by_key.clear()
213274

214275
async def stop(self) -> None:
215276
"""Stop the cache."""

tests/test_grouping_latest_value_cache_integration.py

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313

1414
@pytest.mark.integration
15-
async def test_latest_value_cache_key() -> None:
15+
async def test_latest_value_cache_key() -> None: # pylint: disable=too-many-statements
1616
"""Ensure LatestValueCache works with keys."""
1717
channel = Broadcast[tuple[int, str]](name="lvc_test")
1818

@@ -36,7 +36,9 @@ async def test_latest_value_cache_key() -> None:
3636
assert 7 not in cache
3737

3838
assert cache.get(5) == (5, "c")
39+
assert cache[5] == (5, "c")
3940
assert cache.get(6) == (6, "b")
41+
assert cache[6] == (6, "b")
4042

4143
assert cache.keys() == {5, 6}
4244

@@ -59,9 +61,64 @@ async def test_latest_value_cache_key() -> None:
5961

6062
assert cache.keys() == {5, 6, 12}
6163

62-
cache.clear(5)
64+
del cache[5]
6365
assert 5 not in cache
6466
assert 6 in cache
6567

6668
assert cache.get(5) is None
6769
assert cache.keys() == {6, 12}
70+
71+
assert cache.pop(6) == (6, "g")
72+
assert 6 not in cache
73+
assert cache.keys() == {12}
74+
75+
assert cache.pop(8, default=True) is True
76+
with pytest.raises(KeyError):
77+
cache.pop(8)
78+
79+
assert cache.popitem() == (12, (12, "d"))
80+
assert 12 not in cache
81+
assert not cache
82+
83+
await sender.send((1, "h"))
84+
await sender.send((2, "i"))
85+
await asyncio.sleep(0)
86+
87+
expected = {1: (1, "h"), 2: (2, "i")}
88+
assert cache.keys() == expected.keys()
89+
assert list(cache.values()) == list(expected.values())
90+
assert list(cache.items()) == list(expected.items())
91+
assert cache == expected
92+
assert list(cache) == list(expected)
93+
94+
cache.clear()
95+
assert not cache
96+
assert cache.keys() == set()
97+
98+
await cache.stop()
99+
100+
101+
@pytest.mark.integration
102+
async def test_equality() -> None:
103+
"""Test that two caches with the same content are equal."""
104+
channel = Broadcast[tuple[int, str]](name="lvc_test")
105+
106+
cache1: GroupingLatestValueCache[int, tuple[int, str]] = GroupingLatestValueCache(
107+
channel.new_receiver(), key=lambda x: x[0]
108+
)
109+
cache2: GroupingLatestValueCache[int, tuple[int, str]] = GroupingLatestValueCache(
110+
channel.new_receiver(), key=lambda x: x[0]
111+
)
112+
113+
sender = channel.new_sender()
114+
await sender.send((1, "one"))
115+
await sender.send((2, "two"))
116+
await asyncio.sleep(0)
117+
118+
assert cache1 == cache2
119+
120+
del cache1[1]
121+
assert cache1 != cache2
122+
123+
await cache1.stop()
124+
await cache2.stop()

0 commit comments

Comments
 (0)