Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

## New Features

- This release introduces the experimental `GroupingLatestValueCache`. It is similar to the `LatestValueCache`, but accepts an additional key-function as an argument, which takes each incoming message and returns a key for that message. The latest value received for each unique key gets cached and is available to look up on-demand.
- This release introduces the experimental `GroupingLatestValueCache`. It is similar to the `LatestValueCache`, but accepts an additional key-function as an argument, which takes each incoming message and returns a key for that message. The latest value received for each unique key gets cached and is available to look up on-demand through a `collections.abc.Mapping` interface.

## Bug Fixes

Expand Down
127 changes: 99 additions & 28 deletions src/frequenz/channels/experimental/_grouping_latest_value_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
takes a [Receiver][frequenz.channels.Receiver] and a `key` function as arguments and
stores the latest value received by that receiver for each key separately.

As soon as a value is received for a `key`, the
[`has_value`][frequenz.channels.experimental.GroupingLatestValueCache.has_value] method
returns `True` for that `key`, and the [`get`][frequenz.channels.LatestValueCache.get]
method for that `key` returns the latest value received. The `get` method will raise an
exception if called before any messages have been received from the receiver for a given
`key`.
The `GroupingLatestValueCache` implements the [`Mapping`][collections.abc.Mapping]
interface, so it can be used like a dictionary. In addition, it provides a
[has_value][frequenz.channels.experimental.GroupingLatestValueCache.has_value] method to
check if a value has been received for a specific key, and a
[clear][frequenz.channels.experimental.GroupingLatestValueCache.clear] method to clear
the cached value for a specific key.

Example:
```python
Expand All @@ -39,26 +39,25 @@

import asyncio
import typing
from collections.abc import Set
from collections.abc import ItemsView, Iterator, KeysView, Mapping, ValuesView

from typing_extensions import override

from .._receiver import Receiver

T_co = typing.TypeVar("T_co", covariant=True)
T = typing.TypeVar("T")
HashableT = typing.TypeVar("HashableT", bound=typing.Hashable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: Document these (adding a docstring below them), so they are shown in the docs index.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are internal, not exposed from the _grouping_latest_value_cache module. Not sure if they should be though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess both are fine. If they are not exported, the docs will show it as a "string" without any links, and it will not be present in the ToC. If the types are self explanatory we can keep it as it is. If they are not documented/exported, users will not know about the possible bound= arguments or constraints (or if it is co/contravariant, but for those we usually use the suffixes), so if it is important to the user to know any of that info, that could be a good reason to export and document.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a commit to do this. I'm somewhat doubtful about exposing them from the experimental package directly, but that seems to be the best option available.



class GroupingLatestValueCache(typing.Generic[T_co, HashableT]):
"""A cache that stores the latest value in a receiver.

It provides a way to look up the latest value in a stream without any delay,
as long as there has been one value received.
"""
class GroupingLatestValueCache(Mapping[HashableT, T_co]):
"""A cache that stores the latest value in a receiver, grouped by key."""

def __init__(
self,
receiver: Receiver[T_co],
key: typing.Callable[[T_co], typing.Any],
*,
key: typing.Callable[[T_co], HashableT],
unique_id: str | None = None,
) -> None:
"""Create a new cache.
Expand All @@ -84,47 +83,119 @@ def unique_id(self) -> str:
"""The unique identifier of this instance."""
return self._unique_id

def keys(self) -> Set[HashableT]:
@override
def keys(self) -> KeysView[HashableT]:
"""Return the set of keys for which values have been received.

If no key function is provided, this will return an empty set.
"""
return self._latest_value_by_key.keys()

def get(self, key: HashableT) -> T_co:
@override
def items(self) -> ItemsView[HashableT, T_co]:
"""Return an iterator over the key-value pairs of the latest values received."""
return self._latest_value_by_key.items()

@override
def values(self) -> ValuesView[T_co]:
"""Return an iterator over the latest values received."""
return self._latest_value_by_key.values()

@typing.overload
def get(self, key: HashableT, default: None = None) -> T_co | None:
"""Return the latest value that has been received for a specific key."""

# MyPy passes this overload as a valid signature, but pylint does not like it.
@typing.overload
def get( # pylint: disable=signature-differs
self, key: HashableT, default: T
) -> T_co | T:
"""Return the latest value that has been received for a specific key."""

@override
def get(self, key: HashableT, default: T | None = None) -> T_co | T | None:
"""Return the latest value that has been received.

This raises a `ValueError` if no value has been received yet. Use `has_value` to
check whether a value has been received yet, before trying to access the value,
to avoid the exception.

Args:
key: An optional key to retrieve the latest value for that key. If not
provided, it retrieves the latest value received overall.
default: The default value to return if no value has been received yet for
the specified key. If not provided, it defaults to `None`.

Returns:
The latest value that has been received.
"""
return self._latest_value_by_key.get(key, default)

@override
def __iter__(self) -> Iterator[HashableT]:
"""Return an iterator over the keys for which values have been received."""
return iter(self._latest_value_by_key)

@override
def __len__(self) -> int:
"""Return the number of keys for which values have been received."""
return len(self._latest_value_by_key)

@override
def __getitem__(self, key: HashableT) -> T_co:
"""Return the latest value that has been received for a specific key.

Args:
key: The key to retrieve the latest value for.

Returns:
The latest value that has been received for that key.

Raises:
ValueError: If no value has been received yet.
KeyError: If no value has been received yet for that key.
"""
if key not in self._latest_value_by_key:
raise ValueError(f"No value received for key: {key!r}")
raise KeyError(f"No value received for key: {key!r}")
return self._latest_value_by_key[key]

def has_value(self, key: HashableT) -> bool:
"""Check whether a value has been received yet.

If `key` is provided, it checks whether a value has been received for that key.
@override
def __contains__(self, key: object, /) -> bool:
"""Check if a value has been received for a specific key.

Args:
key: An optional key to check if a value has been received for that key.
key: The key to check for.

Returns:
`True` if a value has been received, `False` otherwise.
`True` if a value has been received for that key, `False` otherwise.
"""
return key in self._latest_value_by_key

@override
def __eq__(self, other: object, /) -> bool:
"""Check if this cache is equal to another object.

Two caches are considered equal if they have the same keys and values.

Args:
other: The object to compare with.

Returns:
`True` if the caches are equal, `False` otherwise.
"""
if not isinstance(other, GroupingLatestValueCache):
return NotImplemented
return self._latest_value_by_key == other._latest_value_by_key

@override
def __ne__(self, value: object, /) -> bool:
"""Check if this cache is not equal to another object.

Args:
value: The object to compare with.

Returns:
`True` if the caches are not equal, `False` otherwise.
"""
if not isinstance(value, GroupingLatestValueCache):
return NotImplemented
return self._latest_value_by_key != value._latest_value_by_key

def clear(self, key: HashableT) -> None:
"""Clear the latest value for a specific key.

Expand Down
23 changes: 10 additions & 13 deletions tests/test_grouping_latest_value_cache_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ async def test_latest_value_cache_key() -> None:
"""Ensure LatestValueCache works with keys."""
channel = Broadcast[tuple[int, str]](name="lvc_test")

cache: GroupingLatestValueCache[tuple[int, str], int] = GroupingLatestValueCache(
cache: GroupingLatestValueCache[int, tuple[int, str]] = GroupingLatestValueCache(
channel.new_receiver(), key=lambda x: x[0]
)
sender = channel.new_sender()

assert not cache.has_value(5)
with pytest.raises(ValueError, match="No value received for key: 0"):
cache.get(0)
assert 5 not in cache
assert cache.get(0) is None

assert cache.keys() == set()

Expand All @@ -32,17 +31,16 @@ async def test_latest_value_cache_key() -> None:
await sender.send((5, "c"))
await asyncio.sleep(0)

assert cache.has_value(5)
assert cache.has_value(6)
assert not cache.has_value(7)
assert 5 in cache
assert 6 in cache
assert 7 not in cache

assert cache.get(5) == (5, "c")
assert cache.get(6) == (6, "b")

assert cache.keys() == {5, 6}

with pytest.raises(ValueError, match="No value received for key: 7"):
cache.get(7)
assert cache.get(7, default=(7, "default")) == (7, "default")

await sender.send((12, "d"))
await asyncio.sleep(0)
Expand All @@ -62,9 +60,8 @@ async def test_latest_value_cache_key() -> None:
assert cache.keys() == {5, 6, 12}

cache.clear(5)
assert not cache.has_value(5)
assert cache.has_value(6)
assert 5 not in cache
assert 6 in cache

with pytest.raises(ValueError, match="No value received for key: 5"):
assert cache.get(5)
assert cache.get(5) is None
assert cache.keys() == {6, 12}
Loading