diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 5289fd4c..9a85ee6e 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -10,7 +10,7 @@ ## New Features -- This release introduces the experimental `GroupingLatestValueCache`. It is similar to the `LatestValueCache`, but accepts an additional key-function as an argument, which takes each incoming message and returns a key for that message. The latest value received for each unique key gets cached and is available to look up on-demand. +- This release introduces the experimental `GroupingLatestValueCache`. It is similar to the `LatestValueCache`, but accepts an additional key-function as an argument, which takes each incoming message and returns a key for that message. The latest value received for each unique key gets cached and is available to look up on-demand through a `collections.abc.Mapping` interface. ## Bug Fixes diff --git a/src/frequenz/channels/experimental/__init__.py b/src/frequenz/channels/experimental/__init__.py index 8ec5741d..4547f395 100644 --- a/src/frequenz/channels/experimental/__init__.py +++ b/src/frequenz/channels/experimental/__init__.py @@ -10,7 +10,12 @@ guidelines](https://github.com/frequenz-floss/docs/blob/v0.x.x/python/experimental-packages.md). """ -from ._grouping_latest_value_cache import GroupingLatestValueCache +from ._grouping_latest_value_cache import ( + DefaultT, + GroupingLatestValueCache, + HashableT, + ValueT_co, +) from ._nop_receiver import NopReceiver from ._optional_receiver import OptionalReceiver from ._pipe import Pipe @@ -18,10 +23,13 @@ from ._with_previous import WithPrevious __all__ = [ + "DefaultT", "GroupingLatestValueCache", + "HashableT", "NopReceiver", "OptionalReceiver", "Pipe", "RelaySender", + "ValueT_co", "WithPrevious", ] diff --git a/src/frequenz/channels/experimental/_grouping_latest_value_cache.py b/src/frequenz/channels/experimental/_grouping_latest_value_cache.py index 89aa2923..fbe67ec7 100644 --- a/src/frequenz/channels/experimental/_grouping_latest_value_cache.py +++ b/src/frequenz/channels/experimental/_grouping_latest_value_cache.py @@ -10,12 +10,12 @@ takes a [Receiver][frequenz.channels.Receiver] and a `key` function as arguments and stores the latest value received by that receiver for each key separately. -As soon as a value is received for a `key`, the -[`has_value`][frequenz.channels.experimental.GroupingLatestValueCache.has_value] method -returns `True` for that `key`, and the [`get`][frequenz.channels.LatestValueCache.get] -method for that `key` returns the latest value received. The `get` method will raise an -exception if called before any messages have been received from the receiver for a given -`key`. +The `GroupingLatestValueCache` implements the [`Mapping`][collections.abc.Mapping] +interface, so it can be used like a dictionary. In addition, it provides a +[has_value][frequenz.channels.experimental.GroupingLatestValueCache.has_value] method to +check if a value has been received for a specific key, and a +[clear][frequenz.channels.experimental.GroupingLatestValueCache.clear] method to clear +the cached value for a specific key. Example: ```python @@ -39,26 +39,30 @@ import asyncio import typing -from collections.abc import Set +from collections.abc import ItemsView, Iterator, KeysView, Mapping, ValuesView + +from typing_extensions import override from .._receiver import Receiver -T_co = typing.TypeVar("T_co", covariant=True) -HashableT = typing.TypeVar("HashableT", bound=typing.Hashable) +ValueT_co = typing.TypeVar("ValueT_co", covariant=True) +"""Covariant type variable for the values cached by the `GroupingLatestValueCache`.""" +DefaultT = typing.TypeVar("DefaultT") +"""Type variable for the default value returned by `GroupingLatestValueCache.get`.""" + +HashableT = typing.TypeVar("HashableT", bound=typing.Hashable) +"""Type variable for the keys used to group values in the `GroupingLatestValueCache`.""" -class GroupingLatestValueCache(typing.Generic[T_co, HashableT]): - """A cache that stores the latest value in a receiver. - It provides a way to look up the latest value in a stream without any delay, - as long as there has been one value received. - """ +class GroupingLatestValueCache(Mapping[HashableT, ValueT_co]): + """A cache that stores the latest value in a receiver, grouped by key.""" def __init__( self, - receiver: Receiver[T_co], - key: typing.Callable[[T_co], typing.Any], + receiver: Receiver[ValueT_co], *, + key: typing.Callable[[ValueT_co], HashableT], unique_id: str | None = None, ) -> None: """Create a new cache. @@ -71,10 +75,10 @@ def __init__( provided, a unique identifier will be generated from the object's [`id()`][id]. It is used mostly for debugging purposes. """ - self._receiver: Receiver[T_co] = receiver - self._key: typing.Callable[[T_co], HashableT] = key + self._receiver: Receiver[ValueT_co] = receiver + self._key: typing.Callable[[ValueT_co], HashableT] = key self._unique_id: str = hex(id(self)) if unique_id is None else unique_id - self._latest_value_by_key: dict[HashableT, T_co] = {} + self._latest_value_by_key: dict[HashableT, ValueT_co] = {} self._task: asyncio.Task[None] = asyncio.create_task( self._run(), name=f"LatestValueCache«{self._unique_id}»" ) @@ -84,47 +88,121 @@ def unique_id(self) -> str: """The unique identifier of this instance.""" return self._unique_id - def keys(self) -> Set[HashableT]: + @override + def keys(self) -> KeysView[HashableT]: """Return the set of keys for which values have been received. If no key function is provided, this will return an empty set. """ return self._latest_value_by_key.keys() - def get(self, key: HashableT) -> T_co: + @override + def items(self) -> ItemsView[HashableT, ValueT_co]: + """Return an iterator over the key-value pairs of the latest values received.""" + return self._latest_value_by_key.items() + + @override + def values(self) -> ValuesView[ValueT_co]: + """Return an iterator over the latest values received.""" + return self._latest_value_by_key.values() + + @typing.overload + def get(self, key: HashableT, default: None = None) -> ValueT_co | None: + """Return the latest value that has been received for a specific key.""" + + # MyPy passes this overload as a valid signature, but pylint does not like it. + @typing.overload + def get( # pylint: disable=signature-differs + self, key: HashableT, default: DefaultT + ) -> ValueT_co | DefaultT: + """Return the latest value that has been received for a specific key.""" + + @override + def get( + self, key: HashableT, default: DefaultT | None = None + ) -> ValueT_co | DefaultT | None: """Return the latest value that has been received. - This raises a `ValueError` if no value has been received yet. Use `has_value` to - check whether a value has been received yet, before trying to access the value, - to avoid the exception. - Args: key: An optional key to retrieve the latest value for that key. If not provided, it retrieves the latest value received overall. + default: The default value to return if no value has been received yet for + the specified key. If not provided, it defaults to `None`. Returns: The latest value that has been received. + """ + return self._latest_value_by_key.get(key, default) + + @override + def __iter__(self) -> Iterator[HashableT]: + """Return an iterator over the keys for which values have been received.""" + return iter(self._latest_value_by_key) + + @override + def __len__(self) -> int: + """Return the number of keys for which values have been received.""" + return len(self._latest_value_by_key) + + @override + def __getitem__(self, key: HashableT) -> ValueT_co: + """Return the latest value that has been received for a specific key. + + Args: + key: The key to retrieve the latest value for. + + Returns: + The latest value that has been received for that key. Raises: - ValueError: If no value has been received yet. + KeyError: If no value has been received yet for that key. """ if key not in self._latest_value_by_key: - raise ValueError(f"No value received for key: {key!r}") + raise KeyError(f"No value received for key: {key!r}") return self._latest_value_by_key[key] - def has_value(self, key: HashableT) -> bool: - """Check whether a value has been received yet. - - If `key` is provided, it checks whether a value has been received for that key. + @override + def __contains__(self, key: object, /) -> bool: + """Check if a value has been received for a specific key. Args: - key: An optional key to check if a value has been received for that key. + key: The key to check for. Returns: - `True` if a value has been received, `False` otherwise. + `True` if a value has been received for that key, `False` otherwise. """ return key in self._latest_value_by_key + @override + def __eq__(self, other: object, /) -> bool: + """Check if this cache is equal to another object. + + Two caches are considered equal if they have the same keys and values. + + Args: + other: The object to compare with. + + Returns: + `True` if the caches are equal, `False` otherwise. + """ + if not isinstance(other, GroupingLatestValueCache): + return NotImplemented + return self._latest_value_by_key == other._latest_value_by_key + + @override + def __ne__(self, value: object, /) -> bool: + """Check if this cache is not equal to another object. + + Args: + value: The object to compare with. + + Returns: + `True` if the caches are not equal, `False` otherwise. + """ + if not isinstance(value, GroupingLatestValueCache): + return NotImplemented + return self._latest_value_by_key != value._latest_value_by_key + def clear(self, key: HashableT) -> None: """Clear the latest value for a specific key. diff --git a/tests/test_grouping_latest_value_cache_integration.py b/tests/test_grouping_latest_value_cache_integration.py index 635096e4..0bd6dced 100644 --- a/tests/test_grouping_latest_value_cache_integration.py +++ b/tests/test_grouping_latest_value_cache_integration.py @@ -16,14 +16,13 @@ async def test_latest_value_cache_key() -> None: """Ensure LatestValueCache works with keys.""" channel = Broadcast[tuple[int, str]](name="lvc_test") - cache: GroupingLatestValueCache[tuple[int, str], int] = GroupingLatestValueCache( + cache: GroupingLatestValueCache[int, tuple[int, str]] = GroupingLatestValueCache( channel.new_receiver(), key=lambda x: x[0] ) sender = channel.new_sender() - assert not cache.has_value(5) - with pytest.raises(ValueError, match="No value received for key: 0"): - cache.get(0) + assert 5 not in cache + assert cache.get(0) is None assert cache.keys() == set() @@ -32,17 +31,16 @@ async def test_latest_value_cache_key() -> None: await sender.send((5, "c")) await asyncio.sleep(0) - assert cache.has_value(5) - assert cache.has_value(6) - assert not cache.has_value(7) + assert 5 in cache + assert 6 in cache + assert 7 not in cache assert cache.get(5) == (5, "c") assert cache.get(6) == (6, "b") assert cache.keys() == {5, 6} - with pytest.raises(ValueError, match="No value received for key: 7"): - cache.get(7) + assert cache.get(7, default=(7, "default")) == (7, "default") await sender.send((12, "d")) await asyncio.sleep(0) @@ -62,9 +60,8 @@ async def test_latest_value_cache_key() -> None: assert cache.keys() == {5, 6, 12} cache.clear(5) - assert not cache.has_value(5) - assert cache.has_value(6) + assert 5 not in cache + assert 6 in cache - with pytest.raises(ValueError, match="No value received for key: 5"): - assert cache.get(5) + assert cache.get(5) is None assert cache.keys() == {6, 12}