Skip to content

Commit 3edcb49

Browse files
authored
Support grouping by keys in LatestValueCache (#424)
`LatestValueCache` now takes an optional `key` function. When specified, it is used to get the key for each incoming message, and the latest value for each key is cached and can be retrieved separately.
2 parents f7fb341 + 9db1a85 commit 3edcb49

File tree

3 files changed

+146
-11
lines changed

3 files changed

+146
-11
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
## New Features
1212

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
13+
- `LatestValueCache` now takes an optional `key` function, which returns the key for each incoming message, and the latest value for each key is cached and can be retrieved separately.
1414

1515
## Bug Fixes
1616

src/frequenz/channels/_latest_value_cache.py

Lines changed: 96 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,20 @@
88
99
[LatestValueCache][frequenz.channels.LatestValueCache] takes a
1010
[Receiver][frequenz.channels.Receiver] as an argument and stores the latest
11-
value received by that receiver. As soon as a value is received, its
11+
value received by that receiver. It also takes an optional `key` function
12+
that allows you to group the values by a specific key. If the `key` is
13+
provided, the cache will store the latest value for each key separately,
14+
otherwise it will store only the latest value received overall.
15+
16+
As soon as a value is received, its
1217
[`has_value`][frequenz.channels.LatestValueCache.has_value] method returns
1318
`True`, and its [`get`][frequenz.channels.LatestValueCache.get] method returns
1419
the latest value received. The `get` method will raise an exception if called
1520
before any messages have been received from the receiver.
1621
22+
Both `has_value` and `get` methods can take an optional `key` argument to
23+
check or retrieve the latest value for that specific key.
24+
1725
Example:
1826
```python
1927
from frequenz.channels import Broadcast, LatestValueCache
@@ -32,31 +40,84 @@
3240
```
3341
"""
3442

43+
from __future__ import annotations
44+
3545
import asyncio
3646
import typing
3747

3848
from ._receiver import Receiver
3949

4050
T_co = typing.TypeVar("T_co", covariant=True)
51+
HashableT = typing.TypeVar("HashableT", bound=typing.Hashable)
4152

4253

43-
class _Sentinel:
54+
class Sentinel:
4455
"""A sentinel to denote that no value has been received yet."""
4556

57+
def __init__(self, desc: str) -> None:
58+
"""Initialize the sentinel."""
59+
self._desc = desc
60+
4661
def __str__(self) -> str:
4762
"""Return a string representation of this sentinel."""
48-
return "<no value received yet>"
63+
return f"<Sentinel: {self._desc}>"
4964

5065

51-
class LatestValueCache(typing.Generic[T_co]):
66+
NO_KEY: typing.Final[Sentinel] = Sentinel("no key provided")
67+
NO_KEY_FUNCTION: typing.Final[Sentinel] = Sentinel("no key function provided")
68+
NO_VALUE_RECEIVED: typing.Final[Sentinel] = Sentinel("no value received yet")
69+
70+
71+
class LatestValueCache(typing.Generic[T_co, HashableT]):
5272
"""A cache that stores the latest value in a receiver.
5373
5474
It provides a way to look up the latest value in a stream without any delay,
5575
as long as there has been one value received.
5676
"""
5777

78+
@typing.overload
79+
def __init__(
80+
self: LatestValueCache[T_co, Sentinel],
81+
receiver: Receiver[T_co],
82+
*,
83+
unique_id: str | None = None,
84+
key: Sentinel = NO_KEY_FUNCTION,
85+
) -> None:
86+
"""Create a new cache that does not use keys.
87+
88+
Args:
89+
receiver: The receiver to cache.
90+
unique_id: A string to help uniquely identify this instance. If not
91+
provided, a unique identifier will be generated from the object's
92+
[`id()`][id]. It is used mostly for debugging purposes.
93+
key: This parameter is ignored when set to `None`.
94+
"""
95+
96+
@typing.overload
5897
def __init__(
59-
self, receiver: Receiver[T_co], *, unique_id: str | None = None
98+
self: LatestValueCache[T_co, HashableT],
99+
receiver: Receiver[T_co],
100+
*,
101+
unique_id: str | None = None,
102+
key: typing.Callable[[T_co], HashableT],
103+
) -> None:
104+
"""Create a new cache that uses keys.
105+
106+
Args:
107+
receiver: The receiver to cache.
108+
unique_id: A string to help uniquely identify this instance. If not
109+
provided, a unique identifier will be generated from the object's
110+
[`id()`][id]. It is used mostly for debugging purposes.
111+
key: A function that takes a value and returns a key to group the values by.
112+
If provided, the cache will store the latest value for each key separately.
113+
"""
114+
115+
def __init__(
116+
self,
117+
receiver: Receiver[T_co],
118+
*,
119+
unique_id: str | None = None,
120+
key: typing.Callable[[T_co], typing.Any] | Sentinel = NO_KEY_FUNCTION,
60121
) -> None:
61122
"""Create a new cache.
62123
@@ -65,10 +126,16 @@ def __init__(
65126
unique_id: A string to help uniquely identify this instance. If not
66127
provided, a unique identifier will be generated from the object's
67128
[`id()`][id]. It is used mostly for debugging purposes.
129+
key: An optional function that takes a value and returns a key to group the
130+
values by. If provided, the cache will store the latest value for each
131+
key separately. If not provided, it will store only the latest value
132+
received overall.
68133
"""
69134
self._receiver = receiver
135+
self._key: typing.Callable[[T_co], HashableT] | Sentinel = key
70136
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
71-
self._latest_value: T_co | _Sentinel = _Sentinel()
137+
self._latest_value: T_co | Sentinel = NO_VALUE_RECEIVED
138+
self._latest_value_by_key: dict[HashableT, T_co] = {}
72139
self._task = asyncio.create_task(
73140
self._run(), name=f"LatestValueCache«{self._unique_id}»"
74141
)
@@ -78,34 +145,53 @@ def unique_id(self) -> str:
78145
"""The unique identifier of this instance."""
79146
return self._unique_id
80147

81-
def get(self) -> T_co:
148+
def get(self, key: HashableT | Sentinel = NO_KEY) -> T_co:
82149
"""Return the latest value that has been received.
83150
84151
This raises a `ValueError` if no value has been received yet. Use `has_value` to
85152
check whether a value has been received yet, before trying to access the value,
86153
to avoid the exception.
87154
155+
Args:
156+
key: An optional key to retrieve the latest value for that key. If not
157+
provided, it retrieves the latest value received overall.
158+
88159
Returns:
89160
The latest value that has been received.
90161
91162
Raises:
92163
ValueError: If no value has been received yet.
93164
"""
94-
if isinstance(self._latest_value, _Sentinel):
165+
if not isinstance(key, Sentinel):
166+
if key not in self._latest_value_by_key:
167+
raise ValueError(f"No value received for key: {key!r}")
168+
return self._latest_value_by_key[key]
169+
170+
if isinstance(self._latest_value, Sentinel):
95171
raise ValueError("No value has been received yet.")
96172
return self._latest_value
97173

98-
def has_value(self) -> bool:
174+
def has_value(self, key: HashableT | Sentinel = NO_KEY) -> bool:
99175
"""Check whether a value has been received yet.
100176
177+
If `key` is provided, it checks whether a value has been received for that key.
178+
179+
Args:
180+
key: An optional key to check if a value has been received for that key.
181+
101182
Returns:
102183
`True` if a value has been received, `False` otherwise.
103184
"""
104-
return not isinstance(self._latest_value, _Sentinel)
185+
if not isinstance(key, Sentinel):
186+
return key in self._latest_value_by_key
187+
return not isinstance(self._latest_value, Sentinel)
105188

106189
async def _run(self) -> None:
107190
async for value in self._receiver:
108191
self._latest_value = value
192+
if not isinstance(self._key, Sentinel):
193+
key = self._key(value)
194+
self._latest_value_by_key[key] = value
109195

110196
async def stop(self) -> None:
111197
"""Stop the cache."""

tests/test_latest_value_cache_integration.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,52 @@ async def test_latest_value_cache() -> None:
4343
await asyncio.sleep(0)
4444

4545
assert cache.get() == 19
46+
47+
48+
@pytest.mark.integration
49+
async def test_latest_value_cache_key() -> None:
50+
"""Ensure LatestValueCache works with keys."""
51+
channel = Broadcast[tuple[int, str]](name="lvc_test")
52+
53+
cache = LatestValueCache(channel.new_receiver(), key=lambda x: x[0])
54+
sender = channel.new_sender()
55+
56+
assert not cache.has_value()
57+
with pytest.raises(ValueError, match="No value has been received yet."):
58+
cache.get()
59+
with pytest.raises(ValueError, match="No value received for key: 0"):
60+
cache.get(0)
61+
62+
await sender.send((5, "a"))
63+
await sender.send((6, "b"))
64+
await sender.send((5, "c"))
65+
await asyncio.sleep(0)
66+
67+
assert cache.has_value()
68+
assert cache.has_value(5)
69+
assert cache.has_value(6)
70+
assert not cache.has_value(7)
71+
72+
assert cache.get() == (5, "c")
73+
assert cache.get(5) == (5, "c")
74+
assert cache.get(6) == (6, "b")
75+
76+
with pytest.raises(ValueError, match="No value received for key: 7"):
77+
cache.get(7)
78+
79+
await sender.send((12, "d"))
80+
await asyncio.sleep(0)
81+
82+
assert cache.get() == (12, "d")
83+
assert cache.get() == (12, "d")
84+
assert cache.get(12) == (12, "d")
85+
assert cache.get(12) == (12, "d")
86+
assert cache.get(5) == (5, "c")
87+
assert cache.get(6) == (6, "b")
88+
89+
await sender.send((6, "e"))
90+
await sender.send((6, "f"))
91+
await sender.send((6, "g"))
92+
await asyncio.sleep(0)
93+
94+
assert cache.get(6) == (6, "g")

0 commit comments

Comments
 (0)