diff --git a/src/frequenz/sdk/_internal/_channels.py b/src/frequenz/sdk/_internal/_channels.py index 898bfce34..1300fbf2e 100644 --- a/src/frequenz/sdk/_internal/_channels.py +++ b/src/frequenz/sdk/_internal/_channels.py @@ -36,15 +36,29 @@ class _Sentinel: class LatestValueCache(typing.Generic[T_co]): """A cache that stores the latest value in a receiver.""" - def __init__(self, receiver: Receiver[T_co]) -> None: + def __init__( + self, receiver: Receiver[T_co], *, unique_id: str | None = None + ) -> None: """Create a new cache. Args: receiver: The receiver to cache. + unique_id: A string to help uniquely identify this instance. If not + provided, a unique identifier will be generated from the object's + [`id()`][]. It is used mostly for debugging purposes. """ self._receiver = receiver + self._unique_id: str = hex(id(self)) if unique_id is None else unique_id self._latest_value: T_co | _Sentinel = _Sentinel() - self._task = asyncio.create_task(self._run()) + self._task = asyncio.create_task( + self._run(), name=f"LatestValueCache«{self._unique_id}»" + ) + self._value_received_event = asyncio.Event() + + @property + def unique_id(self) -> str: + """The unique identifier of this instance.""" + return self._unique_id def get(self) -> T_co: """Return the latest value that has been received. @@ -71,10 +85,34 @@ def has_value(self) -> bool: """ return not isinstance(self._latest_value, _Sentinel) + async def wait_for_value(self) -> None: + """Wait for a value to be received.""" + if self.has_value(): + return + await self._value_received_event.wait() + async def _run(self) -> None: async for value in self._receiver: + had_value = self.has_value() self._latest_value = value + if not had_value: + self._value_received_event.set() async def stop(self) -> None: """Stop the cache.""" await cancel_and_await(self._task) + + def __del__(self) -> None: + """Stop the cache when it is deleted.""" + self._task.cancel() + + def __repr__(self) -> str: + """Return a string representation of the cache.""" + return ( + f"" + ) + + def __str__(self) -> str: + """Return a string representation of the cache.""" + return str(self._latest_value) diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py index f0c32a7ca..bc7a95874 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py @@ -190,6 +190,20 @@ async def start(self) -> None: """Start the battery data manager.""" await self._create_channels() + @override + async def wait_for_data(self) -> None: + """Wait until this manager receiver data for all components it manages. + + Before this happens, the manager could misbehave, as it would not have all the + data it needs to make the appropriate decisions. + """ + await asyncio.gather( + *[ + *[cache.wait_for_value() for cache in self._battery_caches.values()], + *[cache.wait_for_value() for cache in self._inverter_caches.values()], + ] + ) + @override async def stop(self) -> None: """Stop the battery data manager.""" @@ -315,13 +329,19 @@ async def _distribute_power( async def _create_channels(self) -> None: """Create channels to get data of components in microgrid.""" api = connection_manager.get().api_client + manager_id = f"{type(self).__name__}»{hex(id(self))}»" for battery_id, inverter_ids in self._bat_invs_map.items(): bat_recv: Receiver[BatteryData] = await api.battery_data(battery_id) - self._battery_caches[battery_id] = LatestValueCache(bat_recv) + self._battery_caches[battery_id] = LatestValueCache( + bat_recv, + unique_id=f"{manager_id}:battery«{battery_id}»", + ) for inverter_id in inverter_ids: inv_recv: Receiver[InverterData] = await api.inverter_data(inverter_id) - self._inverter_caches[inverter_id] = LatestValueCache(inv_recv) + self._inverter_caches[inverter_id] = LatestValueCache( + inv_recv, unique_id=f"{manager_id}:inverter«{inverter_id}»" + ) def _get_bounds( self, diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_component_manager.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_component_manager.py index f9eba111e..1b49776fc 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_managers/_component_manager.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_component_manager.py @@ -38,6 +38,14 @@ def component_ids(self) -> collections.abc.Set[int]: async def start(self) -> None: """Start the component data manager.""" + @abc.abstractmethod + async def wait_for_data(self) -> None: + """Wait until this manager receiver data for all components it manages. + + Before this happens, the manager could misbehave, as it would not have all the + data it needs to make the appropriate decisions. + """ + @abc.abstractmethod async def distribute_power(self, request: Request) -> None: """Distribute the requested power to the components. diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py index 7698cb51c..28493f1fa 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py @@ -55,7 +55,8 @@ def __init__( self._ev_charger_ids = self._get_ev_charger_ids() self._evc_states = EvcStates() self._voltage_cache: LatestValueCache[Sample3Phase[Voltage]] = LatestValueCache( - microgrid.voltage().new_receiver() + microgrid.voltage().new_receiver(), + unique_id=f"{type(self).__name__}«{hex(id(self))}»:voltage_cache", ) self._config = EVDistributionConfig(component_ids=self._ev_charger_ids) self._component_pool_status_tracker = ComponentPoolStatusTracker( @@ -83,6 +84,14 @@ async def start(self) -> None: if self._ev_charger_ids: self._task = asyncio.create_task(self._run_forever()) + @override + async def wait_for_data(self) -> None: + """Wait until this manager receiver data for all components it manages. + + Before this happens, the manager could misbehave, as it would not have all the + data it needs to make the appropriate decisions. + """ + @override async def distribute_power(self, request: Request) -> None: """Distribute the requested power to the ev chargers. diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_pv_inverter_manager/_pv_inverter_manager.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_pv_inverter_manager/_pv_inverter_manager.py index b1642b43b..eea9f2f4f 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_managers/_pv_inverter_manager/_pv_inverter_manager.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_pv_inverter_manager/_pv_inverter_manager.py @@ -76,11 +76,23 @@ async def start(self) -> None: """Start the PV inverter manager.""" self._component_data_caches = { inv_id: LatestValueCache( - await connection_manager.get().api_client.inverter_data(inv_id) + await connection_manager.get().api_client.inverter_data(inv_id), + unique_id=f"{type(self).__name__}«{hex(id(self))}»:inverter«{inv_id}»", ) for inv_id in self._pv_inverter_ids } + @override + async def wait_for_data(self) -> None: + """Wait until this manager receiver data for all components it manages. + + Before this happens, the manager could misbehave, as it would not have all the + data it needs to make the appropriate decisions. + """ + await asyncio.gather( + *[cache.wait_for_value() for cache in self._component_data_caches.values()], + ) + @override async def stop(self) -> None: """Stop the PV inverter manager.""" diff --git a/src/frequenz/sdk/actor/power_distributing/power_distributing.py b/src/frequenz/sdk/actor/power_distributing/power_distributing.py index c6473343b..63d671113 100644 --- a/src/frequenz/sdk/actor/power_distributing/power_distributing.py +++ b/src/frequenz/sdk/actor/power_distributing/power_distributing.py @@ -13,9 +13,11 @@ import asyncio +import logging from frequenz.channels import Receiver, Sender from frequenz.client.microgrid import ComponentCategory, ComponentType, InverterType +from typing_extensions import override from ...actor._actor import Actor from ._component_managers import ( @@ -28,6 +30,8 @@ from .request import Request from .result import Result +_logger = logging.getLogger(__name__) + class PowerDistributingActor(Actor): # pylint: disable=too-many-instance-attributes @@ -119,6 +123,7 @@ def __init__( # pylint: disable=too-many-arguments f"PowerDistributor doesn't support controlling: {component_category}" ) + @override async def _run(self) -> None: # pylint: disable=too-many-locals """Run actor main function. @@ -130,12 +135,20 @@ async def _run(self) -> None: # pylint: disable=too-many-locals """ await self._component_manager.start() - # Wait few seconds to get data from the channels created above. - await asyncio.sleep(self._wait_for_data_sec) + try: + async with asyncio.timeout(self._wait_for_data_sec): + await self._component_manager.wait_for_data() + except asyncio.TimeoutError: + _logger.warning( + "%s timeout while waiting for data after %s seconds", + self, + self._wait_for_data_sec, + ) async for request in self._requests_receiver: await self._component_manager.distribute_power(request) + @override async def stop(self, msg: str | None = None) -> None: """Stop this actor. diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py index faefff5ee..81bd54630 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py @@ -99,8 +99,10 @@ async def _run(self) -> None: _logger.error(err) raise RuntimeError(err) + meter_id = next(iter(meters)).component_id self._meter_data_cache = LatestValueCache( - await api_client.meter_data(next(iter(meters)).component_id) + await api_client.meter_data(meter_id), + unique_id=f"{type(self).__name__}«{hex(id(self))}»:meter«{meter_id}»", ) latest_bound: dict[int, ComponentCurrentLimit] = {}