From 8b2e546b1ba63a37a2f5668ca6c1a97b8435f2b2 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Mon, 10 Jun 2024 11:29:31 +0200 Subject: [PATCH 1/3] Add a `unique_id` to `LatestValueCache` When debugging and having some task in the stack coming from a `LastValueCache` it is very difficult to identify which one is it. This commit add a new `unique_id` that's used to create the task name. Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/_internal/_channels.py | 17 +++++++++++++++-- .../_component_managers/_battery_manager.py | 10 ++++++++-- .../_ev_charger_manager/_ev_charger_manager.py | 3 ++- .../_pv_inverter_manager.py | 3 ++- .../ev_charger_pool/_set_current_bounds.py | 4 +++- 5 files changed, 30 insertions(+), 7 deletions(-) diff --git a/src/frequenz/sdk/_internal/_channels.py b/src/frequenz/sdk/_internal/_channels.py index 898bfce34..e17a58910 100644 --- a/src/frequenz/sdk/_internal/_channels.py +++ b/src/frequenz/sdk/_internal/_channels.py @@ -36,15 +36,28 @@ class _Sentinel: class LatestValueCache(typing.Generic[T_co]): """A cache that stores the latest value in a receiver.""" - def __init__(self, receiver: Receiver[T_co]) -> None: + def __init__( + self, receiver: Receiver[T_co], *, unique_id: str | None = None + ) -> None: """Create a new cache. Args: receiver: The receiver to cache. + unique_id: A string to help uniquely identify this instance. If not + provided, a unique identifier will be generated from the object's + [`id()`][]. It is used mostly for debugging purposes. """ self._receiver = receiver + self._unique_id: str = hex(id(self)) if unique_id is None else unique_id self._latest_value: T_co | _Sentinel = _Sentinel() - self._task = asyncio.create_task(self._run()) + self._task = asyncio.create_task( + self._run(), name=f"LatestValueCache«{self._unique_id}»" + ) + + @property + def unique_id(self) -> str: + """The unique identifier of this instance.""" + return self._unique_id def get(self) -> T_co: """Return the latest value that has been received. diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py index 8c0b14986..eed32e27c 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py @@ -315,13 +315,19 @@ async def _distribute_power( async def _create_channels(self) -> None: """Create channels to get data of components in microgrid.""" api = connection_manager.get().api_client + manager_id = f"{type(self).__name__}»{hex(id(self))}»" for battery_id, inverter_ids in self._bat_invs_map.items(): bat_recv: Receiver[BatteryData] = await api.battery_data(battery_id) - self._battery_caches[battery_id] = LatestValueCache(bat_recv) + self._battery_caches[battery_id] = LatestValueCache( + bat_recv, + unique_id=f"{manager_id}:battery«{battery_id}»", + ) for inverter_id in inverter_ids: inv_recv: Receiver[InverterData] = await api.inverter_data(inverter_id) - self._inverter_caches[inverter_id] = LatestValueCache(inv_recv) + self._inverter_caches[inverter_id] = LatestValueCache( + inv_recv, unique_id=f"{manager_id}:inverter«{inverter_id}»" + ) def _get_bounds( self, diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py index 7698cb51c..6b9257502 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py @@ -55,7 +55,8 @@ def __init__( self._ev_charger_ids = self._get_ev_charger_ids() self._evc_states = EvcStates() self._voltage_cache: LatestValueCache[Sample3Phase[Voltage]] = LatestValueCache( - microgrid.voltage().new_receiver() + microgrid.voltage().new_receiver(), + unique_id=f"{type(self).__name__}«{hex(id(self))}»:voltage_cache", ) self._config = EVDistributionConfig(component_ids=self._ev_charger_ids) self._component_pool_status_tracker = ComponentPoolStatusTracker( diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_pv_inverter_manager/_pv_inverter_manager.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_pv_inverter_manager/_pv_inverter_manager.py index b1642b43b..570c79e37 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_managers/_pv_inverter_manager/_pv_inverter_manager.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_pv_inverter_manager/_pv_inverter_manager.py @@ -76,7 +76,8 @@ async def start(self) -> None: """Start the PV inverter manager.""" self._component_data_caches = { inv_id: LatestValueCache( - await connection_manager.get().api_client.inverter_data(inv_id) + await connection_manager.get().api_client.inverter_data(inv_id), + unique_id=f"{type(self).__name__}«{hex(id(self))}»:inverter«{inv_id}»", ) for inv_id in self._pv_inverter_ids } diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py index faefff5ee..81bd54630 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py @@ -99,8 +99,10 @@ async def _run(self) -> None: _logger.error(err) raise RuntimeError(err) + meter_id = next(iter(meters)).component_id self._meter_data_cache = LatestValueCache( - await api_client.meter_data(next(iter(meters)).component_id) + await api_client.meter_data(meter_id), + unique_id=f"{type(self).__name__}«{hex(id(self))}»:meter«{meter_id}»", ) latest_bound: dict[int, ComponentCurrentLimit] = {} From 8fbaa3a22f87f9f60ac8cf38a10bcf140109b936 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Mon, 10 Jun 2024 11:30:54 +0200 Subject: [PATCH 2/3] Add `__str__` and `__repr__` to `LatestValueCache` The `str` just returns the value, and `repr` gets all the info, including the new `unique_id`. We also implement `__str__` for `_Sentinel` to return a more meaningful string, since it will be shown in the `LatestValueCache` string representation when no value has been received yet. Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/_internal/_channels.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/frequenz/sdk/_internal/_channels.py b/src/frequenz/sdk/_internal/_channels.py index e17a58910..f9dd3c34c 100644 --- a/src/frequenz/sdk/_internal/_channels.py +++ b/src/frequenz/sdk/_internal/_channels.py @@ -32,6 +32,10 @@ def new_receiver(self, *, limit: int = 50) -> Receiver[T_co]: class _Sentinel: """A sentinel to denote that no value has been received yet.""" + def __str__(self) -> str: + """Return a string representation of this sentinel.""" + return "" + class LatestValueCache(typing.Generic[T_co]): """A cache that stores the latest value in a receiver.""" @@ -91,3 +95,14 @@ async def _run(self) -> None: async def stop(self) -> None: """Stop the cache.""" await cancel_and_await(self._task) + + def __repr__(self) -> str: + """Return a string representation of this cache.""" + return ( + f"" + ) + + def __str__(self) -> str: + """Return the last value seen by this cache.""" + return str(self._latest_value) From e9b2e05f339e0cbe2eb3cea2f2d7e2694ed61c14 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Mon, 10 Jun 2024 12:13:58 +0200 Subject: [PATCH 3/3] Add `@override` to `PowerDistributingActor` Signed-off-by: Leandro Lucarella --- .../sdk/actor/power_distributing/power_distributing.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/frequenz/sdk/actor/power_distributing/power_distributing.py b/src/frequenz/sdk/actor/power_distributing/power_distributing.py index fb0378135..4c5d38152 100644 --- a/src/frequenz/sdk/actor/power_distributing/power_distributing.py +++ b/src/frequenz/sdk/actor/power_distributing/power_distributing.py @@ -14,6 +14,7 @@ from frequenz.channels import Receiver, Sender from frequenz.client.microgrid import ComponentCategory, ComponentType, InverterType +from typing_extensions import override from ...actor._actor import Actor from ._component_managers import ( @@ -113,6 +114,7 @@ def __init__( # pylint: disable=too-many-arguments f"PowerDistributor doesn't support controlling: {component_category}" ) + @override async def _run(self) -> None: # pylint: disable=too-many-locals """Run actor main function. @@ -127,6 +129,7 @@ async def _run(self) -> None: # pylint: disable=too-many-locals async for request in self._requests_receiver: await self._component_manager.distribute_power(request) + @override async def stop(self, msg: str | None = None) -> None: """Stop this actor.