Skip to content

Commit 41de546

Browse files
authored
Implement the experimental GroupingLatestValueCache (#428)
It is similar to the `LatestValueCache`, but accepts an additional key-function as an argument, which takes each incoming message and returns a key for that message. The latest value received for each unique key gets cached and is available to look up on demand. This also reverts the (unreleased) group-by-key extensions made to `LatestValueCache`, so that there is more time to finalise the API before making it stable.
2 parents 6f24b25 + 7597e34 commit 41de546

File tree

6 files changed

+238
-162
lines changed

6 files changed

+238
-162
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
## New Features
1212

13-
- `LatestValueCache` now takes an optional `key` function, which returns the key for each incoming message, and the latest value for each key is cached and can be retrieved separately.
13+
- This release introduces the experimental `GroupingLatestValueCache`. It is similar to the `LatestValueCache`, but accepts an additional key-function as an argument, which takes each incoming message and returns a key for that message. The latest value received for each unique key gets cached and is available to look up on-demand.
1414

1515
## Bug Fixes
1616

src/frequenz/channels/_latest_value_cache.py

Lines changed: 10 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,12 @@
88
99
[LatestValueCache][frequenz.channels.LatestValueCache] takes a
1010
[Receiver][frequenz.channels.Receiver] as an argument and stores the latest
11-
value received by that receiver. It also takes an optional `key` function
12-
that allows you to group the values by a specific key. If the `key` is
13-
provided, the cache will store the latest value for each key separately,
14-
otherwise it will store only the latest value received overall.
15-
16-
As soon as a value is received, its
11+
value received by that receiver. As soon as a value is received, its
1712
[`has_value`][frequenz.channels.LatestValueCache.has_value] method returns
1813
`True`, and its [`get`][frequenz.channels.LatestValueCache.get] method returns
1914
the latest value received. The `get` method will raise an exception if called
2015
before any messages have been received from the receiver.
2116
22-
Both `has_value` and `get` methods can take an optional `key` argument to
23-
check or retrieve the latest value for that specific key.
24-
2517
Example:
2618
```python
2719
from frequenz.channels import Broadcast, LatestValueCache
@@ -40,85 +32,31 @@
4032
```
4133
"""
4234

43-
from __future__ import annotations
44-
4535
import asyncio
4636
import typing
47-
from collections.abc import Set
4837

4938
from ._receiver import Receiver
5039

5140
T_co = typing.TypeVar("T_co", covariant=True)
52-
HashableT = typing.TypeVar("HashableT", bound=typing.Hashable)
5341

5442

55-
class Sentinel:
43+
class _Sentinel:
5644
"""A sentinel to denote that no value has been received yet."""
5745

58-
def __init__(self, desc: str) -> None:
59-
"""Initialize the sentinel."""
60-
self._desc = desc
61-
6246
def __str__(self) -> str:
6347
"""Return a string representation of this sentinel."""
64-
return f"<Sentinel: {self._desc}>"
65-
48+
return "<no value received yet>"
6649

67-
NO_KEY: typing.Final[Sentinel] = Sentinel("no key provided")
68-
NO_KEY_FUNCTION: typing.Final[Sentinel] = Sentinel("no key function provided")
69-
NO_VALUE_RECEIVED: typing.Final[Sentinel] = Sentinel("no value received yet")
7050

71-
72-
class LatestValueCache(typing.Generic[T_co, HashableT]):
51+
class LatestValueCache(typing.Generic[T_co]):
7352
"""A cache that stores the latest value in a receiver.
7453
7554
It provides a way to look up the latest value in a stream without any delay,
7655
as long as there has been one value received.
7756
"""
7857

79-
@typing.overload
80-
def __init__(
81-
self: LatestValueCache[T_co, Sentinel],
82-
receiver: Receiver[T_co],
83-
*,
84-
unique_id: str | None = None,
85-
key: Sentinel = NO_KEY_FUNCTION,
86-
) -> None:
87-
"""Create a new cache that does not use keys.
88-
89-
Args:
90-
receiver: The receiver to cache.
91-
unique_id: A string to help uniquely identify this instance. If not
92-
provided, a unique identifier will be generated from the object's
93-
[`id()`][id]. It is used mostly for debugging purposes.
94-
key: This parameter is ignored when set to `None`.
95-
"""
96-
97-
@typing.overload
9858
def __init__(
99-
self: LatestValueCache[T_co, HashableT],
100-
receiver: Receiver[T_co],
101-
*,
102-
unique_id: str | None = None,
103-
key: typing.Callable[[T_co], HashableT],
104-
) -> None:
105-
"""Create a new cache that uses keys.
106-
107-
Args:
108-
receiver: The receiver to cache.
109-
unique_id: A string to help uniquely identify this instance. If not
110-
provided, a unique identifier will be generated from the object's
111-
[`id()`][id]. It is used mostly for debugging purposes.
112-
key: A function that takes a value and returns a key to group the values by.
113-
If provided, the cache will store the latest value for each key separately.
114-
"""
115-
116-
def __init__(
117-
self,
118-
receiver: Receiver[T_co],
119-
*,
120-
unique_id: str | None = None,
121-
key: typing.Callable[[T_co], typing.Any] | Sentinel = NO_KEY_FUNCTION,
59+
self, receiver: Receiver[T_co], *, unique_id: str | None = None
12260
) -> None:
12361
"""Create a new cache.
12462
@@ -127,16 +65,10 @@ def __init__(
12765
unique_id: A string to help uniquely identify this instance. If not
12866
provided, a unique identifier will be generated from the object's
12967
[`id()`][id]. It is used mostly for debugging purposes.
130-
key: An optional function that takes a value and returns a key to group the
131-
values by. If provided, the cache will store the latest value for each
132-
key separately. If not provided, it will store only the latest value
133-
received overall.
13468
"""
13569
self._receiver = receiver
136-
self._key: typing.Callable[[T_co], HashableT] | Sentinel = key
13770
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
138-
self._latest_value: T_co | Sentinel = NO_VALUE_RECEIVED
139-
self._latest_value_by_key: dict[HashableT, T_co] = {}
71+
self._latest_value: T_co | _Sentinel = _Sentinel()
14072
self._task = asyncio.create_task(
14173
self._run(), name=f"LatestValueCache«{self._unique_id}»"
14274
)
@@ -146,60 +78,34 @@ def unique_id(self) -> str:
14678
"""The unique identifier of this instance."""
14779
return self._unique_id
14880

149-
def keys(self) -> Set[HashableT]:
150-
"""Return the set of keys for which values have been received.
151-
152-
If no key function is provided, this will return an empty set.
153-
"""
154-
return self._latest_value_by_key.keys()
155-
156-
def get(self, key: HashableT | Sentinel = NO_KEY) -> T_co:
81+
def get(self) -> T_co:
15782
"""Return the latest value that has been received.
15883
15984
This raises a `ValueError` if no value has been received yet. Use `has_value` to
16085
check whether a value has been received yet, before trying to access the value,
16186
to avoid the exception.
16287
163-
Args:
164-
key: An optional key to retrieve the latest value for that key. If not
165-
provided, it retrieves the latest value received overall.
166-
16788
Returns:
16889
The latest value that has been received.
16990
17091
Raises:
17192
ValueError: If no value has been received yet.
17293
"""
173-
if not isinstance(key, Sentinel):
174-
if key not in self._latest_value_by_key:
175-
raise ValueError(f"No value received for key: {key!r}")
176-
return self._latest_value_by_key[key]
177-
178-
if isinstance(self._latest_value, Sentinel):
94+
if isinstance(self._latest_value, _Sentinel):
17995
raise ValueError("No value has been received yet.")
18096
return self._latest_value
18197

182-
def has_value(self, key: HashableT | Sentinel = NO_KEY) -> bool:
98+
def has_value(self) -> bool:
18399
"""Check whether a value has been received yet.
184100
185-
If `key` is provided, it checks whether a value has been received for that key.
186-
187-
Args:
188-
key: An optional key to check if a value has been received for that key.
189-
190101
Returns:
191102
`True` if a value has been received, `False` otherwise.
192103
"""
193-
if not isinstance(key, Sentinel):
194-
return key in self._latest_value_by_key
195-
return not isinstance(self._latest_value, Sentinel)
104+
return not isinstance(self._latest_value, _Sentinel)
196105

197106
async def _run(self) -> None:
198107
async for value in self._receiver:
199108
self._latest_value = value
200-
if not isinstance(self._key, Sentinel):
201-
key = self._key(value)
202-
self._latest_value_by_key[key] = value
203109

204110
async def stop(self) -> None:
205111
"""Stop the cache."""

src/frequenz/channels/experimental/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@
1010
guidelines](https://github.com/frequenz-floss/docs/blob/v0.x.x/python/experimental-packages.md).
1111
"""
1212

13+
from ._grouping_latest_value_cache import GroupingLatestValueCache
1314
from ._nop_receiver import NopReceiver
1415
from ._optional_receiver import OptionalReceiver
1516
from ._pipe import Pipe
1617
from ._relay_sender import RelaySender
1718
from ._with_previous import WithPrevious
1819

1920
__all__ = [
21+
"GroupingLatestValueCache",
2022
"NopReceiver",
2123
"OptionalReceiver",
2224
"Pipe",
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""The GroupingLatestValueCache caches the latest values in a receiver grouped by key.
5+
6+
It provides a way to look up on demand, the latest value in a stream for any key, as
7+
long as there has been at least one value received for that key.
8+
9+
[GroupingLatestValueCache][frequenz.channels.experimental.GroupingLatestValueCache]
10+
takes a [Receiver][frequenz.channels.Receiver] and a `key` function as arguments and
11+
stores the latest value received by that receiver for each key separately.
12+
13+
As soon as a value is received for a `key`, the
14+
[`has_value`][frequenz.channels.experimental.GroupingLatestValueCache.has_value] method
15+
returns `True` for that `key`, and the [`get`][frequenz.channels.LatestValueCache.get]
16+
method for that `key` returns the latest value received. The `get` method will raise an
17+
exception if called before any messages have been received from the receiver for a given
18+
`key`.
19+
20+
Example:
21+
```python
22+
from frequenz.channels import Broadcast
23+
from frequenz.channels.experimental import GroupingLatestValueCache
24+
25+
channel = Broadcast[tuple[int, str]](name="lvc_test")
26+
27+
cache = GroupingLatestValueCache(channel.new_receiver(), key=lambda x: x[0])
28+
sender = channel.new_sender()
29+
30+
assert not cache.has_value(6)
31+
32+
await sender.send((6, "twenty-six"))
33+
34+
assert cache.has_value(6)
35+
assert cache.get(6) == (6, "twenty-six")
36+
```
37+
"""
38+
39+
40+
import asyncio
41+
import typing
42+
from collections.abc import Set
43+
44+
from .._receiver import Receiver
45+
46+
T_co = typing.TypeVar("T_co", covariant=True)
47+
HashableT = typing.TypeVar("HashableT", bound=typing.Hashable)
48+
49+
50+
class GroupingLatestValueCache(typing.Generic[T_co, HashableT]):
51+
"""A cache that stores the latest value in a receiver.
52+
53+
It provides a way to look up the latest value in a stream without any delay,
54+
as long as there has been one value received.
55+
"""
56+
57+
def __init__(
58+
self,
59+
receiver: Receiver[T_co],
60+
key: typing.Callable[[T_co], typing.Any],
61+
*,
62+
unique_id: str | None = None,
63+
) -> None:
64+
"""Create a new cache.
65+
66+
Args:
67+
receiver: The receiver to cache values from.
68+
key: An function that takes a value and returns a key to group the values
69+
by.
70+
unique_id: A string to help uniquely identify this instance. If not
71+
provided, a unique identifier will be generated from the object's
72+
[`id()`][id]. It is used mostly for debugging purposes.
73+
"""
74+
self._receiver: Receiver[T_co] = receiver
75+
self._key: typing.Callable[[T_co], HashableT] = key
76+
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
77+
self._latest_value_by_key: dict[HashableT, T_co] = {}
78+
self._task: asyncio.Task[None] = asyncio.create_task(
79+
self._run(), name=f"LatestValueCache«{self._unique_id}»"
80+
)
81+
82+
@property
83+
def unique_id(self) -> str:
84+
"""The unique identifier of this instance."""
85+
return self._unique_id
86+
87+
def keys(self) -> Set[HashableT]:
88+
"""Return the set of keys for which values have been received.
89+
90+
If no key function is provided, this will return an empty set.
91+
"""
92+
return self._latest_value_by_key.keys()
93+
94+
def get(self, key: HashableT) -> T_co:
95+
"""Return the latest value that has been received.
96+
97+
This raises a `ValueError` if no value has been received yet. Use `has_value` to
98+
check whether a value has been received yet, before trying to access the value,
99+
to avoid the exception.
100+
101+
Args:
102+
key: An optional key to retrieve the latest value for that key. If not
103+
provided, it retrieves the latest value received overall.
104+
105+
Returns:
106+
The latest value that has been received.
107+
108+
Raises:
109+
ValueError: If no value has been received yet.
110+
"""
111+
if key not in self._latest_value_by_key:
112+
raise ValueError(f"No value received for key: {key!r}")
113+
return self._latest_value_by_key[key]
114+
115+
def has_value(self, key: HashableT) -> bool:
116+
"""Check whether a value has been received yet.
117+
118+
If `key` is provided, it checks whether a value has been received for that key.
119+
120+
Args:
121+
key: An optional key to check if a value has been received for that key.
122+
123+
Returns:
124+
`True` if a value has been received, `False` otherwise.
125+
"""
126+
return key in self._latest_value_by_key
127+
128+
def clear(self, key: HashableT) -> None:
129+
"""Clear the latest value for a specific key.
130+
131+
Args:
132+
key: The key for which to clear the latest value.
133+
"""
134+
_ = self._latest_value_by_key.pop(key, None)
135+
136+
async def stop(self) -> None:
137+
"""Stop the cache."""
138+
if not self._task.done():
139+
self._task.cancel()
140+
try:
141+
await self._task
142+
except asyncio.CancelledError:
143+
pass
144+
145+
def __repr__(self) -> str:
146+
"""Return a string representation of this cache."""
147+
return (
148+
f"<GroupingLatestValueCache num_keys={len(self._latest_value_by_key.keys())}, "
149+
f"receiver={self._receiver!r}, unique_id={self._unique_id!r}>"
150+
)
151+
152+
async def _run(self) -> None:
153+
async for value in self._receiver:
154+
key = self._key(value)
155+
self._latest_value_by_key[key] = value

0 commit comments

Comments
 (0)