diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 6970d0947..69f45d4de 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -18,10 +18,14 @@ - New `propose_power` and `power_status` methods have been added to the `EVChargerPool` similar to the `BatteryPool`. These method interface with the `PowerManager` and `PowerDistributor`, which currently uses a first-come-first-serve algorithm to distribute power to EVs. +- PV Power is now available from `microgrid.pv_pool().power`, and no longer from `microgrid.logical_meter().pv_power`. + ## New Features - Warning messages are logged when multiple instances of `*Pool`s are created for the same set of batteries, with the same priority values. +- A PV Pool, with `propose_power`, `power_status` and `power` methods similar to Battery and EV Pools. + ## Bug Fixes - A bug was fixed where the grid fuse was not created properly and would end up with a `max_current` with type `float` instead of `Current`. diff --git a/benchmarks/power_distribution/power_distributor.py b/benchmarks/power_distribution/power_distributor.py index 6b188cb44..877df4026 100644 --- a/benchmarks/power_distribution/power_distributor.py +++ b/benchmarks/power_distribution/power_distributor.py @@ -116,6 +116,7 @@ async def run_test( # pylint: disable=too-many-locals power_result_channel = Broadcast[Result](name="power-result") async with PowerDistributingActor( component_category=ComponentCategory.BATTERY, + component_type=None, requests_receiver=power_request_channel.new_receiver(), results_sender=power_result_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), diff --git a/src/frequenz/sdk/_internal/_channels.py b/src/frequenz/sdk/_internal/_channels.py index 71deb5d67..898bfce34 100644 --- a/src/frequenz/sdk/_internal/_channels.py +++ b/src/frequenz/sdk/_internal/_channels.py @@ -9,6 +9,8 @@ from frequenz.channels import Receiver +from ._asyncio import cancel_and_await + T_co = typing.TypeVar("T_co", covariant=True) @@ -72,3 +74,7 @@ def has_value(self) -> bool: async def _run(self) -> None: async for value in self._receiver: self._latest_value = value + + async def stop(self) -> None: + """Stop the cache.""" + await cancel_and_await(self._task) diff --git a/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py b/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py index a5e9570e6..a6a4ce153 100644 --- a/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py +++ b/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py @@ -12,7 +12,7 @@ from frequenz.channels import Receiver, Sender, select, selected_from from frequenz.channels.timer import SkipMissedAndDrift, Timer -from frequenz.client.microgrid import ComponentCategory +from frequenz.client.microgrid import ComponentCategory, ComponentType, InverterType from typing_extensions import override from ...timeseries._base_types import SystemBounds @@ -32,12 +32,14 @@ class PowerManagingActor(Actor): def __init__( # pylint: disable=too-many-arguments self, - component_category: ComponentCategory, proposals_receiver: Receiver[Proposal], bounds_subscription_receiver: Receiver[ReportRequest], power_distributing_requests_sender: Sender[power_distributing.Request], power_distributing_results_receiver: Receiver[power_distributing.Result], channel_registry: ChannelRegistry, + *, + component_category: ComponentCategory, + component_type: ComponentType | None = None, # arguments to actors need to serializable, so we pass an enum for the algorithm # instead of an instance of the algorithm. algorithm: Algorithm = Algorithm.MATRYOSHKA, @@ -45,8 +47,6 @@ def __init__( # pylint: disable=too-many-arguments """Create a new instance of the power manager. Args: - component_category: The category of the component this power manager - instance is going to support. proposals_receiver: The receiver for proposals. bounds_subscription_receiver: The receiver for bounds subscriptions. power_distributing_requests_sender: The sender for power distribution @@ -54,6 +54,15 @@ def __init__( # pylint: disable=too-many-arguments power_distributing_results_receiver: The receiver for power distribution results. channel_registry: The channel registry. + component_category: The category of the component this power manager + instance is going to support. + component_type: The type of the component of the given category that this + actor is responsible for. This is used only when the component category + is not enough to uniquely identify the component. For example, when the + category is `ComponentCategory.INVERTER`, the type is needed to identify + the inverter as a solar inverter or a battery inverter. This can be + `None` when the component category is enough to uniquely identify the + component. algorithm: The power management algorithm to use. Raises: @@ -65,6 +74,7 @@ def __init__( # pylint: disable=too-many-arguments ) self._component_category = component_category + self._component_type = component_type self._bounds_subscription_receiver = bounds_subscription_receiver self._power_distributing_requests_sender = power_distributing_requests_sender self._power_distributing_results_receiver = power_distributing_results_receiver @@ -141,6 +151,12 @@ def _add_bounds_tracker(self, component_ids: frozenset[int]) -> None: elif self._component_category is ComponentCategory.EV_CHARGER: ev_charger_pool = microgrid.ev_charger_pool(component_ids) bounds_receiver = ev_charger_pool._system_power_bounds.new_receiver() + elif ( + self._component_category is ComponentCategory.INVERTER + and self._component_type is InverterType.SOLAR + ): + pv_pool = microgrid.pv_pool(component_ids) + bounds_receiver = pv_pool._system_power_bounds.new_receiver() # pylint: enable=protected-access else: err = ( diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/__init__.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/__init__.py index 0ec7d84be..606dfea99 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_managers/__init__.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/__init__.py @@ -6,9 +6,11 @@ from ._battery_manager import BatteryManager from ._component_manager import ComponentManager from ._ev_charger_manager import EVChargerManager +from ._pv_inverter_manager import PVManager __all__ = [ "BatteryManager", "ComponentManager", "EVChargerManager", + "PVManager", ] diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py index 8b5f1cba0..1359fa8ec 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py @@ -188,6 +188,10 @@ async def start(self) -> None: @override async def stop(self) -> None: """Stop the battery data manager.""" + for bat_cache in self._battery_caches.values(): + await bat_cache.stop() + for inv_cache in self._inverter_caches.values(): + await inv_cache.stop() await self._component_pool_status_tracker.stop() @override @@ -668,6 +672,13 @@ def _parse_result( battery_ids, request_timeout.total_seconds(), ) + except Exception: # pylint: disable=broad-except + failed_power += distribution[inverter_id] + failed_batteries = failed_batteries.union(battery_ids) + _logger.exception( + "Unknown error while setting power to batteries: %s", + battery_ids, + ) return failed_power, failed_batteries diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py index 9fa1c75a0..6c077c62e 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py @@ -92,6 +92,7 @@ async def distribute_power(self, request: Request) -> None: @override async def stop(self) -> None: """Stop the ev charger manager.""" + await self._voltage_cache.stop() await self._component_pool_status_tracker.stop() def _get_ev_charger_ids(self) -> collections.abc.Set[int]: @@ -330,16 +331,21 @@ async def _set_api_power( succeeded_components.add(component_id) match task.exception(): - case asyncio.CancelledError: + case asyncio.CancelledError(): _logger.warning( "Timeout while setting power to EV charger %s", component_id ) - case grpc.aio.AioRpcError as err: + case grpc.aio.AioRpcError() as err: _logger.warning( "Error while setting power to EV charger %s: %s", component_id, err, ) + case Exception(): + _logger.exception( + "Unknown error while setting power to EV charger: %s", + component_id, + ) if failed_components: return PartialFailure( failed_components=failed_components, diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_pv_inverter_manager/__init__.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_pv_inverter_manager/__init__.py new file mode 100644 index 000000000..901b3fddb --- /dev/null +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_pv_inverter_manager/__init__.py @@ -0,0 +1,8 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Manage PV inverters for the power distributor.""" + +from ._pv_inverter_manager import PVManager + +__all__ = ["PVManager"] diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_pv_inverter_manager/_pv_inverter_manager.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_pv_inverter_manager/_pv_inverter_manager.py new file mode 100644 index 000000000..2a2f77852 --- /dev/null +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_pv_inverter_manager/_pv_inverter_manager.py @@ -0,0 +1,238 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Manage PV inverters for the power distributor.""" + +import asyncio +import collections.abc +import logging +from datetime import timedelta + +import grpc +from frequenz.channels import Broadcast, Sender +from frequenz.client.microgrid import ComponentCategory, InverterData, InverterType +from typing_extensions import override + +from ....._internal._channels import LatestValueCache +from ....._internal._math import is_close_to_zero +from .....microgrid import connection_manager +from .....timeseries import Power +from ..._component_pool_status_tracker import ComponentPoolStatusTracker +from ..._component_status import ComponentPoolStatus, PVInverterStatusTracker +from ...request import Request +from ...result import PartialFailure, Result, Success +from .._component_manager import ComponentManager + +_logger = logging.getLogger(__name__) + + +class PVManager(ComponentManager): + """Manage PV inverters for the power distributor.""" + + @override + def __init__( + self, + component_pool_status_sender: Sender[ComponentPoolStatus], + results_sender: Sender[Result], + ) -> None: + """Initialize this instance. + + Args: + component_pool_status_sender: Channel for sending information about which + components are expected to be working. + results_sender: Channel for sending results of power distribution. + """ + self._results_sender = results_sender + self._pv_inverter_ids = self._get_pv_inverter_ids() + + self._component_pool_status_tracker = ( + ComponentPoolStatusTracker( + component_ids=self._pv_inverter_ids, + component_status_sender=component_pool_status_sender, + max_data_age=timedelta(seconds=10.0), + max_blocking_duration=timedelta(seconds=30.0), + component_status_tracker_type=PVInverterStatusTracker, + ) + if self._pv_inverter_ids + else None + ) + self._component_data_caches: dict[int, LatestValueCache[InverterData]] = {} + self._target_power = Power.zero() + self._target_power_channel = Broadcast[Request](name="target_power") + self._target_power_tx = self._target_power_channel.new_sender() + self._task: asyncio.Task[None] | None = None + + @override + def component_ids(self) -> collections.abc.Set[int]: + """Return the set of PV inverter ids.""" + return self._pv_inverter_ids + + @override + async def start(self) -> None: + """Start the PV inverter manager.""" + self._component_data_caches = { + inv_id: LatestValueCache( + await connection_manager.get().api_client.inverter_data(inv_id) + ) + for inv_id in self._pv_inverter_ids + } + + @override + async def stop(self) -> None: + """Stop the PV inverter manager.""" + await asyncio.gather( + *[tracker.stop() for tracker in self._component_data_caches.values()] + ) + if self._component_pool_status_tracker: + await self._component_pool_status_tracker.stop() + + @override + async def distribute_power(self, request: Request) -> None: + """Distribute the requested power to the PV inverters. + + Args: + request: Request to get the distribution for. + + Raises: + ValueError: If no PV inverters are present in the component graph, but + component_ids are provided in the request. + """ + remaining_power = request.power + allocations: dict[int, Power] = {} + if not self._component_pool_status_tracker: + if not request.component_ids: + await self._results_sender.send( + Success( + succeeded_components=set(), + succeeded_power=Power.zero(), + excess_power=remaining_power, + request=request, + ) + ) + return + raise ValueError( + "Cannot distribute power to PV inverters without any inverters" + ) + working_components = list( + self._component_pool_status_tracker.get_working_components( + request.component_ids + ) + ) + + # When sorting by lower bounds, which are negative for PV inverters, we have to + # reverse the order, so that the inverters with the higher bounds i.e., the + # least absolute value are first. + working_components.sort( + key=lambda inv_id: self._component_data_caches[inv_id] + .get() + .active_power_inclusion_lower_bound, + reverse=True, + ) + + num_components = len(working_components) + for idx, inv_id in enumerate(working_components): + # Request powers are negative for PV inverters. When remaining power is + # greater than 0.0, we can stop allocating further. + if remaining_power > Power.zero() or is_close_to_zero( + remaining_power.as_watts() + ): + break + distribution = remaining_power / float(num_components - idx) + inv_data = self._component_data_caches[inv_id] + if not inv_data.has_value(): + allocations[inv_id] = Power.zero() + # Can't get device bounds, so can't use inverter. + continue + discharge_bounds = Power.from_watts( + inv_data.get().active_power_inclusion_lower_bound + ) + # Because all 3 values are negative or 0, we use max, to get the value + # with the least absolute value. + allocated_power = max(remaining_power, discharge_bounds, distribution) + allocations[inv_id] = allocated_power + remaining_power -= allocated_power + + _logger.debug( + "Distributing %s to PV inverters %s", + request.power, + allocations, + ) + await self._set_api_power(request, allocations, remaining_power) + + async def _set_api_power( # pylint: disable=too-many-locals + self, request: Request, allocations: dict[int, Power], remaining_power: Power + ) -> None: + api_client = connection_manager.get().api_client + tasks: dict[int, asyncio.Task[None]] = {} + for component_id, power in allocations.items(): + tasks[component_id] = asyncio.create_task( + api_client.set_power(component_id, power.as_watts()) + ) + _, pending = await asyncio.wait( + tasks.values(), + timeout=request.request_timeout.total_seconds(), + return_when=asyncio.ALL_COMPLETED, + ) + # collect the timed out tasks and cancel them while keeping the + # exceptions, so that they can be processed later. + for task in pending: + task.cancel() + await asyncio.gather(*pending, return_exceptions=True) + + failed_components: set[int] = set() + succeeded_components: set[int] = set() + failed_power = Power.zero() + for component_id, task in tasks.items(): + exc = task.exception() + if exc is not None: + failed_components.add(component_id) + failed_power += allocations[component_id] + else: + succeeded_components.add(component_id) + + match task.exception(): + case asyncio.CancelledError(): + _logger.warning( + "Timeout while setting power to PV inverter %s", component_id + ) + case grpc.aio.AioRpcError() as err: + _logger.warning( + "Error while setting power to PV inverter %s: %s", + component_id, + err, + ) + case Exception(): + _logger.exception( + "Unknown error while setting power to PV inverter: %s", + component_id, + ) + if failed_components: + await self._results_sender.send( + PartialFailure( + failed_components=failed_components, + succeeded_components=succeeded_components, + failed_power=failed_power, + succeeded_power=self._target_power - failed_power, + excess_power=remaining_power, + request=request, + ) + ) + return + await self._results_sender.send( + Success( + succeeded_components=succeeded_components, + succeeded_power=self._target_power, + excess_power=remaining_power, + request=request, + ) + ) + + def _get_pv_inverter_ids(self) -> collections.abc.Set[int]: + """Return the IDs of all PV inverters present in the component graph.""" + return { + inv.component_id + for inv in connection_manager.get().component_graph.components( + component_categories={ComponentCategory.INVERTER} + ) + if inv.type == InverterType.SOLAR + } diff --git a/src/frequenz/sdk/actor/power_distributing/_component_status/__init__.py b/src/frequenz/sdk/actor/power_distributing/_component_status/__init__.py index 929cbc726..4794cbf34 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_status/__init__.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_status/__init__.py @@ -12,6 +12,7 @@ SetPowerResult, ) from ._ev_charger_status_tracker import EVChargerStatusTracker +from ._pv_inverter_status_tracker import PVInverterStatusTracker __all__ = [ "BatteryStatusTracker", @@ -20,5 +21,6 @@ "ComponentStatusEnum", "ComponentStatusTracker", "EVChargerStatusTracker", + "PVInverterStatusTracker", "SetPowerResult", ] diff --git a/src/frequenz/sdk/actor/power_distributing/_component_status/_pv_inverter_status_tracker.py b/src/frequenz/sdk/actor/power_distributing/_component_status/_pv_inverter_status_tracker.py new file mode 100644 index 000000000..abe80ef40 --- /dev/null +++ b/src/frequenz/sdk/actor/power_distributing/_component_status/_pv_inverter_status_tracker.py @@ -0,0 +1,204 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Background service that tracks the status of a PV inverter.""" + +import asyncio +import logging +from datetime import datetime, timedelta, timezone + +# Component state for inverters and batteries is not wrapped by the +# microgrid client currently, so it needs to be imported directly from +# the api repo. +# pylint: disable=no-name-in-module +from frequenz.api.microgrid.inverter_pb2 import ( + ComponentState as PbInverterComponentState, +) + +# pylint: enable=no-name-in-module +from frequenz.channels import Receiver, Sender, select, selected_from +from frequenz.channels.timer import SkipMissedAndDrift, Timer +from frequenz.client.microgrid import InverterData +from typing_extensions import override + +from ....microgrid import connection_manager +from ..._background_service import BackgroundService +from ._blocking_status import BlockingStatus +from ._component_status import ( + ComponentStatus, + ComponentStatusEnum, + ComponentStatusTracker, + SetPowerResult, +) + +_logger = logging.getLogger(__name__) + + +class PVInverterStatusTracker(ComponentStatusTracker, BackgroundService): + """Status tracker for PV inverters. + + It reports a PV inverter as `WORKING` or `NOT_WORKING` based on + the status in the received component data from the microgrid API. + When no data is received for a specific duration, the component is + marked as `NOT_WORKING`. + + If it receives a power assignment failure from the PowerDistributor, + when the component is expected to be `WORKING`, it is marked as + `UNCERTAIN` for a specific interval, before being marked `WORKING` + again. + """ + + @override + def __init__( # pylint: disable=too-many-arguments + self, + component_id: int, + max_data_age: timedelta, + max_blocking_duration: timedelta, + status_sender: Sender[ComponentStatus], + set_power_result_receiver: Receiver[SetPowerResult], + ) -> None: + """Initialize this instance. + + Args: + component_id: ID of the PV inverter to monitor the status of. + max_data_age: max duration to wait for, before marking a component as + NOT_WORKING, unless new data arrives. + max_blocking_duration: max duration to wait for, before marking a component + as BLOCKING, unless new data arrives. + status_sender: Sender to send the status of the PV inverter. + set_power_result_receiver: Receiver for the power assignment result. + """ + BackgroundService.__init__( + self, name=f"PVInverterStatusTracker({component_id})" + ) + self._component_id = component_id + self._max_data_age = max_data_age + self._status_sender = status_sender + self._set_power_result_receiver = set_power_result_receiver + + self._last_status = ComponentStatusEnum.NOT_WORKING + self._blocking_status = BlockingStatus( + min_duration=timedelta(seconds=1.0), + max_duration=max_blocking_duration, + ) + + @override + def start(self) -> None: + """Start the status tracker.""" + self._tasks.add(asyncio.create_task(self._run_forever())) + + def _is_working(self, pv_data: InverterData) -> bool: + """Return whether the given data indicates that the PV inverter is working.""" + return pv_data._component_state in ( # pylint: disable=protected-access + PbInverterComponentState.COMPONENT_STATE_DISCHARGING, + PbInverterComponentState.COMPONENT_STATE_CHARGING, + PbInverterComponentState.COMPONENT_STATE_IDLE, + PbInverterComponentState.COMPONENT_STATE_STANDBY, + ) + + async def _run_forever(self) -> None: + while True: + try: + await self._run() + except Exception: # pylint: disable=broad-except + _logger.exception( + "Restarting after exception in PVInverterStatusTracker.run()" + ) + await asyncio.sleep(1.0) + + def _is_stale(self, pv_data: InverterData) -> bool: + """Return whether the given data is stale.""" + now = datetime.now(tz=timezone.utc) + stale = now - pv_data.timestamp > self._max_data_age + return stale + + def _handle_set_power_result( + self, set_power_result: SetPowerResult + ) -> ComponentStatusEnum: + """Handle a new set power result.""" + if self._component_id in set_power_result.succeeded: + return ComponentStatusEnum.WORKING + + self._blocking_status.block() + if self._last_status == ComponentStatusEnum.WORKING: + _logger.warning( + "PV inverter %s is in UNCERTAIN state. Set power result: %s", + self._component_id, + set_power_result, + ) + return ComponentStatusEnum.UNCERTAIN + + def _handle_pv_inverter_data(self, pv_data: InverterData) -> ComponentStatusEnum: + """Handle new PV inverter data.""" + if self._is_stale(pv_data): + if self._last_status == ComponentStatusEnum.WORKING: + _logger.warning( + "PV inverter %s data is stale. Last timestamp: %s", + self._component_id, + pv_data.timestamp, + ) + return ComponentStatusEnum.NOT_WORKING + + if self._is_working(pv_data): + if self._last_status == ComponentStatusEnum.NOT_WORKING: + _logger.warning( + "PV inverter %s is in WORKING state.", + self._component_id, + ) + return ComponentStatusEnum.WORKING + + if self._last_status == ComponentStatusEnum.WORKING: + _logger.warning( + "PV inverter %s is in NOT_WORKING state. Component state: %s", + self._component_id, + pv_data._component_state, # pylint: disable=protected-access + ) + return ComponentStatusEnum.NOT_WORKING + + async def _run(self) -> None: + """Run the status tracker.""" + api_client = connection_manager.get().api_client + pv_data_rx = await api_client.inverter_data(self._component_id) + set_power_result_rx = self._set_power_result_receiver + missing_data_timer = Timer(self._max_data_age, SkipMissedAndDrift()) + + # Send initial status + await self._status_sender.send( + ComponentStatus(self._component_id, self._last_status) + ) + + async for selected in select( + pv_data_rx, set_power_result_rx, missing_data_timer + ): + new_status = ComponentStatusEnum.NOT_WORKING + if selected_from(selected, pv_data_rx): + missing_data_timer.reset() + new_status = self._handle_pv_inverter_data(selected.message) + elif selected_from(selected, set_power_result_rx): + new_status = self._handle_set_power_result(selected.message) + elif selected_from(selected, missing_data_timer): + _logger.warning( + "No PV inverter %s data received for %s. " + "Setting status to NOT_WORKING.", + self._component_id, + self._max_data_age, + ) + + # Send status update if status changed + if ( + self._blocking_status.is_blocked() + and new_status != ComponentStatusEnum.NOT_WORKING + ): + new_status = ComponentStatusEnum.UNCERTAIN + + if new_status != self._last_status: + _logger.info( + "EV charger %s status changed from %s to %s", + self._component_id, + self._last_status, + new_status, + ) + self._last_status = new_status + await self._status_sender.send( + ComponentStatus(self._component_id, new_status) + ) diff --git a/src/frequenz/sdk/actor/power_distributing/power_distributing.py b/src/frequenz/sdk/actor/power_distributing/power_distributing.py index 6af02716c..c6473343b 100644 --- a/src/frequenz/sdk/actor/power_distributing/power_distributing.py +++ b/src/frequenz/sdk/actor/power_distributing/power_distributing.py @@ -15,10 +15,15 @@ import asyncio from frequenz.channels import Receiver, Sender -from frequenz.client.microgrid import ComponentCategory +from frequenz.client.microgrid import ComponentCategory, ComponentType, InverterType from ...actor._actor import Actor -from ._component_managers import BatteryManager, ComponentManager, EVChargerManager +from ._component_managers import ( + BatteryManager, + ComponentManager, + EVChargerManager, + PVManager, +) from ._component_status import ComponentPoolStatus from .request import Request from .result import Result @@ -52,19 +57,18 @@ class PowerDistributingActor(Actor): def __init__( # pylint: disable=too-many-arguments self, - component_category: ComponentCategory, requests_receiver: Receiver[Request], results_sender: Sender[Result], component_pool_status_sender: Sender[ComponentPoolStatus], wait_for_data_sec: float, *, + component_category: ComponentCategory, + component_type: ComponentType | None = None, name: str | None = None, ) -> None: """Create class instance. Args: - component_category: The category of the components that this actor is - responsible for. requests_receiver: Receiver for receiving power requests from the power manager. results_sender: Sender for sending results to the power manager. @@ -72,6 +76,15 @@ def __init__( # pylint: disable=too-many-arguments components are expected to be working. wait_for_data_sec: How long actor should wait before processing first request. It is a time needed to collect first components data. + component_category: The category of the components that this actor is + responsible for. + component_type: The type of the component of the given category that this + actor is responsible for. This is used only when the component category + is not enough to uniquely identify the component. For example, when the + category is `ComponentCategory.INVERTER`, the type is needed to identify + the inverter as a solar inverter or a battery inverter. This can be + `None` when the component category is enough to uniquely identify the + component. name: The name of the actor. If `None`, `str(id(self))` will be used. This is used mostly for debugging purposes. @@ -80,6 +93,7 @@ def __init__( # pylint: disable=too-many-arguments """ super().__init__(name=name) self._component_category = component_category + self._component_type = component_type self._requests_receiver = requests_receiver self._result_sender = results_sender self._wait_for_data_sec = wait_for_data_sec @@ -93,6 +107,13 @@ def __init__( # pylint: disable=too-many-arguments self._component_manager = EVChargerManager( component_pool_status_sender, results_sender ) + elif ( + component_category == ComponentCategory.INVERTER + and component_type == InverterType.SOLAR + ): + self._component_manager = PVManager( + component_pool_status_sender, results_sender + ) else: raise ValueError( f"PowerDistributor doesn't support controlling: {component_category}" diff --git a/src/frequenz/sdk/microgrid/__init__.py b/src/frequenz/sdk/microgrid/__init__.py index b68d5f5a5..34b26064a 100644 --- a/src/frequenz/sdk/microgrid/__init__.py +++ b/src/frequenz/sdk/microgrid/__init__.py @@ -74,16 +74,31 @@ ## Producers: PV Arrays, CHP -The total {{glossary("pv", "PV")}} power production in a microgrid can be streamed -through [`pv_power`][frequenz.sdk.timeseries.logical_meter.LogicalMeter.pv_power] , and -similarly the total CHP production in a site can be streamed through -[`chp_power`][frequenz.sdk.timeseries.logical_meter.LogicalMeter.chp_power]. And total -producer power is available through -[`producer_power`][frequenz.sdk.timeseries.producer.Producer.power]. +The total CHP production in a site can be streamed through +[`chp_power`][frequenz.sdk.timeseries.logical_meter.LogicalMeter.chp_power]. PV Power +is available through the PV pool described below. And total producer power is available +through [`microgrid.producer().power`][frequenz.sdk.timeseries.producer.Producer.power]. As is the case with the other methods, if PV Arrays or CHPs are not available in a microgrid, the corresponding methods stream zero values. +## PV Arrays + +The total PV power production is available through +[`pv_pool`][frequenz.sdk.microgrid.pv_pool]'s +[`power`][frequenz.sdk.timeseries.pv_pool.PVPool.power]. The PV pool by default uses +all PV inverters available at a location, but PV pool instances can be created for +subsets of PV inverters if necessary, by specifying the inverter ids. + +The `pv_pool` also provides available power bounds through the +[`power_status`][frequenz.sdk.timeseries.pv_pool.PVPool.power_status] method. + +The `pv_pool` also provides a control method +[`propose_power`][frequenz.sdk.timeseries.pv_pool.PVPool.propose_power], which accepts +values in the {{glossary("psc", "Passive Sign Convention")}} and supports only +production. + + ## Batteries The total Battery power is available through @@ -112,6 +127,16 @@ [`power`][frequenz.sdk.timeseries.ev_charger_pool.EVChargerPool.power] method that streams the total power measured for all the {{glossary("ev-charger", "EV Chargers")}} at a site. + +The `ev_charger_pool` also provides available power bounds through the +[`power_status`][frequenz.sdk.timeseries.ev_charger_pool.EVChargerPool.power_status] +method. + + +The `ev_charger_pool` also provides a control method +[`propose_power`][frequenz.sdk.timeseries.ev_charger_pool.EVChargerPool.propose_power], +which accepts values in the {{glossary("psc", "Passive Sign Convention")}} and supports +only charging. """ # noqa: D205, D400 from ..actor import ResamplerConfig @@ -124,6 +149,7 @@ grid, logical_meter, producer, + pv_pool, voltage, ) @@ -149,5 +175,6 @@ async def initialize(host: str, port: int, resampler_config: ResamplerConfig) -> "frequency", "logical_meter", "producer", + "pv_pool", "voltage", ] diff --git a/src/frequenz/sdk/microgrid/_data_pipeline.py b/src/frequenz/sdk/microgrid/_data_pipeline.py index 4e38764b5..815b3b5fe 100644 --- a/src/frequenz/sdk/microgrid/_data_pipeline.py +++ b/src/frequenz/sdk/microgrid/_data_pipeline.py @@ -17,7 +17,7 @@ from dataclasses import dataclass from frequenz.channels import Broadcast, Sender -from frequenz.client.microgrid import ComponentCategory +from frequenz.client.microgrid import ComponentCategory, InverterType from ..actor._actor import Actor from ..timeseries._grid_frequency import GridFrequency @@ -44,6 +44,8 @@ ) from ..timeseries.logical_meter import LogicalMeter from ..timeseries.producer import Producer + from ..timeseries.pv_pool import PVPool + from ..timeseries.pv_pool._pv_pool_reference_store import PVPoolReferenceStore _logger = logging.getLogger(__name__) @@ -96,10 +98,15 @@ def __init__( self._resampling_actor: _ActorInfo | None = None self._battery_power_wrapper = PowerWrapper( - ComponentCategory.BATTERY, self._channel_registry + self._channel_registry, component_category=ComponentCategory.BATTERY ) self._ev_power_wrapper = PowerWrapper( - ComponentCategory.EV_CHARGER, self._channel_registry + self._channel_registry, component_category=ComponentCategory.EV_CHARGER + ) + self._pv_power_wrapper = PowerWrapper( + self._channel_registry, + component_category=ComponentCategory.INVERTER, + component_type=InverterType.SOLAR, ) self._logical_meter: LogicalMeter | None = None @@ -112,6 +119,7 @@ def __init__( self._battery_pool_reference_stores: dict[ frozenset[int], BatteryPoolReferenceStore ] = {} + self._pv_pool_reference_stores: dict[frozenset[int], PVPoolReferenceStore] = {} self._frequency_instance: GridFrequency | None = None self._voltage_instance: VoltageStreamer | None = None @@ -245,6 +253,71 @@ def ev_charger_pool( self._ev_charger_pool_reference_stores[ref_store_key], name, priority ) + def pv_pool( + self, + pv_inverter_ids: abc.Set[int] | None = None, + name: str | None = None, + priority: int = -sys.maxsize - 1, + ) -> PVPool: + """Return a new `PVPool` instance for the given ids. + + If a `PVPoolReferenceStore` instance for the given PV inverter ids doesn't + exist, a new one is created and used for creating the `PVPool`. + + Args: + pv_inverter_ids: Optional set of IDs of PV inverters to be managed by the + `PVPool`. + name: An optional name used to identify this instance of the pool or a + corresponding actor in the logs. + priority: The priority of the actor making the call. + + Returns: + A `PVPool` instance. + """ + from ..timeseries.pv_pool import PVPool + from ..timeseries.pv_pool._pv_pool_reference_store import PVPoolReferenceStore + + if not self._pv_power_wrapper.started: + self._pv_power_wrapper.start() + + # We use frozenset to make a hashable key from the input set. + ref_store_key: frozenset[int] = frozenset() + if pv_inverter_ids is not None: + ref_store_key = frozenset(pv_inverter_ids) + + pool_key = f"{ref_store_key}-{priority}" + if pool_key in self._known_pool_keys: + _logger.warning( + "A PVPool instance was already created for pv_inverter_ids=%s and " + "priority=%s using `microgrid.pv_pool(...)`." + "\n Hint: If the multiple instances are created from the same actor, " + "consider reusing the same instance." + "\n Hint: If the instances are created from different actors, " + "consider using different priorities to distinguish them.", + pv_inverter_ids, + priority, + ) + else: + self._known_pool_keys.add(pool_key) + + if ref_store_key not in self._pv_pool_reference_stores: + self._pv_pool_reference_stores[ref_store_key] = PVPoolReferenceStore( + channel_registry=self._channel_registry, + resampler_subscription_sender=self._resampling_request_sender(), + status_receiver=( + self._pv_power_wrapper.status_channel.new_receiver(limit=1) + ), + power_manager_requests_sender=( + self._pv_power_wrapper.proposal_channel.new_sender() + ), + power_manager_bounds_subs_sender=( + self._pv_power_wrapper.bounds_subscription_channel.new_sender() + ), + component_ids=pv_inverter_ids, + ) + + return PVPool(self._pv_pool_reference_stores[ref_store_key], name, priority) + def grid(self) -> Grid: """Return the grid measuring point.""" if self._grid is None: @@ -504,6 +577,43 @@ def battery_pool( return _get().battery_pool(battery_ids, name, priority) +def pv_pool( + pv_inverter_ids: abc.Set[int] | None = None, + name: str | None = None, + priority: int = -sys.maxsize - 1, +) -> PVPool: + """Return a new `PVPool` instance for the given parameters. + + The priority value is used to resolve conflicts when multiple actors are trying to + propose different power values for the same set of PV inverters. + + !!! note + When specifying priority, bigger values indicate higher priority. The default + priority is the lowest possible value. + + It is recommended to reuse the same instance of the `PVPool` within the same + actor, unless they are managing different sets of PV inverters. + + In deployments with multiple actors managing the same set of PV inverters, it is + recommended to use different priorities to distinguish between them. If not, + a random prioritization will be imposed on them to resolve conflicts, which may + lead to unexpected behavior like longer duration to converge on the desired + power. + + Args: + pv_inverter_ids: Optional set of IDs of PV inverters to be managed by the + `PVPool`. If not specified, all PV inverters available in the component + graph are used. + name: An optional name used to identify this instance of the pool or a + corresponding actor in the logs. + priority: The priority of the actor making the call. + + Returns: + A `PVPool` instance. + """ + return _get().pv_pool(pv_inverter_ids, name, priority) + + def grid() -> Grid: """Return the grid measuring point.""" return _get().grid() diff --git a/src/frequenz/sdk/microgrid/_power_wrapper.py b/src/frequenz/sdk/microgrid/_power_wrapper.py index d70fd351a..2749f5f50 100644 --- a/src/frequenz/sdk/microgrid/_power_wrapper.py +++ b/src/frequenz/sdk/microgrid/_power_wrapper.py @@ -13,7 +13,7 @@ # pylint seems to think this is a cyclic import, but it is not. # # pylint: disable=cyclic-import -from frequenz.client.microgrid import ComponentCategory +from frequenz.client.microgrid import ComponentCategory, ComponentType # A number of imports had to be done inside functions where they are used, to break # import cycles. @@ -37,16 +37,28 @@ class PowerWrapper: """Wrapper around the power managing and power distributing actors.""" def __init__( - self, component_category: ComponentCategory, channel_registry: ChannelRegistry + self, + channel_registry: ChannelRegistry, + *, + component_category: ComponentCategory, + component_type: ComponentType | None = None, ): """Initialize the power control. Args: + channel_registry: A channel registry for use in the actors. component_category: The category of the components that actors started by this instance of the PowerWrapper will be responsible for. - channel_registry: A channel registry for use in the actors. + component_type: The type of the component of the given category that this + actor is responsible for. This is used only when the component category + is not enough to uniquely identify the component. For example, when the + category is `ComponentCategory.INVERTER`, the type is needed to identify + the inverter as a solar inverter or a battery inverter. This can be + `None` when the component category is enough to uniquely identify the + component. """ self._component_category = component_category + self._component_type = component_type self._channel_registry = channel_registry self.status_channel: Broadcast[ComponentPoolStatus] = Broadcast( @@ -95,6 +107,7 @@ def _start_power_managing_actor(self) -> None: self._power_managing_actor = PowerManagingActor( component_category=self._component_category, + component_type=self._component_type, proposals_receiver=self.proposal_channel.new_receiver(), bounds_subscription_receiver=( self.bounds_subscription_channel.new_receiver() @@ -134,6 +147,7 @@ def _start_power_distributing_actor(self) -> None: # will not be available in the high level interface. self._power_distributing_actor = PowerDistributingActor( component_category=self._component_category, + component_type=self._component_type, requests_receiver=self._power_distribution_requests_channel.new_receiver(), results_sender=self._power_distribution_results_channel.new_sender(), component_pool_status_sender=self.status_channel.new_sender(), diff --git a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py index bc8ff0b74..bd9fc6dea 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py @@ -40,11 +40,6 @@ class BatteryPool: """An interface for interaction with pools of batteries. - !!! note - `BatteryPool` instances are not meant to be created directly by users. Use the - [`microgrid.battery_pool`][frequenz.sdk.microgrid.battery_pool] method for - creating `BatteryPool` instances. - Provides: - properties for fetching reporting streams of instantaneous [power][frequenz.sdk.timeseries.battery_pool.BatteryPool.power], @@ -429,3 +424,7 @@ def _system_power_bounds(self) -> ReceiverFetcher[SystemBounds]: ) return self._battery_pool._active_methods[method_name] + + async def stop(self) -> None: + """Stop all tasks and channels owned by the BatteryPool.""" + await self._battery_pool.stop() diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py index 3758f66a2..506983b23 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py @@ -32,11 +32,6 @@ class EVChargerPoolError(Exception): class EVChargerPool: """An interface for interaction with pools of EV Chargers. - !!! note - `EVChargerPool` instances are not meant to be created directly by users. Use the - [`microgrid.ev_charger_pool`][frequenz.sdk.microgrid.ev_charger_pool] method for - creating `EVChargerPool` instances. - Provides: - Aggregate [`power`][frequenz.sdk.timeseries.ev_charger_pool.EVChargerPool.power] and 3-phase diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py index 00365bdae..faefff5ee 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py @@ -10,7 +10,7 @@ from frequenz.channels import Broadcast, Sender, select, selected_from from frequenz.channels.timer import SkipMissedAndDrift, Timer -from frequenz.client.microgrid import ComponentCategory +from frequenz.client.microgrid import ComponentCategory, MeterData from ..._internal._asyncio import cancel_and_await from ..._internal._channels import LatestValueCache @@ -55,6 +55,7 @@ def __init__(self, repeat_interval: timedelta) -> None: ) self._bounds_rx = self._bounds_chan.new_receiver() self._bounds_tx = self._bounds_chan.new_sender() + self._meter_data_cache: LatestValueCache[MeterData] | None = None async def set(self, component_id: int, max_amps: float) -> None: """Send the given current limit to the microgrid for the given component id. @@ -75,6 +76,8 @@ def new_bounds_sender(self) -> Sender[ComponentCurrentLimit]: async def stop(self) -> None: """Stop the BoundsSetter.""" + if self._meter_data_cache is not None: + await self._meter_data_cache.stop() await self._bounds_chan.close() await cancel_and_await(self._task) @@ -96,7 +99,7 @@ async def _run(self) -> None: _logger.error(err) raise RuntimeError(err) - meter_data = LatestValueCache( + self._meter_data_cache = LatestValueCache( await api_client.meter_data(next(iter(meters)).component_id) ) latest_bound: dict[int, ComponentCurrentLimit] = {} @@ -107,7 +110,7 @@ async def _run(self) -> None: ) async for selected in select(bound_chan, timer): - meter = meter_data.get() + meter = self._meter_data_cache.get() if meter is None: raise ValueError("Meter channel closed.") diff --git a/src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_pv_power_formula.py b/src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_pv_power_formula.py index b859f21d6..c3e3a1004 100644 --- a/src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_pv_power_formula.py +++ b/src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_pv_power_formula.py @@ -38,11 +38,15 @@ def generate( # noqa: DOC502 ) component_graph = connection_manager.get().component_graph - pv_components = component_graph.dfs( - self._get_grid_component(), - set(), - component_graph.is_pv_chain, - ) + component_ids = self._config.component_ids + if component_ids: + pv_components = component_graph.components(set(component_ids)) + else: + pv_components = component_graph.dfs( + self._get_grid_component(), + set(), + component_graph.is_pv_chain, + ) if not pv_components: _logger.warning( diff --git a/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py b/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py index ca511c108..4ea557323 100644 --- a/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py +++ b/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py @@ -13,7 +13,7 @@ from .._quantities import Power, Quantity from ..formula_engine import FormulaEngine from ..formula_engine._formula_engine_pool import FormulaEnginePool -from ..formula_engine._formula_generators import CHPPowerFormula, PVPowerFormula +from ..formula_engine._formula_generators import CHPPowerFormula class LogicalMeter: @@ -42,17 +42,18 @@ class LogicalMeter: ) logical_meter = microgrid.logical_meter() + pv_pool = microgrid.pv_pool() grid = microgrid.grid() # Get a receiver for a builtin formula - pv_power_recv = logical_meter.pv_power.new_receiver() + pv_power_recv = pv_pool.power.new_receiver() async for pv_power_sample in pv_power_recv: print(pv_power_sample) # or compose formulas to create a new formula net_power_recv = ( ( - grid.power - logical_meter.pv_power + grid.power - pv_pool.power ) .build("net_power") .new_receiver() @@ -123,28 +124,6 @@ def start_formula( formula, component_metric_id, nones_are_zeros=nones_are_zeros ) - @property - def pv_power(self) -> FormulaEngine[Power]: - """Fetch the PV power in the microgrid. - - This formula produces values that are in the Passive Sign Convention (PSC). - - If a formula engine to calculate PV power is not already running, it will be - started. - - A receiver from the formula engine can be created using the `new_receiver` - method. - - Returns: - A FormulaEngine that will calculate and stream PV total power. - """ - engine = self._formula_pool.from_power_formula_generator( - "pv_power", - PVPowerFormula, - ) - assert isinstance(engine, FormulaEngine) - return engine - @property def chp_power(self) -> FormulaEngine[Power]: """Fetch the CHP power production in the microgrid. diff --git a/src/frequenz/sdk/timeseries/pv_pool/__init__.py b/src/frequenz/sdk/timeseries/pv_pool/__init__.py new file mode 100644 index 000000000..334ffc311 --- /dev/null +++ b/src/frequenz/sdk/timeseries/pv_pool/__init__.py @@ -0,0 +1,13 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Interactions with PV inverters.""" + +from ._pv_pool import PVPool, PVPoolError +from ._result_types import PVPoolReport + +__all__ = [ + "PVPool", + "PVPoolError", + "PVPoolReport", +] diff --git a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py new file mode 100644 index 000000000..6b79748e5 --- /dev/null +++ b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py @@ -0,0 +1,196 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Interactions with pools of PV inverters.""" + +import asyncio +import typing +import uuid +from collections import abc +from datetime import timedelta + +from ..._internal._channels import ReceiverFetcher +from ...actor import _power_managing +from ...timeseries import Bounds +from .._base_types import SystemBounds +from .._quantities import Power +from ..formula_engine import FormulaEngine +from ..formula_engine._formula_generators import FormulaGeneratorConfig, PVPowerFormula +from ._pv_pool_reference_store import PVPoolReferenceStore +from ._result_types import PVPoolReport + + +class PVPoolError(Exception): + """An error that occurred in any of the PVPool methods.""" + + +class PVPool: + """An interface for interaction with pools of PV inverters. + + Provides: + - Aggregate [`power`][frequenz.sdk.timeseries.pv_pool.PVPool.power] + measurements of the PV inverters in the pool. + """ + + def __init__( # pylint: disable=too-many-arguments + self, + pv_pool_ref: PVPoolReferenceStore, + name: str | None, + priority: int, + ) -> None: + """Initialize the instance. + + !!! note + `PVPool` instances are not meant to be created directly by users. Use the + [`microgrid.pv_pool`][frequenz.sdk.microgrid.pv_pool] method for creating + `PVPool` instances. + + Args: + pv_pool_ref: The reference store for the PV pool. + name: The name of the PV pool. + priority: The priority of the PV pool. + """ + self._pv_pool_ref = pv_pool_ref + unique_id = uuid.uuid4() + self._source_id = str(unique_id) if name is None else f"{name}-{unique_id}" + self._priority = priority + + async def propose_power( + self, + power: Power | None, + *, + request_timeout: timedelta = timedelta(seconds=5.0), + bounds: Bounds[Power | None] = Bounds(None, None), + ) -> None: + """Send a proposal to the power manager for the pool's set of PV inverters. + + This proposal is for the maximum power that can be set for the PV inverters in + the pool. The actual production might be lower. + + Power values need to follow the Passive Sign Convention (PSC). That is, positive + values indicate charge power and negative values indicate discharge power. + Only discharge powers are allowed for PV inverters. + + If the same PV inverters are shared by multiple actors, the power manager will + consider the priority of the actors, the bounds they set, and their preferred + power, when calculating the target power for the PV inverters. + + The preferred power of lower priority actors will take precedence as long as + they respect the bounds set by higher priority actors. If lower priority actors + request power values outside of the bounds set by higher priority actors, the + target power will be the closest value to the preferred power that is within the + bounds. + + When there are no other actors trying to use the same PV inverters, the actor's + preferred power would be set as the target power, as long as it falls within the + system power bounds for the PV inverters. + + The result of the request can be accessed using the receiver returned from the + [`power_status`][frequenz.sdk.timeseries.pv_pool.PVPool.power_status] + method, which also streams the bounds that an actor should comply with, based on + its priority. + + Args: + power: The power to propose for the PV inverters in the pool. If `None`, + this proposal will not have any effect on the target power, unless + bounds are specified. If both are `None`, it is equivalent to not + having a proposal or withdrawing a previous one. + request_timeout: The timeout for the request. + bounds: The power bounds for the proposal. These bounds will apply to + actors with a lower priority, and can be overridden by bounds from + actors with a higher priority. If None, the power bounds will be set to + the maximum power of the batteries in the pool. This is currently and + experimental feature. + + Raises: + PVPoolError: If a charge power for PV inverters is requested. + """ + if power is not None and power > Power.zero(): + raise PVPoolError("Charge powers for PV inverters is not supported.") + await self._pv_pool_ref.power_manager_requests_sender.send( + _power_managing.Proposal( + source_id=self._source_id, + preferred_power=power, + bounds=bounds, + component_ids=self._pv_pool_ref.component_ids, + priority=self._priority, + creation_time=asyncio.get_running_loop().time(), + request_timeout=request_timeout, + ) + ) + + @property + def component_ids(self) -> abc.Set[int]: + """Return component IDs of all PV inverters managed by this PVPool. + + Returns: + Set of managed component IDs. + """ + return self._pv_pool_ref.component_ids + + @property + def power(self) -> FormulaEngine[Power]: + """Fetch the total power for the EV Chargers in the pool. + + This formula produces values that are in the Passive Sign Convention (PSC). + + If a formula engine to calculate EV Charger power is not already running, it + will be started. + + A receiver from the formula engine can be created using the `new_receiver` + method. + + Returns: + A FormulaEngine that will calculate and stream the total power of all EV + Chargers. + """ + engine = self._pv_pool_ref.formula_pool.from_power_formula_generator( + "pv_power", + PVPowerFormula, + FormulaGeneratorConfig( + component_ids=self._pv_pool_ref.component_ids, + ), + ) + assert isinstance(engine, FormulaEngine) + return engine + + @property + def power_status(self) -> ReceiverFetcher[PVPoolReport]: + """Get a receiver to receive new power status reports when they change. + + These include + - the current inclusion/exclusion bounds available for the pool's priority, + - the current target power for the pool's set of batteries, + - the result of the last distribution request for the pool's set of batteries. + + Returns: + A receiver that will stream power status reports for the pool's priority. + """ + sub = _power_managing.ReportRequest( + source_id=self._source_id, + priority=self._priority, + component_ids=self._pv_pool_ref.component_ids, + ) + self._pv_pool_ref.power_bounds_subs[sub.get_channel_name()] = ( + asyncio.create_task( + self._pv_pool_ref.power_manager_bounds_subs_sender.send(sub) + ) + ) + channel = self._pv_pool_ref.channel_registry.get_or_create( + _power_managing._Report, # pylint: disable=protected-access + sub.get_channel_name(), + ) + channel.resend_latest = True + + # More details on why the cast is needed here: + # https://github.com/frequenz-floss/frequenz-sdk-python/issues/823 + return typing.cast(ReceiverFetcher[PVPoolReport], channel) + + async def stop(self) -> None: + """Stop all tasks and channels owned by the PVPool.""" + await self._pv_pool_ref.stop() + + @property + def _system_power_bounds(self) -> ReceiverFetcher[SystemBounds]: + """Return a receiver fetcher for the system power bounds.""" + return self._pv_pool_ref.bounds_channel diff --git a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py new file mode 100644 index 000000000..63d23a39c --- /dev/null +++ b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py @@ -0,0 +1,103 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Manages shared state/tasks for a set of PV inverters.""" + + +import asyncio +import uuid +from collections import abc + +from frequenz.channels import Broadcast, Receiver, Sender +from frequenz.client.microgrid import ComponentCategory, InverterType + +from ...actor import ChannelRegistry, ComponentMetricRequest +from ...actor._power_managing._base_classes import Proposal, ReportRequest +from ...actor.power_distributing import ComponentPoolStatus +from ...microgrid import connection_manager +from .._base_types import SystemBounds +from ..formula_engine._formula_engine_pool import FormulaEnginePool +from ._system_bounds_tracker import PVSystemBoundsTracker + + +class PVPoolReferenceStore: + """A class for maintaining the shared state/tasks for a set of pool of PV inverters. + + This includes ownership of + - the formula engine pool and metric calculators. + - the tasks for calculating system bounds for the PV inverters. + + These are independent of the priority of the actors and can be shared between + multiple users of the same set of PV inverters. + + They are exposed through the PVPool class. + """ + + def __init__( # pylint: disable=too-many-arguments + self, + channel_registry: ChannelRegistry, + resampler_subscription_sender: Sender[ComponentMetricRequest], + status_receiver: Receiver[ComponentPoolStatus], + power_manager_requests_sender: Sender[Proposal], + power_manager_bounds_subs_sender: Sender[ReportRequest], + component_ids: abc.Set[int] | None = None, + ): + """Initialize this instance. + + Args: + channel_registry: A channel registry instance shared with the resampling + actor. + resampler_subscription_sender: A sender for sending metric requests to the + resampling actor. + status_receiver: A receiver that streams the status of the PV inverters in + the pool. + power_manager_requests_sender: A Channel sender for sending power + requests to the power managing actor. + power_manager_bounds_subs_sender: A Channel sender for sending power bounds + subscription requests to the power managing actor. + component_ids: An optional list of component_ids belonging to this pool. If + not specified, IDs of all PV inverters in the microgrid will be fetched + from the component graph. + """ + self.channel_registry = channel_registry + self.resampler_subscription_sender = resampler_subscription_sender + self.status_receiver = status_receiver + self.power_manager_requests_sender = power_manager_requests_sender + self.power_manager_bounds_subs_sender = power_manager_bounds_subs_sender + + if component_ids is not None: + self.component_ids: frozenset[int] = frozenset(component_ids) + else: + graph = connection_manager.get().component_graph + self.component_ids = frozenset( + { + inv.component_id + for inv in graph.components( + component_categories={ComponentCategory.INVERTER} + ) + if inv.type == InverterType.SOLAR + } + ) + + self.power_bounds_subs: dict[str, asyncio.Task[None]] = {} + + self.namespace: str = f"pv-pool-{uuid.uuid4()}" + self.formula_pool = FormulaEnginePool( + self.namespace, + self.channel_registry, + self.resampler_subscription_sender, + ) + self.bounds_channel: Broadcast[SystemBounds] = Broadcast( + name=f"System Bounds for PV inverters: {component_ids}" + ) + self.bounds_tracker: PVSystemBoundsTracker = PVSystemBoundsTracker( + self.component_ids, + self.status_receiver, + self.bounds_channel.new_sender(), + ) + self.bounds_tracker.start() + + async def stop(self) -> None: + """Stop all tasks and channels owned by the EVChargerPool.""" + await self.formula_pool.stop() + await self.bounds_tracker.stop() diff --git a/src/frequenz/sdk/timeseries/pv_pool/_result_types.py b/src/frequenz/sdk/timeseries/pv_pool/_result_types.py new file mode 100644 index 000000000..7fcf2447b --- /dev/null +++ b/src/frequenz/sdk/timeseries/pv_pool/_result_types.py @@ -0,0 +1,31 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Types for exposing PV pool reports.""" + +import typing + +from ...actor import power_distributing +from .._base_types import Bounds +from .._quantities import Power + + +class PVPoolReport(typing.Protocol): + """A status report for a PV pool.""" + + target_power: Power | None + """The currently set power for the PV inverters.""" + + distribution_result: power_distributing.Result | None + """The result of the last power distribution. + + This is `None` if no power distribution has been performed yet. + """ + + @property + def bounds(self) -> Bounds[Power] | None: + """The usable bounds for the PV inverters. + + These bounds are adjusted to any restrictions placed by actors with higher + priorities. + """ diff --git a/src/frequenz/sdk/timeseries/pv_pool/_system_bounds_tracker.py b/src/frequenz/sdk/timeseries/pv_pool/_system_bounds_tracker.py new file mode 100644 index 000000000..3ab7c55ee --- /dev/null +++ b/src/frequenz/sdk/timeseries/pv_pool/_system_bounds_tracker.py @@ -0,0 +1,154 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""System bounds tracker for PV inverters.""" + +import asyncio +import logging +from collections import abc + +from frequenz.channels import Receiver, Sender, merge, select, selected_from +from frequenz.client.microgrid import InverterData + +from ...actor import BackgroundService +from ...actor.power_distributing._component_status import ComponentPoolStatus +from ...microgrid import connection_manager +from .._base_types import Bounds, SystemBounds +from .._quantities import Power + +_logger = logging.getLogger(__name__) + + +class PVSystemBoundsTracker(BackgroundService): + """Track the system bounds for PV inverters. + + System bounds are the aggregate bounds for the PV inverters in the pool that are in + a working state. They are calculated from the individual bounds received from the + microgrid API. + + The system bounds are sent to the `bounds_sender` whenever they change. + """ + + def __init__( + self, + component_ids: abc.Set[int], + status_receiver: Receiver[ComponentPoolStatus], + bounds_sender: Sender[SystemBounds], + ): + """Initialize the system bounds tracker. + + Args: + component_ids: The ids of the components to track. + status_receiver: A receiver that streams the status of the PV inverters in + the pool. + bounds_sender: A sender to send the system bounds to. + """ + super().__init__() + + self._component_ids = component_ids + self._status_receiver = status_receiver + self._bounds_sender = bounds_sender + self._latest_component_data: dict[int, InverterData] = {} + self._last_sent_bounds: SystemBounds | None = None + self._component_pool_status = ComponentPoolStatus(set(), set()) + + def start(self) -> None: + """Start the PV inverter system bounds tracker.""" + self._tasks.add(asyncio.create_task(self._run_forever())) + + async def _send_bounds(self) -> None: + """Calculate and send the aggregate system bounds if they have changed.""" + if not self._latest_component_data: + return + inclusion_bounds = Bounds( + lower=Power.from_watts( + sum( + data.active_power_inclusion_lower_bound + for data in self._latest_component_data.values() + ) + ), + upper=Power.from_watts( + sum( + data.active_power_inclusion_upper_bound + for data in self._latest_component_data.values() + ) + ), + ) + exclusion_bounds = Bounds( + lower=Power.from_watts( + sum( + data.active_power_exclusion_lower_bound + for data in self._latest_component_data.values() + ) + ), + upper=Power.from_watts( + sum( + data.active_power_exclusion_upper_bound + for data in self._latest_component_data.values() + ) + ), + ) + + if ( + self._last_sent_bounds is None + or self._last_sent_bounds.inclusion_bounds != inclusion_bounds + or self._last_sent_bounds.exclusion_bounds != exclusion_bounds + ): + self._last_sent_bounds = SystemBounds( + timestamp=max( + data.timestamp for data in self._latest_component_data.values() + ), + inclusion_bounds=inclusion_bounds, + exclusion_bounds=exclusion_bounds, + ) + await self._bounds_sender.send(self._last_sent_bounds) + + async def _run_forever(self) -> None: + """Run the system bounds tracker.""" + while True: + try: + await self._run() + except Exception: # pylint: disable=broad-except + _logger.exception( + "Restarting after exception in PVSystemBoundsTracker.run()" + ) + await asyncio.sleep(1.0) + + async def _run(self) -> None: + """Run the system bounds tracker.""" + api_client = connection_manager.get().api_client + status_rx = self._status_receiver + pv_data_rx = merge( + *( + await asyncio.gather( + *( + api_client.inverter_data(component_id) + for component_id in self._component_ids + ) + ) + ) + ) + + async for selected in select(status_rx, pv_data_rx): + if selected_from(selected, status_rx): + self._component_pool_status = selected.message + to_remove = [] + for comp_id in self._latest_component_data: + if ( + comp_id not in self._component_pool_status.working + and comp_id not in self._component_pool_status.uncertain + ): + to_remove.append(comp_id) + for comp_id in to_remove: + del self._latest_component_data[comp_id] + elif selected_from(selected, pv_data_rx): + data = selected.message + comp_id = data.component_id + if ( + comp_id not in self._component_pool_status.working + and comp_id not in self._component_pool_status.uncertain + ): + continue + self._latest_component_data[data.component_id] = data + + await self._send_bounds() diff --git a/tests/actor/power_distributing/_component_status/test_pv_inverter_status.py b/tests/actor/power_distributing/_component_status/test_pv_inverter_status.py new file mode 100644 index 000000000..5688719d4 --- /dev/null +++ b/tests/actor/power_distributing/_component_status/test_pv_inverter_status.py @@ -0,0 +1,164 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Tests for PVInverterStatusTracker.""" + + +import asyncio +from datetime import datetime, timedelta, timezone + +# pylint: disable=no-name-in-module +from frequenz.api.microgrid.inverter_pb2 import ( + ComponentState as PbInverterComponentState, +) + +# pylint: enable=no-name-in-module +from frequenz.channels import Broadcast +from pytest_mock import MockerFixture + +from frequenz.sdk._internal._asyncio import cancel_and_await +from frequenz.sdk.actor.power_distributing._component_status import ( + ComponentStatus, + ComponentStatusEnum, + PVInverterStatusTracker, + SetPowerResult, +) + +from ....timeseries.mock_microgrid import MockMicrogrid +from ....utils.component_data_wrapper import InverterDataWrapper +from ....utils.receive_timeout import Timeout, receive_timeout + +_PV_INVERTER_ID = 8 + + +class TestPVInverterStatusTracker: + """Tests for PVInverterStatusTracker.""" + + async def test_status_changes(self, mocker: MockerFixture) -> None: + """Test that the status changes as expected.""" + mock_microgrid = MockMicrogrid(grid_meter=True, mocker=mocker) + mock_microgrid.add_solar_inverters(1) + + status_channel = Broadcast[ComponentStatus](name="pv_inverter_status") + set_power_result_channel = Broadcast[SetPowerResult](name="set_power_result") + set_power_result_sender = set_power_result_channel.new_sender() + + async with ( + mock_microgrid, + PVInverterStatusTracker( + component_id=_PV_INVERTER_ID, + max_data_age=timedelta(seconds=0.2), + max_blocking_duration=timedelta(seconds=1), + status_sender=status_channel.new_sender(), + set_power_result_receiver=set_power_result_channel.new_receiver(), + ), + ): + status_receiver = status_channel.new_receiver() + # The status is initially not working. + assert ( + await status_receiver.receive() + ).value == ComponentStatusEnum.NOT_WORKING + + # When there's healthy inverter data, status should be working. + await mock_microgrid.mock_client.send( + InverterDataWrapper( + _PV_INVERTER_ID, + datetime.now(tz=timezone.utc), + active_power=0.0, + _component_state=PbInverterComponentState.COMPONENT_STATE_IDLE, + ) + ) + assert await receive_timeout(status_receiver) == ComponentStatus( + _PV_INVERTER_ID, ComponentStatusEnum.WORKING + ) + + # When it is discharging, there should be no change in status + await mock_microgrid.mock_client.send( + InverterDataWrapper( + _PV_INVERTER_ID, + datetime.now(tz=timezone.utc), + active_power=0.0, + _component_state=PbInverterComponentState.COMPONENT_STATE_DISCHARGING, + ) + ) + assert await receive_timeout(status_receiver) is Timeout + + # When there an error message, status should be not working + await mock_microgrid.mock_client.send( + InverterDataWrapper( + _PV_INVERTER_ID, + datetime.now(tz=timezone.utc), + active_power=0.0, + _component_state=PbInverterComponentState.COMPONENT_STATE_ERROR, + ) + ) + assert await receive_timeout(status_receiver) == ComponentStatus( + _PV_INVERTER_ID, ComponentStatusEnum.NOT_WORKING + ) + + # Get it back to working again + await mock_microgrid.mock_client.send( + InverterDataWrapper( + _PV_INVERTER_ID, + datetime.now(tz=timezone.utc), + active_power=0.0, + _component_state=PbInverterComponentState.COMPONENT_STATE_IDLE, + ) + ) + assert await receive_timeout(status_receiver) == ComponentStatus( + _PV_INVERTER_ID, ComponentStatusEnum.WORKING + ) + + # When there's no new data, status should be not working + assert await receive_timeout(status_receiver, 0.1) is Timeout + assert await receive_timeout(status_receiver, 0.2) == ComponentStatus( + _PV_INVERTER_ID, ComponentStatusEnum.NOT_WORKING + ) + + # Get it back to working again + await mock_microgrid.mock_client.send( + InverterDataWrapper( + _PV_INVERTER_ID, + datetime.now(tz=timezone.utc), + active_power=0.0, + _component_state=PbInverterComponentState.COMPONENT_STATE_IDLE, + ) + ) + assert await receive_timeout(status_receiver) == ComponentStatus( + _PV_INVERTER_ID, ComponentStatusEnum.WORKING + ) + + async def keep_sending_healthy_message() -> None: + """Keep sending healthy messages.""" + while True: + await mock_microgrid.mock_client.send( + InverterDataWrapper( + _PV_INVERTER_ID, + datetime.now(tz=timezone.utc), + active_power=0.0, + _component_state=PbInverterComponentState.COMPONENT_STATE_IDLE, + ) + ) + await asyncio.sleep(0.1) + + _keep_sending_healthy_message_task = asyncio.create_task( + keep_sending_healthy_message() + ) + # when there's a PowerDistributor failure for the component, status should + # become uncertain. + await set_power_result_sender.send( + SetPowerResult( + succeeded=set(), + failed={_PV_INVERTER_ID}, + ) + ) + assert await receive_timeout(status_receiver) == ComponentStatus( + _PV_INVERTER_ID, ComponentStatusEnum.UNCERTAIN + ) + + # After the blocking duration, it should become working again. + assert await receive_timeout(status_receiver) is Timeout + assert await receive_timeout(status_receiver, 1.0) == ComponentStatus( + _PV_INVERTER_ID, ComponentStatusEnum.WORKING + ) + await cancel_and_await(_keep_sending_healthy_message_task) diff --git a/tests/actor/power_distributing/test_power_distributing.py b/tests/actor/power_distributing/test_power_distributing.py index 498ebf8b4..761b9f5b1 100644 --- a/tests/actor/power_distributing/test_power_distributing.py +++ b/tests/actor/power_distributing/test_power_distributing.py @@ -108,6 +108,7 @@ async def test_constructor_with_grid_meter(self, mocker: MockerFixture) -> None: ) async with PowerDistributingActor( component_category=ComponentCategory.BATTERY, + component_type=None, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -139,6 +140,7 @@ async def test_constructor_without_grid_meter(self, mocker: MockerFixture) -> No ) async with PowerDistributingActor( component_category=ComponentCategory.BATTERY, + component_type=None, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -204,6 +206,7 @@ async def test_power_distributor_one_user(self, mocker: MockerFixture) -> None: battery_status_channel = Broadcast[ComponentPoolStatus](name="battery_status") async with PowerDistributingActor( component_category=ComponentCategory.BATTERY, + component_type=None, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -264,6 +267,7 @@ async def test_power_distributor_exclusion_bounds( ) async with PowerDistributingActor( component_category=ComponentCategory.BATTERY, + component_type=None, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -366,6 +370,7 @@ async def test_two_batteries_one_inverters(self, mocker: MockerFixture) -> None: async with PowerDistributingActor( component_category=ComponentCategory.BATTERY, + component_type=None, requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), @@ -445,6 +450,7 @@ async def test_two_batteries_one_broken_one_inverters( async with PowerDistributingActor( component_category=ComponentCategory.BATTERY, + component_type=None, requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), @@ -500,6 +506,7 @@ async def test_battery_two_inverters(self, mocker: MockerFixture) -> None: async with PowerDistributingActor( component_category=ComponentCategory.BATTERY, + component_type=None, requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), @@ -551,6 +558,7 @@ async def test_two_batteries_three_inverters(self, mocker: MockerFixture) -> Non async with PowerDistributingActor( component_category=ComponentCategory.BATTERY, + component_type=None, requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), @@ -636,6 +644,7 @@ async def test_two_batteries_one_inverter_different_exclusion_bounds_2( async with PowerDistributingActor( component_category=ComponentCategory.BATTERY, + component_type=None, requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), @@ -722,6 +731,7 @@ async def test_two_batteries_one_inverter_different_exclusion_bounds( async with PowerDistributingActor( component_category=ComponentCategory.BATTERY, + component_type=None, requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), @@ -787,6 +797,7 @@ async def test_connected_but_not_requested_batteries( async with PowerDistributingActor( component_category=ComponentCategory.BATTERY, + component_type=None, requests_receiver=requests_channel.new_receiver(), component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), @@ -844,6 +855,7 @@ async def test_battery_soc_nan(self, mocker: MockerFixture) -> None: ) async with PowerDistributingActor( component_category=ComponentCategory.BATTERY, + component_type=None, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -898,6 +910,7 @@ async def test_battery_capacity_nan(self, mocker: MockerFixture) -> None: ) async with PowerDistributingActor( component_category=ComponentCategory.BATTERY, + component_type=None, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -971,6 +984,7 @@ async def test_battery_power_bounds_nan(self, mocker: MockerFixture) -> None: ) async with PowerDistributingActor( component_category=ComponentCategory.BATTERY, + component_type=None, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -1016,6 +1030,7 @@ async def test_power_distributor_invalid_battery_id( ) async with PowerDistributingActor( component_category=ComponentCategory.BATTERY, + component_type=None, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -1060,6 +1075,7 @@ async def test_power_distributor_one_user_adjust_power_consume( ) async with PowerDistributingActor( component_category=ComponentCategory.BATTERY, + component_type=None, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -1106,6 +1122,7 @@ async def test_power_distributor_one_user_adjust_power_supply( ) async with PowerDistributingActor( component_category=ComponentCategory.BATTERY, + component_type=None, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -1152,6 +1169,7 @@ async def test_power_distributor_one_user_adjust_power_success( ) async with PowerDistributingActor( component_category=ComponentCategory.BATTERY, + component_type=None, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -1191,6 +1209,7 @@ async def test_not_all_batteries_are_working(self, mocker: MockerFixture) -> Non ) async with PowerDistributingActor( component_category=ComponentCategory.BATTERY, + component_type=None, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), @@ -1244,6 +1263,7 @@ async def test_partial_failure_result(self, mocker: MockerFixture) -> None: ) async with PowerDistributingActor( component_category=ComponentCategory.BATTERY, + component_type=None, requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), component_pool_status_sender=battery_status_channel.new_sender(), diff --git a/tests/timeseries/_formula_engine/test_formula_composition.py b/tests/timeseries/_formula_engine/test_formula_composition.py index 5c0d13d63..9391154c7 100644 --- a/tests/timeseries/_formula_engine/test_formula_composition.py +++ b/tests/timeseries/_formula_engine/test_formula_composition.py @@ -36,9 +36,10 @@ async def test_formula_composition( # pylint: disable=too-many-locals stack.push_async_callback(logical_meter.stop) battery_pool = microgrid.battery_pool() - stack.push_async_callback( - battery_pool._battery_pool.stop # pylint: disable=protected-access - ) + stack.push_async_callback(battery_pool.stop) + + pv_pool = microgrid.pv_pool() + stack.push_async_callback(pv_pool.stop) grid = microgrid.grid() stack.push_async_callback(grid.stop) @@ -51,9 +52,9 @@ async def test_formula_composition( # pylint: disable=too-many-locals ) grid_power_recv = grid.power.new_receiver() battery_power_recv = battery_pool.power.new_receiver() - pv_power_recv = logical_meter.pv_power.new_receiver() + pv_power_recv = pv_pool.power.new_receiver() - engine = (logical_meter.pv_power + battery_pool.power).build("inv_power") + engine = (pv_pool.power + battery_pool.power).build("inv_power") stack.push_async_callback(engine._stop) # pylint: disable=protected-access inv_calc_recv = engine.new_receiver() @@ -62,6 +63,7 @@ async def test_formula_composition( # pylint: disable=too-many-locals await mockgrid.mock_resampler.send_meter_power( [100.0, 10.0, 12.0, 14.0, -100.0, -200.0] ) + await mockgrid.mock_resampler.send_pv_inverter_power([-100.0, -200.0]) grid_pow = await grid_power_recv.receive() pv_pow = await pv_power_recv.receive() @@ -112,16 +114,17 @@ async def test_formula_composition_missing_pv(self, mocker: MockerFixture) -> No count = 0 async with mockgrid, AsyncExitStack() as stack: battery_pool = microgrid.battery_pool() - stack.push_async_callback( - battery_pool._battery_pool.stop # pylint: disable=protected-access - ) + stack.push_async_callback(battery_pool.stop) + + pv_pool = microgrid.pv_pool() + stack.push_async_callback(pv_pool.stop) logical_meter = microgrid.logical_meter() stack.push_async_callback(logical_meter.stop) battery_power_recv = battery_pool.power.new_receiver() - pv_power_recv = logical_meter.pv_power.new_receiver() - engine = (logical_meter.pv_power + battery_pool.power).build("inv_power") + pv_power_recv = pv_pool.power.new_receiver() + engine = (pv_pool.power + battery_pool.power).build("inv_power") stack.push_async_callback(engine._stop) # pylint: disable=protected-access inv_calc_recv = engine.new_receiver() @@ -153,21 +156,23 @@ async def test_formula_composition_missing_bat(self, mocker: MockerFixture) -> N count = 0 async with mockgrid, AsyncExitStack() as stack: battery_pool = microgrid.battery_pool() - stack.push_async_callback( - battery_pool._battery_pool.stop # pylint: disable=protected-access - ) + stack.push_async_callback(battery_pool.stop) + + pv_pool = microgrid.pv_pool() + stack.push_async_callback(pv_pool.stop) + logical_meter = microgrid.logical_meter() stack.push_async_callback(logical_meter.stop) battery_power_recv = battery_pool.power.new_receiver() - pv_power_recv = logical_meter.pv_power.new_receiver() - engine = (logical_meter.pv_power + battery_pool.power).build("inv_power") + pv_power_recv = pv_pool.power.new_receiver() + engine = (pv_pool.power + battery_pool.power).build("inv_power") stack.push_async_callback(engine._stop) # pylint: disable=protected-access inv_calc_recv = engine.new_receiver() for _ in range(10): - await mockgrid.mock_resampler.send_meter_power( + await mockgrid.mock_resampler.send_pv_inverter_power( [12.0 + count, 14.0 + count] ) await mockgrid.mock_resampler.send_non_existing_component_value() diff --git a/tests/timeseries/_pv_pool/__init__.py b/tests/timeseries/_pv_pool/__init__.py new file mode 100644 index 000000000..c6a581f17 --- /dev/null +++ b/tests/timeseries/_pv_pool/__init__.py @@ -0,0 +1,4 @@ +# License: MIT +# Copyright © 2023 Frequenz Energy-as-a-Service GmbH + +"""Test the PV pool control methods.""" diff --git a/tests/timeseries/_pv_pool/test_pv_pool_control_methods.py b/tests/timeseries/_pv_pool/test_pv_pool_control_methods.py new file mode 100644 index 000000000..8647c0028 --- /dev/null +++ b/tests/timeseries/_pv_pool/test_pv_pool_control_methods.py @@ -0,0 +1,262 @@ +# License: MIT +# Copyright © 2023 Frequenz Energy-as-a-Service GmbH + +"""Test the PV pool control methods.""" + +import asyncio +import typing +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock + +import pytest + +# pylint: disable=no-name-in-module +from frequenz.api.microgrid.inverter_pb2 import ComponentState + +# pylint: enable=no-name-in-module +from frequenz.channels import Receiver +from pytest_mock import MockerFixture + +from frequenz.sdk import microgrid +from frequenz.sdk.actor import ResamplerConfig, power_distributing +from frequenz.sdk.microgrid._data_pipeline import _DataPipeline +from frequenz.sdk.timeseries import Power +from frequenz.sdk.timeseries.pv_pool import PVPoolReport + +from ...microgrid.fixtures import _Mocks +from ...utils.component_data_streamer import MockComponentDataStreamer +from ...utils.component_data_wrapper import InverterDataWrapper +from ..mock_microgrid import MockMicrogrid + + +@pytest.fixture +async def mocks(mocker: MockerFixture) -> typing.AsyncIterator[_Mocks]: + """Create the mocks.""" + mockgrid = MockMicrogrid(grid_meter=True) + mockgrid.add_solar_inverters(4) + await mockgrid.start(mocker) + + # pylint: disable=protected-access + if microgrid._data_pipeline._DATA_PIPELINE is not None: + microgrid._data_pipeline._DATA_PIPELINE = None + await microgrid._data_pipeline.initialize( + ResamplerConfig(resampling_period=timedelta(seconds=0.1)) + ) + streamer = MockComponentDataStreamer(mockgrid.mock_client) + + dp = typing.cast(_DataPipeline, microgrid._data_pipeline._DATA_PIPELINE) + + yield _Mocks( + mockgrid, + streamer, + dp._ev_power_wrapper.status_channel.new_sender(), + ) + + +class TestPVPoolControl: + """Test control methods for the PVPool.""" + + async def _patch_data_pipeline(self, mocker: MockerFixture) -> None: + mocker.patch( + "frequenz.sdk.microgrid._data_pipeline._DATA_PIPELINE._pv_power_wrapper" + "._pd_wait_for_data_sec", + 0.1, + ) + + async def _init_pv_inverters(self, mocks: _Mocks) -> None: + now = datetime.now(tz=timezone.utc) + for idx, comp_id in enumerate(mocks.microgrid.pv_inverter_ids): + mocks.streamer.start_streaming( + InverterDataWrapper( + comp_id, + now, + _component_state=ComponentState.COMPONENT_STATE_IDLE, + active_power=0.0, + active_power_inclusion_lower_bound=-10000.0 * (idx + 1), + active_power_inclusion_upper_bound=0.0, + ), + 0.05, + ) + + async def _fail_pv_inverters(self, fail_ids: list[int], mocks: _Mocks) -> None: + now = datetime.now(tz=timezone.utc) + for idx, comp_id in enumerate(mocks.microgrid.pv_inverter_ids): + mocks.streamer.update_stream( + InverterDataWrapper( + comp_id, + now, + _component_state=( + ComponentState.COMPONENT_STATE_ERROR + if comp_id in fail_ids + else ComponentState.COMPONENT_STATE_IDLE + ), + active_power=0.0, + active_power_inclusion_lower_bound=-10000.0 * (idx + 1), + active_power_inclusion_upper_bound=0.0, + ), + ) + + def _assert_report( # pylint: disable=too-many-arguments + self, + report: PVPoolReport, + *, + power: float | None, + lower: float, + upper: float, + expected_result_pred: ( + typing.Callable[[power_distributing.Result], bool] | None + ) = None, + ) -> None: + assert report.target_power == ( + Power.from_watts(power) if power is not None else None + ) + assert report.bounds is not None + assert report.bounds.lower == Power.from_watts(lower) + assert report.bounds.upper == Power.from_watts(upper) + if expected_result_pred is not None: + assert report.distribution_result is not None + assert expected_result_pred(report.distribution_result) + + async def _recv_reports_until( + self, + bounds_rx: Receiver[PVPoolReport], + check: typing.Callable[[PVPoolReport], bool], + ) -> None: + """Receive reports until the given condition is met.""" + max_reports = 10 + ctr = 0 + while ctr < max_reports: + ctr += 1 + report = await bounds_rx.receive() + if check(report): + break + + async def test_setting_power( + self, + mocks: _Mocks, + mocker: MockerFixture, + ) -> None: + """Test setting power.""" + set_power = typing.cast( + AsyncMock, microgrid.connection_manager.get().api_client.set_power + ) + + await self._init_pv_inverters(mocks) + await self._patch_data_pipeline(mocker) + pv_pool = microgrid.pv_pool() + bounds_rx = pv_pool.power_status.new_receiver() + await self._recv_reports_until( + bounds_rx, + lambda x: x.bounds is not None and x.bounds.lower.as_watts() == -100000.0, + ) + self._assert_report( + await bounds_rx.receive(), power=None, lower=-100000.0, upper=0.0 + ) + await pv_pool.propose_power(Power.from_watts(-80000.0)) + await self._recv_reports_until( + bounds_rx, + lambda x: x.target_power is not None + and x.target_power.as_watts() == -80000.0, + ) + self._assert_report( + await bounds_rx.receive(), power=-80000.0, lower=-100000.0, upper=0.0 + ) + await asyncio.sleep(0.0) + + # Components are set initial power + assert set_power.call_count == 4 + inv_ids = mocks.microgrid.pv_inverter_ids + assert sorted(set_power.call_args_list, key=lambda x: x.args[0]) == [ + mocker.call(inv_ids[0], -10000.0), + mocker.call(inv_ids[1], -20000.0), + mocker.call(inv_ids[2], -25000.0), + mocker.call(inv_ids[3], -25000.0), + ] + + set_power.reset_mock() + await pv_pool.propose_power(Power.from_watts(-4000.0)) + await self._recv_reports_until( + bounds_rx, + lambda x: x.target_power is not None + and x.target_power.as_watts() == -4000.0, + ) + self._assert_report( + await bounds_rx.receive(), power=-4000.0, lower=-100000.0, upper=0.0 + ) + await asyncio.sleep(0.0) + + # Components are set initial power + assert set_power.call_count == 4 + inv_ids = mocks.microgrid.pv_inverter_ids + assert sorted(set_power.call_args_list, key=lambda x: x.args[0]) == [ + mocker.call(inv_ids[0], -1000.0), + mocker.call(inv_ids[1], -1000.0), + mocker.call(inv_ids[2], -1000.0), + mocker.call(inv_ids[3], -1000.0), + ] + + # After failing 1 inverter, bounds should go down and power shouldn't be + # distributed to that inverter. + await self._fail_pv_inverters([inv_ids[1]], mocks) + await self._recv_reports_until( + bounds_rx, + lambda x: x.bounds is not None and x.bounds.lower.as_watts() == -80000.0, + ) + self._assert_report( + await bounds_rx.receive(), power=-4000.0, lower=-80000.0, upper=0.0 + ) + + set_power.reset_mock() + await pv_pool.propose_power(Power.from_watts(-70000.0)) + await self._recv_reports_until( + bounds_rx, + lambda x: x.target_power is not None + and x.target_power.as_watts() == -70000.0, + ) + + self._assert_report( + await bounds_rx.receive(), power=-70000.0, lower=-80000.0, upper=0.0 + ) + await asyncio.sleep(0.0) + + # Components are set initial power + assert set_power.call_count == 3 + inv_ids = mocks.microgrid.pv_inverter_ids + assert sorted(set_power.call_args_list, key=lambda x: x.args[0]) == [ + mocker.call(inv_ids[0], -10000.0), + mocker.call(inv_ids[2], -30000.0), + mocker.call(inv_ids[3], -30000.0), + ] + + # After the failed inverter recovers, bounds should go back up and power + # should be distributed to all inverters + await self._fail_pv_inverters([], mocks) + await self._recv_reports_until( + bounds_rx, + lambda x: x.bounds is not None and x.bounds.lower.as_watts() == -100000.0, + ) + self._assert_report( + await bounds_rx.receive(), power=-70000.0, lower=-100000.0, upper=0.0 + ) + + set_power.reset_mock() + await pv_pool.propose_power(Power.from_watts(-90000.0)) + await self._recv_reports_until( + bounds_rx, + lambda x: x.target_power is not None + and x.target_power.as_watts() == -90000.0, + ) + + self._assert_report( + await bounds_rx.receive(), power=-90000.0, lower=-100000.0, upper=0.0 + ) + await asyncio.sleep(0.0) + + assert set_power.call_count == 4 + inv_ids = mocks.microgrid.pv_inverter_ids + assert sorted(set_power.call_args_list, key=lambda x: x.args[0]) == [ + mocker.call(inv_ids[0], -10000.0), + mocker.call(inv_ids[1], -20000.0), + mocker.call(inv_ids[2], -30000.0), + mocker.call(inv_ids[3], -30000.0), + ] diff --git a/tests/timeseries/test_formula_formatter.py b/tests/timeseries/test_formula_formatter.py index a5688439a..f1a0c43a8 100644 --- a/tests/timeseries/test_formula_formatter.py +++ b/tests/timeseries/test_formula_formatter.py @@ -123,15 +123,16 @@ async def test_higher_order_formula(self, mocker: MockerFixture) -> None: logical_meter = microgrid.logical_meter() stack.push_async_callback(logical_meter.stop) + pv_pool = microgrid.pv_pool() + stack.push_async_callback(pv_pool.stop) + grid = microgrid.grid() stack.push_async_callback(grid.stop) assert str(grid.power) == "#36 + #7 + #47 + #17 + #57 + #27" - composed_formula = (grid.power - logical_meter.pv_power).build( - "grid_minus_pv" - ) + composed_formula = (grid.power - pv_pool.power).build("grid_minus_pv") assert ( str(composed_formula) - == "[grid-power](#36 + #7 + #47 + #17 + #57 + #27) - [pv-power](#57 + #47)" + == "[grid-power](#36 + #7 + #47 + #17 + #57 + #27) - [pv-power](#48 + #58)" ) diff --git a/tests/timeseries/test_logical_meter.py b/tests/timeseries/test_logical_meter.py index 4183b1934..594616baf 100644 --- a/tests/timeseries/test_logical_meter.py +++ b/tests/timeseries/test_logical_meter.py @@ -44,11 +44,11 @@ async def test_pv_power(self, mocker: MockerFixture) -> None: mockgrid.add_solar_inverters(2) async with mockgrid, AsyncExitStack() as stack: - logical_meter = microgrid.logical_meter() - stack.push_async_callback(logical_meter.stop) - pv_power_receiver = logical_meter.pv_power.new_receiver() + pv_pool = microgrid.pv_pool() + stack.push_async_callback(pv_pool.stop) + pv_power_receiver = pv_pool.power.new_receiver() - await mockgrid.mock_resampler.send_meter_power([-1.0, -2.0]) + await mockgrid.mock_resampler.send_pv_inverter_power([-1.0, -2.0]) assert (await pv_power_receiver.receive()).value == Power.from_watts(-3.0) async def test_pv_power_no_meter(self, mocker: MockerFixture) -> None: @@ -57,9 +57,9 @@ async def test_pv_power_no_meter(self, mocker: MockerFixture) -> None: mockgrid.add_solar_inverters(2, no_meter=True) async with mockgrid, AsyncExitStack() as stack: - logical_meter = microgrid.logical_meter() - stack.push_async_callback(logical_meter.stop) - pv_power_receiver = logical_meter.pv_power.new_receiver() + pv_pool = microgrid.pv_pool() + stack.push_async_callback(pv_pool.stop) + pv_power_receiver = pv_pool.power.new_receiver() await mockgrid.mock_resampler.send_pv_inverter_power([-1.0, -2.0]) assert (await pv_power_receiver.receive()).value == Power.from_watts(-3.0) @@ -70,9 +70,9 @@ async def test_pv_power_no_pv_components(self, mocker: MockerFixture) -> None: MockMicrogrid(grid_meter=True, mocker=mocker) as mockgrid, AsyncExitStack() as stack, ): - logical_meter = microgrid.logical_meter() - stack.push_async_callback(logical_meter.stop) - pv_power_receiver = logical_meter.pv_power.new_receiver() + pv_pool = microgrid.pv_pool() + stack.push_async_callback(pv_pool.stop) + pv_power_receiver = pv_pool.power.new_receiver() await mockgrid.mock_resampler.send_non_existing_component_value() assert (await pv_power_receiver.receive()).value == Power.zero()