Skip to content

Commit a8bb343

Browse files
authored
Make some minor improvements for LastValueCache (#974)
- **Add a `unique_id` to `LatestValueCache`** - **Add `__str__` and `__repr__` to `LatestValueCache`** - **Cancel `LatestValueCache`'s task if deleted** - **Add `@override` to `PowerDistributingActor`**
2 parents 6a54a0b + e9b2e05 commit a8bb343

File tree

6 files changed

+48
-7
lines changed

6 files changed

+48
-7
lines changed

src/frequenz/sdk/_internal/_channels.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,19 +32,36 @@ def new_receiver(self, *, limit: int = 50) -> Receiver[T_co]:
3232
class _Sentinel:
3333
"""A sentinel to denote that no value has been received yet."""
3434

35+
def __str__(self) -> str:
36+
"""Return a string representation of this sentinel."""
37+
return "<no value received yet>"
38+
3539

3640
class LatestValueCache(typing.Generic[T_co]):
3741
"""A cache that stores the latest value in a receiver."""
3842

39-
def __init__(self, receiver: Receiver[T_co]) -> None:
43+
def __init__(
44+
self, receiver: Receiver[T_co], *, unique_id: str | None = None
45+
) -> None:
4046
"""Create a new cache.
4147
4248
Args:
4349
receiver: The receiver to cache.
50+
unique_id: A string to help uniquely identify this instance. If not
51+
provided, a unique identifier will be generated from the object's
52+
[`id()`][]. It is used mostly for debugging purposes.
4453
"""
4554
self._receiver = receiver
55+
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
4656
self._latest_value: T_co | _Sentinel = _Sentinel()
47-
self._task = asyncio.create_task(self._run())
57+
self._task = asyncio.create_task(
58+
self._run(), name=f"LatestValueCache«{self._unique_id}»"
59+
)
60+
61+
@property
62+
def unique_id(self) -> str:
63+
"""The unique identifier of this instance."""
64+
return self._unique_id
4865

4966
def get(self) -> T_co:
5067
"""Return the latest value that has been received.
@@ -78,3 +95,14 @@ async def _run(self) -> None:
7895
async def stop(self) -> None:
7996
"""Stop the cache."""
8097
await cancel_and_await(self._task)
98+
99+
def __repr__(self) -> str:
100+
"""Return a string representation of this cache."""
101+
return (
102+
f"<LatestValueCache latest_value={self._latest_value!r}, "
103+
f"receiver={self._receiver!r}, unique_id={self._unique_id!r}>"
104+
)
105+
106+
def __str__(self) -> str:
107+
"""Return the last value seen by this cache."""
108+
return str(self._latest_value)

src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,13 +315,19 @@ async def _distribute_power(
315315
async def _create_channels(self) -> None:
316316
"""Create channels to get data of components in microgrid."""
317317
api = connection_manager.get().api_client
318+
manager_id = f"{type(self).__name__}»{hex(id(self))}»"
318319
for battery_id, inverter_ids in self._bat_invs_map.items():
319320
bat_recv: Receiver[BatteryData] = await api.battery_data(battery_id)
320-
self._battery_caches[battery_id] = LatestValueCache(bat_recv)
321+
self._battery_caches[battery_id] = LatestValueCache(
322+
bat_recv,
323+
unique_id=f"{manager_id}:battery«{battery_id}»",
324+
)
321325

322326
for inverter_id in inverter_ids:
323327
inv_recv: Receiver[InverterData] = await api.inverter_data(inverter_id)
324-
self._inverter_caches[inverter_id] = LatestValueCache(inv_recv)
328+
self._inverter_caches[inverter_id] = LatestValueCache(
329+
inv_recv, unique_id=f"{manager_id}:inverter«{inverter_id}»"
330+
)
325331

326332
def _get_bounds(
327333
self,

src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ def __init__(
5555
self._ev_charger_ids = self._get_ev_charger_ids()
5656
self._evc_states = EvcStates()
5757
self._voltage_cache: LatestValueCache[Sample3Phase[Voltage]] = LatestValueCache(
58-
microgrid.voltage().new_receiver()
58+
microgrid.voltage().new_receiver(),
59+
unique_id=f"{type(self).__name__}«{hex(id(self))}»:voltage_cache",
5960
)
6061
self._config = EVDistributionConfig(component_ids=self._ev_charger_ids)
6162
self._component_pool_status_tracker = ComponentPoolStatusTracker(

src/frequenz/sdk/actor/power_distributing/_component_managers/_pv_inverter_manager/_pv_inverter_manager.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ async def start(self) -> None:
7676
"""Start the PV inverter manager."""
7777
self._component_data_caches = {
7878
inv_id: LatestValueCache(
79-
await connection_manager.get().api_client.inverter_data(inv_id)
79+
await connection_manager.get().api_client.inverter_data(inv_id),
80+
unique_id=f"{type(self).__name__}«{hex(id(self))}»:inverter«{inv_id}»",
8081
)
8182
for inv_id in self._pv_inverter_ids
8283
}

src/frequenz/sdk/actor/power_distributing/power_distributing.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from frequenz.channels import Receiver, Sender
1616
from frequenz.client.microgrid import ComponentCategory, ComponentType, InverterType
17+
from typing_extensions import override
1718

1819
from ...actor._actor import Actor
1920
from ._component_managers import (
@@ -113,6 +114,7 @@ def __init__( # pylint: disable=too-many-arguments
113114
f"PowerDistributor doesn't support controlling: {component_category}"
114115
)
115116

117+
@override
116118
async def _run(self) -> None: # pylint: disable=too-many-locals
117119
"""Run actor main function.
118120
@@ -127,6 +129,7 @@ async def _run(self) -> None: # pylint: disable=too-many-locals
127129
async for request in self._requests_receiver:
128130
await self._component_manager.distribute_power(request)
129131

132+
@override
130133
async def stop(self, msg: str | None = None) -> None:
131134
"""Stop this actor.
132135

src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,10 @@ async def _run(self) -> None:
9999
_logger.error(err)
100100
raise RuntimeError(err)
101101

102+
meter_id = next(iter(meters)).component_id
102103
self._meter_data_cache = LatestValueCache(
103-
await api_client.meter_data(next(iter(meters)).component_id)
104+
await api_client.meter_data(meter_id),
105+
unique_id=f"{type(self).__name__}«{hex(id(self))}»:meter«{meter_id}»",
104106
)
105107
latest_bound: dict[int, ComponentCurrentLimit] = {}
106108

0 commit comments

Comments
 (0)