Skip to content

Commit a90611c

Browse files
authored
Import the LatestValueCache implementation from the Frequenz SDK (#302)
This PR also adds an integration test for `LatestValueCache`.
2 parents 2c606e6 + 3e68eea commit a90611c

File tree

5 files changed

+191
-1
lines changed

5 files changed

+191
-1
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
## New Features
1212

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
13+
- The `LatestValueCache` class, which used to be internal to the Frequenz SDK, is now available through the channels package.
1414

1515
## Bug Fixes
1616

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Caching
2+
3+
::: frequenz.channels._latest_value_cache
4+
options:
5+
inherited_members: []
6+
members: []
7+
show_bases: false
8+
show_root_heading: false
9+
show_root_toc_entry: false
10+
show_source: false

src/frequenz/channels/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@
3535
* [select][frequenz.channels.select]: Iterate over the messages of all
3636
[receivers][frequenz.channels.Receiver] as new messages become available.
3737
38+
* [LatestValueCache][frequenz.channels.LatestValueCache]: A cache that stores
39+
the latest value in a receiver, providing a way to look up the latest value in
40+
a stream, without having to wait, as long as there has been one value
41+
received.
42+
3843
Exception classes:
3944
4045
* [Error][frequenz.channels.Error]: Base class for all errors in this
@@ -85,6 +90,7 @@
8590
SenderMessageT_co,
8691
SenderMessageT_contra,
8792
)
93+
from ._latest_value_cache import LatestValueCache
8894
from ._merge import Merger, merge
8995
from ._receiver import Receiver, ReceiverError, ReceiverStoppedError
9096
from ._select import (
@@ -104,6 +110,7 @@
104110
"ChannelMessageT",
105111
"Error",
106112
"ErroredChannelT_co",
113+
"LatestValueCache",
107114
"MappedMessageT_co",
108115
"Merger",
109116
"Receiver",
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""The LatestValueCache caches the latest value in a receiver.
5+
6+
It provides a way to look up the latest value in a stream whenever required, as
7+
long as there has been one value received.
8+
9+
[LatestValueCache][frequenz.channels.LatestValueCache] takes a
10+
[Receiver][frequenz.channels.Receiver] as an argument and stores the latest
11+
value received by that receiver. As soon as a value is received, its
12+
[`has_value`][frequenz.channels.LatestValueCache.has_value] method returns
13+
`True`, and its [`get`][frequenz.channels.LatestValueCache.get] method returns
14+
the latest value received. The `get` method will raise an exception if called
15+
before any messages have been received from the receiver.
16+
17+
Example:
18+
```python
19+
from frequenz.channels import Broadcast, LatestValueCache
20+
21+
channel = Broadcast[int](name="lvc_test")
22+
23+
cache = LatestValueCache(channel.new_receiver())
24+
sender = channel.new_sender()
25+
26+
assert not cache.has_value()
27+
28+
await sender.send(5)
29+
30+
assert cache.has_value()
31+
assert cache.get() == 5
32+
```
33+
"""
34+
35+
import asyncio
36+
import typing
37+
38+
from ._receiver import Receiver
39+
40+
T_co = typing.TypeVar("T_co", covariant=True)
41+
42+
43+
class _Sentinel:
44+
"""A sentinel to denote that no value has been received yet."""
45+
46+
def __str__(self) -> str:
47+
"""Return a string representation of this sentinel."""
48+
return "<no value received yet>"
49+
50+
51+
class LatestValueCache(typing.Generic[T_co]):
52+
"""A cache that stores the latest value in a receiver.
53+
54+
It provides a way to look up the latest value in a stream without any delay,
55+
as long as there has been one value received.
56+
"""
57+
58+
def __init__(
59+
self, receiver: Receiver[T_co], *, unique_id: str | None = None
60+
) -> None:
61+
"""Create a new cache.
62+
63+
Args:
64+
receiver: The receiver to cache.
65+
unique_id: A string to help uniquely identify this instance. If not
66+
provided, a unique identifier will be generated from the object's
67+
[`id()`][id]. It is used mostly for debugging purposes.
68+
"""
69+
self._receiver = receiver
70+
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
71+
self._latest_value: T_co | _Sentinel = _Sentinel()
72+
self._task = asyncio.create_task(
73+
self._run(), name=f"LatestValueCache«{self._unique_id}»"
74+
)
75+
76+
@property
77+
def unique_id(self) -> str:
78+
"""The unique identifier of this instance."""
79+
return self._unique_id
80+
81+
def get(self) -> T_co:
82+
"""Return the latest value that has been received.
83+
84+
This raises a `ValueError` if no value has been received yet. Use `has_value` to
85+
check whether a value has been received yet, before trying to access the value,
86+
to avoid the exception.
87+
88+
Returns:
89+
The latest value that has been received.
90+
91+
Raises:
92+
ValueError: If no value has been received yet.
93+
"""
94+
if isinstance(self._latest_value, _Sentinel):
95+
raise ValueError("No value has been received yet.")
96+
return self._latest_value
97+
98+
def has_value(self) -> bool:
99+
"""Check whether a value has been received yet.
100+
101+
Returns:
102+
`True` if a value has been received, `False` otherwise.
103+
"""
104+
return not isinstance(self._latest_value, _Sentinel)
105+
106+
async def _run(self) -> None:
107+
async for value in self._receiver:
108+
self._latest_value = value
109+
110+
async def stop(self) -> None:
111+
"""Stop the cache."""
112+
if not self._task.done():
113+
self._task.cancel()
114+
try:
115+
await self._task
116+
except asyncio.CancelledError:
117+
pass
118+
119+
def __repr__(self) -> str:
120+
"""Return a string representation of this cache."""
121+
return (
122+
f"<LatestValueCache latest_value={self._latest_value!r}, "
123+
f"receiver={self._receiver!r}, unique_id={self._unique_id!r}>"
124+
)
125+
126+
def __str__(self) -> str:
127+
"""Return the last value seen by this cache."""
128+
return str(self._latest_value)
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tests for the LatestValueCache implementation."""
5+
6+
import asyncio
7+
8+
import pytest
9+
10+
from frequenz.channels import Broadcast, LatestValueCache
11+
12+
13+
@pytest.mark.integration
14+
async def test_latest_value_cache() -> None:
15+
"""Ensure LatestValueCache always gives out the latest value."""
16+
channel = Broadcast[int](name="lvc_test")
17+
18+
cache = LatestValueCache(channel.new_receiver())
19+
sender = channel.new_sender()
20+
21+
assert not cache.has_value()
22+
with pytest.raises(ValueError, match="No value has been received yet."):
23+
cache.get()
24+
25+
await sender.send(5)
26+
await sender.send(6)
27+
await asyncio.sleep(0)
28+
29+
assert cache.has_value()
30+
assert cache.get() == 6
31+
assert cache.get() == 6
32+
33+
await sender.send(12)
34+
await asyncio.sleep(0)
35+
36+
assert cache.get() == 12
37+
assert cache.get() == 12
38+
assert cache.get() == 12
39+
40+
await sender.send(15)
41+
await sender.send(18)
42+
await sender.send(19)
43+
await asyncio.sleep(0)
44+
45+
assert cache.get() == 19

0 commit comments

Comments
 (0)