Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

## New Features

- `LatestValueCache` now takes an optional `key` function, which returns the key for each incoming message, and the latest value for each key is cached and can be retrieved separately.
- This release introduces the experimental `GroupingLatestValueCache`. It is similar to the `LatestValueCache`, but accepts an additional key-function as an argument, which takes each incoming message and returns a key for that message. The latest value received for each unique key gets cached and is available to look up on-demand.

## Bug Fixes

Expand Down
114 changes: 10 additions & 104 deletions src/frequenz/channels/_latest_value_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,12 @@

[LatestValueCache][frequenz.channels.LatestValueCache] takes a
[Receiver][frequenz.channels.Receiver] as an argument and stores the latest
value received by that receiver. It also takes an optional `key` function
that allows you to group the values by a specific key. If the `key` is
provided, the cache will store the latest value for each key separately,
otherwise it will store only the latest value received overall.

As soon as a value is received, its
value received by that receiver. As soon as a value is received, its
[`has_value`][frequenz.channels.LatestValueCache.has_value] method returns
`True`, and its [`get`][frequenz.channels.LatestValueCache.get] method returns
the latest value received. The `get` method will raise an exception if called
before any messages have been received from the receiver.

Both `has_value` and `get` methods can take an optional `key` argument to
check or retrieve the latest value for that specific key.

Example:
```python
from frequenz.channels import Broadcast, LatestValueCache
Expand All @@ -40,85 +32,31 @@
```
"""

from __future__ import annotations

import asyncio
import typing
from collections.abc import Set

from ._receiver import Receiver

T_co = typing.TypeVar("T_co", covariant=True)
HashableT = typing.TypeVar("HashableT", bound=typing.Hashable)


class Sentinel:
class _Sentinel:
"""A sentinel to denote that no value has been received yet."""

def __init__(self, desc: str) -> None:
"""Initialize the sentinel."""
self._desc = desc

def __str__(self) -> str:
"""Return a string representation of this sentinel."""
return f"<Sentinel: {self._desc}>"

return "<no value received yet>"

NO_KEY: typing.Final[Sentinel] = Sentinel("no key provided")
NO_KEY_FUNCTION: typing.Final[Sentinel] = Sentinel("no key function provided")
NO_VALUE_RECEIVED: typing.Final[Sentinel] = Sentinel("no value received yet")


class LatestValueCache(typing.Generic[T_co, HashableT]):
class LatestValueCache(typing.Generic[T_co]):
"""A cache that stores the latest value in a receiver.

It provides a way to look up the latest value in a stream without any delay,
as long as there has been one value received.
"""

@typing.overload
def __init__(
self: LatestValueCache[T_co, Sentinel],
receiver: Receiver[T_co],
*,
unique_id: str | None = None,
key: Sentinel = NO_KEY_FUNCTION,
) -> None:
"""Create a new cache that does not use keys.

Args:
receiver: The receiver to cache.
unique_id: A string to help uniquely identify this instance. If not
provided, a unique identifier will be generated from the object's
[`id()`][id]. It is used mostly for debugging purposes.
key: This parameter is ignored when set to `None`.
"""

@typing.overload
def __init__(
self: LatestValueCache[T_co, HashableT],
receiver: Receiver[T_co],
*,
unique_id: str | None = None,
key: typing.Callable[[T_co], HashableT],
) -> None:
"""Create a new cache that uses keys.

Args:
receiver: The receiver to cache.
unique_id: A string to help uniquely identify this instance. If not
provided, a unique identifier will be generated from the object's
[`id()`][id]. It is used mostly for debugging purposes.
key: A function that takes a value and returns a key to group the values by.
If provided, the cache will store the latest value for each key separately.
"""

def __init__(
self,
receiver: Receiver[T_co],
*,
unique_id: str | None = None,
key: typing.Callable[[T_co], typing.Any] | Sentinel = NO_KEY_FUNCTION,
self, receiver: Receiver[T_co], *, unique_id: str | None = None
) -> None:
"""Create a new cache.

Expand All @@ -127,16 +65,10 @@ def __init__(
unique_id: A string to help uniquely identify this instance. If not
provided, a unique identifier will be generated from the object's
[`id()`][id]. It is used mostly for debugging purposes.
key: An optional function that takes a value and returns a key to group the
values by. If provided, the cache will store the latest value for each
key separately. If not provided, it will store only the latest value
received overall.
"""
self._receiver = receiver
self._key: typing.Callable[[T_co], HashableT] | Sentinel = key
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
self._latest_value: T_co | Sentinel = NO_VALUE_RECEIVED
self._latest_value_by_key: dict[HashableT, T_co] = {}
self._latest_value: T_co | _Sentinel = _Sentinel()
self._task = asyncio.create_task(
self._run(), name=f"LatestValueCache«{self._unique_id}»"
)
Expand All @@ -146,60 +78,34 @@ def unique_id(self) -> str:
"""The unique identifier of this instance."""
return self._unique_id

def keys(self) -> Set[HashableT]:
"""Return the set of keys for which values have been received.

If no key function is provided, this will return an empty set.
"""
return self._latest_value_by_key.keys()

def get(self, key: HashableT | Sentinel = NO_KEY) -> T_co:
def get(self) -> T_co:
"""Return the latest value that has been received.

This raises a `ValueError` if no value has been received yet. Use `has_value` to
check whether a value has been received yet, before trying to access the value,
to avoid the exception.

Args:
key: An optional key to retrieve the latest value for that key. If not
provided, it retrieves the latest value received overall.

Returns:
The latest value that has been received.

Raises:
ValueError: If no value has been received yet.
"""
if not isinstance(key, Sentinel):
if key not in self._latest_value_by_key:
raise ValueError(f"No value received for key: {key!r}")
return self._latest_value_by_key[key]

if isinstance(self._latest_value, Sentinel):
if isinstance(self._latest_value, _Sentinel):
raise ValueError("No value has been received yet.")
return self._latest_value

def has_value(self, key: HashableT | Sentinel = NO_KEY) -> bool:
def has_value(self) -> bool:
"""Check whether a value has been received yet.

If `key` is provided, it checks whether a value has been received for that key.

Args:
key: An optional key to check if a value has been received for that key.

Returns:
`True` if a value has been received, `False` otherwise.
"""
if not isinstance(key, Sentinel):
return key in self._latest_value_by_key
return not isinstance(self._latest_value, Sentinel)
return not isinstance(self._latest_value, _Sentinel)

async def _run(self) -> None:
async for value in self._receiver:
self._latest_value = value
if not isinstance(self._key, Sentinel):
key = self._key(value)
self._latest_value_by_key[key] = value

async def stop(self) -> None:
"""Stop the cache."""
Expand Down
2 changes: 2 additions & 0 deletions src/frequenz/channels/experimental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
guidelines](https://github.com/frequenz-floss/docs/blob/v0.x.x/python/experimental-packages.md).
"""

from ._grouping_latest_value_cache import GroupingLatestValueCache
from ._nop_receiver import NopReceiver
from ._optional_receiver import OptionalReceiver
from ._pipe import Pipe
from ._relay_sender import RelaySender
from ._with_previous import WithPrevious

__all__ = [
"GroupingLatestValueCache",
"NopReceiver",
"OptionalReceiver",
"Pipe",
Expand Down
155 changes: 155 additions & 0 deletions src/frequenz/channels/experimental/_grouping_latest_value_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""The GroupingLatestValueCache caches the latest values in a receiver grouped by key.

It provides a way to look up on demand, the latest value in a stream for any key, as
long as there has been at least one value received for that key.

[GroupingLatestValueCache][frequenz.channels.experimental.GroupingLatestValueCache]
takes a [Receiver][frequenz.channels.Receiver] and a `key` function as arguments and
stores the latest value received by that receiver for each key separately.

As soon as a value is received for a `key`, the
[`has_value`][frequenz.channels.experimental.GroupingLatestValueCache.has_value] method
returns `True` for that `key`, and the [`get`][frequenz.channels.LatestValueCache.get]
method for that `key` returns the latest value received. The `get` method will raise an
exception if called before any messages have been received from the receiver for a given
`key`.

Example:
```python
from frequenz.channels import Broadcast
from frequenz.channels.experimental import GroupingLatestValueCache

channel = Broadcast[tuple[int, str]](name="lvc_test")

cache = GroupingLatestValueCache(channel.new_receiver(), key=lambda x: x[0])
sender = channel.new_sender()

assert not cache.has_value(6)

await sender.send((6, "twenty-six"))

assert cache.has_value(6)
assert cache.get(6) == (6, "twenty-six")
```
"""


import asyncio
import typing
from collections.abc import Set

from .._receiver import Receiver

T_co = typing.TypeVar("T_co", covariant=True)
HashableT = typing.TypeVar("HashableT", bound=typing.Hashable)


class GroupingLatestValueCache(typing.Generic[T_co, HashableT]):
"""A cache that stores the latest value in a receiver.

It provides a way to look up the latest value in a stream without any delay,
as long as there has been one value received.
"""

def __init__(
self,
receiver: Receiver[T_co],
key: typing.Callable[[T_co], typing.Any],
*,
unique_id: str | None = None,
) -> None:
"""Create a new cache.

Args:
receiver: The receiver to cache values from.
key: An function that takes a value and returns a key to group the values
by.
unique_id: A string to help uniquely identify this instance. If not
provided, a unique identifier will be generated from the object's
[`id()`][id]. It is used mostly for debugging purposes.
"""
self._receiver: Receiver[T_co] = receiver
self._key: typing.Callable[[T_co], HashableT] = key
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
self._latest_value_by_key: dict[HashableT, T_co] = {}
self._task: asyncio.Task[None] = asyncio.create_task(
self._run(), name=f"LatestValueCache«{self._unique_id}»"
)

@property
def unique_id(self) -> str:
"""The unique identifier of this instance."""
return self._unique_id

def keys(self) -> Set[HashableT]:
"""Return the set of keys for which values have been received.

If no key function is provided, this will return an empty set.
"""
return self._latest_value_by_key.keys()

def get(self, key: HashableT) -> T_co:
"""Return the latest value that has been received.

This raises a `ValueError` if no value has been received yet. Use `has_value` to
check whether a value has been received yet, before trying to access the value,
to avoid the exception.

Args:
key: An optional key to retrieve the latest value for that key. If not
provided, it retrieves the latest value received overall.

Returns:
The latest value that has been received.

Raises:
ValueError: If no value has been received yet.
"""
if key not in self._latest_value_by_key:
raise ValueError(f"No value received for key: {key!r}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why this is not a KeyError? 🤔

return self._latest_value_by_key[key]

def has_value(self, key: HashableT) -> bool:
"""Check whether a value has been received yet.

If `key` is provided, it checks whether a value has been received for that key.

Args:
key: An optional key to check if a value has been received for that key.

Returns:
`True` if a value has been received, `False` otherwise.
"""
return key in self._latest_value_by_key

def clear(self, key: HashableT) -> None:
"""Clear the latest value for a specific key.

Args:
key: The key for which to clear the latest value.
"""
_ = self._latest_value_by_key.pop(key, None)

async def stop(self) -> None:
"""Stop the cache."""
if not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass

def __repr__(self) -> str:
"""Return a string representation of this cache."""
return (
f"<GroupingLatestValueCache num_keys={len(self._latest_value_by_key.keys())}, "
f"receiver={self._receiver!r}, unique_id={self._unique_id!r}>"
)

async def _run(self) -> None:
async for value in self._receiver:
key = self._key(value)
self._latest_value_by_key[key] = value
Loading
Loading