Skip to content

Commit 49110d7

Browse files
committed
Import the LatestValueCache implementation from the Frequenz SDK
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 2efdedc commit 49110d7

File tree

2 files changed

+110
-0
lines changed

2 files changed

+110
-0
lines changed

src/frequenz/channels/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@
3535
* [select][frequenz.channels.select]: Iterate over the messages of all
3636
[receivers][frequenz.channels.Receiver] as new messages become available.
3737
38+
* [LatestValueCache][frequenz.channels.LatestValueCache]: A cache that stores
39+
the latest value in a receiver, providing a way to look up the latest value in
40+
a stream, without having to wait, as long as there has been one value
41+
received.
42+
3843
Exception classes:
3944
4045
* [Error][frequenz.channels.Error]: Base class for all errors in this
@@ -85,6 +90,7 @@
8590
SenderMessageT_co,
8691
SenderMessageT_contra,
8792
)
93+
from ._latest_value_cache import LatestValueCache
8894
from ._merge import Merger, merge
8995
from ._receiver import Receiver, ReceiverError, ReceiverStoppedError
9096
from ._select import (
@@ -104,6 +110,7 @@
104110
"ChannelMessageT",
105111
"Error",
106112
"ErroredChannelT_co",
113+
"LatestValueCache",
107114
"MappedMessageT_co",
108115
"Merger",
109116
"Receiver",
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A cache that stores the latest value in a receiver.
5+
6+
It provides a way to look up the latest value in a stream without any delay,
7+
as long as there has been one value received.
8+
"""
9+
10+
import asyncio
11+
import typing
12+
13+
from ._receiver import Receiver
14+
15+
T_co = typing.TypeVar("T_co", covariant=True)
16+
17+
18+
class _Sentinel:
19+
"""A sentinel to denote that no value has been received yet."""
20+
21+
def __str__(self) -> str:
22+
"""Return a string representation of this sentinel."""
23+
return "<no value received yet>"
24+
25+
26+
class LatestValueCache(typing.Generic[T_co]):
27+
"""A cache that stores the latest value in a receiver.
28+
29+
It provides a way to look up the latest value in a stream without any delay,
30+
as long as there has been one value received.
31+
"""
32+
33+
def __init__(
34+
self, receiver: Receiver[T_co], *, unique_id: str | None = None
35+
) -> None:
36+
"""Create a new cache.
37+
38+
Args:
39+
receiver: The receiver to cache.
40+
unique_id: A string to help uniquely identify this instance. If not
41+
provided, a unique identifier will be generated from the object's
42+
[`id()`][id]. It is used mostly for debugging purposes.
43+
"""
44+
self._receiver = receiver
45+
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
46+
self._latest_value: T_co | _Sentinel = _Sentinel()
47+
self._task = asyncio.create_task(
48+
self._run(), name=f"LatestValueCache«{self._unique_id}»"
49+
)
50+
51+
@property
52+
def unique_id(self) -> str:
53+
"""The unique identifier of this instance."""
54+
return self._unique_id
55+
56+
def get(self) -> T_co:
57+
"""Return the latest value that has been received.
58+
59+
This raises a `ValueError` if no value has been received yet. Use `has_value` to
60+
check whether a value has been received yet, before trying to access the value,
61+
to avoid the exception.
62+
63+
Returns:
64+
The latest value that has been received.
65+
66+
Raises:
67+
ValueError: If no value has been received yet.
68+
"""
69+
if isinstance(self._latest_value, _Sentinel):
70+
raise ValueError("No value has been received yet.")
71+
return self._latest_value
72+
73+
def has_value(self) -> bool:
74+
"""Check whether a value has been received yet.
75+
76+
Returns:
77+
`True` if a value has been received, `False` otherwise.
78+
"""
79+
return not isinstance(self._latest_value, _Sentinel)
80+
81+
async def _run(self) -> None:
82+
async for value in self._receiver:
83+
self._latest_value = value
84+
85+
async def stop(self) -> None:
86+
"""Stop the cache."""
87+
if not self._task.done():
88+
self._task.cancel()
89+
try:
90+
await self._task
91+
except asyncio.CancelledError:
92+
pass
93+
94+
def __repr__(self) -> str:
95+
"""Return a string representation of this cache."""
96+
return (
97+
f"<LatestValueCache latest_value={self._latest_value!r}, "
98+
f"receiver={self._receiver!r}, unique_id={self._unique_id!r}>"
99+
)
100+
101+
def __str__(self) -> str:
102+
"""Return the last value seen by this cache."""
103+
return str(self._latest_value)

0 commit comments

Comments
 (0)