diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 82f612b73..d7e21773d 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -19,6 +19,8 @@ * Passing a `request_timeout` in calls to `*_pool.propose_power` is no longer supported. It may be specified at application startup, through the new optional `api_power_request_timeout` parameter in the `microgrid.initialize()` method. +- Power distribution results are no longer available through the `power_status` streams in the `*Pool`s. They can now be accessed as a stream from a separate property `power_distribution_results`, which is available from all the `*Pool`s. + ## New Features - Classes `Bounds` and `SystemBounds` now implement the `__contains__` method, allowing the use of the `in` operator to check whether a value falls within the bounds or not. diff --git a/pyproject.toml b/pyproject.toml index 07a70332b..3fdd7fdc8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ dependencies = [ # changing the version # (plugins.mkdocstrings.handlers.python.import) "frequenz-client-microgrid >= 0.4.0, < 0.5.0", - "frequenz-channels >= 1.0.0-rc1, < 2.0.0", + "frequenz-channels >= 1.1.0, < 2.0.0", "networkx >= 2.8, < 4", "numpy >= 1.26.4, < 2", "typing_extensions >= 4.6.1, < 5", diff --git a/src/frequenz/sdk/_internal/_channels.py b/src/frequenz/sdk/_internal/_channels.py index f9dd3c34c..6eadca45e 100644 --- a/src/frequenz/sdk/_internal/_channels.py +++ b/src/frequenz/sdk/_internal/_channels.py @@ -12,6 +12,7 @@ from ._asyncio import cancel_and_await T_co = typing.TypeVar("T_co", covariant=True) +U_co = typing.TypeVar("U_co", covariant=True) class ReceiverFetcher(typing.Generic[T_co], typing.Protocol): @@ -29,6 +30,36 @@ def new_receiver(self, *, limit: int = 50) -> Receiver[T_co]: """ +class MappingReceiverFetcher(typing.Generic[T_co, U_co]): + """A receiver fetcher that can manipulate receivers before returning them.""" + + def __init__( + self, + fetcher: ReceiverFetcher[T_co], + mapping_function: typing.Callable[[Receiver[T_co]], Receiver[U_co]], + ) -> None: + """Initialize this instance. + + Args: + fetcher: The underlying fetcher to get receivers from. + mapping_function: The method to be applied on new receivers before returning + them. + """ + self._fetcher = fetcher + self._mapping_function = mapping_function + + def new_receiver(self, *, limit: int = 50) -> Receiver[U_co]: + """Get a receiver from the channel. + + Args: + limit: The maximum size of the receiver. + + Returns: + A receiver instance. + """ + return self._mapping_function(self._fetcher.new_receiver(limit=limit)) + + class _Sentinel: """A sentinel to denote that no value has been received yet.""" diff --git a/src/frequenz/sdk/actor/_power_managing/_base_classes.py b/src/frequenz/sdk/actor/_power_managing/_base_classes.py index 79f2d2af8..46b328903 100644 --- a/src/frequenz/sdk/actor/_power_managing/_base_classes.py +++ b/src/frequenz/sdk/actor/_power_managing/_base_classes.py @@ -16,7 +16,6 @@ if typing.TYPE_CHECKING: from ...timeseries._base_types import SystemBounds - from .. import power_distributing @dataclasses.dataclass(frozen=True, kw_only=True) @@ -52,12 +51,6 @@ class _Report: target_power: Power | None """The currently set power for the components.""" - distribution_result: power_distributing.Result | None - """The result of the last power distribution. - - This is `None` if no power distribution has been performed yet. - """ - _inclusion_bounds: timeseries.Bounds[Power] | None """The available inclusion bounds for the components, for the actor's priority. @@ -266,7 +259,6 @@ def get_status( component_ids: frozenset[int], priority: int, system_bounds: SystemBounds, - distribution_result: power_distributing.Result | None, ) -> _Report: """Get the bounds for a set of components, for the given priority. @@ -274,7 +266,6 @@ def get_status( component_ids: The IDs of the components to get the bounds for. priority: The priority of the actor for which the bounds are requested. system_bounds: The system bounds for the components. - distribution_result: The result of the last power distribution. Returns: The bounds for the components. diff --git a/src/frequenz/sdk/actor/_power_managing/_matryoshka.py b/src/frequenz/sdk/actor/_power_managing/_matryoshka.py index ceafe8946..006d451d7 100644 --- a/src/frequenz/sdk/actor/_power_managing/_matryoshka.py +++ b/src/frequenz/sdk/actor/_power_managing/_matryoshka.py @@ -32,7 +32,6 @@ if typing.TYPE_CHECKING: from ...timeseries._base_types import SystemBounds - from .. import power_distributing _logger = logging.getLogger(__name__) @@ -225,7 +224,6 @@ def get_status( component_ids: frozenset[int], priority: int, system_bounds: SystemBounds, - distribution_result: power_distributing.Result | None, ) -> _Report: """Get the bounds for the algorithm. @@ -233,7 +231,6 @@ def get_status( component_ids: The IDs of the components to get the bounds for. priority: The priority of the actor for which the bounds are requested. system_bounds: The system bounds for the components. - distribution_result: The result of the last power distribution. Returns: The target power and the available bounds for the given components, for @@ -245,7 +242,6 @@ def get_status( target_power=target_power, _inclusion_bounds=None, _exclusion_bounds=system_bounds.exclusion_bounds, - distribution_result=distribution_result, ) lower_bound = system_bounds.inclusion_bounds.lower @@ -284,7 +280,6 @@ def get_status( lower=lower_bound, upper=upper_bound ), _exclusion_bounds=system_bounds.exclusion_bounds, - distribution_result=distribution_result, ) @override diff --git a/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py b/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py index 4848b8a69..2a230803e 100644 --- a/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py +++ b/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py @@ -91,7 +91,6 @@ def __init__( # pylint: disable=too-many-arguments self._set_op_power_subscriptions: dict[ frozenset[int], dict[int, Sender[_Report]] ] = {} - self._distribution_results: dict[frozenset[int], power_distributing.Result] = {} self._set_power_group: BaseAlgorithm = Matryoshka( max_proposal_age=timedelta(seconds=60.0) @@ -120,7 +119,6 @@ async def _send_reports(self, component_ids: frozenset[int]) -> None: component_ids, priority, bounds, - self._distribution_results.get(component_ids), ) await sender.send(status) for priority, sender in self._set_power_subscriptions.get( @@ -133,7 +131,6 @@ async def _send_reports(self, component_ids: frozenset[int]) -> None: bounds, self._set_op_power_group.get_target_power(component_ids), ), - self._distribution_results.get(component_ids), ) await sender.send(status) @@ -403,9 +400,6 @@ async def _run(self) -> None: ) result = selected.message - self._distribution_results[frozenset(result.request.component_ids)] = ( - result - ) if not isinstance(result, power_distributing.Success): _logger.warning( "PowerManagingActor: PowerDistributing failed: %s", result diff --git a/src/frequenz/sdk/microgrid/_data_pipeline.py b/src/frequenz/sdk/microgrid/_data_pipeline.py index 010a9219b..397a6a788 100644 --- a/src/frequenz/sdk/microgrid/_data_pipeline.py +++ b/src/frequenz/sdk/microgrid/_data_pipeline.py @@ -269,6 +269,9 @@ def new_ev_charger_pool( power_manager_bounds_subs_sender=( self._ev_power_wrapper.bounds_subscription_channel.new_sender() ), + power_distribution_results_fetcher=( + self._ev_power_wrapper.distribution_results_fetcher() + ), component_ids=component_ids, ) ) @@ -343,6 +346,9 @@ def new_pv_pool( power_manager_bounds_subs_sender=( self._pv_power_wrapper.bounds_subscription_channel.new_sender() ), + power_distribution_results_fetcher=( + self._pv_power_wrapper.distribution_results_fetcher() + ), component_ids=component_ids, ) @@ -420,6 +426,9 @@ def new_battery_pool( power_manager_bounds_subscription_sender=( self._battery_power_wrapper.bounds_subscription_channel.new_sender() ), + power_distribution_results_fetcher=( + self._battery_power_wrapper.distribution_results_fetcher() + ), min_update_interval=self._resampler_config.resampling_period, batteries_id=component_ids, ) diff --git a/src/frequenz/sdk/microgrid/_power_wrapper.py b/src/frequenz/sdk/microgrid/_power_wrapper.py index 480e24cfb..5b05a4584 100644 --- a/src/frequenz/sdk/microgrid/_power_wrapper.py +++ b/src/frequenz/sdk/microgrid/_power_wrapper.py @@ -16,6 +16,8 @@ # pylint: disable=cyclic-import from frequenz.client.microgrid import ComponentCategory, ComponentType +from .._internal._channels import ReceiverFetcher + # A number of imports had to be done inside functions where they are used, to break # import cycles. # @@ -178,3 +180,7 @@ async def stop(self) -> None: await self._power_distributing_actor.stop() if self._power_managing_actor: await self._power_managing_actor.stop() + + def distribution_results_fetcher(self) -> ReceiverFetcher[Result]: + """Return a fetcher for the power distribution results.""" + return self._power_distribution_results_channel diff --git a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py index f132ce335..0adabaac0 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py @@ -13,8 +13,8 @@ from collections import abc from ... import timeseries -from ..._internal._channels import ReceiverFetcher -from ...actor import _power_managing +from ..._internal._channels import MappingReceiverFetcher, ReceiverFetcher +from ...actor import _power_managing, power_distributing from ...timeseries import Energy, Percentage, Power, Sample, Temperature from .._base_types import SystemBounds from ..formula_engine import FormulaEngine @@ -384,6 +384,21 @@ def power_status(self) -> ReceiverFetcher[BatteryPoolReport]: return channel + @property + def power_distribution_results(self) -> ReceiverFetcher[power_distributing.Result]: + """Get a receiver to receive power distribution results. + + Returns: + A receiver that will stream power distribution results for the pool's set of + batteries. + """ + return MappingReceiverFetcher( + self._pool_ref_store._power_dist_results_fetcher, + lambda recv: recv.filter( + lambda x: x.request.component_ids == self._pool_ref_store._batteries + ), + ) + @property def _system_power_bounds(self) -> ReceiverFetcher[SystemBounds]: """Get receiver to receive new power bounds when they change. diff --git a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py index 553a8b335..0577c7424 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py @@ -14,9 +14,11 @@ from frequenz.client.microgrid import ComponentCategory from ..._internal._asyncio import cancel_and_await +from ..._internal._channels import ReceiverFetcher from ...actor._channel_registry import ChannelRegistry from ...actor._data_sourcing._component_metric_request import ComponentMetricRequest from ...actor._power_managing._base_classes import Proposal, ReportRequest +from ...actor.power_distributing import Result from ...actor.power_distributing._component_status import ComponentPoolStatus from ...microgrid import connection_manager from ..formula_engine._formula_engine_pool import FormulaEnginePool @@ -43,6 +45,7 @@ def __init__( # pylint: disable=too-many-arguments batteries_status_receiver: Receiver[ComponentPoolStatus], power_manager_requests_sender: Sender[Proposal], power_manager_bounds_subscription_sender: Sender[ReportRequest], + power_distribution_results_fetcher: ReceiverFetcher[Result], min_update_interval: timedelta, batteries_id: Set[int] | None = None, ) -> None: @@ -63,6 +66,8 @@ def __init__( # pylint: disable=too-many-arguments requests to the power managing actor. power_manager_bounds_subscription_sender: A Channel sender for sending power bounds requests to the power managing actor. + power_distribution_results_fetcher: A ReceiverFetcher for the results from + the power distributing actor. min_update_interval: Some metrics in BatteryPool are send only when they change. For these metrics min_update_interval is the minimum time interval between the following messages. @@ -105,6 +110,9 @@ def __init__( # pylint: disable=too-many-arguments self._namespace: str = f"battery-pool-{self._batteries}-{uuid.uuid4()}" self._power_distributing_namespace: str = f"power-distributor-{self._namespace}" self._channel_registry: ChannelRegistry = channel_registry + self._power_dist_results_fetcher: ReceiverFetcher[Result] = ( + power_distribution_results_fetcher + ) self._formula_pool: FormulaEnginePool = FormulaEnginePool( self._namespace, self._channel_registry, diff --git a/src/frequenz/sdk/timeseries/battery_pool/_result_types.py b/src/frequenz/sdk/timeseries/battery_pool/_result_types.py index 0e51c65d5..1a318175e 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_result_types.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_result_types.py @@ -6,7 +6,6 @@ import abc import typing -from ...actor import power_distributing from .._base_types import Bounds from .._quantities import Power @@ -20,13 +19,6 @@ class BatteryPoolReport(typing.Protocol): def target_power(self) -> Power | None: """The currently set power for the batteries.""" - @property - def distribution_result(self) -> power_distributing.Result | None: - """The result of the last power distribution. - - This is `None` if no power distribution has been performed yet. - """ - @property def bounds(self) -> Bounds[Power] | None: """The usable bounds for the batteries. diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py index 1b44f4973..a22bdb0b9 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py @@ -8,8 +8,8 @@ import uuid from collections import abc -from ..._internal._channels import ReceiverFetcher -from ...actor import _power_managing +from ..._internal._channels import MappingReceiverFetcher, ReceiverFetcher +from ...actor import _power_managing, power_distributing from ...timeseries import Bounds from .._base_types import SystemBounds from .._quantities import Current, Power @@ -227,6 +227,21 @@ def power_status(self) -> ReceiverFetcher[EVChargerPoolReport]: return channel + @property + def power_distribution_results(self) -> ReceiverFetcher[power_distributing.Result]: + """Get a receiver to receive power distribution results. + + Returns: + A receiver that will stream power distribution results for the pool's set of + EV chargers. + """ + return MappingReceiverFetcher( + self._pool_ref_store.power_distribution_results_fetcher, + lambda recv: recv.filter( + lambda x: x.request.component_ids == self._pool_ref_store.component_ids + ), + ) + async def stop(self) -> None: """Stop all tasks and channels owned by the EVChargerPool.""" await self._pool_ref_store.stop() diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py index 8dec056de..edfb03a91 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py @@ -3,7 +3,6 @@ """Manages shared state/tasks for a set of EV chargers.""" - import asyncio import uuid from collections import abc @@ -11,9 +10,10 @@ from frequenz.channels import Broadcast, Receiver, Sender from frequenz.client.microgrid import ComponentCategory +from ..._internal._channels import ReceiverFetcher from ...actor import ChannelRegistry, ComponentMetricRequest from ...actor._power_managing._base_classes import Proposal, ReportRequest -from ...actor.power_distributing import ComponentPoolStatus +from ...actor.power_distributing import ComponentPoolStatus, Result from ...microgrid import connection_manager from .._base_types import SystemBounds from ..formula_engine._formula_engine_pool import FormulaEnginePool @@ -40,6 +40,7 @@ def __init__( # pylint: disable=too-many-arguments status_receiver: Receiver[ComponentPoolStatus], power_manager_requests_sender: Sender[Proposal], power_manager_bounds_subs_sender: Sender[ReportRequest], + power_distribution_results_fetcher: ReceiverFetcher[Result], component_ids: abc.Set[int] | None = None, ): """Create an instance of the class. @@ -55,6 +56,8 @@ def __init__( # pylint: disable=too-many-arguments requests to the power managing actor. power_manager_bounds_subs_sender: A Channel sender for sending power bounds subscription requests to the power managing actor. + power_distribution_results_fetcher: A ReceiverFetcher for the results from + the power distributing actor. component_ids: An optional list of component_ids belonging to this pool. If not specified, IDs of all EV Chargers in the microgrid will be fetched from the component graph. @@ -64,6 +67,7 @@ def __init__( # pylint: disable=too-many-arguments self.status_receiver = status_receiver self.power_manager_requests_sender = power_manager_requests_sender self.power_manager_bounds_subs_sender = power_manager_bounds_subs_sender + self.power_distribution_results_fetcher = power_distribution_results_fetcher if component_ids is not None: self.component_ids: frozenset[int] = frozenset(component_ids) diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_result_types.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_result_types.py index 1512c81da..ff9a5ee3e 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_result_types.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_result_types.py @@ -5,7 +5,6 @@ import typing -from ...actor import power_distributing from .._base_types import Bounds from .._quantities import Power @@ -17,13 +16,6 @@ class EVChargerPoolReport(typing.Protocol): def target_power(self) -> Power | None: """The currently set power for the EV chargers.""" - @property - def distribution_result(self) -> power_distributing.Result | None: - """The result of the last power distribution. - - This is `None` if no power distribution has been performed yet. - """ - @property def bounds(self) -> Bounds[Power] | None: """The usable bounds for the EV chargers. diff --git a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py index 7e7a24745..750f62128 100644 --- a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py +++ b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py @@ -7,8 +7,8 @@ import uuid from collections import abc -from ..._internal._channels import ReceiverFetcher -from ...actor import _power_managing +from ..._internal._channels import MappingReceiverFetcher, ReceiverFetcher +from ...actor import _power_managing, power_distributing from ...timeseries import Bounds from .._base_types import SystemBounds from .._quantities import Power @@ -186,6 +186,21 @@ def power_status(self) -> ReceiverFetcher[PVPoolReport]: return channel + @property + def power_distribution_results(self) -> ReceiverFetcher[power_distributing.Result]: + """Get a receiver to receive power distribution results. + + Returns: + A receiver that will stream power distribution results for the pool's set of + PV inverters. + """ + return MappingReceiverFetcher( + self._pool_ref_store.power_distribution_results_fetcher, + lambda recv: recv.filter( + lambda x: x.request.component_ids == self._pool_ref_store.component_ids + ), + ) + async def stop(self) -> None: """Stop all tasks and channels owned by the PVPool.""" await self._pool_ref_store.stop() diff --git a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py index c2dd776e5..3d00c8c74 100644 --- a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py +++ b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py @@ -11,9 +11,10 @@ from frequenz.channels import Broadcast, Receiver, Sender from frequenz.client.microgrid import ComponentCategory, InverterType +from ..._internal._channels import ReceiverFetcher from ...actor import ChannelRegistry, ComponentMetricRequest from ...actor._power_managing._base_classes import Proposal, ReportRequest -from ...actor.power_distributing import ComponentPoolStatus +from ...actor.power_distributing import ComponentPoolStatus, Result from ...microgrid import connection_manager from .._base_types import SystemBounds from ..formula_engine._formula_engine_pool import FormulaEnginePool @@ -40,6 +41,7 @@ def __init__( # pylint: disable=too-many-arguments status_receiver: Receiver[ComponentPoolStatus], power_manager_requests_sender: Sender[Proposal], power_manager_bounds_subs_sender: Sender[ReportRequest], + power_distribution_results_fetcher: ReceiverFetcher[Result], component_ids: abc.Set[int] | None = None, ): """Initialize this instance. @@ -55,6 +57,8 @@ def __init__( # pylint: disable=too-many-arguments requests to the power managing actor. power_manager_bounds_subs_sender: A Channel sender for sending power bounds subscription requests to the power managing actor. + power_distribution_results_fetcher: A ReceiverFetcher for the results from + the power distributing actor. component_ids: An optional list of component_ids belonging to this pool. If not specified, IDs of all PV inverters in the microgrid will be fetched from the component graph. @@ -64,6 +68,7 @@ def __init__( # pylint: disable=too-many-arguments self.status_receiver = status_receiver self.power_manager_requests_sender = power_manager_requests_sender self.power_manager_bounds_subs_sender = power_manager_bounds_subs_sender + self.power_distribution_results_fetcher = power_distribution_results_fetcher if component_ids is not None: self.component_ids: frozenset[int] = frozenset(component_ids) diff --git a/src/frequenz/sdk/timeseries/pv_pool/_result_types.py b/src/frequenz/sdk/timeseries/pv_pool/_result_types.py index ef0a3663c..ad6fb59fd 100644 --- a/src/frequenz/sdk/timeseries/pv_pool/_result_types.py +++ b/src/frequenz/sdk/timeseries/pv_pool/_result_types.py @@ -5,7 +5,6 @@ import typing -from ...actor import power_distributing from .._base_types import Bounds from .._quantities import Power @@ -17,13 +16,6 @@ class PVPoolReport(typing.Protocol): def target_power(self) -> Power | None: """The currently set power for the PV inverters.""" - @property - def distribution_result(self) -> power_distributing.Result | None: - """The result of the last power distribution. - - This is `None` if no power distribution has been performed yet. - """ - @property def bounds(self) -> Bounds[Power] | None: """The usable bounds for the PV inverters. diff --git a/tests/actor/_power_managing/test_matryoshka.py b/tests/actor/_power_managing/test_matryoshka.py index c457b39f5..53bf41ac7 100644 --- a/tests/actor/_power_managing/test_matryoshka.py +++ b/tests/actor/_power_managing/test_matryoshka.py @@ -70,7 +70,7 @@ def bounds( ) -> None: """Test the status report.""" report = self.algorithm.get_status( - self._batteries, priority, self._system_bounds, None + self._batteries, priority, self._system_bounds ) if expected_power is None: assert report.target_power is None diff --git a/tests/actor/_power_managing/test_report.py b/tests/actor/_power_managing/test_report.py index cca841727..c19c8e3bf 100644 --- a/tests/actor/_power_managing/test_report.py +++ b/tests/actor/_power_managing/test_report.py @@ -34,7 +34,6 @@ def __init__( if exclusion_bounds is not None else None ), - distribution_result=None, ) def case( diff --git a/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py b/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py index 27bffc6c0..0f5fd29d2 100644 --- a/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py +++ b/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py @@ -14,6 +14,7 @@ from pytest_mock import MockerFixture from frequenz.sdk import microgrid, timeseries +from frequenz.sdk._internal._channels import LatestValueCache from frequenz.sdk.actor import ResamplerConfig, power_distributing from frequenz.sdk.actor.power_distributing import ComponentPoolStatus from frequenz.sdk.actor.power_distributing._component_pool_status_tracker import ( @@ -161,6 +162,7 @@ def _assert_report( # pylint: disable=too-many-arguments power: float | None, lower: float, upper: float, + dist_result: power_distributing.Result | None = None, expected_result_pred: ( typing.Callable[[power_distributing.Result], bool] | None ) = None, @@ -172,8 +174,8 @@ def _assert_report( # pylint: disable=too-many-arguments assert report.bounds.lower == Power.from_watts(lower) assert report.bounds.upper == Power.from_watts(upper) if expected_result_pred is not None: - assert report.distribution_result is not None - assert expected_result_pred(report.distribution_result) + assert dist_result is not None + assert expected_result_pred(dist_result) async def test_case_1( self, @@ -196,6 +198,9 @@ async def test_case_1( battery_pool = microgrid.new_battery_pool(priority=5) bounds_rx = battery_pool.power_status.new_receiver() + latest_dist_result = LatestValueCache( + battery_pool.power_distribution_results.new_receiver() + ) self._assert_report( await bounds_rx.receive(), power=None, lower=-4000.0, upper=4000.0 @@ -217,6 +222,7 @@ async def test_case_1( power=1000.0, lower=-4000.0, upper=4000.0, + dist_result=latest_dist_result.get(), expected_result_pred=lambda result: isinstance( result, power_distributing.Success ), @@ -236,6 +242,7 @@ async def side_effect(inv_id: int, _: float) -> None: power=100.0, lower=-4000.0, upper=4000.0, + dist_result=latest_dist_result.get(), expected_result_pred=lambda result: isinstance( result, power_distributing.Success ), @@ -250,6 +257,7 @@ async def side_effect(inv_id: int, _: float) -> None: power=100.0, lower=-4000.0, upper=4000.0, + dist_result=latest_dist_result.get(), expected_result_pred=lambda result: isinstance( result, power_distributing.PartialFailure ) @@ -267,6 +275,7 @@ async def side_effect(inv_id: int, _: float) -> None: power=100.0, lower=-4000.0, upper=4000.0, + dist_result=latest_dist_result.get(), expected_result_pred=lambda result: isinstance( result, power_distributing.Success ), @@ -294,6 +303,9 @@ async def test_case_2(self, mocks: Mocks, mocker: MockerFixture) -> None: priority=5, component_ids=set(mocks.microgrid.battery_ids[2:]) ) bounds_2_rx = battery_pool_2.power_status.new_receiver() + latest_dist_result_2 = LatestValueCache( + battery_pool_2.power_distribution_results.new_receiver() + ) self._assert_report( await bounds_1_rx.receive(), power=None, lower=-2000.0, upper=2000.0 @@ -313,8 +325,9 @@ async def test_case_2(self, mocks: Mocks, mocker: MockerFixture) -> None: set_power.reset_mock() await battery_pool_2.propose_power(Power.from_watts(1000.0)) + bounds = await bounds_2_rx.receive() - if bounds.distribution_result is None: + if not latest_dist_result_2.has_value(): bounds = await bounds_2_rx.receive() self._assert_report(bounds, power=1000.0, lower=-2000.0, upper=2000.0) assert set_power.call_count == 2 @@ -341,6 +354,9 @@ async def test_case_3(self, mocks: Mocks, mocker: MockerFixture) -> None: bounds_1_rx = battery_pool_1.power_status.new_receiver() battery_pool_2 = microgrid.new_battery_pool(priority=1) bounds_2_rx = battery_pool_2.power_status.new_receiver() + latest_dist_result_2 = LatestValueCache( + battery_pool_2.power_distribution_results.new_receiver() + ) self._assert_report( await bounds_1_rx.receive(), power=None, lower=-4000.0, upper=4000.0 @@ -374,7 +390,7 @@ async def test_case_3(self, mocks: Mocks, mocker: MockerFixture) -> None: await bounds_1_rx.receive(), power=0.0, lower=-4000.0, upper=4000.0 ) bounds = await bounds_2_rx.receive() - if bounds.distribution_result is None: + if not latest_dist_result_2.has_value(): bounds = await bounds_2_rx.receive() self._assert_report(bounds, power=0.0, lower=-1000.0, upper=0.0) @@ -398,6 +414,9 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None: battery_pool = microgrid.new_battery_pool(priority=5) bounds_rx = battery_pool.power_status.new_receiver() + latest_dist_result = LatestValueCache( + battery_pool.power_distribution_results.new_receiver() + ) self._assert_report( await bounds_rx.receive(), power=None, lower=-4000.0, upper=4000.0 @@ -418,6 +437,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None: power=1000.0, lower=-4000.0, upper=4000.0, + dist_result=latest_dist_result.get(), expected_result_pred=lambda result: isinstance( result, power_distributing.Success ), @@ -442,6 +462,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None: power=400.0, lower=-4000.0, upper=4000.0, + dist_result=latest_dist_result.get(), expected_result_pred=lambda result: isinstance( result, power_distributing.Success ), @@ -464,6 +485,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None: power=0.0, lower=-4000.0, upper=4000.0, + dist_result=latest_dist_result.get(), expected_result_pred=lambda result: isinstance( result, power_distributing.Success ), @@ -488,12 +510,13 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None: power=-400.0, lower=-4000.0, upper=4000.0, + dist_result=latest_dist_result.get(), expected_result_pred=lambda result: isinstance( result, power_distributing.Success ), ) - async def test_case_5( # pylint: disable=too-many-statements + async def test_case_5( # pylint: disable=too-many-statements,too-many-locals self, mocks: Mocks, mocker: MockerFixture, @@ -525,6 +548,10 @@ async def test_case_5( # pylint: disable=too-many-statements battery_pool_1 = microgrid.new_battery_pool(priority=1) bounds_1_rx = battery_pool_1.power_status.new_receiver() + latest_dist_result_4 = LatestValueCache( + battery_pool_4.power_distribution_results.new_receiver() + ) + self._assert_report( await bounds_4_rx.receive(), power=None, lower=-4000.0, upper=4000.0 ) @@ -587,12 +614,13 @@ async def test_case_5( # pylint: disable=too-many-statements await bounds_1_rx.receive() await bounds_2_rx.receive() await bounds_3_rx.receive() - bounds = await bounds_4_rx.receive() - if bounds.distribution_result is None or not isinstance( - bounds.distribution_result, power_distributing.Success + await bounds_4_rx.receive() + dist_result = latest_dist_result_4.get() + if dist_result is None or not isinstance( + dist_result, power_distributing.Success ): continue - if bounds.distribution_result.succeeded_power == Power.from_watts(720.0): + if dist_result.succeeded_power == Power.from_watts(720.0): break assert set_power.call_count == 4 @@ -626,12 +654,13 @@ async def test_case_5( # pylint: disable=too-many-statements await bounds_1_rx.receive() await bounds_2_rx.receive() await bounds_3_rx.receive() - bounds = await bounds_4_rx.receive() - if bounds.distribution_result is None or not isinstance( - bounds.distribution_result, power_distributing.Success + await bounds_4_rx.receive() + dist_result = latest_dist_result_4.get() + if dist_result is None or not isinstance( + dist_result, power_distributing.Success ): continue - if bounds.distribution_result.succeeded_power == Power.from_watts(-280.0): + if dist_result.succeeded_power == Power.from_watts(-280.0): break assert set_power.call_count == 4 diff --git a/tests/timeseries/_ev_charger_pool/test_ev_charger_pool_control_methods.py b/tests/timeseries/_ev_charger_pool/test_ev_charger_pool_control_methods.py index 0fc5bf2ba..2e53c259e 100644 --- a/tests/timeseries/_ev_charger_pool/test_ev_charger_pool_control_methods.py +++ b/tests/timeseries/_ev_charger_pool/test_ev_charger_pool_control_methods.py @@ -163,6 +163,7 @@ def _assert_report( # pylint: disable=too-many-arguments power: float | None, lower: float, upper: float, + dist_result: power_distributing.Result | None = None, expected_result_pred: ( typing.Callable[[power_distributing.Result], bool] | None ) = None, @@ -174,8 +175,8 @@ def _assert_report( # pylint: disable=too-many-arguments assert report.bounds.lower == Power.from_watts(lower) assert report.bounds.upper == Power.from_watts(upper) if expected_result_pred is not None: - assert report.distribution_result is not None - assert expected_result_pred(report.distribution_result) + assert dist_result is not None + assert expected_result_pred(dist_result) async def _get_bounds_receiver( self, ev_charger_pool: EVChargerPool diff --git a/tests/timeseries/_pv_pool/test_pv_pool_control_methods.py b/tests/timeseries/_pv_pool/test_pv_pool_control_methods.py index 6c898bbcb..9e61009cd 100644 --- a/tests/timeseries/_pv_pool/test_pv_pool_control_methods.py +++ b/tests/timeseries/_pv_pool/test_pv_pool_control_methods.py @@ -92,6 +92,7 @@ def _assert_report( # pylint: disable=too-many-arguments power: float | None, lower: float, upper: float, + dist_result: power_distributing.Result | None = None, expected_result_pred: ( typing.Callable[[power_distributing.Result], bool] | None ) = None, @@ -103,8 +104,8 @@ def _assert_report( # pylint: disable=too-many-arguments assert report.bounds.lower == Power.from_watts(lower) assert report.bounds.upper == Power.from_watts(upper) if expected_result_pred is not None: - assert report.distribution_result is not None - assert expected_result_pred(report.distribution_result) + assert dist_result is not None + assert expected_result_pred(dist_result) async def _recv_reports_until( self,