Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions src/frequenz/sdk/_internal/_channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,29 @@ class _Sentinel:
class LatestValueCache(typing.Generic[T_co]):
"""A cache that stores the latest value in a receiver."""

def __init__(self, receiver: Receiver[T_co]) -> None:
def __init__(
self, receiver: Receiver[T_co], *, unique_id: str | None = None
) -> None:
"""Create a new cache.

Args:
receiver: The receiver to cache.
unique_id: A string to help uniquely identify this instance. If not
provided, a unique identifier will be generated from the object's
[`id()`][]. It is used mostly for debugging purposes.
"""
self._receiver = receiver
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
self._latest_value: T_co | _Sentinel = _Sentinel()
self._task = asyncio.create_task(self._run())
self._task = asyncio.create_task(
self._run(), name=f"LatestValueCache«{self._unique_id}»"
)
self._value_received_event = asyncio.Event()

@property
def unique_id(self) -> str:
"""The unique identifier of this instance."""
return self._unique_id

def get(self) -> T_co:
"""Return the latest value that has been received.
Expand All @@ -71,10 +85,34 @@ def has_value(self) -> bool:
"""
return not isinstance(self._latest_value, _Sentinel)

async def wait_for_value(self) -> None:
"""Wait for a value to be received."""
if self.has_value():
return
await self._value_received_event.wait()

async def _run(self) -> None:
async for value in self._receiver:
had_value = self.has_value()
self._latest_value = value
if not had_value:
self._value_received_event.set()

async def stop(self) -> None:
"""Stop the cache."""
await cancel_and_await(self._task)

def __del__(self) -> None:
"""Stop the cache when it is deleted."""
self._task.cancel()

def __repr__(self) -> str:
"""Return a string representation of the cache."""
return (
f"<LatestValueCache latest_value={self._latest_value!r}, "
f"receiver={self._receiver!r}, unique_id={self._unique_id!r}>"
)

def __str__(self) -> str:
"""Return a string representation of the cache."""
return str(self._latest_value)
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,20 @@ async def start(self) -> None:
"""Start the battery data manager."""
await self._create_channels()

@override
async def wait_for_data(self) -> None:
"""Wait until this manager receiver data for all components it manages.

Before this happens, the manager could misbehave, as it would not have all the
data it needs to make the appropriate decisions.
"""
await asyncio.gather(
*[
*[cache.wait_for_value() for cache in self._battery_caches.values()],
*[cache.wait_for_value() for cache in self._inverter_caches.values()],
]
)

@override
async def stop(self) -> None:
"""Stop the battery data manager."""
Expand Down Expand Up @@ -315,13 +329,19 @@ async def _distribute_power(
async def _create_channels(self) -> None:
"""Create channels to get data of components in microgrid."""
api = connection_manager.get().api_client
manager_id = f"{type(self).__name__}»{hex(id(self))}»"
for battery_id, inverter_ids in self._bat_invs_map.items():
bat_recv: Receiver[BatteryData] = await api.battery_data(battery_id)
self._battery_caches[battery_id] = LatestValueCache(bat_recv)
self._battery_caches[battery_id] = LatestValueCache(
bat_recv,
unique_id=f"{manager_id}:battery«{battery_id}»",
)

for inverter_id in inverter_ids:
inv_recv: Receiver[InverterData] = await api.inverter_data(inverter_id)
self._inverter_caches[inverter_id] = LatestValueCache(inv_recv)
self._inverter_caches[inverter_id] = LatestValueCache(
inv_recv, unique_id=f"{manager_id}:inverter«{inverter_id}»"
)

def _get_bounds(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ def component_ids(self) -> collections.abc.Set[int]:
async def start(self) -> None:
"""Start the component data manager."""

@abc.abstractmethod
async def wait_for_data(self) -> None:
"""Wait until this manager receiver data for all components it manages.

Before this happens, the manager could misbehave, as it would not have all the
data it needs to make the appropriate decisions.
"""

@abc.abstractmethod
async def distribute_power(self, request: Request) -> None:
"""Distribute the requested power to the components.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ def __init__(
self._ev_charger_ids = self._get_ev_charger_ids()
self._evc_states = EvcStates()
self._voltage_cache: LatestValueCache[Sample3Phase[Voltage]] = LatestValueCache(
microgrid.voltage().new_receiver()
microgrid.voltage().new_receiver(),
unique_id=f"{type(self).__name__}«{hex(id(self))}»:voltage_cache",
)
self._config = EVDistributionConfig(component_ids=self._ev_charger_ids)
self._component_pool_status_tracker = ComponentPoolStatusTracker(
Expand Down Expand Up @@ -83,6 +84,14 @@ async def start(self) -> None:
if self._ev_charger_ids:
self._task = asyncio.create_task(self._run_forever())

@override
async def wait_for_data(self) -> None:
"""Wait until this manager receiver data for all components it manages.

Before this happens, the manager could misbehave, as it would not have all the
data it needs to make the appropriate decisions.
"""

@override
async def distribute_power(self, request: Request) -> None:
"""Distribute the requested power to the ev chargers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,23 @@ async def start(self) -> None:
"""Start the PV inverter manager."""
self._component_data_caches = {
inv_id: LatestValueCache(
await connection_manager.get().api_client.inverter_data(inv_id)
await connection_manager.get().api_client.inverter_data(inv_id),
unique_id=f"{type(self).__name__}«{hex(id(self))}»:inverter«{inv_id}»",
)
for inv_id in self._pv_inverter_ids
}

@override
async def wait_for_data(self) -> None:
"""Wait until this manager receiver data for all components it manages.

Before this happens, the manager could misbehave, as it would not have all the
data it needs to make the appropriate decisions.
"""
await asyncio.gather(
*[cache.wait_for_value() for cache in self._component_data_caches.values()],
)

@override
async def stop(self) -> None:
"""Stop the PV inverter manager."""
Expand Down
17 changes: 15 additions & 2 deletions src/frequenz/sdk/actor/power_distributing/power_distributing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@


import asyncio
import logging

from frequenz.channels import Receiver, Sender
from frequenz.client.microgrid import ComponentCategory, ComponentType, InverterType
from typing_extensions import override

from ...actor._actor import Actor
from ._component_managers import (
Expand All @@ -28,6 +30,8 @@
from .request import Request
from .result import Result

_logger = logging.getLogger(__name__)


class PowerDistributingActor(Actor):
# pylint: disable=too-many-instance-attributes
Expand Down Expand Up @@ -119,6 +123,7 @@ def __init__( # pylint: disable=too-many-arguments
f"PowerDistributor doesn't support controlling: {component_category}"
)

@override
async def _run(self) -> None: # pylint: disable=too-many-locals
"""Run actor main function.

Expand All @@ -130,12 +135,20 @@ async def _run(self) -> None: # pylint: disable=too-many-locals
"""
await self._component_manager.start()

# Wait few seconds to get data from the channels created above.
await asyncio.sleep(self._wait_for_data_sec)
try:
async with asyncio.timeout(self._wait_for_data_sec):
await self._component_manager.wait_for_data()
except asyncio.TimeoutError:
_logger.warning(
"%s timeout while waiting for data after %s seconds",
self,
self._wait_for_data_sec,
)

async for request in self._requests_receiver:
await self._component_manager.distribute_power(request)

@override
async def stop(self, msg: str | None = None) -> None:
"""Stop this actor.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ async def _run(self) -> None:
_logger.error(err)
raise RuntimeError(err)

meter_id = next(iter(meters)).component_id
self._meter_data_cache = LatestValueCache(
await api_client.meter_data(next(iter(meters)).component_id)
await api_client.meter_data(meter_id),
unique_id=f"{type(self).__name__}«{hex(id(self))}»:meter«{meter_id}»",
)
latest_bound: dict[int, ComponentCurrentLimit] = {}

Expand Down