11# License: MIT
22# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
33
4- """The GroupingLatestValueCache caches the latest values in a receiver grouped by key.
4+ """The GroupingLatestValueCache caches the latest values in a receiver grouped by key."""
55
6- It provides a way to look up on demand, the latest value in a stream for any key, as
7- long as there has been at least one value received for that key.
86
9- [GroupingLatestValueCache][frequenz.channels.experimental.GroupingLatestValueCache]
10- takes a [Receiver][frequenz.channels.Receiver] and a `key` function as arguments and
11- stores the latest value received by that receiver for each key separately.
7+ import asyncio
8+ from collections . abc import Callable , ItemsView , Iterator , KeysView , Mapping , ValuesView
9+ from typing import Hashable , TypeVar , overload
1210
13- The `GroupingLatestValueCache` implements the [`Mapping`][collections.abc.Mapping]
14- interface, so it can be used like a dictionary. In addition, it provides a
15- [has_value][frequenz.channels.experimental.GroupingLatestValueCache.has_value] method to
16- check if a value has been received for a specific key, and a
17- [clear][frequenz.channels.experimental.GroupingLatestValueCache.clear] method to clear
18- the cached value for a specific key.
11+ from typing_extensions import override
1912
20- Example:
21- ```python
22- from frequenz.channels import Broadcast
23- from frequenz.channels.experimental import GroupingLatestValueCache
13+ from .._receiver import Receiver
2414
25- channel = Broadcast[tuple[int, str]](name="lvc_test")
15+ ValueT_co = TypeVar ("ValueT_co" , covariant = True )
16+ """Covariant type variable for the values cached by the `GroupingLatestValueCache`."""
2617
27- cache = GroupingLatestValueCache(channel.new_receiver(), key=lambda x: x[0] )
28- sender = channel.new_sender()
18+ DefaultT = TypeVar ( "DefaultT" )
19+ """Type variable for the default value returned by `GroupingLatestValueCache.get`."""
2920
30- assert not cache.has_value(6)
21+ HashableT = TypeVar ("HashableT" , bound = Hashable )
22+ """Type variable for the keys used to group values in the `GroupingLatestValueCache`."""
3123
32- await sender.send((6, "twenty-six"))
3324
34- assert cache.has_value(6)
35- assert cache.get(6) == (6, "twenty-six")
36- ```
37- """
25+ class _NotSpecified :
26+ """A sentinel value to indicate that no default value was provided."""
3827
28+ def __repr__ (self ) -> str :
29+ """Return a string representation of this sentinel."""
30+ return "<_NotSpecified>"
3931
40- import asyncio
41- import typing
42- from collections .abc import ItemsView , Iterator , KeysView , Mapping , ValuesView
4332
44- from typing_extensions import override
33+ class GroupingLatestValueCache (Mapping [HashableT , ValueT_co ]):
34+ """A cache that stores the latest value in a receiver, grouped by key.
4535
46- from .._receiver import Receiver
36+ It provides a way to look up on demand, the latest value in a stream for any key, as
37+ long as there has been at least one value received for that key.
4738
48- ValueT_co = typing .TypeVar ("ValueT_co" , covariant = True )
49- """Covariant type variable for the values cached by the `GroupingLatestValueCache`."""
39+ [GroupingLatestValueCache][frequenz.channels.experimental.GroupingLatestValueCache]
40+ takes a [Receiver][frequenz.channels.Receiver] and a `key` function as arguments and
41+ stores the latest value received by that receiver for each key separately.
5042
51- DefaultT = typing .TypeVar ("DefaultT" )
52- """Type variable for the default value returned by `GroupingLatestValueCache.get`."""
43+ The `GroupingLatestValueCache` implements the [`Mapping`][collections.abc.Mapping]
44+ interface, so it can be used like a dictionary. Additionally other methods from
45+ [`MutableMapping`][collections.abc.MutableMapping] are implemented, but only
46+ methods removing items from the cache are allowed, such as
47+ [`pop()`][frequenz.channels.experimental.GroupingLatestValueCache.pop],
48+ [`popitem()`][frequenz.channels.experimental.GroupingLatestValueCache.popitem],
49+ [`clear()`][frequenz.channels.experimental.GroupingLatestValueCache.clear], and
50+ [`__delitem__()`][frequenz.channels.experimental.GroupingLatestValueCache.__delitem__].
51+ Other update methods are not provided because the user should not update the
52+ cache values directly.
5353
54- HashableT = typing .TypeVar ("HashableT" , bound = typing .Hashable )
55- """Type variable for the keys used to group values in the `GroupingLatestValueCache`."""
54+ Example:
55+ ```python
56+ from frequenz.channels import Broadcast
57+ from frequenz.channels.experimental import GroupingLatestValueCache
5658
59+ channel = Broadcast[tuple[int, str]](name="lvc_test")
5760
58- class GroupingLatestValueCache (Mapping [HashableT , ValueT_co ]):
59- """A cache that stores the latest value in a receiver, grouped by key."""
61+ cache = GroupingLatestValueCache(channel.new_receiver(), key=lambda x: x[0])
62+ sender = channel.new_sender()
63+
64+ assert cache.get(6) is None
65+ assert 6 not in cache
66+
67+ await sender.send((6, "twenty-six"))
68+
69+ assert 6 in cache
70+ assert cache.get(6) == (6, "twenty-six")
71+
72+ del cache[6]
73+
74+ assert cache.get(6) is None
75+ assert 6 not in cache
76+
77+ await cache.stop()
78+ ```
79+ """
6080
6181 def __init__ (
6282 self ,
6383 receiver : Receiver [ValueT_co ],
6484 * ,
65- key : typing . Callable [[ValueT_co ], HashableT ],
85+ key : Callable [[ValueT_co ], HashableT ],
6686 unique_id : str | None = None ,
6787 ) -> None :
6888 """Create a new cache.
@@ -76,7 +96,7 @@ def __init__(
7696 [`id()`][id]. It is used mostly for debugging purposes.
7797 """
7898 self ._receiver : Receiver [ValueT_co ] = receiver
79- self ._key : typing . Callable [[ValueT_co ], HashableT ] = key
99+ self ._key : Callable [[ValueT_co ], HashableT ] = key
80100 self ._unique_id : str = hex (id (self )) if unique_id is None else unique_id
81101 self ._latest_value_by_key : dict [HashableT , ValueT_co ] = {}
82102 self ._task : asyncio .Task [None ] = asyncio .create_task (
@@ -106,12 +126,12 @@ def values(self) -> ValuesView[ValueT_co]:
106126 """Return an iterator over the latest values received."""
107127 return self ._latest_value_by_key .values ()
108128
109- @typing . overload
129+ @overload
110130 def get (self , key : HashableT , default : None = None ) -> ValueT_co | None :
111131 """Return the latest value that has been received for a specific key."""
112132
113133 # MyPy passes this overload as a valid signature, but pylint does not like it.
114- @typing . overload
134+ @overload
115135 def get ( # pylint: disable=signature-differs
116136 self , key : HashableT , default : DefaultT
117137 ) -> ValueT_co | DefaultT :
@@ -153,12 +173,7 @@ def __getitem__(self, key: HashableT) -> ValueT_co:
153173
154174 Returns:
155175 The latest value that has been received for that key.
156-
157- Raises:
158- KeyError: If no value has been received yet for that key.
159176 """
160- if key not in self ._latest_value_by_key :
161- raise KeyError (f"No value received for key: { key !r} " )
162177 return self ._latest_value_by_key [key ]
163178
164179 @override
@@ -185,9 +200,13 @@ def __eq__(self, other: object, /) -> bool:
185200 Returns:
186201 `True` if the caches are equal, `False` otherwise.
187202 """
188- if not isinstance (other , GroupingLatestValueCache ):
189- return NotImplemented
190- return self ._latest_value_by_key == other ._latest_value_by_key
203+ match other :
204+ case GroupingLatestValueCache ():
205+ return self ._latest_value_by_key == other ._latest_value_by_key
206+ case Mapping ():
207+ return self ._latest_value_by_key == other
208+ case _:
209+ return NotImplemented
191210
192211 @override
193212 def __ne__ (self , value : object , / ) -> bool :
@@ -199,17 +218,59 @@ def __ne__(self, value: object, /) -> bool:
199218 Returns:
200219 `True` if the caches are not equal, `False` otherwise.
201220 """
202- if not isinstance (value , GroupingLatestValueCache ):
203- return NotImplemented
204- return self ._latest_value_by_key != value ._latest_value_by_key
221+ return not self .__eq__ (value )
205222
206- def clear (self , key : HashableT ) -> None :
223+ def __delitem__ (self , key : HashableT ) -> None :
207224 """Clear the latest value for a specific key.
208225
209226 Args:
210227 key: The key for which to clear the latest value.
211228 """
212- _ = self ._latest_value_by_key .pop (key , None )
229+ del self ._latest_value_by_key [key ]
230+
231+ @overload
232+ def pop (self , key : HashableT , / ) -> ValueT_co | None :
233+ """Remove the latest value for a specific key and return it."""
234+
235+ @overload
236+ def pop (self , key : HashableT , / , default : DefaultT ) -> ValueT_co | DefaultT :
237+ """Remove the latest value for a specific key and return it."""
238+
239+ def pop (
240+ self , key : HashableT , / , default : DefaultT | _NotSpecified = _NotSpecified ()
241+ ) -> ValueT_co | DefaultT | None :
242+ """Remove the latest value for a specific key and return it.
243+
244+ If no value has been received yet for that key, it returns the default value or
245+ raises a `KeyError` if no default value is provided.
246+
247+ Args:
248+ key: The key for which to remove the latest value.
249+ default: The default value to return if no value has been received yet for
250+ the specified key.
251+
252+ Returns:
253+ The latest value that has been received for that key, or the default value if
254+ no value has been received yet and a default value is provided.
255+ """
256+ if isinstance (default , _NotSpecified ):
257+ return self ._latest_value_by_key .pop (key )
258+ return self ._latest_value_by_key .pop (key , default )
259+
260+ def popitem (self ) -> tuple [HashableT , ValueT_co ]:
261+ """Remove and return a (key, value) pair from the cache.
262+
263+ Pairs are returned in LIFO (last-in, first-out) order.
264+
265+ Returns:
266+ A tuple containing the key and the latest value that has been received for
267+ that key.
268+ """
269+ return self ._latest_value_by_key .popitem ()
270+
271+ def clear (self ) -> None :
272+ """Clear all entries from the cache."""
273+ self ._latest_value_by_key .clear ()
213274
214275 async def stop (self ) -> None :
215276 """Stop the cache."""
0 commit comments