Skip to content

Commit 55ac012

Browse files
authored
Improve GroupingLatestValueCache (#433)
Fixes some documentation issues and brings the interface more close to `MutableMapping`.
2 parents 4096546 + 7097b3d commit 55ac012

File tree

3 files changed

+185
-58
lines changed

3 files changed

+185
-58
lines changed

src/frequenz/channels/experimental/_grouping_latest_value_cache.py

Lines changed: 124 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,96 @@
11
# License: MIT
22
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
33

4-
"""The GroupingLatestValueCache caches the latest values in a receiver grouped by key.
4+
"""The GroupingLatestValueCache caches the latest values in a receiver grouped by key."""
55

6-
It provides a way to look up on demand, the latest value in a stream for any key, as
7-
long as there has been at least one value received for that key.
86

9-
[GroupingLatestValueCache][frequenz.channels.experimental.GroupingLatestValueCache]
10-
takes a [Receiver][frequenz.channels.Receiver] and a `key` function as arguments and
11-
stores the latest value received by that receiver for each key separately.
7+
import asyncio
8+
from collections.abc import (
9+
Callable,
10+
Hashable,
11+
ItemsView,
12+
Iterator,
13+
KeysView,
14+
Mapping,
15+
ValuesView,
16+
)
17+
from typing import TypeVar, overload
1218

13-
The `GroupingLatestValueCache` implements the [`Mapping`][collections.abc.Mapping]
14-
interface, so it can be used like a dictionary. In addition, it provides a
15-
[has_value][frequenz.channels.experimental.GroupingLatestValueCache.has_value] method to
16-
check if a value has been received for a specific key, and a
17-
[clear][frequenz.channels.experimental.GroupingLatestValueCache.clear] method to clear
18-
the cached value for a specific key.
19+
from typing_extensions import override
1920

20-
Example:
21-
```python
22-
from frequenz.channels import Broadcast
23-
from frequenz.channels.experimental import GroupingLatestValueCache
21+
from .._receiver import Receiver
2422

25-
channel = Broadcast[tuple[int, str]](name="lvc_test")
23+
ValueT_co = TypeVar("ValueT_co", covariant=True)
24+
"""Covariant type variable for the values cached by the `GroupingLatestValueCache`."""
2625

27-
cache = GroupingLatestValueCache(channel.new_receiver(), key=lambda x: x[0])
28-
sender = channel.new_sender()
26+
DefaultT = TypeVar("DefaultT")
27+
"""Type variable for the default value returned by `GroupingLatestValueCache.get`."""
2928

30-
assert not cache.has_value(6)
29+
HashableT = TypeVar("HashableT", bound=Hashable)
30+
"""Type variable for the keys used to group values in the `GroupingLatestValueCache`."""
3131

32-
await sender.send((6, "twenty-six"))
3332

34-
assert cache.has_value(6)
35-
assert cache.get(6) == (6, "twenty-six")
36-
```
37-
"""
33+
class _NotSpecified:
34+
"""A sentinel value to indicate that no default value was provided."""
3835

36+
def __repr__(self) -> str:
37+
"""Return a string representation of this sentinel."""
38+
return "<_NotSpecified>"
3939

40-
import asyncio
41-
import typing
42-
from collections.abc import ItemsView, Iterator, KeysView, Mapping, ValuesView
4340

44-
from typing_extensions import override
41+
class GroupingLatestValueCache(Mapping[HashableT, ValueT_co]):
42+
"""A cache that stores the latest value in a receiver, grouped by key.
4543
46-
from .._receiver import Receiver
44+
It provides a way to look up on demand, the latest value in a stream for any key, as
45+
long as there has been at least one value received for that key.
4746
48-
ValueT_co = typing.TypeVar("ValueT_co", covariant=True)
49-
"""Covariant type variable for the values cached by the `GroupingLatestValueCache`."""
47+
[GroupingLatestValueCache][frequenz.channels.experimental.GroupingLatestValueCache]
48+
takes a [Receiver][frequenz.channels.Receiver] and a `key` function as arguments and
49+
stores the latest value received by that receiver for each key separately.
5050
51-
DefaultT = typing.TypeVar("DefaultT")
52-
"""Type variable for the default value returned by `GroupingLatestValueCache.get`."""
51+
The `GroupingLatestValueCache` implements the [`Mapping`][collections.abc.Mapping]
52+
interface, so it can be used like a dictionary. Additionally other methods from
53+
[`MutableMapping`][collections.abc.MutableMapping] are implemented, but only
54+
methods removing items from the cache are allowed, such as
55+
[`pop()`][frequenz.channels.experimental.GroupingLatestValueCache.pop],
56+
[`popitem()`][frequenz.channels.experimental.GroupingLatestValueCache.popitem],
57+
[`clear()`][frequenz.channels.experimental.GroupingLatestValueCache.clear], and
58+
[`__delitem__()`][frequenz.channels.experimental.GroupingLatestValueCache.__delitem__].
59+
Other update methods are not provided because the user should not update the
60+
cache values directly.
5361
54-
HashableT = typing.TypeVar("HashableT", bound=typing.Hashable)
55-
"""Type variable for the keys used to group values in the `GroupingLatestValueCache`."""
62+
Example:
63+
```python
64+
from frequenz.channels import Broadcast
65+
from frequenz.channels.experimental import GroupingLatestValueCache
5666
67+
channel = Broadcast[tuple[int, str]](name="lvc_test")
5768
58-
class GroupingLatestValueCache(Mapping[HashableT, ValueT_co]):
59-
"""A cache that stores the latest value in a receiver, grouped by key."""
69+
cache = GroupingLatestValueCache(channel.new_receiver(), key=lambda x: x[0])
70+
sender = channel.new_sender()
71+
72+
assert cache.get(6) is None
73+
assert 6 not in cache
74+
75+
await sender.send((6, "twenty-six"))
76+
77+
assert 6 in cache
78+
assert cache.get(6) == (6, "twenty-six")
79+
80+
del cache[6]
81+
82+
assert cache.get(6) is None
83+
assert 6 not in cache
84+
85+
await cache.stop()
86+
```
87+
"""
6088

6189
def __init__(
6290
self,
6391
receiver: Receiver[ValueT_co],
6492
*,
65-
key: typing.Callable[[ValueT_co], HashableT],
93+
key: Callable[[ValueT_co], HashableT],
6694
unique_id: str | None = None,
6795
) -> None:
6896
"""Create a new cache.
@@ -76,7 +104,7 @@ def __init__(
76104
[`id()`][id]. It is used mostly for debugging purposes.
77105
"""
78106
self._receiver: Receiver[ValueT_co] = receiver
79-
self._key: typing.Callable[[ValueT_co], HashableT] = key
107+
self._key: Callable[[ValueT_co], HashableT] = key
80108
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
81109
self._latest_value_by_key: dict[HashableT, ValueT_co] = {}
82110
self._task: asyncio.Task[None] = asyncio.create_task(
@@ -106,12 +134,12 @@ def values(self) -> ValuesView[ValueT_co]:
106134
"""Return an iterator over the latest values received."""
107135
return self._latest_value_by_key.values()
108136

109-
@typing.overload
137+
@overload
110138
def get(self, key: HashableT, default: None = None) -> ValueT_co | None:
111139
"""Return the latest value that has been received for a specific key."""
112140

113141
# MyPy passes this overload as a valid signature, but pylint does not like it.
114-
@typing.overload
142+
@overload
115143
def get( # pylint: disable=signature-differs
116144
self, key: HashableT, default: DefaultT
117145
) -> ValueT_co | DefaultT:
@@ -153,12 +181,7 @@ def __getitem__(self, key: HashableT) -> ValueT_co:
153181
154182
Returns:
155183
The latest value that has been received for that key.
156-
157-
Raises:
158-
KeyError: If no value has been received yet for that key.
159184
"""
160-
if key not in self._latest_value_by_key:
161-
raise KeyError(f"No value received for key: {key!r}")
162185
return self._latest_value_by_key[key]
163186

164187
@override
@@ -185,9 +208,13 @@ def __eq__(self, other: object, /) -> bool:
185208
Returns:
186209
`True` if the caches are equal, `False` otherwise.
187210
"""
188-
if not isinstance(other, GroupingLatestValueCache):
189-
return NotImplemented
190-
return self._latest_value_by_key == other._latest_value_by_key
211+
match other:
212+
case GroupingLatestValueCache():
213+
return self._latest_value_by_key == other._latest_value_by_key
214+
case Mapping():
215+
return self._latest_value_by_key == other
216+
case _:
217+
return NotImplemented
191218

192219
@override
193220
def __ne__(self, value: object, /) -> bool:
@@ -199,17 +226,59 @@ def __ne__(self, value: object, /) -> bool:
199226
Returns:
200227
`True` if the caches are not equal, `False` otherwise.
201228
"""
202-
if not isinstance(value, GroupingLatestValueCache):
203-
return NotImplemented
204-
return self._latest_value_by_key != value._latest_value_by_key
229+
return not self.__eq__(value)
205230

206-
def clear(self, key: HashableT) -> None:
231+
def __delitem__(self, key: HashableT) -> None:
207232
"""Clear the latest value for a specific key.
208233
209234
Args:
210235
key: The key for which to clear the latest value.
211236
"""
212-
_ = self._latest_value_by_key.pop(key, None)
237+
del self._latest_value_by_key[key]
238+
239+
@overload
240+
def pop(self, key: HashableT, /) -> ValueT_co | None:
241+
"""Remove the latest value for a specific key and return it."""
242+
243+
@overload
244+
def pop(self, key: HashableT, /, default: DefaultT) -> ValueT_co | DefaultT:
245+
"""Remove the latest value for a specific key and return it."""
246+
247+
def pop(
248+
self, key: HashableT, /, default: DefaultT | _NotSpecified = _NotSpecified()
249+
) -> ValueT_co | DefaultT | None:
250+
"""Remove the latest value for a specific key and return it.
251+
252+
If no value has been received yet for that key, it returns the default value or
253+
raises a `KeyError` if no default value is provided.
254+
255+
Args:
256+
key: The key for which to remove the latest value.
257+
default: The default value to return if no value has been received yet for
258+
the specified key.
259+
260+
Returns:
261+
The latest value that has been received for that key, or the default value if
262+
no value has been received yet and a default value is provided.
263+
"""
264+
if isinstance(default, _NotSpecified):
265+
return self._latest_value_by_key.pop(key)
266+
return self._latest_value_by_key.pop(key, default)
267+
268+
def popitem(self) -> tuple[HashableT, ValueT_co]:
269+
"""Remove and return a (key, value) pair from the cache.
270+
271+
Pairs are returned in LIFO (last-in, first-out) order.
272+
273+
Returns:
274+
A tuple containing the key and the latest value that has been received for
275+
that key.
276+
"""
277+
return self._latest_value_by_key.popitem()
278+
279+
def clear(self) -> None:
280+
"""Clear all entries from the cache."""
281+
self._latest_value_by_key.clear()
213282

214283
async def stop(self) -> None:
215284
"""Stop the cache."""

src/frequenz/channels/experimental/_with_previous.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
"""Composable predicate to cache and compare with the previous message."""
55

66

7-
from typing import Callable, Final, Generic, TypeGuard
7+
from collections.abc import Callable
8+
from typing import Final, Generic, TypeGuard
89

910
from frequenz.channels._generic import ChannelMessageT
1011

tests/test_grouping_latest_value_cache_integration.py

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313

1414
@pytest.mark.integration
15-
async def test_latest_value_cache_key() -> None:
15+
async def test_latest_value_cache_key() -> None: # pylint: disable=too-many-statements
1616
"""Ensure LatestValueCache works with keys."""
1717
channel = Broadcast[tuple[int, str]](name="lvc_test")
1818

@@ -36,7 +36,9 @@ async def test_latest_value_cache_key() -> None:
3636
assert 7 not in cache
3737

3838
assert cache.get(5) == (5, "c")
39+
assert cache[5] == (5, "c")
3940
assert cache.get(6) == (6, "b")
41+
assert cache[6] == (6, "b")
4042

4143
assert cache.keys() == {5, 6}
4244

@@ -59,9 +61,64 @@ async def test_latest_value_cache_key() -> None:
5961

6062
assert cache.keys() == {5, 6, 12}
6163

62-
cache.clear(5)
64+
del cache[5]
6365
assert 5 not in cache
6466
assert 6 in cache
6567

6668
assert cache.get(5) is None
6769
assert cache.keys() == {6, 12}
70+
71+
assert cache.pop(6) == (6, "g")
72+
assert 6 not in cache
73+
assert cache.keys() == {12}
74+
75+
assert cache.pop(8, default=True) is True
76+
with pytest.raises(KeyError):
77+
cache.pop(8)
78+
79+
assert cache.popitem() == (12, (12, "d"))
80+
assert 12 not in cache
81+
assert not cache
82+
83+
await sender.send((1, "h"))
84+
await sender.send((2, "i"))
85+
await asyncio.sleep(0)
86+
87+
expected = {1: (1, "h"), 2: (2, "i")}
88+
assert cache.keys() == expected.keys()
89+
assert list(cache.values()) == list(expected.values())
90+
assert list(cache.items()) == list(expected.items())
91+
assert cache == expected
92+
assert list(cache) == list(expected)
93+
94+
cache.clear()
95+
assert not cache
96+
assert cache.keys() == set()
97+
98+
await cache.stop()
99+
100+
101+
@pytest.mark.integration
102+
async def test_equality() -> None:
103+
"""Test that two caches with the same content are equal."""
104+
channel = Broadcast[tuple[int, str]](name="lvc_test")
105+
106+
cache1: GroupingLatestValueCache[int, tuple[int, str]] = GroupingLatestValueCache(
107+
channel.new_receiver(), key=lambda x: x[0]
108+
)
109+
cache2: GroupingLatestValueCache[int, tuple[int, str]] = GroupingLatestValueCache(
110+
channel.new_receiver(), key=lambda x: x[0]
111+
)
112+
113+
sender = channel.new_sender()
114+
await sender.send((1, "one"))
115+
await sender.send((2, "two"))
116+
await asyncio.sleep(0)
117+
118+
assert cache1 == cache2
119+
120+
del cache1[1]
121+
assert cache1 != cache2
122+
123+
await cache1.stop()
124+
await cache2.stop()

0 commit comments

Comments
 (0)