Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@
"""Type variable for the keys used to group values in the `GroupingLatestValueCache`."""


class _NotSpecified:
"""A sentinel value to indicate that no default value was provided."""

def __repr__(self) -> str:
"""Return a string representation of this sentinel."""
return "<_NotSpecified>"


class GroupingLatestValueCache(Mapping[HashableT, ValueT_co]):
"""A cache that stores the latest value in a receiver, grouped by key.

Expand All @@ -33,12 +41,15 @@ class GroupingLatestValueCache(Mapping[HashableT, ValueT_co]):
stores the latest value received by that receiver for each key separately.

The `GroupingLatestValueCache` implements the [`Mapping`][collections.abc.Mapping]
interface, so it can be used like a dictionary. It is not
a [`MutableMapping`][collections.abc.MutableMapping] because users can't mutate the
cache directly, it is only mutated by the underlying receiver. There is one exception
though, users can clear individual keys from the cache using the
[__delitem__][frequenz.channels.experimental.GroupingLatestValueCache.__delitem__]
method.
interface, so it can be used like a dictionary. Additionally other methods from
[`MutableMapping`][collections.abc.MutableMapping] are implemented, but only
methods removing items from the cache are allowed, such as
[`pop()`][frequenz.channels.experimental.GroupingLatestValueCache.pop],
[`popitem()`][frequenz.channels.experimental.GroupingLatestValueCache.popitem],
[`clear()`][frequenz.channels.experimental.GroupingLatestValueCache.clear], and
[`__delitem__()`][frequenz.channels.experimental.GroupingLatestValueCache.__delitem__].
Other update methods are not provided because the user should not update the
cache values directly.

Example:
```python
Expand Down Expand Up @@ -220,6 +231,50 @@ def __delitem__(self, key: HashableT) -> None:
"""
del self._latest_value_by_key[key]

@typing.overload
def pop(self, key: HashableT, /) -> ValueT_co | None:
"""Remove the latest value for a specific key and return it."""

@typing.overload
def pop(self, key: HashableT, /, default: DefaultT) -> ValueT_co | DefaultT:
"""Remove the latest value for a specific key and return it."""

def pop(
self, key: HashableT, /, default: DefaultT | _NotSpecified = _NotSpecified()
) -> ValueT_co | DefaultT | None:
Comment on lines +247 to +249
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def pop(
self, key: HashableT, /, default: DefaultT | _NotSpecified = _NotSpecified()
) -> ValueT_co | DefaultT | None:
def pop(
self, key: HashableT, /, default: DefaultT | None = None
) -> ValueT_co | DefaultT | None:

Then you can use return self._latest_value_by_key.pop(key, default) as the only body, because that's the exact signature of dict.pop(), and no need for special cases.

And if a user wants to have None in ValueT_co, they can make their own sentinel that will be DefaultT. So we don't need to make a sentinel to be able to satisfy MutableMapping.

Copy link
Contributor Author

@llucax llucax Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the problem is pop() raises when the default is not specified, it doesn't return None (is not symmetric with get()), so we need to be able to tell between not passing the argument and passing None.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh damn, that's such a bad API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

"""Remove the latest value for a specific key and return it.

If no value has been received yet for that key, it returns the default value or
raises a `KeyError` if no default value is provided.

Args:
key: The key for which to remove the latest value.
default: The default value to return if no value has been received yet for
the specified key.

Returns:
The latest value that has been received for that key, or the default value if
no value has been received yet and a default value is provided.
"""
if isinstance(default, _NotSpecified):
return self._latest_value_by_key.pop(key)
return self._latest_value_by_key.pop(key, default)

def popitem(self) -> tuple[HashableT, ValueT_co]:
"""Remove and return a (key, value) pair from the cache.

Pairs are returned in LIFO (last-in, first-out) order.

Returns:
A tuple containing the key and the latest value that has been received for
that key.
"""
return self._latest_value_by_key.popitem()

def clear(self) -> None:
"""Clear all entries from the cache."""
self._latest_value_by_key.clear()

async def stop(self) -> None:
"""Stop the cache."""
if not self._task.done():
Expand Down
59 changes: 58 additions & 1 deletion tests/test_grouping_latest_value_cache_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@


@pytest.mark.integration
async def test_latest_value_cache_key() -> None:
async def test_latest_value_cache_key() -> None: # pylint: disable=too-many-statements
"""Ensure LatestValueCache works with keys."""
channel = Broadcast[tuple[int, str]](name="lvc_test")

Expand All @@ -36,7 +36,9 @@ async def test_latest_value_cache_key() -> None:
assert 7 not in cache

assert cache.get(5) == (5, "c")
assert cache[5] == (5, "c")
assert cache.get(6) == (6, "b")
assert cache[6] == (6, "b")

assert cache.keys() == {5, 6}

Expand Down Expand Up @@ -65,3 +67,58 @@ async def test_latest_value_cache_key() -> None:

assert cache.get(5) is None
assert cache.keys() == {6, 12}

assert cache.pop(6) == (6, "g")
assert 6 not in cache
assert cache.keys() == {12}

assert cache.pop(8, default=True) is True
with pytest.raises(KeyError):
cache.pop(8)

assert cache.popitem() == (12, (12, "d"))
assert 12 not in cache
assert not cache

await sender.send((1, "h"))
await sender.send((2, "i"))
await asyncio.sleep(0)

expected = {1: (1, "h"), 2: (2, "i")}
assert cache.keys() == expected.keys()
assert list(cache.values()) == list(expected.values())
assert list(cache.items()) == list(expected.items())
# assert cache == expected
assert list(cache) == list(expected)

cache.clear()
assert not cache
assert cache.keys() == set()

await cache.stop()


@pytest.mark.integration
async def test_equality() -> None:
"""Test that two caches with the same content are equal."""
channel = Broadcast[tuple[int, str]](name="lvc_test")

cache1: GroupingLatestValueCache[int, tuple[int, str]] = GroupingLatestValueCache(
channel.new_receiver(), key=lambda x: x[0]
)
cache2: GroupingLatestValueCache[int, tuple[int, str]] = GroupingLatestValueCache(
channel.new_receiver(), key=lambda x: x[0]
)

sender = channel.new_sender()
await sender.send((1, "one"))
await sender.send((2, "two"))
await asyncio.sleep(0)

assert cache1 == cache2

del cache1[1]
assert cache1 != cache2

await cache1.stop()
await cache2.stop()