Skip to content

Commit 07198d7

Browse files
committed
Revert "Support grouping by keys in LatestValueCache (frequenz-floss#424)"
This reverts commit 3edcb49, reversing changes made to f7fb341. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 0aae91f commit 07198d7

File tree

3 files changed

+11
-146
lines changed

3 files changed

+11
-146
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
## New Features
1212

13-
- `LatestValueCache` now takes an optional `key` function, which returns the key for each incoming message, and the latest value for each key is cached and can be retrieved separately.
13+
<!-- Here goes the main new features and examples or instructions on how to use them -->
1414

1515
## Bug Fixes
1616

src/frequenz/channels/_latest_value_cache.py

Lines changed: 10 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,12 @@
88
99
[LatestValueCache][frequenz.channels.LatestValueCache] takes a
1010
[Receiver][frequenz.channels.Receiver] as an argument and stores the latest
11-
value received by that receiver. It also takes an optional `key` function
12-
that allows you to group the values by a specific key. If the `key` is
13-
provided, the cache will store the latest value for each key separately,
14-
otherwise it will store only the latest value received overall.
15-
16-
As soon as a value is received, its
11+
value received by that receiver. As soon as a value is received, its
1712
[`has_value`][frequenz.channels.LatestValueCache.has_value] method returns
1813
`True`, and its [`get`][frequenz.channels.LatestValueCache.get] method returns
1914
the latest value received. The `get` method will raise an exception if called
2015
before any messages have been received from the receiver.
2116
22-
Both `has_value` and `get` methods can take an optional `key` argument to
23-
check or retrieve the latest value for that specific key.
24-
2517
Example:
2618
```python
2719
from frequenz.channels import Broadcast, LatestValueCache
@@ -40,84 +32,31 @@
4032
```
4133
"""
4234

43-
from __future__ import annotations
44-
4535
import asyncio
4636
import typing
4737

4838
from ._receiver import Receiver
4939

5040
T_co = typing.TypeVar("T_co", covariant=True)
51-
HashableT = typing.TypeVar("HashableT", bound=typing.Hashable)
5241

5342

54-
class Sentinel:
43+
class _Sentinel:
5544
"""A sentinel to denote that no value has been received yet."""
5645

57-
def __init__(self, desc: str) -> None:
58-
"""Initialize the sentinel."""
59-
self._desc = desc
60-
6146
def __str__(self) -> str:
6247
"""Return a string representation of this sentinel."""
63-
return f"<Sentinel: {self._desc}>"
48+
return "<no value received yet>"
6449

6550

66-
NO_KEY: typing.Final[Sentinel] = Sentinel("no key provided")
67-
NO_KEY_FUNCTION: typing.Final[Sentinel] = Sentinel("no key function provided")
68-
NO_VALUE_RECEIVED: typing.Final[Sentinel] = Sentinel("no value received yet")
69-
70-
71-
class LatestValueCache(typing.Generic[T_co, HashableT]):
51+
class LatestValueCache(typing.Generic[T_co]):
7252
"""A cache that stores the latest value in a receiver.
7353
7454
It provides a way to look up the latest value in a stream without any delay,
7555
as long as there has been one value received.
7656
"""
7757

78-
@typing.overload
79-
def __init__(
80-
self: LatestValueCache[T_co, Sentinel],
81-
receiver: Receiver[T_co],
82-
*,
83-
unique_id: str | None = None,
84-
key: Sentinel = NO_KEY_FUNCTION,
85-
) -> None:
86-
"""Create a new cache that does not use keys.
87-
88-
Args:
89-
receiver: The receiver to cache.
90-
unique_id: A string to help uniquely identify this instance. If not
91-
provided, a unique identifier will be generated from the object's
92-
[`id()`][id]. It is used mostly for debugging purposes.
93-
key: This parameter is ignored when set to `None`.
94-
"""
95-
96-
@typing.overload
9758
def __init__(
98-
self: LatestValueCache[T_co, HashableT],
99-
receiver: Receiver[T_co],
100-
*,
101-
unique_id: str | None = None,
102-
key: typing.Callable[[T_co], HashableT],
103-
) -> None:
104-
"""Create a new cache that uses keys.
105-
106-
Args:
107-
receiver: The receiver to cache.
108-
unique_id: A string to help uniquely identify this instance. If not
109-
provided, a unique identifier will be generated from the object's
110-
[`id()`][id]. It is used mostly for debugging purposes.
111-
key: A function that takes a value and returns a key to group the values by.
112-
If provided, the cache will store the latest value for each key separately.
113-
"""
114-
115-
def __init__(
116-
self,
117-
receiver: Receiver[T_co],
118-
*,
119-
unique_id: str | None = None,
120-
key: typing.Callable[[T_co], typing.Any] | Sentinel = NO_KEY_FUNCTION,
59+
self, receiver: Receiver[T_co], *, unique_id: str | None = None
12160
) -> None:
12261
"""Create a new cache.
12362
@@ -126,16 +65,10 @@ def __init__(
12665
unique_id: A string to help uniquely identify this instance. If not
12766
provided, a unique identifier will be generated from the object's
12867
[`id()`][id]. It is used mostly for debugging purposes.
129-
key: An optional function that takes a value and returns a key to group the
130-
values by. If provided, the cache will store the latest value for each
131-
key separately. If not provided, it will store only the latest value
132-
received overall.
13368
"""
13469
self._receiver = receiver
135-
self._key: typing.Callable[[T_co], HashableT] | Sentinel = key
13670
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
137-
self._latest_value: T_co | Sentinel = NO_VALUE_RECEIVED
138-
self._latest_value_by_key: dict[HashableT, T_co] = {}
71+
self._latest_value: T_co | _Sentinel = _Sentinel()
13972
self._task = asyncio.create_task(
14073
self._run(), name=f"LatestValueCache«{self._unique_id}»"
14174
)
@@ -145,53 +78,34 @@ def unique_id(self) -> str:
14578
"""The unique identifier of this instance."""
14679
return self._unique_id
14780

148-
def get(self, key: HashableT | Sentinel = NO_KEY) -> T_co:
81+
def get(self) -> T_co:
14982
"""Return the latest value that has been received.
15083
15184
This raises a `ValueError` if no value has been received yet. Use `has_value` to
15285
check whether a value has been received yet, before trying to access the value,
15386
to avoid the exception.
15487
155-
Args:
156-
key: An optional key to retrieve the latest value for that key. If not
157-
provided, it retrieves the latest value received overall.
158-
15988
Returns:
16089
The latest value that has been received.
16190
16291
Raises:
16392
ValueError: If no value has been received yet.
16493
"""
165-
if not isinstance(key, Sentinel):
166-
if key not in self._latest_value_by_key:
167-
raise ValueError(f"No value received for key: {key!r}")
168-
return self._latest_value_by_key[key]
169-
170-
if isinstance(self._latest_value, Sentinel):
94+
if isinstance(self._latest_value, _Sentinel):
17195
raise ValueError("No value has been received yet.")
17296
return self._latest_value
17397

174-
def has_value(self, key: HashableT | Sentinel = NO_KEY) -> bool:
98+
def has_value(self) -> bool:
17599
"""Check whether a value has been received yet.
176100
177-
If `key` is provided, it checks whether a value has been received for that key.
178-
179-
Args:
180-
key: An optional key to check if a value has been received for that key.
181-
182101
Returns:
183102
`True` if a value has been received, `False` otherwise.
184103
"""
185-
if not isinstance(key, Sentinel):
186-
return key in self._latest_value_by_key
187-
return not isinstance(self._latest_value, Sentinel)
104+
return not isinstance(self._latest_value, _Sentinel)
188105

189106
async def _run(self) -> None:
190107
async for value in self._receiver:
191108
self._latest_value = value
192-
if not isinstance(self._key, Sentinel):
193-
key = self._key(value)
194-
self._latest_value_by_key[key] = value
195109

196110
async def stop(self) -> None:
197111
"""Stop the cache."""

tests/test_latest_value_cache_integration.py

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -43,52 +43,3 @@ async def test_latest_value_cache() -> None:
4343
await asyncio.sleep(0)
4444

4545
assert cache.get() == 19
46-
47-
48-
@pytest.mark.integration
49-
async def test_latest_value_cache_key() -> None:
50-
"""Ensure LatestValueCache works with keys."""
51-
channel = Broadcast[tuple[int, str]](name="lvc_test")
52-
53-
cache = LatestValueCache(channel.new_receiver(), key=lambda x: x[0])
54-
sender = channel.new_sender()
55-
56-
assert not cache.has_value()
57-
with pytest.raises(ValueError, match="No value has been received yet."):
58-
cache.get()
59-
with pytest.raises(ValueError, match="No value received for key: 0"):
60-
cache.get(0)
61-
62-
await sender.send((5, "a"))
63-
await sender.send((6, "b"))
64-
await sender.send((5, "c"))
65-
await asyncio.sleep(0)
66-
67-
assert cache.has_value()
68-
assert cache.has_value(5)
69-
assert cache.has_value(6)
70-
assert not cache.has_value(7)
71-
72-
assert cache.get() == (5, "c")
73-
assert cache.get(5) == (5, "c")
74-
assert cache.get(6) == (6, "b")
75-
76-
with pytest.raises(ValueError, match="No value received for key: 7"):
77-
cache.get(7)
78-
79-
await sender.send((12, "d"))
80-
await asyncio.sleep(0)
81-
82-
assert cache.get() == (12, "d")
83-
assert cache.get() == (12, "d")
84-
assert cache.get(12) == (12, "d")
85-
assert cache.get(12) == (12, "d")
86-
assert cache.get(5) == (5, "c")
87-
assert cache.get(6) == (6, "b")
88-
89-
await sender.send((6, "e"))
90-
await sender.send((6, "f"))
91-
await sender.send((6, "g"))
92-
await asyncio.sleep(0)
93-
94-
assert cache.get(6) == (6, "g")

0 commit comments

Comments
 (0)