Skip to content
179 changes: 124 additions & 55 deletions src/frequenz/channels/experimental/_grouping_latest_value_cache.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,96 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""The GroupingLatestValueCache caches the latest values in a receiver grouped by key.
"""The GroupingLatestValueCache caches the latest values in a receiver grouped by key."""

It provides a way to look up on demand, the latest value in a stream for any key, as
long as there has been at least one value received for that key.

[GroupingLatestValueCache][frequenz.channels.experimental.GroupingLatestValueCache]
takes a [Receiver][frequenz.channels.Receiver] and a `key` function as arguments and
stores the latest value received by that receiver for each key separately.
import asyncio
from collections.abc import (
Callable,
Hashable,
ItemsView,
Iterator,
KeysView,
Mapping,
ValuesView,
)
from typing import TypeVar, overload

The `GroupingLatestValueCache` implements the [`Mapping`][collections.abc.Mapping]
interface, so it can be used like a dictionary. In addition, it provides a
[has_value][frequenz.channels.experimental.GroupingLatestValueCache.has_value] method to
check if a value has been received for a specific key, and a
[clear][frequenz.channels.experimental.GroupingLatestValueCache.clear] method to clear
the cached value for a specific key.
from typing_extensions import override

Example:
```python
from frequenz.channels import Broadcast
from frequenz.channels.experimental import GroupingLatestValueCache
from .._receiver import Receiver

channel = Broadcast[tuple[int, str]](name="lvc_test")
ValueT_co = TypeVar("ValueT_co", covariant=True)
"""Covariant type variable for the values cached by the `GroupingLatestValueCache`."""

cache = GroupingLatestValueCache(channel.new_receiver(), key=lambda x: x[0])
sender = channel.new_sender()
DefaultT = TypeVar("DefaultT")
"""Type variable for the default value returned by `GroupingLatestValueCache.get`."""

assert not cache.has_value(6)
HashableT = TypeVar("HashableT", bound=Hashable)
"""Type variable for the keys used to group values in the `GroupingLatestValueCache`."""

await sender.send((6, "twenty-six"))

assert cache.has_value(6)
assert cache.get(6) == (6, "twenty-six")
```
"""
class _NotSpecified:
"""A sentinel value to indicate that no default value was provided."""

def __repr__(self) -> str:
"""Return a string representation of this sentinel."""
return "<_NotSpecified>"

import asyncio
import typing
from collections.abc import ItemsView, Iterator, KeysView, Mapping, ValuesView

from typing_extensions import override
class GroupingLatestValueCache(Mapping[HashableT, ValueT_co]):
"""A cache that stores the latest value in a receiver, grouped by key.

from .._receiver import Receiver
It provides a way to look up on demand, the latest value in a stream for any key, as
long as there has been at least one value received for that key.

ValueT_co = typing.TypeVar("ValueT_co", covariant=True)
"""Covariant type variable for the values cached by the `GroupingLatestValueCache`."""
[GroupingLatestValueCache][frequenz.channels.experimental.GroupingLatestValueCache]
takes a [Receiver][frequenz.channels.Receiver] and a `key` function as arguments and
stores the latest value received by that receiver for each key separately.

DefaultT = typing.TypeVar("DefaultT")
"""Type variable for the default value returned by `GroupingLatestValueCache.get`."""
The `GroupingLatestValueCache` implements the [`Mapping`][collections.abc.Mapping]
interface, so it can be used like a dictionary. Additionally other methods from
[`MutableMapping`][collections.abc.MutableMapping] are implemented, but only
methods removing items from the cache are allowed, such as
[`pop()`][frequenz.channels.experimental.GroupingLatestValueCache.pop],
[`popitem()`][frequenz.channels.experimental.GroupingLatestValueCache.popitem],
[`clear()`][frequenz.channels.experimental.GroupingLatestValueCache.clear], and
[`__delitem__()`][frequenz.channels.experimental.GroupingLatestValueCache.__delitem__].
Other update methods are not provided because the user should not update the
cache values directly.

HashableT = typing.TypeVar("HashableT", bound=typing.Hashable)
"""Type variable for the keys used to group values in the `GroupingLatestValueCache`."""
Example:
```python
from frequenz.channels import Broadcast
from frequenz.channels.experimental import GroupingLatestValueCache

channel = Broadcast[tuple[int, str]](name="lvc_test")

class GroupingLatestValueCache(Mapping[HashableT, ValueT_co]):
"""A cache that stores the latest value in a receiver, grouped by key."""
cache = GroupingLatestValueCache(channel.new_receiver(), key=lambda x: x[0])
sender = channel.new_sender()

assert cache.get(6) is None
assert 6 not in cache

await sender.send((6, "twenty-six"))

assert 6 in cache
assert cache.get(6) == (6, "twenty-six")

del cache[6]

assert cache.get(6) is None
assert 6 not in cache

await cache.stop()
```
"""

def __init__(
self,
receiver: Receiver[ValueT_co],
*,
key: typing.Callable[[ValueT_co], HashableT],
key: Callable[[ValueT_co], HashableT],
unique_id: str | None = None,
) -> None:
"""Create a new cache.
Expand All @@ -76,7 +104,7 @@ def __init__(
[`id()`][id]. It is used mostly for debugging purposes.
"""
self._receiver: Receiver[ValueT_co] = receiver
self._key: typing.Callable[[ValueT_co], HashableT] = key
self._key: Callable[[ValueT_co], HashableT] = key
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
self._latest_value_by_key: dict[HashableT, ValueT_co] = {}
self._task: asyncio.Task[None] = asyncio.create_task(
Expand Down Expand Up @@ -106,12 +134,12 @@ def values(self) -> ValuesView[ValueT_co]:
"""Return an iterator over the latest values received."""
return self._latest_value_by_key.values()

@typing.overload
@overload
def get(self, key: HashableT, default: None = None) -> ValueT_co | None:
"""Return the latest value that has been received for a specific key."""

# MyPy passes this overload as a valid signature, but pylint does not like it.
@typing.overload
@overload
def get( # pylint: disable=signature-differs
self, key: HashableT, default: DefaultT
) -> ValueT_co | DefaultT:
Expand Down Expand Up @@ -153,12 +181,7 @@ def __getitem__(self, key: HashableT) -> ValueT_co:

Returns:
The latest value that has been received for that key.

Raises:
KeyError: If no value has been received yet for that key.
"""
if key not in self._latest_value_by_key:
raise KeyError(f"No value received for key: {key!r}")
return self._latest_value_by_key[key]

@override
Expand All @@ -185,9 +208,13 @@ def __eq__(self, other: object, /) -> bool:
Returns:
`True` if the caches are equal, `False` otherwise.
"""
if not isinstance(other, GroupingLatestValueCache):
return NotImplemented
return self._latest_value_by_key == other._latest_value_by_key
match other:
case GroupingLatestValueCache():
return self._latest_value_by_key == other._latest_value_by_key
case Mapping():
return self._latest_value_by_key == other
case _:
return NotImplemented

@override
def __ne__(self, value: object, /) -> bool:
Expand All @@ -199,17 +226,59 @@ def __ne__(self, value: object, /) -> bool:
Returns:
`True` if the caches are not equal, `False` otherwise.
"""
if not isinstance(value, GroupingLatestValueCache):
return NotImplemented
return self._latest_value_by_key != value._latest_value_by_key
return not self.__eq__(value)

def clear(self, key: HashableT) -> None:
def __delitem__(self, key: HashableT) -> None:
"""Clear the latest value for a specific key.

Args:
key: The key for which to clear the latest value.
"""
_ = self._latest_value_by_key.pop(key, None)
del self._latest_value_by_key[key]

@overload
def pop(self, key: HashableT, /) -> ValueT_co | None:
"""Remove the latest value for a specific key and return it."""

@overload
def pop(self, key: HashableT, /, default: DefaultT) -> ValueT_co | DefaultT:
"""Remove the latest value for a specific key and return it."""

def pop(
self, key: HashableT, /, default: DefaultT | _NotSpecified = _NotSpecified()
) -> ValueT_co | DefaultT | None:
Comment on lines +247 to +249
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def pop(
self, key: HashableT, /, default: DefaultT | _NotSpecified = _NotSpecified()
) -> ValueT_co | DefaultT | None:
def pop(
self, key: HashableT, /, default: DefaultT | None = None
) -> ValueT_co | DefaultT | None:

Then you can use return self._latest_value_by_key.pop(key, default) as the only body, because that's the exact signature of dict.pop(), and no need for special cases.

And if a user wants to have None in ValueT_co, they can make their own sentinel that will be DefaultT. So we don't need to make a sentinel to be able to satisfy MutableMapping.

Copy link
Contributor Author

@llucax llucax Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the problem is pop() raises when the default is not specified, it doesn't return None (is not symmetric with get()), so we need to be able to tell between not passing the argument and passing None.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh damn, that's such a bad API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

"""Remove the latest value for a specific key and return it.

If no value has been received yet for that key, it returns the default value or
raises a `KeyError` if no default value is provided.

Args:
key: The key for which to remove the latest value.
default: The default value to return if no value has been received yet for
the specified key.

Returns:
The latest value that has been received for that key, or the default value if
no value has been received yet and a default value is provided.
"""
if isinstance(default, _NotSpecified):
return self._latest_value_by_key.pop(key)
return self._latest_value_by_key.pop(key, default)

def popitem(self) -> tuple[HashableT, ValueT_co]:
"""Remove and return a (key, value) pair from the cache.

Pairs are returned in LIFO (last-in, first-out) order.

Returns:
A tuple containing the key and the latest value that has been received for
that key.
"""
return self._latest_value_by_key.popitem()

def clear(self) -> None:
"""Clear all entries from the cache."""
self._latest_value_by_key.clear()

async def stop(self) -> None:
"""Stop the cache."""
Expand Down
3 changes: 2 additions & 1 deletion src/frequenz/channels/experimental/_with_previous.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"""Composable predicate to cache and compare with the previous message."""


from typing import Callable, Final, Generic, TypeGuard
from collections.abc import Callable
from typing import Final, Generic, TypeGuard

from frequenz.channels._generic import ChannelMessageT

Expand Down
61 changes: 59 additions & 2 deletions tests/test_grouping_latest_value_cache_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@


@pytest.mark.integration
async def test_latest_value_cache_key() -> None:
async def test_latest_value_cache_key() -> None: # pylint: disable=too-many-statements
"""Ensure LatestValueCache works with keys."""
channel = Broadcast[tuple[int, str]](name="lvc_test")

Expand All @@ -36,7 +36,9 @@ async def test_latest_value_cache_key() -> None:
assert 7 not in cache

assert cache.get(5) == (5, "c")
assert cache[5] == (5, "c")
assert cache.get(6) == (6, "b")
assert cache[6] == (6, "b")

assert cache.keys() == {5, 6}

Expand All @@ -59,9 +61,64 @@ async def test_latest_value_cache_key() -> None:

assert cache.keys() == {5, 6, 12}

cache.clear(5)
del cache[5]
assert 5 not in cache
assert 6 in cache

assert cache.get(5) is None
assert cache.keys() == {6, 12}

assert cache.pop(6) == (6, "g")
assert 6 not in cache
assert cache.keys() == {12}

assert cache.pop(8, default=True) is True
with pytest.raises(KeyError):
cache.pop(8)

assert cache.popitem() == (12, (12, "d"))
assert 12 not in cache
assert not cache

await sender.send((1, "h"))
await sender.send((2, "i"))
await asyncio.sleep(0)

expected = {1: (1, "h"), 2: (2, "i")}
assert cache.keys() == expected.keys()
assert list(cache.values()) == list(expected.values())
assert list(cache.items()) == list(expected.items())
assert cache == expected
assert list(cache) == list(expected)

cache.clear()
assert not cache
assert cache.keys() == set()

await cache.stop()


@pytest.mark.integration
async def test_equality() -> None:
"""Test that two caches with the same content are equal."""
channel = Broadcast[tuple[int, str]](name="lvc_test")

cache1: GroupingLatestValueCache[int, tuple[int, str]] = GroupingLatestValueCache(
channel.new_receiver(), key=lambda x: x[0]
)
cache2: GroupingLatestValueCache[int, tuple[int, str]] = GroupingLatestValueCache(
channel.new_receiver(), key=lambda x: x[0]
)

sender = channel.new_sender()
await sender.send((1, "one"))
await sender.send((2, "two"))
await asyncio.sleep(0)

assert cache1 == cache2

del cache1[1]
assert cache1 != cache2

await cache1.stop()
await cache2.stop()
Loading