Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

## New Features

- This release introduces the experimental `GroupingLatestValueCache`. It is similar to the `LatestValueCache`, but accepts an additional key-function as an argument, which takes each incoming message and returns a key for that message. The latest value received for each unique key gets cached and is available to look up on-demand.
- This release introduces the experimental `GroupingLatestValueCache`. It is similar to the `LatestValueCache`, but accepts an additional key-function as an argument, which takes each incoming message and returns a key for that message. The latest value received for each unique key gets cached and is available to look up on-demand through a `collections.abc.Mapping` interface.

## Bug Fixes

Expand Down
10 changes: 9 additions & 1 deletion src/frequenz/channels/experimental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,26 @@
guidelines](https://github.com/frequenz-floss/docs/blob/v0.x.x/python/experimental-packages.md).
"""

from ._grouping_latest_value_cache import GroupingLatestValueCache
from ._grouping_latest_value_cache import (
DefaultT,
GroupingLatestValueCache,
HashableT,
ValueT_co,
)
from ._nop_receiver import NopReceiver
from ._optional_receiver import OptionalReceiver
from ._pipe import Pipe
from ._relay_sender import RelaySender
from ._with_previous import WithPrevious

__all__ = [
"DefaultT",
"GroupingLatestValueCache",
"HashableT",
"NopReceiver",
"OptionalReceiver",
"Pipe",
"RelaySender",
"ValueT_co",
"WithPrevious",
]
144 changes: 111 additions & 33 deletions src/frequenz/channels/experimental/_grouping_latest_value_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
takes a [Receiver][frequenz.channels.Receiver] and a `key` function as arguments and
stores the latest value received by that receiver for each key separately.

As soon as a value is received for a `key`, the
[`has_value`][frequenz.channels.experimental.GroupingLatestValueCache.has_value] method
returns `True` for that `key`, and the [`get`][frequenz.channels.LatestValueCache.get]
method for that `key` returns the latest value received. The `get` method will raise an
exception if called before any messages have been received from the receiver for a given
`key`.
The `GroupingLatestValueCache` implements the [`Mapping`][collections.abc.Mapping]
interface, so it can be used like a dictionary. In addition, it provides a
[has_value][frequenz.channels.experimental.GroupingLatestValueCache.has_value] method to
check if a value has been received for a specific key, and a
[clear][frequenz.channels.experimental.GroupingLatestValueCache.clear] method to clear
the cached value for a specific key.

Example:
```python
Expand All @@ -39,26 +39,30 @@

import asyncio
import typing
from collections.abc import Set
from collections.abc import ItemsView, Iterator, KeysView, Mapping, ValuesView

from typing_extensions import override

from .._receiver import Receiver

T_co = typing.TypeVar("T_co", covariant=True)
HashableT = typing.TypeVar("HashableT", bound=typing.Hashable)
ValueT_co = typing.TypeVar("ValueT_co", covariant=True)
"""Covariant type variable for the values cached by the `GroupingLatestValueCache`."""

DefaultT = typing.TypeVar("DefaultT")
"""Type variable for the default value returned by `GroupingLatestValueCache.get`."""

HashableT = typing.TypeVar("HashableT", bound=typing.Hashable)
"""Type variable for the keys used to group values in the `GroupingLatestValueCache`."""

class GroupingLatestValueCache(typing.Generic[T_co, HashableT]):
"""A cache that stores the latest value in a receiver.

It provides a way to look up the latest value in a stream without any delay,
as long as there has been one value received.
"""
class GroupingLatestValueCache(Mapping[HashableT, ValueT_co]):
"""A cache that stores the latest value in a receiver, grouped by key."""

def __init__(
self,
receiver: Receiver[T_co],
key: typing.Callable[[T_co], typing.Any],
receiver: Receiver[ValueT_co],
*,
key: typing.Callable[[ValueT_co], HashableT],
unique_id: str | None = None,
) -> None:
"""Create a new cache.
Expand All @@ -71,10 +75,10 @@ def __init__(
provided, a unique identifier will be generated from the object's
[`id()`][id]. It is used mostly for debugging purposes.
"""
self._receiver: Receiver[T_co] = receiver
self._key: typing.Callable[[T_co], HashableT] = key
self._receiver: Receiver[ValueT_co] = receiver
self._key: typing.Callable[[ValueT_co], HashableT] = key
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
self._latest_value_by_key: dict[HashableT, T_co] = {}
self._latest_value_by_key: dict[HashableT, ValueT_co] = {}
self._task: asyncio.Task[None] = asyncio.create_task(
self._run(), name=f"LatestValueCache«{self._unique_id}»"
)
Expand All @@ -84,47 +88,121 @@ def unique_id(self) -> str:
"""The unique identifier of this instance."""
return self._unique_id

def keys(self) -> Set[HashableT]:
@override
def keys(self) -> KeysView[HashableT]:
"""Return the set of keys for which values have been received.

If no key function is provided, this will return an empty set.
"""
return self._latest_value_by_key.keys()

def get(self, key: HashableT) -> T_co:
@override
def items(self) -> ItemsView[HashableT, ValueT_co]:
"""Return an iterator over the key-value pairs of the latest values received."""
return self._latest_value_by_key.items()

@override
def values(self) -> ValuesView[ValueT_co]:
"""Return an iterator over the latest values received."""
return self._latest_value_by_key.values()

@typing.overload
def get(self, key: HashableT, default: None = None) -> ValueT_co | None:
"""Return the latest value that has been received for a specific key."""

# MyPy passes this overload as a valid signature, but pylint does not like it.
@typing.overload
def get( # pylint: disable=signature-differs
self, key: HashableT, default: DefaultT
) -> ValueT_co | DefaultT:
"""Return the latest value that has been received for a specific key."""

@override
def get(
self, key: HashableT, default: DefaultT | None = None
) -> ValueT_co | DefaultT | None:
"""Return the latest value that has been received.

This raises a `ValueError` if no value has been received yet. Use `has_value` to
check whether a value has been received yet, before trying to access the value,
to avoid the exception.

Args:
key: An optional key to retrieve the latest value for that key. If not
provided, it retrieves the latest value received overall.
default: The default value to return if no value has been received yet for
the specified key. If not provided, it defaults to `None`.

Returns:
The latest value that has been received.
"""
return self._latest_value_by_key.get(key, default)

@override
def __iter__(self) -> Iterator[HashableT]:
"""Return an iterator over the keys for which values have been received."""
return iter(self._latest_value_by_key)

@override
def __len__(self) -> int:
"""Return the number of keys for which values have been received."""
return len(self._latest_value_by_key)

@override
def __getitem__(self, key: HashableT) -> ValueT_co:
"""Return the latest value that has been received for a specific key.

Args:
key: The key to retrieve the latest value for.

Returns:
The latest value that has been received for that key.

Raises:
ValueError: If no value has been received yet.
KeyError: If no value has been received yet for that key.
"""
if key not in self._latest_value_by_key:
raise ValueError(f"No value received for key: {key!r}")
raise KeyError(f"No value received for key: {key!r}")
return self._latest_value_by_key[key]

def has_value(self, key: HashableT) -> bool:
"""Check whether a value has been received yet.

If `key` is provided, it checks whether a value has been received for that key.
@override
def __contains__(self, key: object, /) -> bool:
"""Check if a value has been received for a specific key.

Args:
key: An optional key to check if a value has been received for that key.
key: The key to check for.

Returns:
`True` if a value has been received, `False` otherwise.
`True` if a value has been received for that key, `False` otherwise.
"""
return key in self._latest_value_by_key

@override
def __eq__(self, other: object, /) -> bool:
"""Check if this cache is equal to another object.

Two caches are considered equal if they have the same keys and values.

Args:
other: The object to compare with.

Returns:
`True` if the caches are equal, `False` otherwise.
"""
if not isinstance(other, GroupingLatestValueCache):
return NotImplemented
return self._latest_value_by_key == other._latest_value_by_key

@override
def __ne__(self, value: object, /) -> bool:
"""Check if this cache is not equal to another object.

Args:
value: The object to compare with.

Returns:
`True` if the caches are not equal, `False` otherwise.
"""
if not isinstance(value, GroupingLatestValueCache):
return NotImplemented
return self._latest_value_by_key != value._latest_value_by_key

def clear(self, key: HashableT) -> None:
"""Clear the latest value for a specific key.

Expand Down
23 changes: 10 additions & 13 deletions tests/test_grouping_latest_value_cache_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ async def test_latest_value_cache_key() -> None:
"""Ensure LatestValueCache works with keys."""
channel = Broadcast[tuple[int, str]](name="lvc_test")

cache: GroupingLatestValueCache[tuple[int, str], int] = GroupingLatestValueCache(
cache: GroupingLatestValueCache[int, tuple[int, str]] = GroupingLatestValueCache(
channel.new_receiver(), key=lambda x: x[0]
)
sender = channel.new_sender()

assert not cache.has_value(5)
with pytest.raises(ValueError, match="No value received for key: 0"):
cache.get(0)
assert 5 not in cache
assert cache.get(0) is None

assert cache.keys() == set()

Expand All @@ -32,17 +31,16 @@ async def test_latest_value_cache_key() -> None:
await sender.send((5, "c"))
await asyncio.sleep(0)

assert cache.has_value(5)
assert cache.has_value(6)
assert not cache.has_value(7)
assert 5 in cache
assert 6 in cache
assert 7 not in cache

assert cache.get(5) == (5, "c")
assert cache.get(6) == (6, "b")

assert cache.keys() == {5, 6}

with pytest.raises(ValueError, match="No value received for key: 7"):
cache.get(7)
assert cache.get(7, default=(7, "default")) == (7, "default")

await sender.send((12, "d"))
await asyncio.sleep(0)
Expand All @@ -62,9 +60,8 @@ async def test_latest_value_cache_key() -> None:
assert cache.keys() == {5, 6, 12}

cache.clear(5)
assert not cache.has_value(5)
assert cache.has_value(6)
assert 5 not in cache
assert 6 in cache

with pytest.raises(ValueError, match="No value received for key: 5"):
assert cache.get(5)
assert cache.get(5) is None
assert cache.keys() == {6, 12}
Loading