Skip to content

Commit ae49232

Browse files
committed
Add a LatestValueCache that can be used to replace Peekable
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent a710e05 commit ae49232

File tree

2 files changed

+25
-1
lines changed

2 files changed

+25
-1
lines changed

src/frequenz/sdk/_internal/_channels.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"""General purpose classes for use with channels."""
55

66
import abc
7+
import asyncio
78
import typing
89

910
from frequenz.channels import Receiver
@@ -24,3 +25,26 @@ def new_receiver(self, maxsize: int = 50) -> Receiver[T]:
2425
Returns:
2526
A receiver instance.
2627
"""
28+
29+
30+
class LatestValueCache(typing.Generic[T]):
31+
"""A cache that stores the latest value in a receiver."""
32+
33+
def __init__(self, receiver: Receiver[T]) -> None:
34+
"""Create a new cache.
35+
36+
Args:
37+
receiver: The receiver to cache.
38+
"""
39+
self._receiver = receiver
40+
self._latest_value: T | None = None
41+
self._task = asyncio.create_task(self._run())
42+
43+
@property
44+
def latest_value(self) -> T | None:
45+
"""Get the latest value in the cache."""
46+
return self._latest_value
47+
48+
async def _run(self) -> None:
49+
async for value in self._receiver:
50+
self._latest_value = value

src/frequenz/sdk/microgrid/client/_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ def _get_component_data_channel(
425425
if component_id in self._component_streams:
426426
return self._component_streams[component_id]
427427
task_name = f"raw-component-data-{component_id}"
428-
chan = Broadcast[_GenericComponentData](task_name)
428+
chan = Broadcast[_GenericComponentData](task_name, resend_latest=True)
429429
self._component_streams[component_id] = chan
430430

431431
self._streaming_tasks[component_id] = asyncio.create_task(

0 commit comments

Comments
 (0)