Skip to content

Commit 7757126

Browse files
committed
Support grouping by keys in LatestValueCache
`LatestValueCache` now takes an optional `key` function. When specified, it is used to get the key for each incoming message, and the latest value for each key is cached and can be retrieved separately. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent f7fb341 commit 7757126

File tree

1 file changed

+86
-6
lines changed

1 file changed

+86
-6
lines changed

src/frequenz/channels/_latest_value_cache.py

Lines changed: 86 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,20 @@
88
99
[LatestValueCache][frequenz.channels.LatestValueCache] takes a
1010
[Receiver][frequenz.channels.Receiver] as an argument and stores the latest
11-
value received by that receiver. As soon as a value is received, its
11+
value received by that receiver. It also takes an optional `key` function
12+
that allows you to group the values by a specific key. If the `key` is
13+
provided, the cache will store the latest value for each key separately,
14+
otherwise it will store only the latest value received overall.
15+
16+
As soon as a value is received, its
1217
[`has_value`][frequenz.channels.LatestValueCache.has_value] method returns
1318
`True`, and its [`get`][frequenz.channels.LatestValueCache.get] method returns
1419
the latest value received. The `get` method will raise an exception if called
1520
before any messages have been received from the receiver.
1621
22+
Both `has_value` and `get` methods can take an optional `key` argument to
23+
check or retrieve the latest value for that specific key.
24+
1725
Example:
1826
```python
1927
from frequenz.channels import Broadcast, LatestValueCache
@@ -32,12 +40,15 @@
3240
```
3341
"""
3442

43+
from __future__ import annotations
44+
3545
import asyncio
3646
import typing
3747

3848
from ._receiver import Receiver
3949

4050
T_co = typing.TypeVar("T_co", covariant=True)
51+
HashableT = typing.TypeVar("HashableT", bound=typing.Hashable)
4152

4253

4354
class _Sentinel:
@@ -48,15 +59,59 @@ def __str__(self) -> str:
4859
return "<no value received yet>"
4960

5061

51-
class LatestValueCache(typing.Generic[T_co]):
62+
_SENTINEL = _Sentinel()
63+
64+
65+
class LatestValueCache(typing.Generic[T_co, HashableT]):
5266
"""A cache that stores the latest value in a receiver.
5367
5468
It provides a way to look up the latest value in a stream without any delay,
5569
as long as there has been one value received.
5670
"""
5771

72+
@typing.overload
73+
def __init__(
74+
self: LatestValueCache[T_co, _Sentinel],
75+
receiver: Receiver[T_co],
76+
*,
77+
unique_id: str | None = None,
78+
key: _Sentinel = _SENTINEL,
79+
) -> None:
80+
"""Create a new cache that does not use keys.
81+
82+
Args:
83+
receiver: The receiver to cache.
84+
unique_id: A string to help uniquely identify this instance. If not
85+
provided, a unique identifier will be generated from the object's
86+
[`id()`][id]. It is used mostly for debugging purposes.
87+
key: This parameter is ignored when set to `None`.
88+
"""
89+
90+
@typing.overload
5891
def __init__(
59-
self, receiver: Receiver[T_co], *, unique_id: str | None = None
92+
self: LatestValueCache[T_co, HashableT],
93+
receiver: Receiver[T_co],
94+
*,
95+
unique_id: str | None = None,
96+
key: typing.Callable[[T_co], HashableT],
97+
) -> None:
98+
"""Create a new cache that uses keys.
99+
100+
Args:
101+
receiver: The receiver to cache.
102+
unique_id: A string to help uniquely identify this instance. If not
103+
provided, a unique identifier will be generated from the object's
104+
[`id()`][id]. It is used mostly for debugging purposes.
105+
key: A function that takes a value and returns a key to group the values by.
106+
If provided, the cache will store the latest value for each key separately.
107+
"""
108+
109+
def __init__(
110+
self,
111+
receiver: Receiver[T_co],
112+
*,
113+
unique_id: str | None = None,
114+
key: typing.Callable[[T_co], typing.Any] | _Sentinel = _SENTINEL,
60115
) -> None:
61116
"""Create a new cache.
62117
@@ -65,10 +120,16 @@ def __init__(
65120
unique_id: A string to help uniquely identify this instance. If not
66121
provided, a unique identifier will be generated from the object's
67122
[`id()`][id]. It is used mostly for debugging purposes.
123+
key: An optional function that takes a value and returns a key to group the
124+
values by. If provided, the cache will store the latest value for each
125+
key separately. If not provided, it will store only the latest value
126+
received overall.
68127
"""
69128
self._receiver = receiver
129+
self._key: typing.Callable[[T_co], HashableT] | _Sentinel = key
70130
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
71-
self._latest_value: T_co | _Sentinel = _Sentinel()
131+
self._latest_value: T_co | _Sentinel = _SENTINEL
132+
self._latest_value_by_key: dict[HashableT, T_co] = {}
72133
self._task = asyncio.create_task(
73134
self._run(), name=f"LatestValueCache«{self._unique_id}»"
74135
)
@@ -78,34 +139,53 @@ def unique_id(self) -> str:
78139
"""The unique identifier of this instance."""
79140
return self._unique_id
80141

81-
def get(self) -> T_co:
142+
def get(self, key: HashableT | _Sentinel = _SENTINEL) -> T_co:
82143
"""Return the latest value that has been received.
83144
84145
This raises a `ValueError` if no value has been received yet. Use `has_value` to
85146
check whether a value has been received yet, before trying to access the value,
86147
to avoid the exception.
87148
149+
Args:
150+
key: An optional key to retrieve the latest value for that key. If not
151+
provided, it retrieves the latest value received overall.
152+
88153
Returns:
89154
The latest value that has been received.
90155
91156
Raises:
92157
ValueError: If no value has been received yet.
93158
"""
159+
if not isinstance(key, _Sentinel):
160+
if key not in self._latest_value_by_key:
161+
raise ValueError(f"No value received for key: {key!r}")
162+
return self._latest_value_by_key[key]
163+
94164
if isinstance(self._latest_value, _Sentinel):
95165
raise ValueError("No value has been received yet.")
96166
return self._latest_value
97167

98-
def has_value(self) -> bool:
168+
def has_value(self, key: HashableT | _Sentinel = _SENTINEL) -> bool:
99169
"""Check whether a value has been received yet.
100170
171+
If `key` is provided, it checks whether a value has been received for that key.
172+
173+
Args:
174+
key: An optional key to check if a value has been received for that key.
175+
101176
Returns:
102177
`True` if a value has been received, `False` otherwise.
103178
"""
179+
if not isinstance(key, _Sentinel):
180+
return key in self._latest_value_by_key
104181
return not isinstance(self._latest_value, _Sentinel)
105182

106183
async def _run(self) -> None:
107184
async for value in self._receiver:
108185
self._latest_value = value
186+
if not isinstance(self._key, _Sentinel):
187+
key = self._key(value)
188+
self._latest_value_by_key[key] = value
109189

110190
async def stop(self) -> None:
111191
"""Stop the cache."""

0 commit comments

Comments
 (0)