diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 1d5e31867..b48d83c2f 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,7 +6,17 @@ ## Upgrading -- The `BatteryPool.power_status` method now streams objects of type `BatteryPoolReport`. They're identical to the previous `Report` objects, except for the name of the class. +- The `BatteryPool.power_status` method now streams objects of type `BatteryPoolReport`, replacing the previous `Report` objects. + +- In `BatteryPoolReport.distribution_result`, + * the following fields have been renamed: + + `Result.succeeded_batteries` → `Result.succeeded_components` + + `Result.failed_batteries` → `Result.failed_components` + + `Request.batteries` → `Request.component_ids` + * and the following fields are now type-hinted as `collections.abc.Set`, to clearly indicate that they are read-only: + + `Result.succeeded_components` + + `Result.failed_components` + ## New Features diff --git a/benchmarks/power_distribution/power_distributor.py b/benchmarks/power_distribution/power_distributor.py index 8d3a97115..935d5c1d8 100644 --- a/benchmarks/power_distribution/power_distributor.py +++ b/benchmarks/power_distribution/power_distributor.py @@ -117,7 +117,7 @@ async def run_test( # pylint: disable=too-many-locals async with PowerDistributingActor( requests_receiver=power_request_channel.new_receiver(), results_sender=power_result_channel.new_sender(), - battery_status_sender=battery_status_channel.new_sender(), + component_pool_status_sender=battery_status_channel.new_sender(), ): tasks: list[Coroutine[Any, Any, list[Result]]] = [] tasks.append(send_requests(batteries, num_requests)) diff --git a/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py b/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py index 3e9b50419..ceb3e68cd 100644 --- a/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py +++ b/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py @@ -171,7 +171,7 @@ async def _send_updated_target_power( await self._power_distributing_requests_sender.send( power_distributing.Request( power=target_power, - batteries=component_ids, + component_ids=component_ids, request_timeout=request_timeout, adjust_power=True, ) @@ -229,10 +229,12 @@ async def _run(self) -> None: ) result = selected.value - self._distribution_results[frozenset(result.request.batteries)] = result + self._distribution_results[ + frozenset(result.request.component_ids) + ] = result match result: case power_distributing.PartialFailure(request): await self._send_updated_target_power( - frozenset(request.batteries), None, must_send=True + frozenset(request.component_ids), None, must_send=True ) - await self._send_reports(frozenset(result.request.batteries)) + await self._send_reports(frozenset(result.request.component_ids)) diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/__init__.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/__init__.py new file mode 100644 index 000000000..d9cb07266 --- /dev/null +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/__init__.py @@ -0,0 +1,12 @@ +# License: MIT +# Copyright © 2023 Frequenz Energy-as-a-Service GmbH + +"""Interfaces for the power distributing actor with different component types.""" + +from ._battery_manager import BatteryManager +from ._component_manager import ComponentManager + +__all__ = [ + "BatteryManager", + "ComponentManager", +] diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py new file mode 100644 index 000000000..82941bba0 --- /dev/null +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py @@ -0,0 +1,670 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Manage batteries and inverters for the power distributor.""" + +import asyncio +import collections.abc +import logging +import math +import typing +from datetime import timedelta + +import grpc +from frequenz.channels import Peekable, Receiver, Sender + +from .... import microgrid +from ...._internal._math import is_close_to_zero +from ....microgrid import connection_manager +from ....microgrid.component import BatteryData, ComponentCategory, InverterData +from ....timeseries._quantities import Power +from .._battery_status_tracker import BatteryStatusTracker +from .._component_pool_status_tracker import ComponentPoolStatusTracker +from .._component_status import ComponentPoolStatus +from .._distribution_algorithm import ( + AggregatedBatteryData, + BatteryDistributionAlgorithm, + DistributionResult, + InvBatPair, +) +from ..request import Request +from ..result import Error, OutOfBounds, PartialFailure, PowerBounds, Result, Success +from ._component_manager import ComponentManager + +_logger = logging.getLogger(__name__) + + +def _get_all_from_map( + source: dict[int, frozenset[int]], keys: collections.abc.Set[int] +) -> set[int]: + """Get all values for the given keys from the given map. + + Args: + source: map to get values from. + keys: keys to get values for. + + Returns: + Set of values for the given keys. + """ + return set().union(*[source[key] for key in keys]) + + +def _get_battery_inverter_mappings( + battery_ids: collections.abc.Set[int], + *, # force keyword arguments + inv_bats: bool = True, + bat_bats: bool = True, + inv_invs: bool = True, +) -> dict[str, dict[int, frozenset[int]]]: + """Create maps between battery and adjacent inverters. + + Args: + battery_ids: set of battery ids + inv_bats: whether to create the inverter to batteries map + bat_bats: whether to create the battery to batteries map + inv_invs: whether to create the inverter to inverters map + + Returns: + a dict of the requested maps, using the following keys: + * "bat_invs": battery to inverters map + * "inv_bats": inverter to batteries map + * "bat_bats": battery to batteries map + * "inv_invs": inverter to inverters map + """ + bat_invs_map: dict[int, set[int]] = {} + inv_bats_map: dict[int, set[int]] | None = {} if inv_bats else None + bat_bats_map: dict[int, set[int]] | None = {} if bat_bats else None + inv_invs_map: dict[int, set[int]] | None = {} if inv_invs else None + component_graph = connection_manager.get().component_graph + + for battery_id in battery_ids: + inverters: set[int] = set( + component.component_id + for component in component_graph.predecessors(battery_id) + if component.category == ComponentCategory.INVERTER + ) + + if len(inverters) == 0: + _logger.error("No inverters for battery %d", battery_id) + continue + + bat_invs_map[battery_id] = inverters + if bat_bats_map is not None: + bat_bats_map.setdefault(battery_id, set()).update( + set( + component.component_id + for inverter in inverters + for component in component_graph.successors(inverter) + ) + ) + + for inverter in inverters: + if inv_bats_map is not None: + inv_bats_map.setdefault(inverter, set()).add(battery_id) + if inv_invs_map is not None: + inv_invs_map.setdefault(inverter, set()).update(bat_invs_map) + + mapping: dict[str, dict[int, frozenset[int]]] = {} + + # Convert sets to frozensets to make them hashable. + def _add(key: str, value: dict[int, set[int]] | None) -> None: + if value is not None: + mapping[key] = {k: frozenset(v) for k, v in value.items()} + + _add("bat_invs", bat_invs_map) + _add("inv_bats", inv_bats_map) + _add("bat_bats", bat_bats_map) + _add("inv_invs", inv_invs_map) + + return mapping + + +class BatteryManager(ComponentManager): + """Class to manage the data streams for batteries.""" + + def __init__( + self, + component_pool_status_sender: Sender[ComponentPoolStatus], + ): + """Initialize the battery data manager.""" + self._batteries = connection_manager.get().component_graph.components( + component_category={ComponentCategory.BATTERY} + ) + self._battery_ids = {battery.component_id for battery in self._batteries} + + maps = _get_battery_inverter_mappings(self._battery_ids) + + self._bat_invs_map = maps["bat_invs"] + self._inv_bats_map = maps["inv_bats"] + self._bat_bats_map = maps["bat_bats"] + self._inv_invs_map = maps["inv_invs"] + + self._battery_receivers: dict[int, Peekable[BatteryData]] = {} + self._inverter_receivers: dict[int, Peekable[InverterData]] = {} + + self._component_pool_status_tracker = ComponentPoolStatusTracker( + component_ids=set(self._battery_ids), + component_status_sender=component_pool_status_sender, + max_blocking_duration_sec=30.0, + max_data_age_sec=10.0, + component_status_tracker_type=BatteryStatusTracker, + ) + + # NOTE: power_distributor_exponent should be received from ConfigManager + self._power_distributor_exponent: float = 1.0 + """The exponent for the power distribution algorithm. + + The exponent determines how fast the batteries should strive to the + equal SoC level. + """ + + self._distribution_algorithm = BatteryDistributionAlgorithm( + self._power_distributor_exponent + ) + """The distribution algorithm used to distribute power between batteries.""" + + def component_ids(self) -> collections.abc.Set[int]: + """Return the set of component ids.""" + return self._battery_ids + + async def start(self) -> None: + """Start the battery data manager.""" + await self._create_channels() + + async def stop(self) -> None: + """Stop the battery data manager.""" + await self._component_pool_status_tracker.stop() + + async def distribute_power(self, request: Request) -> Result: + """Distribute the requested power to the components. + + Args: + request: Request to get the distribution for. + + Returns: + Result of the distribution. + """ + distribution_result = await self._get_distribution(request) + if not isinstance(distribution_result, DistributionResult): + return distribution_result + result = await self._distribute_power(request, distribution_result) + return result + + async def _get_distribution(self, request: Request) -> DistributionResult | Result: + """Get the distribution of the batteries. + + Args: + request: Request to get the distribution for. + + Returns: + Distribution of the batteries. + """ + try: + pairs_data: list[InvBatPair] = self._get_components_data( + request.component_ids + ) + except KeyError as err: + return Error(request=request, msg=str(err)) + + if not pairs_data: + error_msg = ( + "No data for at least one of the given " + f"batteries {str(request.component_ids)}" + ) + return Error(request=request, msg=str(error_msg)) + + error = self._check_request(request, pairs_data) + if error: + return error + + try: + distribution = self._get_power_distribution(request, pairs_data) + except ValueError as err: + _logger.exception("Couldn't distribute power") + error_msg = f"Couldn't distribute power, error: {str(err)}" + return Error(request=request, msg=str(error_msg)) + + return distribution + + async def _distribute_power( + self, request: Request, distribution: DistributionResult + ) -> Result: + """Set the distributed power to the batteries. + + Args: + request: Request to set the power for. + distribution: Distribution to set. + + Returns: + Result from the microgrid API. + """ + distributed_power_value = ( + request.power.as_watts() - distribution.remaining_power + ) + battery_distribution: dict[int, float] = {} + for inverter_id, dist in distribution.distribution.items(): + for battery_id in self._inv_bats_map[inverter_id]: + battery_distribution[battery_id] = ( + battery_distribution.get(battery_id, 0.0) + dist + ) + _logger.debug( + "Distributing power %d between the batteries %s", + distributed_power_value, + str(battery_distribution), + ) + + failed_power, failed_batteries = await self._set_distributed_power( + distribution, request.request_timeout + ) + + response: Success | PartialFailure + if len(failed_batteries) > 0: + succeed_batteries = set(battery_distribution.keys()) - failed_batteries + response = PartialFailure( + request=request, + succeeded_power=Power.from_watts( + distributed_power_value - failed_power + ), + succeeded_components=succeed_batteries, + failed_power=Power.from_watts(failed_power), + failed_components=failed_batteries, + excess_power=Power.from_watts(distribution.remaining_power), + ) + else: + succeed_batteries = set(battery_distribution.keys()) + response = Success( + request=request, + succeeded_power=Power.from_watts(distributed_power_value), + succeeded_components=succeed_batteries, + excess_power=Power.from_watts(distribution.remaining_power), + ) + + await asyncio.gather( + *[ + self._component_pool_status_tracker.update_status( + succeed_batteries, failed_batteries + ), + ] + ) + + return response + + async def _create_channels(self) -> None: + """Create channels to get data of components in microgrid.""" + api = connection_manager.get().api_client + for battery_id, inverter_ids in self._bat_invs_map.items(): + bat_recv: Receiver[BatteryData] = await api.battery_data(battery_id) + self._battery_receivers[battery_id] = bat_recv.into_peekable() + + for inverter_id in inverter_ids: + inv_recv: Receiver[InverterData] = await api.inverter_data(inverter_id) + self._inverter_receivers[inverter_id] = inv_recv.into_peekable() + + def _get_bounds( + self, + pairs_data: list[InvBatPair], + ) -> PowerBounds: + """Get power bounds for given batteries. + + Args: + pairs_data: list of battery and adjacent inverter data pairs. + + Returns: + Power bounds for given batteries. + """ + return PowerBounds( + inclusion_lower=sum( + max( + battery.power_bounds.inclusion_lower, + sum( + inverter.active_power_inclusion_lower_bound + for inverter in inverters + ), + ) + for battery, inverters in pairs_data + ), + inclusion_upper=sum( + min( + battery.power_bounds.inclusion_upper, + sum( + inverter.active_power_inclusion_upper_bound + for inverter in inverters + ), + ) + for battery, inverters in pairs_data + ), + exclusion_lower=min( + sum(battery.power_bounds.exclusion_lower for battery, _ in pairs_data), + sum( + inverter.active_power_exclusion_lower_bound + for _, inverters in pairs_data + for inverter in inverters + ), + ), + exclusion_upper=max( + sum(battery.power_bounds.exclusion_upper for battery, _ in pairs_data), + sum( + inverter.active_power_exclusion_upper_bound + for _, inverters in pairs_data + for inverter in inverters + ), + ), + ) + + def _check_request( + self, + request: Request, + pairs_data: list[InvBatPair], + ) -> Result | None: + """Check whether the given request if correct. + + Args: + request: request to check + pairs_data: list of battery and adjacent inverter data pairs. + + Returns: + Result for the user if the request is wrong, None otherwise. + """ + if not request.component_ids: + return Error(request=request, msg="Empty battery IDs in the request") + + for battery in request.component_ids: + _logger.debug("Checking battery %d", battery) + if battery not in self._battery_receivers: + msg = ( + f"No battery {battery}, available batteries: " + f"{list(self._battery_receivers.keys())}" + ) + return Error(request=request, msg=msg) + + bounds = self._get_bounds(pairs_data) + + power = request.power.as_watts() + + # Zero power requests are always forwarded to the microgrid API, even if they + # are outside the exclusion bounds. + if is_close_to_zero(power): + return None + + if request.adjust_power: + # Automatic power adjustments can only bring down the requested power down + # to the inclusion bounds. + # + # If the requested power is in the exclusion bounds, it is NOT possible to + # increase it so that it is outside the exclusion bounds. + if bounds.exclusion_lower < power < bounds.exclusion_upper: + return OutOfBounds(request=request, bounds=bounds) + else: + in_lower_range = bounds.inclusion_lower <= power <= bounds.exclusion_lower + in_upper_range = bounds.exclusion_upper <= power <= bounds.inclusion_upper + if not (in_lower_range or in_upper_range): + return OutOfBounds(request=request, bounds=bounds) + + return None + + def _get_battery_inverter_data( + self, battery_ids: frozenset[int], inverter_ids: frozenset[int] + ) -> InvBatPair | None: + """Get battery and inverter data if they are correct. + + Each float data from the microgrid can be "NaN". + We can't do math operations on "NaN". + So check all the metrics and if any are "NaN" then return None. + + Args: + battery_ids: battery ids + inverter_ids: inverter ids + + Returns: + Data for the battery and adjacent inverter without NaN values. + Return None if we could not replace NaN values. + """ + battery_data_none = [ + self._battery_receivers[battery_id].peek() for battery_id in battery_ids + ] + inverter_data_none = [ + self._inverter_receivers[inverter_id].peek() for inverter_id in inverter_ids + ] + + # It means that nothing has been send on this channels, yet. + # This should be handled by BatteryStatus. BatteryStatus should not return + # this batteries as working. + if not all(battery_data_none) or not all(inverter_data_none): + _logger.error( + "Battery %s or inverter %s send no data, yet. They should be not used.", + battery_ids, + inverter_ids, + ) + return None + + battery_data = typing.cast(list[BatteryData], battery_data_none) + inverter_data = typing.cast(list[InverterData], inverter_data_none) + + DataType = typing.TypeVar("DataType", BatteryData, InverterData) + + def metric_is_nan(data: DataType, metrics: list[str]) -> bool: + """Check if non-replaceable metrics are NaN.""" + assert data is not None + return any(map(lambda metric: math.isnan(getattr(data, metric)), metrics)) + + def nan_metric_in_list(data: list[DataType], metrics: list[str]) -> bool: + """Check if any metric is NaN.""" + return any(map(lambda datum: metric_is_nan(datum, metrics), data)) + + crucial_metrics_bat = [ + "soc", + "soc_lower_bound", + "soc_upper_bound", + "capacity", + "power_inclusion_lower_bound", + "power_inclusion_upper_bound", + ] + + crucial_metrics_inv = [ + "active_power_inclusion_lower_bound", + "active_power_inclusion_upper_bound", + ] + + if nan_metric_in_list(battery_data, crucial_metrics_bat): + _logger.debug("Some metrics for battery set %s are NaN", list(battery_ids)) + return None + + if nan_metric_in_list(inverter_data, crucial_metrics_inv): + _logger.debug( + "Some metrics for inverter set %s are NaN", list(inverter_ids) + ) + return None + + return InvBatPair(AggregatedBatteryData(battery_data), inverter_data) + + def _get_components_data( + self, batteries: collections.abc.Set[int] + ) -> list[InvBatPair]: + """Get data for the given batteries and adjacent inverters. + + Args: + batteries: Batteries that needs data. + + Raises: + KeyError: If any battery in the given list doesn't exists in microgrid. + + Returns: + Pairs of battery and adjacent inverter data. + """ + pairs_data: list[InvBatPair] = [] + + working_batteries = self._component_pool_status_tracker.get_working_components( + batteries + ) + + for battery_id in working_batteries: + if battery_id not in self._battery_receivers: + raise KeyError( + f"No battery {battery_id}, " + f"available batteries: {list(self._battery_receivers.keys())}" + ) + + connected_inverters = _get_all_from_map(self._bat_invs_map, batteries) + + # Check to see if inverters are involved that are connected to batteries + # that were not requested. + batteries_from_inverters = _get_all_from_map( + self._inv_bats_map, connected_inverters + ) + + if batteries_from_inverters != batteries: + extra_batteries = batteries_from_inverters - batteries + raise KeyError( + f"Inverters {_get_all_from_map(self._bat_invs_map, extra_batteries)} " + f"are connected to batteries that were not requested: {extra_batteries}" + ) + + # set of set of batteries one for each working_battery + battery_sets: frozenset[frozenset[int]] = frozenset( + self._bat_bats_map[working_battery] for working_battery in working_batteries + ) + + for battery_ids in battery_sets: + inverter_ids: frozenset[int] = self._bat_invs_map[next(iter(battery_ids))] + + data = self._get_battery_inverter_data(battery_ids, inverter_ids) + if data is None: + _logger.warning( + "Skipping battery set %s because at least one of its messages isn't correct.", + list(battery_ids), + ) + continue + + assert len(data.inverter) > 0 + pairs_data.append(data) + return pairs_data + + def _get_power_distribution( + self, request: Request, inv_bat_pairs: list[InvBatPair] + ) -> DistributionResult: + """Get power distribution result for the batteries in the request. + + Args: + request: the power request to process. + inv_bat_pairs: the battery and adjacent inverter data pairs. + + Returns: + the power distribution result. + """ + available_bat_ids = _get_all_from_map( + self._bat_bats_map, {pair.battery.component_id for pair in inv_bat_pairs} + ) + + unavailable_bat_ids = request.component_ids - available_bat_ids + unavailable_inv_ids: set[int] = set() + + for inverter_ids in [ + self._bat_invs_map[battery_id_set] for battery_id_set in unavailable_bat_ids + ]: + unavailable_inv_ids = unavailable_inv_ids.union(inverter_ids) + + result = self._distribution_algorithm.distribute_power( + request.power.as_watts(), inv_bat_pairs + ) + + return result + + async def _set_distributed_power( + self, + distribution: DistributionResult, + timeout: timedelta, + ) -> tuple[float, set[int]]: + """Send distributed power to the inverters. + + Args: + distribution: Distribution result + timeout: How long wait for the response + + Returns: + Tuple where first element is total failed power, and the second element + set of batteries that failed. + """ + api = microgrid.connection_manager.get().api_client + + tasks = { + inverter_id: asyncio.create_task(api.set_power(inverter_id, power)) + for inverter_id, power in distribution.distribution.items() + } + + _, pending = await asyncio.wait( + tasks.values(), + timeout=timeout.total_seconds(), + return_when=asyncio.ALL_COMPLETED, + ) + + await self._cancel_tasks(pending) + + return self._parse_result(tasks, distribution.distribution, timeout) + + def _parse_result( + self, + tasks: dict[int, asyncio.Task[None]], + distribution: dict[int, float], + request_timeout: timedelta, + ) -> tuple[float, set[int]]: + """Parse the results of `set_power` requests. + + Check if any task has failed and determine the reason for failure. + If any task did not succeed, then the corresponding battery is marked as broken. + + Args: + tasks: A dictionary where the key is the inverter ID and the value is the task that + set the power for this inverter. Each task should be finished or cancelled. + distribution: A dictionary where the key is the inverter ID and the value is how much + power was set to the corresponding inverter. + request_timeout: The timeout that was used for the request. + + Returns: + A tuple where the first element is the total failed power, and the second element is + the set of batteries that failed. + """ + failed_power: float = 0.0 + failed_batteries: set[int] = set() + + for inverter_id, aws in tasks.items(): + battery_ids = self._inv_bats_map[inverter_id] + try: + aws.result() + except grpc.aio.AioRpcError as err: + failed_power += distribution[inverter_id] + failed_batteries = failed_batteries.union(battery_ids) + if err.code() == grpc.StatusCode.OUT_OF_RANGE: + _logger.debug( + "Set power for battery %s failed, error %s", + battery_ids, + str(err), + ) + else: + _logger.warning( + "Set power for battery %s failed, error %s. Mark it as broken.", + battery_ids, + str(err), + ) + except asyncio.exceptions.CancelledError: + failed_power += distribution[inverter_id] + failed_batteries = failed_batteries.union(battery_ids) + _logger.warning( + "Battery %s didn't respond in %f sec. Mark it as broken.", + battery_ids, + request_timeout.total_seconds(), + ) + + return failed_power, failed_batteries + + async def _cancel_tasks( + self, tasks: collections.abc.Iterable[asyncio.Task[typing.Any]] + ) -> None: + """Cancel given asyncio tasks and wait for them. + + Args: + tasks: tasks to cancel. + """ + for aws in tasks: + aws.cancel() + + await asyncio.gather(*tasks, return_exceptions=True) diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_component_manager.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_component_manager.py new file mode 100644 index 000000000..372583c0a --- /dev/null +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_component_manager.py @@ -0,0 +1,52 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Manage batteries and inverters for the power distributor.""" + +import abc +import collections.abc + +from frequenz.channels import Sender + +from .._component_status import ComponentPoolStatus +from ..request import Request +from ..result import Result + + +class ComponentManager(abc.ABC): + """Abstract class to manage the data streams for components.""" + + @abc.abstractmethod + def __init__( + self, + component_pool_status_sender: Sender[ComponentPoolStatus], + ): + """Initialize the component data manager. + + Args: + component_pool_status_sender: Channel for sending information about which + components are expected to be working. + """ + + @abc.abstractmethod + def component_ids(self) -> collections.abc.Set[int]: + """Return the set of component ids.""" + + @abc.abstractmethod + async def start(self) -> None: + """Start the component data manager.""" + + @abc.abstractmethod + async def distribute_power(self, request: Request) -> Result: + """Distribute the requested power to the components. + + Args: + request: Request to get the distribution for. + + Returns: + Result of the distribution. + """ + + @abc.abstractmethod + async def stop(self) -> None: + """Stop the component data manager.""" diff --git a/src/frequenz/sdk/actor/power_distributing/_component_pool_status_tracker.py b/src/frequenz/sdk/actor/power_distributing/_component_pool_status_tracker.py index 567454e11..691902797 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_pool_status_tracker.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_pool_status_tracker.py @@ -117,6 +117,7 @@ async def _run(self) -> None: _logger.error( "ComponentPoolStatus failed with error: %s. Restarting.", err ) + await asyncio.sleep(1.0) async def _update_status(self) -> None: async for status in self._merged_status_receiver: diff --git a/src/frequenz/sdk/actor/power_distributing/_distribution_algorithm/__init__.py b/src/frequenz/sdk/actor/power_distributing/_distribution_algorithm/__init__.py index 9e22255e4..31794dd24 100644 --- a/src/frequenz/sdk/actor/power_distributing/_distribution_algorithm/__init__.py +++ b/src/frequenz/sdk/actor/power_distributing/_distribution_algorithm/__init__.py @@ -3,15 +3,15 @@ """Utilities to manage power in a microgrid.""" -from ._distribution_algorithm import ( +from ._battery_distribution_algorithm import ( AggregatedBatteryData, - DistributionAlgorithm, + BatteryDistributionAlgorithm, DistributionResult, InvBatPair, ) __all__ = [ - "DistributionAlgorithm", + "BatteryDistributionAlgorithm", "DistributionResult", "InvBatPair", "AggregatedBatteryData", diff --git a/src/frequenz/sdk/actor/power_distributing/_distribution_algorithm/_distribution_algorithm.py b/src/frequenz/sdk/actor/power_distributing/_distribution_algorithm/_battery_distribution_algorithm.py similarity index 99% rename from src/frequenz/sdk/actor/power_distributing/_distribution_algorithm/_distribution_algorithm.py rename to src/frequenz/sdk/actor/power_distributing/_distribution_algorithm/_battery_distribution_algorithm.py index 2b8ef8892..2132bc03c 100644 --- a/src/frequenz/sdk/actor/power_distributing/_distribution_algorithm/_distribution_algorithm.py +++ b/src/frequenz/sdk/actor/power_distributing/_distribution_algorithm/_battery_distribution_algorithm.py @@ -198,7 +198,7 @@ class DistributionResult: """The power which could not be distributed because of bounds.""" -class DistributionAlgorithm: +class BatteryDistributionAlgorithm: r"""Distribute power between many components. The purpose of this tool is to keep equal SoC level in the batteries. diff --git a/src/frequenz/sdk/actor/power_distributing/power_distributing.py b/src/frequenz/sdk/actor/power_distributing/power_distributing.py index 2f345fef9..7ac5772d1 100644 --- a/src/frequenz/sdk/actor/power_distributing/power_distributing.py +++ b/src/frequenz/sdk/actor/power_distributing/power_distributing.py @@ -13,127 +13,14 @@ import asyncio -import logging -from asyncio.tasks import ALL_COMPLETED -from collections import abc -from collections.abc import Iterable -from datetime import timedelta -from math import isnan -from typing import Any, TypeVar, cast -import grpc -from frequenz.channels import Peekable, Receiver, Sender +from frequenz.channels import Receiver, Sender -from frequenz.sdk.timeseries._quantities import Power - -from ..._internal._math import is_close_to_zero from ...actor._actor import Actor -from ...microgrid import connection_manager -from ...microgrid.client import MicrogridApiClient -from ...microgrid.component import ( - BatteryData, - Component, - ComponentCategory, - InverterData, -) -from ._battery_status_tracker import BatteryStatusTracker -from ._component_pool_status_tracker import ComponentPoolStatusTracker +from ._component_managers import BatteryManager, ComponentManager from ._component_status import ComponentPoolStatus -from ._distribution_algorithm import ( - AggregatedBatteryData, - DistributionAlgorithm, - DistributionResult, - InvBatPair, -) from .request import Request -from .result import Error, OutOfBounds, PartialFailure, PowerBounds, Result, Success - -_logger = logging.getLogger(__name__) - - -def _get_all_from_map( - source: dict[int, frozenset[int]], keys: abc.Set[int] -) -> set[int]: - """Get all values for the given keys from the given map. - - Args: - source: map to get values from. - keys: keys to get values for. - - Returns: - Set of values for the given keys. - """ - return set().union(*[source[key] for key in keys]) - - -def _get_battery_inverter_mappings( - batteries: Iterable[int], - *, # force keyword arguments - inv_bats: bool = True, - bat_bats: bool = True, - inv_invs: bool = True, -) -> dict[str, dict[int, frozenset[int]]]: - """Create maps between battery and adjacent inverters. - - Args: - batteries: batteries to create maps for - inv_bats: whether to create the inverter to batteries map - bat_bats: whether to create the battery to batteries map - inv_invs: whether to create the inverter to inverters map - - Returns: - a dict of the requested maps, using the following keys: - * "bat_invs": battery to inverters map - * "inv_bats": inverter to batteries map - * "bat_bats": battery to batteries map - * "inv_invs": inverter to inverters map - """ - bat_invs_map: dict[int, set[int]] = {} - inv_bats_map: dict[int, set[int]] | None = {} if inv_bats else None - bat_bats_map: dict[int, set[int]] | None = {} if bat_bats else None - inv_invs_map: dict[int, set[int]] | None = {} if inv_invs else None - component_graph = connection_manager.get().component_graph - - for battery_id in batteries: - inverters: set[int] = set( - component.component_id - for component in component_graph.predecessors(battery_id) - if component.category == ComponentCategory.INVERTER - ) - - if len(inverters) == 0: - _logger.error("No inverters for battery %d", battery_id) - continue - - bat_invs_map[battery_id] = inverters - if bat_bats_map is not None: - bat_bats_map.setdefault(battery_id, set()).update( - set( - component.component_id - for inverter in inverters - for component in component_graph.successors(inverter) - ) - ) - - for inverter in inverters: - if inv_bats_map is not None: - inv_bats_map.setdefault(inverter, set()).add(battery_id) - if inv_invs_map is not None: - inv_invs_map.setdefault(inverter, set()).update(bat_invs_map) - - mapping: dict[str, dict[int, frozenset[int]]] = {} - - # Convert sets to frozensets to make them hashable. - def _add(key: str, value: dict[int, set[int]] | None) -> None: - if value is not None: - mapping[key] = {k: frozenset(v) for k, v in value.items()} - - _add("bat_invs", bat_invs_map) - _add("inv_bats", inv_bats_map) - _add("bat_bats", bat_bats_map) - _add("inv_invs", inv_invs_map) - - return mapping +from .result import Result class PowerDistributingActor(Actor): @@ -166,7 +53,7 @@ def __init__( self, requests_receiver: Receiver[Request], results_sender: Sender[Result], - battery_status_sender: Sender[ComponentPoolStatus], + component_pool_status_sender: Sender[ComponentPoolStatus], wait_for_data_sec: float = 2, *, name: str | None = None, @@ -174,10 +61,11 @@ def __init__( """Create class instance. Args: - requests_receiver: Receiver for receiving power requests from the power manager. + requests_receiver: Receiver for receiving power requests from the power + manager. results_sender: Sender for sending results to the power manager. - battery_status_sender: Channel for sending information which batteries are - working. + component_pool_status_sender: Channel for sending information about which + components are expected to be working. wait_for_data_sec: How long actor should wait before processing first request. It is a time needed to collect first components data. name: The name of the actor. If `None`, `str(id(self))` will be used. This @@ -188,94 +76,8 @@ def __init__( self._result_sender = results_sender self._wait_for_data_sec = wait_for_data_sec - # NOTE: power_distributor_exponent should be received from ConfigManager - self.power_distributor_exponent: float = 1.0 - """The exponent for the power distribution algorithm. - - The exponent determines how fast the batteries should strive to the - equal SoC level. - """ - - self.distribution_algorithm = DistributionAlgorithm( - self.power_distributor_exponent - ) - """The distribution algorithm used to distribute power between batteries.""" - - batteries: Iterable[ - Component - ] = connection_manager.get().component_graph.components( - component_category={ComponentCategory.BATTERY} - ) - - maps = _get_battery_inverter_mappings( - [battery.component_id for battery in batteries] - ) - - self._bat_invs_map = maps["bat_invs"] - self._inv_bats_map = maps["inv_bats"] - self._bat_bats_map = maps["bat_bats"] - self._inv_invs_map = maps["inv_invs"] - - self._battery_receivers: dict[int, Peekable[BatteryData]] = {} - self._inverter_receivers: dict[int, Peekable[InverterData]] = {} - - self._all_battery_status = ComponentPoolStatusTracker( - component_ids=set(self._bat_invs_map.keys()), - component_status_sender=battery_status_sender, - max_blocking_duration_sec=30.0, - max_data_age_sec=10.0, - component_status_tracker_type=BatteryStatusTracker, - ) - - def _get_bounds( - self, - pairs_data: list[InvBatPair], - ) -> PowerBounds: - """Get power bounds for given batteries. - - Args: - pairs_data: list of battery and adjacent inverter data pairs. - - Returns: - Power bounds for given batteries. - """ - return PowerBounds( - inclusion_lower=sum( - max( - battery.power_bounds.inclusion_lower, - sum( - inverter.active_power_inclusion_lower_bound - for inverter in inverters - ), - ) - for battery, inverters in pairs_data - ), - inclusion_upper=sum( - min( - battery.power_bounds.inclusion_upper, - sum( - inverter.active_power_inclusion_upper_bound - for inverter in inverters - ), - ) - for battery, inverters in pairs_data - ), - exclusion_lower=min( - sum(battery.power_bounds.exclusion_lower for battery, _ in pairs_data), - sum( - inverter.active_power_exclusion_lower_bound - for _, inverters in pairs_data - for inverter in inverters - ), - ), - exclusion_upper=max( - sum(battery.power_bounds.exclusion_upper for battery, _ in pairs_data), - sum( - inverter.active_power_exclusion_upper_bound - for _, inverters in pairs_data - for inverter in inverters - ), - ), + self._component_manager: ComponentManager = BatteryManager( + component_pool_status_sender ) async def _run(self) -> None: # pylint: disable=too-many-locals @@ -287,420 +89,14 @@ async def _run(self) -> None: # pylint: disable=too-many-locals Every battery and inverter that failed or didn't respond in time will be marked as broken for some time. """ - await self._create_channels() - - api = connection_manager.get().api_client + await self._component_manager.start() # Wait few seconds to get data from the channels created above. await asyncio.sleep(self._wait_for_data_sec) async for request in self._requests_receiver: - try: - pairs_data: list[InvBatPair] = self._get_components_data( - request.batteries - ) - except KeyError as err: - await self._result_sender.send(Error(request=request, msg=str(err))) - continue - - if not pairs_data: - error_msg = ( - "No data for at least one of the given " - f"batteries {str(request.batteries)}" - ) - await self._result_sender.send( - Error(request=request, msg=str(error_msg)) - ) - continue - - error = self._check_request(request, pairs_data) - if error: - await self._result_sender.send(error) - continue - - try: - distribution = self._get_power_distribution(request, pairs_data) - except ValueError as err: - _logger.exception("Couldn't distribute power") - error_msg = f"Couldn't distribute power, error: {str(err)}" - await self._result_sender.send( - Error(request=request, msg=str(error_msg)) - ) - continue - - distributed_power_value = ( - request.power.as_watts() - distribution.remaining_power - ) - battery_distribution: dict[int, float] = {} - for inverter_id, dist in distribution.distribution.items(): - for battery_id in self._inv_bats_map[inverter_id]: - battery_distribution[battery_id] = ( - battery_distribution.get(battery_id, 0.0) + dist - ) - - _logger.debug( - "Distributing power %d between the batteries %s", - distributed_power_value, - str(battery_distribution), - ) - - failed_power, failed_batteries = await self._set_distributed_power( - api, distribution, request.request_timeout - ) - - response: Success | PartialFailure - if len(failed_batteries) > 0: - succeed_batteries = set(battery_distribution.keys()) - failed_batteries - response = PartialFailure( - request=request, - succeeded_power=Power.from_watts( - distributed_power_value - failed_power - ), - succeeded_batteries=succeed_batteries, - failed_power=Power.from_watts(failed_power), - failed_batteries=failed_batteries, - excess_power=Power.from_watts(distribution.remaining_power), - ) - else: - succeed_batteries = set(battery_distribution.keys()) - response = Success( - request=request, - succeeded_power=Power.from_watts(distributed_power_value), - succeeded_batteries=succeed_batteries, - excess_power=Power.from_watts(distribution.remaining_power), - ) - - asyncio.gather( - *[ - self._all_battery_status.update_status( - succeed_batteries, failed_batteries - ), - self._result_sender.send(response), - ] - ) - - async def _set_distributed_power( - self, - api: MicrogridApiClient, - distribution: DistributionResult, - timeout: timedelta, - ) -> tuple[float, set[int]]: - """Send distributed power to the inverters. - - Args: - api: Microgrid api client - distribution: Distribution result - timeout: How long wait for the response - - Returns: - Tuple where first element is total failed power, and the second element - set of batteries that failed. - """ - tasks = { - inverter_id: asyncio.create_task(api.set_power(inverter_id, power)) - for inverter_id, power in distribution.distribution.items() - } - - _, pending = await asyncio.wait( - tasks.values(), - timeout=timeout.total_seconds(), - return_when=ALL_COMPLETED, - ) - - await self._cancel_tasks(pending) - - return self._parse_result(tasks, distribution.distribution, timeout) - - def _get_power_distribution( - self, request: Request, inv_bat_pairs: list[InvBatPair] - ) -> DistributionResult: - """Get power distribution result for the batteries in the request. - - Args: - request: the power request to process. - inv_bat_pairs: the battery and adjacent inverter data pairs. - - Returns: - the power distribution result. - """ - available_bat_ids = _get_all_from_map( - self._bat_bats_map, {pair.battery.component_id for pair in inv_bat_pairs} - ) - - unavailable_bat_ids = request.batteries - available_bat_ids - unavailable_inv_ids: set[int] = set() - - for inverter_ids in [ - self._bat_invs_map[battery_id_set] for battery_id_set in unavailable_bat_ids - ]: - unavailable_inv_ids = unavailable_inv_ids.union(inverter_ids) - - result = self.distribution_algorithm.distribute_power( - request.power.as_watts(), inv_bat_pairs - ) - - return result - - def _check_request( - self, - request: Request, - pairs_data: list[InvBatPair], - ) -> Result | None: - """Check whether the given request if correct. - - Args: - request: request to check - pairs_data: list of battery and adjacent inverter data pairs. - - Returns: - Result for the user if the request is wrong, None otherwise. - """ - if not request.batteries: - return Error(request=request, msg="Empty battery IDs in the request") - - for battery in request.batteries: - _logger.debug("Checking battery %d", battery) - if battery not in self._battery_receivers: - msg = ( - f"No battery {battery}, available batteries: " - f"{list(self._battery_receivers.keys())}" - ) - return Error(request=request, msg=msg) - - bounds = self._get_bounds(pairs_data) - - power = request.power.as_watts() - - # Zero power requests are always forwarded to the microgrid API, even if they - # are outside the exclusion bounds. - if is_close_to_zero(power): - return None - - if request.adjust_power: - # Automatic power adjustments can only bring down the requested power down - # to the inclusion bounds. - # - # If the requested power is in the exclusion bounds, it is NOT possible to - # increase it so that it is outside the exclusion bounds. - if bounds.exclusion_lower < power < bounds.exclusion_upper: - return OutOfBounds(request=request, bounds=bounds) - else: - in_lower_range = bounds.inclusion_lower <= power <= bounds.exclusion_lower - in_upper_range = bounds.exclusion_upper <= power <= bounds.inclusion_upper - if not (in_lower_range or in_upper_range): - return OutOfBounds(request=request, bounds=bounds) - - return None - - def _get_components_data(self, batteries: abc.Set[int]) -> list[InvBatPair]: - """Get data for the given batteries and adjacent inverters. - - Args: - batteries: Batteries that needs data. - - Raises: - KeyError: If any battery in the given list doesn't exists in microgrid. - - Returns: - Pairs of battery and adjacent inverter data. - """ - pairs_data: list[InvBatPair] = [] - - working_batteries = self._all_battery_status.get_working_components(batteries) - - for battery_id in working_batteries: - if battery_id not in self._battery_receivers: - raise KeyError( - f"No battery {battery_id}, " - f"available batteries: {list(self._battery_receivers.keys())}" - ) - - connected_inverters = _get_all_from_map(self._bat_invs_map, batteries) - - # Check to see if inverters are involved that are connected to batteries - # that were not requested. - batteries_from_inverters = _get_all_from_map( - self._inv_bats_map, connected_inverters - ) - - if batteries_from_inverters != batteries: - extra_batteries = batteries_from_inverters - batteries - raise KeyError( - f"Inverters {_get_all_from_map(self._bat_invs_map, extra_batteries)} " - f"are connected to batteries that were not requested: {extra_batteries}" - ) - - # set of set of batteries one for each working_battery - battery_sets: frozenset[frozenset[int]] = frozenset( - self._bat_bats_map[working_battery] for working_battery in working_batteries - ) - - for battery_ids in battery_sets: - inverter_ids: frozenset[int] = self._bat_invs_map[next(iter(battery_ids))] - - data = self._get_battery_inverter_data(battery_ids, inverter_ids) - if data is None: - _logger.warning( - "Skipping battery set %s because at least one of its messages isn't correct.", - list(battery_ids), - ) - continue - - assert len(data.inverter) > 0 - pairs_data.append(data) - return pairs_data - - def _get_battery_inverter_data( - self, battery_ids: frozenset[int], inverter_ids: frozenset[int] - ) -> InvBatPair | None: - """Get battery and inverter data if they are correct. - - Each float data from the microgrid can be "NaN". - We can't do math operations on "NaN". - So check all the metrics and if any are "NaN" then return None. - - Args: - battery_ids: battery ids - inverter_ids: inverter ids - - Returns: - Data for the battery and adjacent inverter without NaN values. - Return None if we could not replace NaN values. - """ - battery_data_none = [ - self._battery_receivers[battery_id].peek() for battery_id in battery_ids - ] - inverter_data_none = [ - self._inverter_receivers[inverter_id].peek() for inverter_id in inverter_ids - ] - - # It means that nothing has been send on this channels, yet. - # This should be handled by BatteryStatus. BatteryStatus should not return - # this batteries as working. - if not all(battery_data_none) or not all(inverter_data_none): - _logger.error( - "Battery %s or inverter %s send no data, yet. They should be not used.", - battery_ids, - inverter_ids, - ) - return None - - battery_data = cast(list[BatteryData], battery_data_none) - inverter_data = cast(list[InverterData], inverter_data_none) - - DataType = TypeVar("DataType", BatteryData, InverterData) - - def metric_is_nan(data: DataType, metrics: list[str]) -> bool: - """Check if non-replaceable metrics are NaN.""" - assert data is not None - return any(map(lambda metric: isnan(getattr(data, metric)), metrics)) - - def nan_metric_in_list(data: list[DataType], metrics: list[str]) -> bool: - """Check if any metric is NaN.""" - return any(map(lambda datum: metric_is_nan(datum, metrics), data)) - - crucial_metrics_bat = [ - "soc", - "soc_lower_bound", - "soc_upper_bound", - "capacity", - "power_inclusion_lower_bound", - "power_inclusion_upper_bound", - ] - - crucial_metrics_inv = [ - "active_power_inclusion_lower_bound", - "active_power_inclusion_upper_bound", - ] - - if nan_metric_in_list(battery_data, crucial_metrics_bat): - _logger.debug("Some metrics for battery set %s are NaN", list(battery_ids)) - return None - - if nan_metric_in_list(inverter_data, crucial_metrics_inv): - _logger.debug( - "Some metrics for inverter set %s are NaN", list(inverter_ids) - ) - return None - - return InvBatPair(AggregatedBatteryData(battery_data), inverter_data) - - async def _create_channels(self) -> None: - """Create channels to get data of components in microgrid.""" - api = connection_manager.get().api_client - for battery_id, inverter_ids in self._bat_invs_map.items(): - bat_recv: Receiver[BatteryData] = await api.battery_data(battery_id) - self._battery_receivers[battery_id] = bat_recv.into_peekable() - - for inverter_id in inverter_ids: - inv_recv: Receiver[InverterData] = await api.inverter_data(inverter_id) - self._inverter_receivers[inverter_id] = inv_recv.into_peekable() - - def _parse_result( - self, - tasks: dict[int, asyncio.Task[None]], - distribution: dict[int, float], - request_timeout: timedelta, - ) -> tuple[float, set[int]]: - """Parse the results of `set_power` requests. - - Check if any task has failed and determine the reason for failure. - If any task did not succeed, then the corresponding battery is marked as broken. - - Args: - tasks: A dictionary where the key is the inverter ID and the value is the task that - set the power for this inverter. Each task should be finished or cancelled. - distribution: A dictionary where the key is the inverter ID and the value is how much - power was set to the corresponding inverter. - request_timeout: The timeout that was used for the request. - - Returns: - A tuple where the first element is the total failed power, and the second element is - the set of batteries that failed. - """ - failed_power: float = 0.0 - failed_batteries: set[int] = set() - - for inverter_id, aws in tasks.items(): - battery_ids = self._inv_bats_map[inverter_id] - try: - aws.result() - except grpc.aio.AioRpcError as err: - failed_power += distribution[inverter_id] - failed_batteries = failed_batteries.union(battery_ids) - if err.code() == grpc.StatusCode.OUT_OF_RANGE: - _logger.debug( - "Set power for battery %s failed, error %s", - battery_ids, - str(err), - ) - else: - _logger.warning( - "Set power for battery %s failed, error %s. Mark it as broken.", - battery_ids, - str(err), - ) - except asyncio.exceptions.CancelledError: - failed_power += distribution[inverter_id] - failed_batteries = failed_batteries.union(battery_ids) - _logger.warning( - "Battery %s didn't respond in %f sec. Mark it as broken.", - battery_ids, - request_timeout.total_seconds(), - ) - - return failed_power, failed_batteries - - async def _cancel_tasks(self, tasks: Iterable[asyncio.Task[Any]]) -> None: - """Cancel given asyncio tasks and wait for them. - - Args: - tasks: tasks to cancel. - """ - for aws in tasks: - aws.cancel() - - await asyncio.gather(*tasks, return_exceptions=True) + result = await self._component_manager.distribute_power(request) + await self._result_sender.send(result) async def stop(self, msg: str | None = None) -> None: """Stop this actor. @@ -708,5 +104,5 @@ async def stop(self, msg: str | None = None) -> None: Args: msg: The message to be passed to the tasks being cancelled. """ - await self._all_battery_status.stop() + await self._component_manager.stop() await super().stop(msg) diff --git a/src/frequenz/sdk/actor/power_distributing/request.py b/src/frequenz/sdk/actor/power_distributing/request.py index c206bd43f..ca7266e4f 100644 --- a/src/frequenz/sdk/actor/power_distributing/request.py +++ b/src/frequenz/sdk/actor/power_distributing/request.py @@ -17,8 +17,8 @@ class Request: power: Power """The requested power.""" - batteries: abc.Set[int] - """The component ids of the batteries to be used for this request.""" + component_ids: abc.Set[int] + """The component ids of the components to be used for this request.""" request_timeout: timedelta = timedelta(seconds=5.0) """The maximum amount of time to wait for the request to be fulfilled.""" @@ -29,6 +29,6 @@ class Request: If `True`, the power will be adjusted (lowered) to match the bounds, so only the reduced power will be set. - If `False` and the power is outside the batteries' bounds, the request will + If `False` and the power is outside the available bounds, the request will fail and be replied to with an `OutOfBound` result. """ diff --git a/src/frequenz/sdk/actor/power_distributing/result.py b/src/frequenz/sdk/actor/power_distributing/result.py index 477a0ca8c..04f2db140 100644 --- a/src/frequenz/sdk/actor/power_distributing/result.py +++ b/src/frequenz/sdk/actor/power_distributing/result.py @@ -5,6 +5,7 @@ import dataclasses +from collections import abc from frequenz.sdk.timeseries._quantities import Power @@ -21,13 +22,13 @@ class _BaseResultMixin: @dataclasses.dataclass class _BaseSuccessMixin: - """Result returned when setting the power succeed for all batteries.""" + """Result returned when setting the power succeed for all components.""" succeeded_power: Power """The part of the requested power that was successfully set.""" - succeeded_batteries: set[int] - """The subset of batteries for which power was set successfully.""" + succeeded_components: abc.Set[int] + """The subset of components for which power was set successfully.""" excess_power: Power """The part of the requested power that could not be fulfilled. @@ -44,17 +45,17 @@ class _BaseSuccessMixin: @dataclasses.dataclass class Success(_BaseSuccessMixin, _BaseResultMixin): # Order matters here. See above. - """Result returned when setting the power succeeded for all batteries.""" + """Result returned when setting the power was successful for all components.""" @dataclasses.dataclass class PartialFailure(_BaseSuccessMixin, _BaseResultMixin): - """Result returned when any battery failed to perform the request.""" + """Result returned when some of the components had an error setting the power.""" failed_power: Power """The part of the requested power that failed to be set.""" - failed_batteries: set[int] + failed_components: abc.Set[int] """The subset of batteries for which the request failed.""" @@ -68,19 +69,19 @@ class Error(_BaseResultMixin): @dataclasses.dataclass class PowerBounds: - """Inclusion and exclusion power bounds for requested batteries.""" + """Inclusion and exclusion power bounds for the requested components.""" inclusion_lower: float - """The lower value of the inclusion power bounds for the requested batteries.""" + """The lower value of the inclusion power bounds for the requested components.""" exclusion_lower: float - """The lower value of the exclusion power bounds for the requested batteries.""" + """The lower value of the exclusion power bounds for the requested components.""" exclusion_upper: float - """The upper value of the exclusion power bounds for the requested batteries.""" + """The upper value of the exclusion power bounds for the requested components.""" inclusion_upper: float - """The upper value of the inclusion power bounds for the requested batteries.""" + """The upper value of the inclusion power bounds for the requested components.""" @dataclasses.dataclass @@ -88,11 +89,11 @@ class OutOfBounds(_BaseResultMixin): """Result returned when the power was not set because it was out of bounds. This result happens when the originating request was done with - `adjust_power = False` and the requested power is not within the batteries bounds. + `adjust_power = False` and the requested power is not within the available bounds. """ bounds: PowerBounds - """The power bounds for the requested batteries. + """The power bounds for the requested components. If the requested power negative, then this value is the lower bound. Otherwise it is upper bound. @@ -134,26 +135,26 @@ def handle_power_request_result(result: Result) -> None: request = Request( namespace="TestChannel", power=Power.from_watts(123.4), - batteries={8, 18}, + component_ids={8, 18}, ) results: list[Result] = [ Success( request, succeeded_power=Power.from_watts(123.4), - succeeded_batteries={8, 18}, + succeeded_components={8, 18}, excess_power=Power.zero(), ), PartialFailure( request, succeeded_power=Power.from_watts(103.4), - succeeded_batteries={8}, + succeeded_components={8}, excess_power=Power.zero(), - failed_batteries={18}, + failed_components={18}, failed_power=Power.from_watts(20.0), ), OutOfBounds(request, bounds=PowerBounds(0, 0, 0, 800)), - Error(request, msg="The batteries are not available"), + Error(request, msg="The components are not available"), ] for r in results: diff --git a/src/frequenz/sdk/microgrid/_data_pipeline.py b/src/frequenz/sdk/microgrid/_data_pipeline.py index cc4dafc05..a2a43439e 100644 --- a/src/frequenz/sdk/microgrid/_data_pipeline.py +++ b/src/frequenz/sdk/microgrid/_data_pipeline.py @@ -315,7 +315,7 @@ def _start_power_distributing_actor(self) -> None: self._power_distributing_actor = PowerDistributingActor( requests_receiver=self._power_distribution_requests_channel.new_receiver(), results_sender=self._power_distribution_results_channel.new_sender(), - battery_status_sender=self._battery_status_channel.new_sender(), + component_pool_status_sender=self._battery_status_channel.new_sender(), ) self._power_distributing_actor.start() diff --git a/src/frequenz/sdk/timeseries/battery_pool/_methods.py b/src/frequenz/sdk/timeseries/battery_pool/_methods.py index be2b3579b..a9af643b3 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_methods.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_methods.py @@ -14,7 +14,7 @@ from ..._internal._asyncio import cancel_and_await from ..._internal._constants import RECEIVER_MAX_SIZE, WAIT_FOR_COMPONENT_DATA_SEC -from ...actor.power_distributing.power_distributing import ( +from ...actor.power_distributing._component_managers._battery_manager import ( _get_battery_inverter_mappings, ) from ._component_metric_fetcher import ( diff --git a/src/frequenz/sdk/timeseries/battery_pool/_metric_calculator.py b/src/frequenz/sdk/timeseries/battery_pool/_metric_calculator.py index 592bea8e8..f42205c5e 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_metric_calculator.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_metric_calculator.py @@ -11,12 +11,12 @@ from typing import Generic, TypeVar from ... import timeseries -from ...actor.power_distributing._distribution_algorithm._distribution_algorithm import ( - _aggregate_battery_power_bounds, -) -from ...actor.power_distributing.power_distributing import ( +from ...actor.power_distributing._component_managers._battery_manager import ( _get_battery_inverter_mappings, ) +from ...actor.power_distributing._distribution_algorithm._battery_distribution_algorithm import ( + _aggregate_battery_power_bounds, +) from ...actor.power_distributing.result import PowerBounds from ...microgrid.component import ComponentMetricId from .._base_types import Sample, SystemBounds diff --git a/tests/actor/power_distributing/test_distribution_algorithm.py b/tests/actor/power_distributing/test_battery_distribution_algorithm.py similarity index 93% rename from tests/actor/power_distributing/test_distribution_algorithm.py rename to tests/actor/power_distributing/test_battery_distribution_algorithm.py index 97b7630a8..3501cc2c2 100644 --- a/tests/actor/power_distributing/test_distribution_algorithm.py +++ b/tests/actor/power_distributing/test_battery_distribution_algorithm.py @@ -11,7 +11,7 @@ from frequenz.sdk.actor.power_distributing._distribution_algorithm import ( AggregatedBatteryData, - DistributionAlgorithm, + BatteryDistributionAlgorithm, DistributionResult, InvBatPair, ) @@ -165,7 +165,7 @@ def test_total_capacity_all_0(self) -> None: """Raise error if all batteries have no capacity.""" capacity = [0.0] * 4 components = self.create_components_with_capacity(4, capacity) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) with raises(ValueError): algorithm._total_capacity(components) # pylint: disable=protected-access @@ -174,7 +174,7 @@ def test_total_capacity(self) -> None: capacity: list[float] = list(range(4)) components = self.create_components_with_capacity(4, capacity) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm._total_capacity(components) assert result == approx(sum(list(range(4)))) @@ -187,7 +187,7 @@ def test_distribute_power_one_battery(self) -> None: incl_bounds: dict[int, float] = {0: 500, 1: 500} excl_bounds: dict[int, float] = {0: 0, 1: 0} - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm._distribute_power( # pylint: disable=protected-access components, 650, available_soc, incl_bounds, excl_bounds ) @@ -208,7 +208,7 @@ def test_distribute_power_two_batteries_1(self) -> None: incl_bounds: dict[int, float] = {0: 500, 2: 500, 1: 500, 3: 500} excl_bounds: dict[int, float] = {0: 0, 2: 0, 1: 0, 3: 0} - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm._distribute_power( # pylint: disable=protected-access components, 600, available_soc, incl_bounds, excl_bounds ) @@ -229,7 +229,7 @@ def test_distribute_power_two_batteries_2(self) -> None: incl_bounds: dict[int, float] = {0: 500, 2: 500, 1: 500, 3: 500} excl_bounds: dict[int, float] = {0: 0, 2: 0, 1: 0, 3: 0} - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm._distribute_power( # pylint: disable=protected-access components, 600, available_soc, incl_bounds, excl_bounds ) @@ -252,7 +252,7 @@ def test_distribute_power_two_batteries_one_inverter(self) -> None: incl_bounds: dict[int, float] = {0: 500, 2: 500, 1: 500} excl_bounds: dict[int, float] = {0: 0, 2: 0, 1: 0} - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm._distribute_power( # pylint: disable=protected-access components, 600, available_soc, incl_bounds, excl_bounds ) @@ -274,7 +274,7 @@ def test_distribute_power_two_batteries_bounds(self) -> None: incl_bounds: dict[int, float] = {0: 250, 2: 330, 1: 250, 3: 330} excl_bounds: dict[int, float] = {0: 0, 2: 0, 1: 0, 3: 0} - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm._distribute_power( # pylint: disable=protected-access components, 600, available_soc, incl_bounds, excl_bounds ) @@ -298,7 +298,7 @@ def test_distribute_power_three_batteries(self) -> None: } excl_bounds: dict[int, float] = {0: 0, 2: 0, 4: 0, 1: 0, 3: 0, 5: 0} - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm._distribute_power( # pylint: disable=protected-access components, 1000, available_soc, incl_bounds, excl_bounds ) @@ -322,7 +322,7 @@ def test_distribute_power_three_batteries_2(self) -> None: } excl_bounds: dict[int, float] = {0: 0, 2: 0, 4: 0, 1: 0, 3: 0, 5: 0} - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm._distribute_power( # pylint: disable=protected-access components, 1000, available_soc, incl_bounds, excl_bounds ) @@ -346,7 +346,7 @@ def test_distribute_power_three_batteries_3(self) -> None: } excl_bounds: dict[int, float] = {0: 0, 2: 0, 4: 0, 1: 0, 3: 0, 5: 0} - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm._distribute_power( # pylint: disable=protected-access components, 1000, available_soc, incl_bounds, excl_bounds ) @@ -376,7 +376,7 @@ def test_supply_three_batteries_1(self) -> None: ] components = create_components(3, capacity, soc, bounds) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm.distribute_power(-1200, components) assert result.distribution == approx({1: -200, 3: -400, 5: -600}) @@ -401,7 +401,7 @@ def test_supply_three_batteries_2(self) -> None: ] components = create_components(3, capacity, soc, bounds) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm.distribute_power(-1400, components) assert result.distribution == approx({1: -400, 3: -400, 5: -600}) @@ -426,7 +426,7 @@ def test_supply_three_batteries_3(self) -> None: ] components = create_components(3, capacity, soc, supply_bounds) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm.distribute_power(-1400, components) assert result.distribution == approx({1: -500, 3: -100, 5: -800}) @@ -451,7 +451,7 @@ def test_supply_three_batteries_4(self) -> None: ] components = create_components(3, capacity, soc, bounds) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm.distribute_power(-1700, components) assert result.distribution == approx({1: -600, 3: -100, 5: -800}) @@ -476,7 +476,7 @@ def test_supply_three_batteries_5(self) -> None: ] components = create_components(3, capacity, soc, supply_bounds) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm.distribute_power(-1700, components) assert result.distribution == approx({1: 0, 3: -100, 5: 0}) @@ -499,7 +499,7 @@ def test_supply_two_batteries_1(self) -> None: ] components = create_components(2, capacity, soc, supply_bounds) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm.distribute_power(-600, components) assert result.distribution == approx({1: -500, 3: -100}) @@ -521,7 +521,7 @@ def test_supply_two_batteries_2(self) -> None: ] components = create_components(2, capacity, soc, supply_bounds) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm.distribute_power(-600, components) assert result.distribution == approx({1: -346.1538, 3: -253.8461}) @@ -547,7 +547,7 @@ def test_consumption_three_batteries_1(self) -> None: ] components = create_components(3, capacity, soc, bounds) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm.distribute_power(1200, components) assert result.distribution == approx({1: 200, 3: 400, 5: 600}) @@ -572,7 +572,7 @@ def test_consumption_three_batteries_2(self) -> None: ] components = create_components(3, capacity, soc, bounds) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm.distribute_power(1400, components) assert result.distribution == approx({1: 400, 3: 400, 5: 600}) @@ -597,7 +597,7 @@ def test_consumption_three_batteries_3(self) -> None: ] components = create_components(3, capacity, soc, bounds) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm.distribute_power(1400, components) assert result.distribution == approx({1: 500, 3: 100, 5: 800}) @@ -622,7 +622,7 @@ def test_consumption_three_batteries_4(self) -> None: ] components = create_components(3, capacity, soc, bounds) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm.distribute_power(1700, components) assert result.distribution == approx({1: 600, 3: 100, 5: 800}) @@ -647,7 +647,7 @@ def test_consumption_three_batteries_5(self) -> None: ] components = create_components(3, capacity, soc, bounds) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm.distribute_power(1700, components) assert result.distribution == approx({1: 0, 3: 100, 5: 0}) @@ -672,7 +672,7 @@ def test_consumption_three_batteries_6(self) -> None: ] components = create_components(3, capacity, soc, bounds) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm.distribute_power(1700, components) assert result.distribution == approx({1: 0, 3: 100, 5: 800}) @@ -697,7 +697,7 @@ def test_consumption_three_batteries_7(self) -> None: ] components = create_components(3, capacity, soc, bounds) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm.distribute_power(500, components) assert result.distribution == approx({1: 498.3388, 3: 1.661129, 5: 0}) @@ -719,7 +719,7 @@ def test_consumption_two_batteries_1(self) -> None: ] components = create_components(2, capacity, soc, bounds) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm.distribute_power(600, components) assert result.distribution == approx({1: 100, 3: 500}) @@ -741,19 +741,19 @@ def test_consumption_two_batteries_distribution_exponent(self) -> None: ] components = create_components(2, capacity, soc, bounds) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm.distribute_power(8000, components) assert result.distribution == approx({1: 2000, 3: 6000}) assert result.remaining_power == approx(0.0) - algorithm2 = DistributionAlgorithm(distributor_exponent=2) + algorithm2 = BatteryDistributionAlgorithm(distributor_exponent=2) result2 = algorithm2.distribute_power(8000, components) assert result2.distribution == approx({1: 800, 3: 7200}) assert result2.remaining_power == approx(0.0) - algorithm3 = DistributionAlgorithm(distributor_exponent=3) + algorithm3 = BatteryDistributionAlgorithm(distributor_exponent=3) result3 = algorithm3.distribute_power(8000, components) assert result3.distribution == approx({1: 285.7142, 3: 7714.2857}) @@ -775,37 +775,37 @@ def test_consumption_two_batteries_distribution_exponent_1(self) -> None: ] components = create_components(2, capacity, soc, bounds) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm.distribute_power(900, components) assert result.distribution == approx({1: 300, 3: 600}) assert result.remaining_power == approx(0.0) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm.distribute_power(8000, components) assert result.distribution == approx({1: 2666.6666, 3: 5333.3333}) assert result.remaining_power == approx(0.0) - algorithm2 = DistributionAlgorithm(distributor_exponent=2) + algorithm2 = BatteryDistributionAlgorithm(distributor_exponent=2) result2 = algorithm2.distribute_power(900, components) assert result2.distribution == approx({1: 180, 3: 720}) assert result2.remaining_power == approx(0.0) - algorithm2 = DistributionAlgorithm(distributor_exponent=2) + algorithm2 = BatteryDistributionAlgorithm(distributor_exponent=2) result2 = algorithm2.distribute_power(8000, components) assert result2.distribution == approx({1: 1600, 3: 6400}) assert result2.remaining_power == approx(0.0) - algorithm2 = DistributionAlgorithm(distributor_exponent=3) + algorithm2 = BatteryDistributionAlgorithm(distributor_exponent=3) result2 = algorithm2.distribute_power(900, components) assert result2.distribution == approx({1: 100, 3: 800}) assert result2.remaining_power == approx(0.0) - algorithm3 = DistributionAlgorithm(distributor_exponent=3) + algorithm3 = BatteryDistributionAlgorithm(distributor_exponent=3) result3 = algorithm3.distribute_power(8000, components) assert result3.distribution == approx({1: 888.8888, 3: 7111.1111}) @@ -827,19 +827,19 @@ def test_supply_two_batteries_distribution_exponent(self) -> None: ] components = create_components(2, capacity, soc, bounds) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm.distribute_power(-8000, components) assert result.distribution == approx({1: -2000, 3: -6000}) assert result.remaining_power == approx(0.0) - algorithm2 = DistributionAlgorithm(distributor_exponent=2) + algorithm2 = BatteryDistributionAlgorithm(distributor_exponent=2) result2 = algorithm2.distribute_power(-8000, components) assert result2.distribution == approx({1: -800, 3: -7200}) assert result2.remaining_power == approx(0.0) - algorithm3 = DistributionAlgorithm(distributor_exponent=3) + algorithm3 = BatteryDistributionAlgorithm(distributor_exponent=3) result3 = algorithm3.distribute_power(-8000, components) assert result3.distribution == approx({1: -285.7142, 3: -7714.2857}) @@ -861,19 +861,19 @@ def test_supply_two_batteries_distribution_exponent_1(self) -> None: ] components = create_components(2, capacity, soc, supply_bounds) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm.distribute_power(-8000, components) assert result.distribution == approx({1: -2666.6666, 3: -5333.3333}) assert result.remaining_power == approx(0.0) - algorithm2 = DistributionAlgorithm(distributor_exponent=2) + algorithm2 = BatteryDistributionAlgorithm(distributor_exponent=2) result2 = algorithm2.distribute_power(-8000, components) assert result2.distribution == approx({1: -1600, 3: -6400}) assert result2.remaining_power == approx(0.0) - algorithm3 = DistributionAlgorithm(distributor_exponent=3) + algorithm3 = BatteryDistributionAlgorithm(distributor_exponent=3) result3 = algorithm3.distribute_power(-8000, components) assert result3.distribution == approx({1: -888.8888, 3: -7111.1111}) @@ -898,7 +898,7 @@ def test_supply_three_batteries_distribution_exponent_2(self) -> None: ] components = create_components(3, capacity, soc, bounds) - algorithm = DistributionAlgorithm(distributor_exponent=1) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=1) result = algorithm.distribute_power(-8000, components) assert result.distribution == approx( @@ -906,7 +906,7 @@ def test_supply_three_batteries_distribution_exponent_2(self) -> None: ) assert result.remaining_power == approx(0.0) - algorithm2 = DistributionAlgorithm(distributor_exponent=2) + algorithm2 = BatteryDistributionAlgorithm(distributor_exponent=2) result2 = algorithm2.distribute_power(-8000, components) assert result2.distribution == approx( @@ -914,7 +914,7 @@ def test_supply_three_batteries_distribution_exponent_2(self) -> None: ) assert result2.remaining_power == approx(0.0) - algorithm3 = DistributionAlgorithm(distributor_exponent=3) + algorithm3 = BatteryDistributionAlgorithm(distributor_exponent=3) result3 = algorithm3.distribute_power(-8000, components) assert result3.distribution == approx( @@ -941,13 +941,13 @@ def test_supply_three_batteries_distribution_exponent_3(self) -> None: ] components = create_components(3, capacity, soc, supply_bounds) - algorithm = DistributionAlgorithm(distributor_exponent=0.5) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=0.5) result = algorithm.distribute_power(-1300, components) assert result.distribution == approx({1: -600, 3: -400, 5: -300}) assert result.remaining_power == approx(0.0) - algorithm = DistributionAlgorithm(distributor_exponent=0) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=0) result = algorithm.distribute_power(-1200, components) assert result.distribution == approx({1: -400, 3: -400, 5: -400}) @@ -969,13 +969,13 @@ def test_supply_two_batteries_distribution_exponent_less_then_1(self) -> None: ] components = create_components(2, capacity, soc, bounds) - algorithm = DistributionAlgorithm(distributor_exponent=0.5) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=0.5) result = algorithm.distribute_power(1000, components) assert result.distribution == approx({1: 600, 3: 400}) assert result.remaining_power == approx(0.0) - algorithm = DistributionAlgorithm(distributor_exponent=0) + algorithm = BatteryDistributionAlgorithm(distributor_exponent=0) result = algorithm.distribute_power(1000, components) assert result.distribution == approx({1: 500, 3: 500}) @@ -1037,7 +1037,7 @@ def test_scenario_1(self) -> None: ] components = create_components(3, capacities, soc, bounds) - algorithm = DistributionAlgorithm() + algorithm = BatteryDistributionAlgorithm() self.assert_result( algorithm.distribute_power(-300, components), @@ -1130,7 +1130,7 @@ def test_scenario_2(self) -> None: ] components = create_components(3, capacities, soc, bounds) - algorithm = DistributionAlgorithm() + algorithm = BatteryDistributionAlgorithm() self.assert_result( algorithm.distribute_power(-300, components), @@ -1231,7 +1231,7 @@ def test_scenario_3(self) -> None: ] components = create_components(3, capacities, soc, bounds) - algorithm = DistributionAlgorithm() + algorithm = BatteryDistributionAlgorithm() self.assert_result( algorithm.distribute_power(-320, components), @@ -1310,7 +1310,7 @@ def test_scenario_4(self) -> None: ) ] - algorithm = DistributionAlgorithm() + algorithm = BatteryDistributionAlgorithm() self.assert_result( algorithm.distribute_power(-300, components), @@ -1392,7 +1392,7 @@ def test_scenario_5(self) -> None: ), ] - algorithm = DistributionAlgorithm() + algorithm = BatteryDistributionAlgorithm() self.assert_result( algorithm.distribute_power(-300, components), diff --git a/tests/actor/power_distributing/test_power_distributing.py b/tests/actor/power_distributing/test_power_distributing.py index e3d03c444..a1a7890fc 100644 --- a/tests/actor/power_distributing/test_power_distributing.py +++ b/tests/actor/power_distributing/test_power_distributing.py @@ -20,6 +20,9 @@ PowerDistributingActor, Request, ) +from frequenz.sdk.actor.power_distributing._component_managers._battery_manager import ( + BatteryManager, +) from frequenz.sdk.actor.power_distributing._component_pool_status_tracker import ( ComponentPoolStatusTracker, ) @@ -37,7 +40,12 @@ from tests.utils.graph_generator import GraphGenerator from ...conftest import SAFETY_TIMEOUT -from .test_distribution_algorithm import Bound, Metric, battery_msg, inverter_msg +from .test_battery_distribution_algorithm import ( + Bound, + Metric, + battery_msg, + inverter_msg, +) T = TypeVar("T") # Declare type variable @@ -62,10 +70,19 @@ async def test_constructor(self, mocker: MockerFixture) -> None: async with PowerDistributingActor( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), - battery_status_sender=battery_status_channel.new_sender(), + component_pool_status_sender=battery_status_channel.new_sender(), ) as distributor: - assert distributor._bat_invs_map == {9: {8}, 19: {18}, 29: {28}} - assert distributor._inv_bats_map == {8: {9}, 18: {19}, 28: {29}} + assert isinstance(distributor._component_manager, BatteryManager) + assert distributor._component_manager._bat_invs_map == { + 9: {8}, + 19: {18}, + 29: {28}, + } + assert distributor._component_manager._inv_bats_map == { + 8: {9}, + 18: {19}, + 28: {29}, + } await mockgrid.cleanup() # Test if it works without grid side meter @@ -76,10 +93,19 @@ async def test_constructor(self, mocker: MockerFixture) -> None: async with PowerDistributingActor( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), - battery_status_sender=battery_status_channel.new_sender(), + component_pool_status_sender=battery_status_channel.new_sender(), ) as distributor: - assert distributor._bat_invs_map == {9: {8}, 19: {18}, 29: {28}} - assert distributor._inv_bats_map == {8: {9}, 18: {19}, 28: {29}} + assert isinstance(distributor._component_manager, BatteryManager) + assert distributor._component_manager._bat_invs_map == { + 9: {8}, + 19: {18}, + 29: {28}, + } + assert distributor._component_manager._inv_bats_map == { + 8: {9}, + 18: {19}, + 28: {29}, + } await mockgrid.cleanup() async def init_component_data( @@ -119,13 +145,14 @@ async def test_power_distributor_one_user(self, mocker: MockerFixture) -> None: request = Request( power=Power.from_kilowatts(1.2), - batteries={9, 19}, + component_ids={9, 19}, request_timeout=SAFETY_TIMEOUT, ) - attrs = {"get_working_components.return_value": request.batteries} + attrs = {"get_working_components.return_value": request.component_ids} mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing.ComponentPoolStatusTracker", + "frequenz.sdk.actor.power_distributing._component_managers._battery_manager" + ".ComponentPoolStatusTracker", return_value=MagicMock(spec=ComponentPoolStatusTracker, **attrs), ) @@ -134,7 +161,7 @@ async def test_power_distributor_one_user(self, mocker: MockerFixture) -> None: async with PowerDistributingActor( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), - battery_status_sender=battery_status_channel.new_sender(), + component_pool_status_sender=battery_status_channel.new_sender(), ): await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -188,7 +215,8 @@ async def test_power_distributor_exclusion_bounds( "get_working_components.return_value": microgrid.battery_pool().battery_ids } mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing.ComponentPoolStatusTracker", + "frequenz.sdk.actor.power_distributing._component_managers._battery_manager" + ".ComponentPoolStatusTracker", return_value=MagicMock(spec=ComponentPoolStatusTracker, **attrs), ) @@ -197,12 +225,12 @@ async def test_power_distributor_exclusion_bounds( async with PowerDistributingActor( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), - battery_status_sender=battery_status_channel.new_sender(), + component_pool_status_sender=battery_status_channel.new_sender(), ): # zero power requests should pass through despite the exclusion bounds. request = Request( power=Power.zero(), - batteries={9, 19}, + component_ids={9, 19}, request_timeout=SAFETY_TIMEOUT, ) @@ -227,7 +255,7 @@ async def test_power_distributor_exclusion_bounds( # rejected. request = Request( power=Power.from_watts(300.0), - batteries={9, 19}, + component_ids={9, 19}, request_timeout=SAFETY_TIMEOUT, ) @@ -243,7 +271,9 @@ async def test_power_distributor_exclusion_bounds( assert len(done) == 1 result = done.pop().result() - assert isinstance(result, OutOfBounds) + assert isinstance( + result, OutOfBounds + ), f"Expected OutOfBounds, got {result}" assert result.bounds == PowerBounds(-1000, -600, 600, 1000) assert result.request == request @@ -282,14 +312,15 @@ async def test_two_batteries_one_inverters(self, mocker: MockerFixture) -> None: results_channel = Broadcast[Result]("power_distributor results") request = Request( power=Power.from_watts(1200.0), - batteries={bat_component1.component_id, bat_component2.component_id}, + component_ids={bat_component1.component_id, bat_component2.component_id}, request_timeout=SAFETY_TIMEOUT, ) - attrs = {"get_working_components.return_value": request.batteries} + attrs = {"get_working_components.return_value": request.component_ids} mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing.ComponentPoolStatusTracker", + "frequenz.sdk.actor.power_distributing._component_managers._battery_manager" + ".ComponentPoolStatusTracker", return_value=MagicMock(spec=ComponentPoolStatusTracker, **attrs), ) @@ -298,7 +329,7 @@ async def test_two_batteries_one_inverters(self, mocker: MockerFixture) -> None: async with PowerDistributingActor( requests_receiver=requests_channel.new_receiver(), - battery_status_sender=battery_status_channel.new_sender(), + component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), ): await requests_channel.new_sender().send(request) @@ -364,14 +395,15 @@ async def test_two_batteries_one_broken_one_inverters( request = Request( power=Power.from_watts(1200.0), - batteries=set(battery.component_id for battery in bat_components), + component_ids=set(battery.component_id for battery in bat_components), request_timeout=SAFETY_TIMEOUT, ) - attrs = {"get_working_components.return_value": request.batteries} + attrs = {"get_working_components.return_value": request.component_ids} mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing.ComponentPoolStatusTracker", + "frequenz.sdk.actor.power_distributing._component_managers._battery_manager" + ".ComponentPoolStatusTracker", return_value=MagicMock(spec=ComponentPoolStatusTracker, **attrs), ) @@ -380,7 +412,7 @@ async def test_two_batteries_one_broken_one_inverters( async with PowerDistributingActor( requests_receiver=requests_channel.new_receiver(), - battery_status_sender=battery_status_channel.new_sender(), + component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), ): await requests_channel.new_sender().send(request) @@ -438,14 +470,15 @@ async def test_battery_two_inverters(self, mocker: MockerFixture) -> None: request = Request( power=Power.from_watts(1200.0), - batteries={bat_component.component_id}, + component_ids={bat_component.component_id}, request_timeout=SAFETY_TIMEOUT, ) - attrs = {"get_working_components.return_value": request.batteries} + attrs = {"get_working_components.return_value": request.component_ids} mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing.ComponentPoolStatusTracker", + "frequenz.sdk.actor.power_distributing._component_managers._battery_manager" + ".ComponentPoolStatusTracker", return_value=MagicMock(spec=ComponentPoolStatusTracker, **attrs), ) @@ -454,7 +487,7 @@ async def test_battery_two_inverters(self, mocker: MockerFixture) -> None: async with PowerDistributingActor( requests_receiver=requests_channel.new_receiver(), - battery_status_sender=battery_status_channel.new_sender(), + component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), ): await requests_channel.new_sender().send(request) @@ -516,14 +549,15 @@ async def test_two_batteries_three_inverters(self, mocker: MockerFixture) -> Non request = Request( power=Power.from_watts(1700.0), - batteries={batteries[0].component_id, batteries[1].component_id}, + component_ids={batteries[0].component_id, batteries[1].component_id}, request_timeout=SAFETY_TIMEOUT, ) - attrs = {"get_working_components.return_value": request.batteries} + attrs = {"get_working_components.return_value": request.component_ids} mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing.ComponentPoolStatusTracker", + "frequenz.sdk.actor.power_distributing._component_managers._battery_manager" + ".ComponentPoolStatusTracker", return_value=MagicMock(spec=ComponentPoolStatusTracker, **attrs), ) @@ -532,7 +566,7 @@ async def test_two_batteries_three_inverters(self, mocker: MockerFixture) -> Non async with PowerDistributingActor( requests_receiver=requests_channel.new_receiver(), - battery_status_sender=battery_status_channel.new_sender(), + component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), ): await requests_channel.new_sender().send(request) @@ -605,14 +639,15 @@ async def test_two_batteries_one_inverter_different_exclusion_bounds_2( request = Request( power=Power.from_watts(300.0), - batteries={batteries[0].component_id, batteries[1].component_id}, + component_ids={batteries[0].component_id, batteries[1].component_id}, request_timeout=SAFETY_TIMEOUT, ) - attrs = {"get_working_components.return_value": request.batteries} + attrs = {"get_working_components.return_value": request.component_ids} mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing.ComponentPoolStatusTracker", + "frequenz.sdk.actor.power_distributing._component_managers._battery_manager" + ".ComponentPoolStatusTracker", return_value=MagicMock(spec=ComponentPoolStatusTracker, **attrs), ) @@ -621,7 +656,7 @@ async def test_two_batteries_one_inverter_different_exclusion_bounds_2( async with PowerDistributingActor( requests_receiver=requests_channel.new_receiver(), - battery_status_sender=battery_status_channel.new_sender(), + component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), ): await requests_channel.new_sender().send(request) @@ -693,14 +728,15 @@ async def test_two_batteries_one_inverter_different_exclusion_bounds( request = Request( power=Power.from_watts(300.0), - batteries={batteries[0].component_id, batteries[1].component_id}, + component_ids={batteries[0].component_id, batteries[1].component_id}, request_timeout=SAFETY_TIMEOUT, ) - attrs = {"get_working_components.return_value": request.batteries} + attrs = {"get_working_components.return_value": request.component_ids} mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing.ComponentPoolStatusTracker", + "frequenz.sdk.actor.power_distributing._component_managers._battery_manager" + ".ComponentPoolStatusTracker", return_value=MagicMock(spec=ComponentPoolStatusTracker, **attrs), ) @@ -709,7 +745,7 @@ async def test_two_batteries_one_inverter_different_exclusion_bounds( async with PowerDistributingActor( requests_receiver=requests_channel.new_receiver(), - battery_status_sender=battery_status_channel.new_sender(), + component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), ): await requests_channel.new_sender().send(request) @@ -764,14 +800,15 @@ async def test_connected_but_not_requested_batteries( request = Request( power=Power.from_watts(600.0), - batteries={batteries[0].component_id}, + component_ids={batteries[0].component_id}, request_timeout=SAFETY_TIMEOUT, ) - attrs = {"get_working_components.return_value": request.batteries} + attrs = {"get_working_components.return_value": request.component_ids} mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing.ComponentPoolStatusTracker", + "frequenz.sdk.actor.power_distributing._component_managers._battery_manager" + ".ComponentPoolStatusTracker", return_value=MagicMock(spec=ComponentPoolStatusTracker, **attrs), ) @@ -780,7 +817,7 @@ async def test_connected_but_not_requested_batteries( async with PowerDistributingActor( requests_receiver=requests_channel.new_receiver(), - battery_status_sender=battery_status_channel.new_sender(), + component_pool_status_sender=battery_status_channel.new_sender(), results_sender=results_channel.new_sender(), ): await requests_channel.new_sender().send(request) @@ -827,13 +864,14 @@ async def test_battery_soc_nan(self, mocker: MockerFixture) -> None: request = Request( power=Power.from_kilowatts(1.2), - batteries={9, 19}, + component_ids={9, 19}, request_timeout=SAFETY_TIMEOUT, ) - attrs = {"get_working_components.return_value": request.batteries} + attrs = {"get_working_components.return_value": request.component_ids} mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing.ComponentPoolStatusTracker", + "frequenz.sdk.actor.power_distributing._component_managers._battery_manager" + ".ComponentPoolStatusTracker", return_value=MagicMock(spec=ComponentPoolStatusTracker, **attrs), ) @@ -842,11 +880,11 @@ async def test_battery_soc_nan(self, mocker: MockerFixture) -> None: async with PowerDistributingActor( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), - battery_status_sender=battery_status_channel.new_sender(), + component_pool_status_sender=battery_status_channel.new_sender(), ): - attrs = {"get_working_components.return_value": request.batteries} + attrs = {"get_working_components.return_value": request.component_ids} mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing" + "frequenz.sdk.actor.power_distributing._component_managers._battery_manager" ".ComponentPoolStatusTracker", return_value=MagicMock(spec=ComponentPoolStatusTracker, **attrs), ) @@ -864,7 +902,7 @@ async def test_battery_soc_nan(self, mocker: MockerFixture) -> None: result: Result = done.pop().result() assert isinstance(result, Success) - assert result.succeeded_batteries == {19} + assert result.succeeded_components == {19} assert result.succeeded_power.isclose(Power.from_watts(500.0)) assert result.excess_power.isclose(Power.from_watts(700.0)) assert result.request == request @@ -892,12 +930,13 @@ async def test_battery_capacity_nan(self, mocker: MockerFixture) -> None: request = Request( power=Power.from_kilowatts(1.2), - batteries={9, 19}, + component_ids={9, 19}, request_timeout=SAFETY_TIMEOUT, ) - attrs = {"get_working_components.return_value": request.batteries} + attrs = {"get_working_components.return_value": request.component_ids} mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing.ComponentPoolStatusTracker", + "frequenz.sdk.actor.power_distributing._component_managers._battery_manager" + ".ComponentPoolStatusTracker", return_value=MagicMock(spec=ComponentPoolStatusTracker, **attrs), ) @@ -906,7 +945,7 @@ async def test_battery_capacity_nan(self, mocker: MockerFixture) -> None: async with PowerDistributingActor( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), - battery_status_sender=battery_status_channel.new_sender(), + component_pool_status_sender=battery_status_channel.new_sender(), ): await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -921,7 +960,7 @@ async def test_battery_capacity_nan(self, mocker: MockerFixture) -> None: result: Result = done.pop().result() assert isinstance(result, Success) - assert result.succeeded_batteries == {19} + assert result.succeeded_components == {19} assert result.succeeded_power.isclose(Power.from_watts(500.0)) assert result.excess_power.isclose(Power.from_watts(700.0)) assert result.request == request @@ -964,12 +1003,13 @@ async def test_battery_power_bounds_nan(self, mocker: MockerFixture) -> None: request = Request( power=Power.from_kilowatts(1.2), - batteries={9, 19}, + component_ids={9, 19}, request_timeout=SAFETY_TIMEOUT, ) - attrs = {"get_working_components.return_value": request.batteries} + attrs = {"get_working_components.return_value": request.component_ids} mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing.ComponentPoolStatusTracker", + "frequenz.sdk.actor.power_distributing._component_managers._battery_manager" + ".ComponentPoolStatusTracker", return_value=MagicMock(spec=ComponentPoolStatusTracker, **attrs), ) @@ -978,7 +1018,7 @@ async def test_battery_power_bounds_nan(self, mocker: MockerFixture) -> None: async with PowerDistributingActor( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), - battery_status_sender=battery_status_channel.new_sender(), + component_pool_status_sender=battery_status_channel.new_sender(), ): await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -993,7 +1033,7 @@ async def test_battery_power_bounds_nan(self, mocker: MockerFixture) -> None: result: Result = done.pop().result() assert isinstance(result, Success) - assert result.succeeded_batteries == {19} + assert result.succeeded_components == {19} assert result.succeeded_power.isclose(Power.from_kilowatts(1.0)) assert result.excess_power.isclose(Power.from_watts(200.0)) assert result.request == request @@ -1013,13 +1053,14 @@ async def test_power_distributor_invalid_battery_id( results_channel = Broadcast[Result]("power_distributor results") request = Request( power=Power.from_kilowatts(1.2), - batteries={9, 100}, + component_ids={9, 100}, request_timeout=SAFETY_TIMEOUT, ) - attrs = {"get_working_components.return_value": request.batteries} + attrs = {"get_working_components.return_value": request.component_ids} mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing.ComponentPoolStatusTracker", + "frequenz.sdk.actor.power_distributing._component_managers._battery_manager" + ".ComponentPoolStatusTracker", return_value=MagicMock(spec=ComponentPoolStatusTracker, **attrs), ) mocker.patch("asyncio.sleep", new_callable=AsyncMock) @@ -1028,7 +1069,7 @@ async def test_power_distributor_invalid_battery_id( async with PowerDistributingActor( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), - battery_status_sender=battery_status_channel.new_sender(), + component_pool_status_sender=battery_status_channel.new_sender(), ): await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -1061,14 +1102,15 @@ async def test_power_distributor_one_user_adjust_power_consume( request = Request( power=Power.from_kilowatts(1.2), - batteries={9, 19}, + component_ids={9, 19}, request_timeout=SAFETY_TIMEOUT, adjust_power=False, ) - attrs = {"get_working_components.return_value": request.batteries} + attrs = {"get_working_components.return_value": request.component_ids} mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing.ComponentPoolStatusTracker", + "frequenz.sdk.actor.power_distributing._component_managers._battery_manager" + ".ComponentPoolStatusTracker", return_value=MagicMock(spec=ComponentPoolStatusTracker, **attrs), ) @@ -1078,7 +1120,7 @@ async def test_power_distributor_one_user_adjust_power_consume( async with PowerDistributingActor( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), - battery_status_sender=battery_status_channel.new_sender(), + component_pool_status_sender=battery_status_channel.new_sender(), ): await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -1113,14 +1155,15 @@ async def test_power_distributor_one_user_adjust_power_supply( request = Request( power=-Power.from_kilowatts(1.2), - batteries={9, 19}, + component_ids={9, 19}, request_timeout=SAFETY_TIMEOUT, adjust_power=False, ) - attrs = {"get_working_components.return_value": request.batteries} + attrs = {"get_working_components.return_value": request.component_ids} mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing.ComponentPoolStatusTracker", + "frequenz.sdk.actor.power_distributing._component_managers._battery_manager" + ".ComponentPoolStatusTracker", return_value=MagicMock(spec=ComponentPoolStatusTracker, **attrs), ) @@ -1130,7 +1173,7 @@ async def test_power_distributor_one_user_adjust_power_supply( async with PowerDistributingActor( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), - battery_status_sender=battery_status_channel.new_sender(), + component_pool_status_sender=battery_status_channel.new_sender(), ): await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -1165,14 +1208,15 @@ async def test_power_distributor_one_user_adjust_power_success( request = Request( power=Power.from_kilowatts(1.0), - batteries={9, 19}, + component_ids={9, 19}, request_timeout=SAFETY_TIMEOUT, adjust_power=False, ) - attrs = {"get_working_components.return_value": request.batteries} + attrs = {"get_working_components.return_value": request.component_ids} mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing.ComponentPoolStatusTracker", + "frequenz.sdk.actor.power_distributing._component_managers._battery_manager" + ".ComponentPoolStatusTracker", return_value=MagicMock(spec=ComponentPoolStatusTracker, **attrs), ) @@ -1182,7 +1226,7 @@ async def test_power_distributor_one_user_adjust_power_success( async with PowerDistributingActor( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), - battery_status_sender=battery_status_channel.new_sender(), + component_pool_status_sender=battery_status_channel.new_sender(), ): await requests_channel.new_sender().send(request) result_rx = results_channel.new_receiver() @@ -1216,7 +1260,8 @@ async def test_not_all_batteries_are_working(self, mocker: MockerFixture) -> Non attrs = {"get_working_components.return_value": batteries - {9}} mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing.ComponentPoolStatusTracker", + "frequenz.sdk.actor.power_distributing._component_managers._battery_manager" + ".ComponentPoolStatusTracker", return_value=MagicMock(spec=ComponentPoolStatusTracker, **attrs), ) @@ -1227,11 +1272,11 @@ async def test_not_all_batteries_are_working(self, mocker: MockerFixture) -> Non async with PowerDistributingActor( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), - battery_status_sender=battery_status_channel.new_sender(), + component_pool_status_sender=battery_status_channel.new_sender(), ): request = Request( power=Power.from_kilowatts(1.2), - batteries=batteries, + component_ids=batteries, request_timeout=SAFETY_TIMEOUT, ) @@ -1247,7 +1292,7 @@ async def test_not_all_batteries_are_working(self, mocker: MockerFixture) -> Non assert len(done) == 1 result = done.pop().result() assert isinstance(result, Success) - assert result.succeeded_batteries == {19} + assert result.succeeded_components == {19} assert result.excess_power.isclose(Power.from_watts(700.0)) assert result.succeeded_power.isclose(Power.from_watts(500.0)) assert result.request == request @@ -1269,7 +1314,8 @@ async def test_partial_failure_result(self, mocker: MockerFixture) -> None: attrs = {"get_working_components.return_value": batteries} mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing.ComponentPoolStatusTracker", + "frequenz.sdk.actor.power_distributing._component_managers._battery_manager" + ".ComponentPoolStatusTracker", return_value=MagicMock( spec=ComponentPoolStatusTracker, **attrs, @@ -1277,7 +1323,8 @@ async def test_partial_failure_result(self, mocker: MockerFixture) -> None: ) mocker.patch( - "frequenz.sdk.actor.power_distributing.PowerDistributingActor._parse_result", + "frequenz.sdk.actor.power_distributing._component_managers._battery_manager" + ".BatteryManager._parse_result", return_value=(failed_power, failed_batteries), ) @@ -1288,11 +1335,11 @@ async def test_partial_failure_result(self, mocker: MockerFixture) -> None: async with PowerDistributingActor( requests_receiver=requests_channel.new_receiver(), results_sender=results_channel.new_sender(), - battery_status_sender=battery_status_channel.new_sender(), + component_pool_status_sender=battery_status_channel.new_sender(), ): request = Request( power=Power.from_kilowatts(1.70), - batteries=batteries, + component_ids=batteries, request_timeout=SAFETY_TIMEOUT, ) @@ -1307,8 +1354,8 @@ async def test_partial_failure_result(self, mocker: MockerFixture) -> None: assert len(done) == 1 result = done.pop().result() assert isinstance(result, PartialFailure) - assert result.succeeded_batteries == batteries - failed_batteries - assert result.failed_batteries == failed_batteries + assert result.succeeded_components == batteries - failed_batteries + assert result.failed_components == failed_batteries assert result.succeeded_power.isclose(Power.from_watts(1000.0)) assert result.failed_power.isclose(Power.from_watts(failed_power)) assert result.excess_power.isclose(Power.from_watts(200.0)) diff --git a/tests/timeseries/_battery_pool/test_battery_pool.py b/tests/timeseries/_battery_pool/test_battery_pool.py index 084f6851f..e3f6928d6 100644 --- a/tests/timeseries/_battery_pool/test_battery_pool.py +++ b/tests/timeseries/_battery_pool/test_battery_pool.py @@ -27,7 +27,7 @@ ) from frequenz.sdk.actor import ResamplerConfig from frequenz.sdk.actor.power_distributing import ComponentPoolStatus -from frequenz.sdk.actor.power_distributing.power_distributing import ( +from frequenz.sdk.actor.power_distributing._component_managers._battery_manager import ( _get_battery_inverter_mappings, ) from frequenz.sdk.microgrid.component import ComponentCategory diff --git a/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py b/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py index f5eea70ba..ed2ddebd8 100644 --- a/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py +++ b/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py @@ -91,7 +91,7 @@ async def _patch_battery_pool_status( mock = MagicMock(spec=ComponentPoolStatusTracker) mock.get_working_components.return_value = battery_ids mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing" + "frequenz.sdk.actor.power_distributing._component_managers._battery_manager" ".ComponentPoolStatusTracker", return_value=mock, ) @@ -99,7 +99,7 @@ async def _patch_battery_pool_status( mock = MagicMock(spec=ComponentPoolStatusTracker) mock.get_working_components.side_effect = set mocker.patch( - "frequenz.sdk.actor.power_distributing.power_distributing" + "frequenz.sdk.actor.power_distributing._component_managers._battery_manager" ".ComponentPoolStatusTracker", return_value=mock, ) @@ -243,7 +243,7 @@ async def side_effect(inv_id: int, _: float) -> None: expected_result_pred=lambda result: isinstance( result, power_distributing.PartialFailure ) - and result.failed_batteries == {mocks.microgrid.battery_ids[0]}, + and result.failed_components == {mocks.microgrid.battery_ids[0]}, ) # There should be an automatic retry. @@ -299,9 +299,10 @@ async def test_case_2(self, mocks: Mocks, mocker: MockerFixture) -> None: set_power.reset_mock() await battery_pool_2.propose_power(Power.from_watts(1000.0)) - self._assert_report( - await bounds_2_rx.receive(), power=1000.0, lower=-2000.0, upper=2000.0 - ) + bounds = await bounds_2_rx.receive() + if bounds.distribution_result is None: + bounds = await bounds_2_rx.receive() + self._assert_report(bounds, power=1000.0, lower=-2000.0, upper=2000.0) assert set_power.call_count == 2 assert sorted(set_power.call_args_list) == [ mocker.call(inv_id, 500.0) @@ -358,9 +359,10 @@ async def test_case_3(self, mocks: Mocks, mocker: MockerFixture) -> None: self._assert_report( await bounds_1_rx.receive(), power=0.0, lower=-4000.0, upper=4000.0 ) - self._assert_report( - await bounds_2_rx.receive(), power=0.0, lower=-1000.0, upper=0.0 - ) + bounds = await bounds_2_rx.receive() + if bounds.distribution_result is None: + bounds = await bounds_2_rx.receive() + self._assert_report(bounds, power=0.0, lower=-1000.0, upper=0.0) assert set_power.call_count == 4 assert sorted(set_power.call_args_list) == [