Skip to content

Commit ce3ad4e

Browse files
committed
Implement the experimental GroupingLatestValueCache
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 07198d7 commit ce3ad4e

File tree

2 files changed

+157
-0
lines changed

2 files changed

+157
-0
lines changed

src/frequenz/channels/experimental/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@
1010
guidelines](https://github.com/frequenz-floss/docs/blob/v0.x.x/python/experimental-packages.md).
1111
"""
1212

13+
from ._grouping_latest_value_cache import GroupingLatestValueCache
1314
from ._nop_receiver import NopReceiver
1415
from ._optional_receiver import OptionalReceiver
1516
from ._pipe import Pipe
1617
from ._relay_sender import RelaySender
1718
from ._with_previous import WithPrevious
1819

1920
__all__ = [
21+
"GroupingLatestValueCache",
2022
"NopReceiver",
2123
"OptionalReceiver",
2224
"Pipe",
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""The GroupingLatestValueCache caches the latest values in a receiver grouped by key.
5+
6+
It provides a way to look up on demand, the latest value in a stream for any key, as
7+
long as there has been at least one value received for that key.
8+
9+
[GroupingLatestValueCache][frequenz.channels.experimental.GroupingLatestValueCache]
10+
takes a [Receiver][frequenz.channels.Receiver] and a `key` function as arguments and
11+
stores the latest value received by that receiver for each key separately.
12+
13+
As soon as a value is received for a `key`, the
14+
[`has_value`][frequenz.channels.experimental.GroupingLatestValueCache.has_value] method
15+
returns `True` for that `key`, and the [`get`][frequenz.channels.LatestValueCache.get]
16+
method for that `key` returns the latest value received. The `get` method will raise an
17+
exception if called before any messages have been received from the receiver for a given
18+
`key`.
19+
20+
Example:
21+
```python
22+
from frequenz.channels import Broadcast
23+
from frequenz.channels.experimental import GroupingLatestValueCache
24+
25+
channel = Broadcast[tuple[int, str]](name="lvc_test")
26+
27+
cache = GroupingLatestValueCache(channel.new_receiver(), key=lambda x: x[0])
28+
sender = channel.new_sender()
29+
30+
assert not cache.has_value(6)
31+
32+
await sender.send((6, "twenty-six"))
33+
34+
assert cache.has_value(6)
35+
assert cache.get(6) == (6, "twenty-six")
36+
```
37+
"""
38+
39+
40+
import asyncio
41+
import typing
42+
from collections.abc import Set
43+
44+
from .._receiver import Receiver
45+
46+
T_co = typing.TypeVar("T_co", covariant=True)
47+
HashableT = typing.TypeVar("HashableT", bound=typing.Hashable)
48+
49+
50+
class GroupingLatestValueCache(typing.Generic[T_co, HashableT]):
51+
"""A cache that stores the latest value in a receiver.
52+
53+
It provides a way to look up the latest value in a stream without any delay,
54+
as long as there has been one value received.
55+
"""
56+
57+
def __init__(
58+
self,
59+
receiver: Receiver[T_co],
60+
key: typing.Callable[[T_co], typing.Any],
61+
*,
62+
unique_id: str | None = None,
63+
) -> None:
64+
"""Create a new cache.
65+
66+
Args:
67+
receiver: The receiver to cache values from.
68+
key: An function that takes a value and returns a key to group the values
69+
by.
70+
unique_id: A string to help uniquely identify this instance. If not
71+
provided, a unique identifier will be generated from the object's
72+
[`id()`][id]. It is used mostly for debugging purposes.
73+
"""
74+
self._receiver: Receiver[T_co] = receiver
75+
self._key: typing.Callable[[T_co], HashableT] = key
76+
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
77+
self._latest_value_by_key: dict[HashableT, T_co] = {}
78+
self._task: asyncio.Task[None] = asyncio.create_task(
79+
self._run(), name=f"LatestValueCache«{self._unique_id}»"
80+
)
81+
82+
@property
83+
def unique_id(self) -> str:
84+
"""The unique identifier of this instance."""
85+
return self._unique_id
86+
87+
def keys(self) -> Set[HashableT]:
88+
"""Return the set of keys for which values have been received.
89+
90+
If no key function is provided, this will return an empty set.
91+
"""
92+
return self._latest_value_by_key.keys()
93+
94+
def get(self, key: HashableT) -> T_co:
95+
"""Return the latest value that has been received.
96+
97+
This raises a `ValueError` if no value has been received yet. Use `has_value` to
98+
check whether a value has been received yet, before trying to access the value,
99+
to avoid the exception.
100+
101+
Args:
102+
key: An optional key to retrieve the latest value for that key. If not
103+
provided, it retrieves the latest value received overall.
104+
105+
Returns:
106+
The latest value that has been received.
107+
108+
Raises:
109+
ValueError: If no value has been received yet.
110+
"""
111+
if key not in self._latest_value_by_key:
112+
raise ValueError(f"No value received for key: {key!r}")
113+
return self._latest_value_by_key[key]
114+
115+
def has_value(self, key: HashableT) -> bool:
116+
"""Check whether a value has been received yet.
117+
118+
If `key` is provided, it checks whether a value has been received for that key.
119+
120+
Args:
121+
key: An optional key to check if a value has been received for that key.
122+
123+
Returns:
124+
`True` if a value has been received, `False` otherwise.
125+
"""
126+
return key in self._latest_value_by_key
127+
128+
def clear(self, key: HashableT) -> None:
129+
"""Clear the latest value or the latest value for a specific key.
130+
131+
Args:
132+
key: The key for which to clear the latest value.
133+
"""
134+
_ = self._latest_value_by_key.pop(key, None)
135+
136+
async def stop(self) -> None:
137+
"""Stop the cache."""
138+
if not self._task.done():
139+
self._task.cancel()
140+
try:
141+
await self._task
142+
except asyncio.CancelledError:
143+
pass
144+
145+
def __repr__(self) -> str:
146+
"""Return a string representation of this cache."""
147+
return (
148+
f"<GroupingLatestValueCache num_keys={len(self._latest_value_by_key.keys())}, "
149+
f"receiver={self._receiver!r}, unique_id={self._unique_id!r}>"
150+
)
151+
152+
async def _run(self) -> None:
153+
async for value in self._receiver:
154+
key = self._key(value)
155+
self._latest_value_by_key[key] = value

0 commit comments

Comments
 (0)