diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 52636b7c..ddf34da8 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -10,7 +10,7 @@ ## New Features -- `LatestValueCache` now takes an optional `key` function, which returns the key for each incoming message, and the latest value for each key is cached and can be retrieved separately. +- This release introduces the experimental `GroupingLatestValueCache`. It is similar to the `LatestValueCache`, but accepts an additional key-function as an argument, which takes each incoming message and returns a key for that message. The latest value received for each unique key gets cached and is available to look up on-demand. ## Bug Fixes diff --git a/src/frequenz/channels/_latest_value_cache.py b/src/frequenz/channels/_latest_value_cache.py index a9ddd364..a9af9436 100644 --- a/src/frequenz/channels/_latest_value_cache.py +++ b/src/frequenz/channels/_latest_value_cache.py @@ -8,20 +8,12 @@ [LatestValueCache][frequenz.channels.LatestValueCache] takes a [Receiver][frequenz.channels.Receiver] as an argument and stores the latest -value received by that receiver. It also takes an optional `key` function -that allows you to group the values by a specific key. If the `key` is -provided, the cache will store the latest value for each key separately, -otherwise it will store only the latest value received overall. - -As soon as a value is received, its +value received by that receiver. As soon as a value is received, its [`has_value`][frequenz.channels.LatestValueCache.has_value] method returns `True`, and its [`get`][frequenz.channels.LatestValueCache.get] method returns the latest value received. The `get` method will raise an exception if called before any messages have been received from the receiver. -Both `has_value` and `get` methods can take an optional `key` argument to -check or retrieve the latest value for that specific key. - Example: ```python from frequenz.channels import Broadcast, LatestValueCache @@ -40,85 +32,31 @@ ``` """ -from __future__ import annotations - import asyncio import typing -from collections.abc import Set from ._receiver import Receiver T_co = typing.TypeVar("T_co", covariant=True) -HashableT = typing.TypeVar("HashableT", bound=typing.Hashable) -class Sentinel: +class _Sentinel: """A sentinel to denote that no value has been received yet.""" - def __init__(self, desc: str) -> None: - """Initialize the sentinel.""" - self._desc = desc - def __str__(self) -> str: """Return a string representation of this sentinel.""" - return f"" - + return "" -NO_KEY: typing.Final[Sentinel] = Sentinel("no key provided") -NO_KEY_FUNCTION: typing.Final[Sentinel] = Sentinel("no key function provided") -NO_VALUE_RECEIVED: typing.Final[Sentinel] = Sentinel("no value received yet") - -class LatestValueCache(typing.Generic[T_co, HashableT]): +class LatestValueCache(typing.Generic[T_co]): """A cache that stores the latest value in a receiver. It provides a way to look up the latest value in a stream without any delay, as long as there has been one value received. """ - @typing.overload - def __init__( - self: LatestValueCache[T_co, Sentinel], - receiver: Receiver[T_co], - *, - unique_id: str | None = None, - key: Sentinel = NO_KEY_FUNCTION, - ) -> None: - """Create a new cache that does not use keys. - - Args: - receiver: The receiver to cache. - unique_id: A string to help uniquely identify this instance. If not - provided, a unique identifier will be generated from the object's - [`id()`][id]. It is used mostly for debugging purposes. - key: This parameter is ignored when set to `None`. - """ - - @typing.overload def __init__( - self: LatestValueCache[T_co, HashableT], - receiver: Receiver[T_co], - *, - unique_id: str | None = None, - key: typing.Callable[[T_co], HashableT], - ) -> None: - """Create a new cache that uses keys. - - Args: - receiver: The receiver to cache. - unique_id: A string to help uniquely identify this instance. If not - provided, a unique identifier will be generated from the object's - [`id()`][id]. It is used mostly for debugging purposes. - key: A function that takes a value and returns a key to group the values by. - If provided, the cache will store the latest value for each key separately. - """ - - def __init__( - self, - receiver: Receiver[T_co], - *, - unique_id: str | None = None, - key: typing.Callable[[T_co], typing.Any] | Sentinel = NO_KEY_FUNCTION, + self, receiver: Receiver[T_co], *, unique_id: str | None = None ) -> None: """Create a new cache. @@ -127,16 +65,10 @@ def __init__( unique_id: A string to help uniquely identify this instance. If not provided, a unique identifier will be generated from the object's [`id()`][id]. It is used mostly for debugging purposes. - key: An optional function that takes a value and returns a key to group the - values by. If provided, the cache will store the latest value for each - key separately. If not provided, it will store only the latest value - received overall. """ self._receiver = receiver - self._key: typing.Callable[[T_co], HashableT] | Sentinel = key self._unique_id: str = hex(id(self)) if unique_id is None else unique_id - self._latest_value: T_co | Sentinel = NO_VALUE_RECEIVED - self._latest_value_by_key: dict[HashableT, T_co] = {} + self._latest_value: T_co | _Sentinel = _Sentinel() self._task = asyncio.create_task( self._run(), name=f"LatestValueCache«{self._unique_id}»" ) @@ -146,60 +78,34 @@ def unique_id(self) -> str: """The unique identifier of this instance.""" return self._unique_id - def keys(self) -> Set[HashableT]: - """Return the set of keys for which values have been received. - - If no key function is provided, this will return an empty set. - """ - return self._latest_value_by_key.keys() - - def get(self, key: HashableT | Sentinel = NO_KEY) -> T_co: + def get(self) -> T_co: """Return the latest value that has been received. This raises a `ValueError` if no value has been received yet. Use `has_value` to check whether a value has been received yet, before trying to access the value, to avoid the exception. - Args: - key: An optional key to retrieve the latest value for that key. If not - provided, it retrieves the latest value received overall. - Returns: The latest value that has been received. Raises: ValueError: If no value has been received yet. """ - if not isinstance(key, Sentinel): - if key not in self._latest_value_by_key: - raise ValueError(f"No value received for key: {key!r}") - return self._latest_value_by_key[key] - - if isinstance(self._latest_value, Sentinel): + if isinstance(self._latest_value, _Sentinel): raise ValueError("No value has been received yet.") return self._latest_value - def has_value(self, key: HashableT | Sentinel = NO_KEY) -> bool: + def has_value(self) -> bool: """Check whether a value has been received yet. - If `key` is provided, it checks whether a value has been received for that key. - - Args: - key: An optional key to check if a value has been received for that key. - Returns: `True` if a value has been received, `False` otherwise. """ - if not isinstance(key, Sentinel): - return key in self._latest_value_by_key - return not isinstance(self._latest_value, Sentinel) + return not isinstance(self._latest_value, _Sentinel) async def _run(self) -> None: async for value in self._receiver: self._latest_value = value - if not isinstance(self._key, Sentinel): - key = self._key(value) - self._latest_value_by_key[key] = value async def stop(self) -> None: """Stop the cache.""" diff --git a/src/frequenz/channels/experimental/__init__.py b/src/frequenz/channels/experimental/__init__.py index 9876e8e3..8ec5741d 100644 --- a/src/frequenz/channels/experimental/__init__.py +++ b/src/frequenz/channels/experimental/__init__.py @@ -10,6 +10,7 @@ guidelines](https://github.com/frequenz-floss/docs/blob/v0.x.x/python/experimental-packages.md). """ +from ._grouping_latest_value_cache import GroupingLatestValueCache from ._nop_receiver import NopReceiver from ._optional_receiver import OptionalReceiver from ._pipe import Pipe @@ -17,6 +18,7 @@ from ._with_previous import WithPrevious __all__ = [ + "GroupingLatestValueCache", "NopReceiver", "OptionalReceiver", "Pipe", diff --git a/src/frequenz/channels/experimental/_grouping_latest_value_cache.py b/src/frequenz/channels/experimental/_grouping_latest_value_cache.py new file mode 100644 index 00000000..89aa2923 --- /dev/null +++ b/src/frequenz/channels/experimental/_grouping_latest_value_cache.py @@ -0,0 +1,155 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""The GroupingLatestValueCache caches the latest values in a receiver grouped by key. + +It provides a way to look up on demand, the latest value in a stream for any key, as +long as there has been at least one value received for that key. + +[GroupingLatestValueCache][frequenz.channels.experimental.GroupingLatestValueCache] +takes a [Receiver][frequenz.channels.Receiver] and a `key` function as arguments and +stores the latest value received by that receiver for each key separately. + +As soon as a value is received for a `key`, the +[`has_value`][frequenz.channels.experimental.GroupingLatestValueCache.has_value] method +returns `True` for that `key`, and the [`get`][frequenz.channels.LatestValueCache.get] +method for that `key` returns the latest value received. The `get` method will raise an +exception if called before any messages have been received from the receiver for a given +`key`. + +Example: +```python +from frequenz.channels import Broadcast +from frequenz.channels.experimental import GroupingLatestValueCache + +channel = Broadcast[tuple[int, str]](name="lvc_test") + +cache = GroupingLatestValueCache(channel.new_receiver(), key=lambda x: x[0]) +sender = channel.new_sender() + +assert not cache.has_value(6) + +await sender.send((6, "twenty-six")) + +assert cache.has_value(6) +assert cache.get(6) == (6, "twenty-six") +``` +""" + + +import asyncio +import typing +from collections.abc import Set + +from .._receiver import Receiver + +T_co = typing.TypeVar("T_co", covariant=True) +HashableT = typing.TypeVar("HashableT", bound=typing.Hashable) + + +class GroupingLatestValueCache(typing.Generic[T_co, HashableT]): + """A cache that stores the latest value in a receiver. + + It provides a way to look up the latest value in a stream without any delay, + as long as there has been one value received. + """ + + def __init__( + self, + receiver: Receiver[T_co], + key: typing.Callable[[T_co], typing.Any], + *, + unique_id: str | None = None, + ) -> None: + """Create a new cache. + + Args: + receiver: The receiver to cache values from. + key: An function that takes a value and returns a key to group the values + by. + unique_id: A string to help uniquely identify this instance. If not + provided, a unique identifier will be generated from the object's + [`id()`][id]. It is used mostly for debugging purposes. + """ + self._receiver: Receiver[T_co] = receiver + self._key: typing.Callable[[T_co], HashableT] = key + self._unique_id: str = hex(id(self)) if unique_id is None else unique_id + self._latest_value_by_key: dict[HashableT, T_co] = {} + self._task: asyncio.Task[None] = asyncio.create_task( + self._run(), name=f"LatestValueCache«{self._unique_id}»" + ) + + @property + def unique_id(self) -> str: + """The unique identifier of this instance.""" + return self._unique_id + + def keys(self) -> Set[HashableT]: + """Return the set of keys for which values have been received. + + If no key function is provided, this will return an empty set. + """ + return self._latest_value_by_key.keys() + + def get(self, key: HashableT) -> T_co: + """Return the latest value that has been received. + + This raises a `ValueError` if no value has been received yet. Use `has_value` to + check whether a value has been received yet, before trying to access the value, + to avoid the exception. + + Args: + key: An optional key to retrieve the latest value for that key. If not + provided, it retrieves the latest value received overall. + + Returns: + The latest value that has been received. + + Raises: + ValueError: If no value has been received yet. + """ + if key not in self._latest_value_by_key: + raise ValueError(f"No value received for key: {key!r}") + return self._latest_value_by_key[key] + + def has_value(self, key: HashableT) -> bool: + """Check whether a value has been received yet. + + If `key` is provided, it checks whether a value has been received for that key. + + Args: + key: An optional key to check if a value has been received for that key. + + Returns: + `True` if a value has been received, `False` otherwise. + """ + return key in self._latest_value_by_key + + def clear(self, key: HashableT) -> None: + """Clear the latest value for a specific key. + + Args: + key: The key for which to clear the latest value. + """ + _ = self._latest_value_by_key.pop(key, None) + + async def stop(self) -> None: + """Stop the cache.""" + if not self._task.done(): + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + + def __repr__(self) -> str: + """Return a string representation of this cache.""" + return ( + f"" + ) + + async def _run(self) -> None: + async for value in self._receiver: + key = self._key(value) + self._latest_value_by_key[key] = value diff --git a/tests/test_grouping_latest_value_cache_integration.py b/tests/test_grouping_latest_value_cache_integration.py new file mode 100644 index 00000000..635096e4 --- /dev/null +++ b/tests/test_grouping_latest_value_cache_integration.py @@ -0,0 +1,70 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Tests for the LatestValueCache implementation.""" + +import asyncio + +import pytest + +from frequenz.channels import Broadcast +from frequenz.channels.experimental import GroupingLatestValueCache + + +@pytest.mark.integration +async def test_latest_value_cache_key() -> None: + """Ensure LatestValueCache works with keys.""" + channel = Broadcast[tuple[int, str]](name="lvc_test") + + cache: GroupingLatestValueCache[tuple[int, str], int] = GroupingLatestValueCache( + channel.new_receiver(), key=lambda x: x[0] + ) + sender = channel.new_sender() + + assert not cache.has_value(5) + with pytest.raises(ValueError, match="No value received for key: 0"): + cache.get(0) + + assert cache.keys() == set() + + await sender.send((5, "a")) + await sender.send((6, "b")) + await sender.send((5, "c")) + await asyncio.sleep(0) + + assert cache.has_value(5) + assert cache.has_value(6) + assert not cache.has_value(7) + + assert cache.get(5) == (5, "c") + assert cache.get(6) == (6, "b") + + assert cache.keys() == {5, 6} + + with pytest.raises(ValueError, match="No value received for key: 7"): + cache.get(7) + + await sender.send((12, "d")) + await asyncio.sleep(0) + + assert cache.get(12) == (12, "d") + assert cache.get(12) == (12, "d") + assert cache.get(5) == (5, "c") + assert cache.get(6) == (6, "b") + + await sender.send((6, "e")) + await sender.send((6, "f")) + await sender.send((6, "g")) + await asyncio.sleep(0) + + assert cache.get(6) == (6, "g") + + assert cache.keys() == {5, 6, 12} + + cache.clear(5) + assert not cache.has_value(5) + assert cache.has_value(6) + + with pytest.raises(ValueError, match="No value received for key: 5"): + assert cache.get(5) + assert cache.keys() == {6, 12} diff --git a/tests/test_latest_value_cache_integration.py b/tests/test_latest_value_cache_integration.py index 46cef2c4..0c39b21e 100644 --- a/tests/test_latest_value_cache_integration.py +++ b/tests/test_latest_value_cache_integration.py @@ -43,60 +43,3 @@ async def test_latest_value_cache() -> None: await asyncio.sleep(0) assert cache.get() == 19 - - assert cache.keys() == set() - - -@pytest.mark.integration -async def test_latest_value_cache_key() -> None: - """Ensure LatestValueCache works with keys.""" - channel = Broadcast[tuple[int, str]](name="lvc_test") - - cache = LatestValueCache(channel.new_receiver(), key=lambda x: x[0]) - sender = channel.new_sender() - - assert not cache.has_value() - with pytest.raises(ValueError, match="No value has been received yet."): - cache.get() - with pytest.raises(ValueError, match="No value received for key: 0"): - cache.get(0) - - assert cache.keys() == set() - - await sender.send((5, "a")) - await sender.send((6, "b")) - await sender.send((5, "c")) - await asyncio.sleep(0) - - assert cache.has_value() - assert cache.has_value(5) - assert cache.has_value(6) - assert not cache.has_value(7) - - assert cache.get() == (5, "c") - assert cache.get(5) == (5, "c") - assert cache.get(6) == (6, "b") - - assert cache.keys() == {5, 6} - - with pytest.raises(ValueError, match="No value received for key: 7"): - cache.get(7) - - await sender.send((12, "d")) - await asyncio.sleep(0) - - assert cache.get() == (12, "d") - assert cache.get() == (12, "d") - assert cache.get(12) == (12, "d") - assert cache.get(12) == (12, "d") - assert cache.get(5) == (5, "c") - assert cache.get(6) == (6, "b") - - await sender.send((6, "e")) - await sender.send((6, "f")) - await sender.send((6, "g")) - await asyncio.sleep(0) - - assert cache.get(6) == (6, "g") - - assert cache.keys() == {5, 6, 12}