diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 99f21e273..16a4e2b25 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -85,3 +85,5 @@ - The resampler now properly handles sending zero values. A bug made the resampler interpret zero values as `None` when generating new samples, so if the result of the resampling is zero, the resampler would just produce `None` values. + +- The PowerManager no longer holds on to proposals from dead actors forever. If an actor hasn't sent a new proposal in 60 seconds, the available proposal from that actor is dropped. diff --git a/src/frequenz/sdk/actor/_power_managing/_base_classes.py b/src/frequenz/sdk/actor/_power_managing/_base_classes.py index f5a11f1ba..bb5afd916 100644 --- a/src/frequenz/sdk/actor/_power_managing/_base_classes.py +++ b/src/frequenz/sdk/actor/_power_managing/_base_classes.py @@ -207,6 +207,12 @@ class Proposal: priority: int """The priority of the actor sending the proposal.""" + creation_time: float + """The loop time when the proposal is created. + + This is used by the power manager to determine the age of the proposal. + """ + request_timeout: datetime.timedelta = datetime.timedelta(seconds=5.0) """The maximum amount of time to wait for the request to be fulfilled.""" @@ -279,3 +285,13 @@ def get_status( Returns: The bounds for the components. """ + + @abc.abstractmethod + def drop_old_proposals(self, loop_time: float) -> None: + """Drop old proposals. + + This method is called periodically by the power manager. + + Args: + loop_time: The current loop time. + """ diff --git a/src/frequenz/sdk/actor/_power_managing/_matryoshka.py b/src/frequenz/sdk/actor/_power_managing/_matryoshka.py index 7dac119a7..635be89af 100644 --- a/src/frequenz/sdk/actor/_power_managing/_matryoshka.py +++ b/src/frequenz/sdk/actor/_power_managing/_matryoshka.py @@ -21,6 +21,7 @@ import logging import typing +from datetime import timedelta from typing_extensions import override @@ -40,8 +41,9 @@ class Matryoshka(BaseAlgorithm): """The matryoshka algorithm.""" - def __init__(self) -> None: + def __init__(self, max_proposal_age: timedelta) -> None: """Create a new instance of the matryoshka algorithm.""" + self._max_proposal_age_sec = max_proposal_age.total_seconds() self._component_buckets: dict[frozenset[int], SortedSet[Proposal]] = {} self._target_power: dict[frozenset[int], Power] = {} @@ -267,3 +269,21 @@ def get_status( _exclusion_bounds=system_bounds.exclusion_bounds, distribution_result=distribution_result, ) + + @override + def drop_old_proposals(self, loop_time: float) -> None: + """Drop old proposals. + + This will remove all proposals that have not been updated for longer than + `max_proposal_age`. + + Args: + loop_time: The current loop time. + """ + for bucket in self._component_buckets.values(): + to_delete = [] + for proposal in bucket: + if (loop_time - proposal.creation_time) > self._max_proposal_age_sec: + to_delete.append(proposal) + for proposal in to_delete: + bucket.delete(proposal) diff --git a/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py b/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py index 90405ca06..f7b3e6f33 100644 --- a/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py +++ b/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py @@ -11,7 +11,7 @@ from datetime import datetime, timedelta, timezone from frequenz.channels import Receiver, Sender -from frequenz.channels.util import select, selected_from +from frequenz.channels.util import SkipMissedAndDrift, Timer, select, selected_from from typing_extensions import override from ...timeseries._base_types import PoolType, SystemBounds @@ -75,7 +75,9 @@ def __init__( # pylint: disable=too-many-arguments self._subscriptions: dict[frozenset[int], dict[int, Sender[_Report]]] = {} self._distribution_results: dict[frozenset[int], power_distributing.Result] = {} - self._algorithm: BaseAlgorithm = Matryoshka() + self._algorithm: BaseAlgorithm = Matryoshka( + max_proposal_age=timedelta(seconds=60.0) + ) super().__init__() @@ -180,10 +182,13 @@ async def _send_updated_target_power( @override async def _run(self) -> None: """Run the power managing actor.""" + last_result_partial_failure = False + drop_old_proposals_timer = Timer(timedelta(seconds=1.0), SkipMissedAndDrift()) async for selected in select( self._proposals_receiver, self._bounds_subscription_receiver, self._power_distributing_results_receiver, + drop_old_proposals_timer, ): if selected_from(selected, self._proposals_receiver): proposal = selected.value @@ -234,9 +239,20 @@ async def _run(self) -> None: self._distribution_results[ frozenset(result.request.component_ids) ] = result + if not isinstance(result, power_distributing.Success): + _logger.warning( + "PowerManagingActor: PowerDistributing failed: %s", result + ) match result: case power_distributing.PartialFailure(request): - await self._send_updated_target_power( - frozenset(request.component_ids), None, must_send=True - ) + if not last_result_partial_failure: + last_result_partial_failure = True + await self._send_updated_target_power( + frozenset(request.component_ids), None, must_send=True + ) + case power_distributing.Success(): + last_result_partial_failure = False await self._send_reports(frozenset(result.request.component_ids)) + + elif selected_from(selected, drop_old_proposals_timer): + self._algorithm.drop_old_proposals(asyncio.get_event_loop().time()) diff --git a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py index 8c7dbe62a..7bea60fef 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py @@ -132,6 +132,7 @@ async def propose_power( bounds=bounds, component_ids=self._battery_pool._batteries, priority=self._priority, + creation_time=asyncio.get_running_loop().time(), request_timeout=request_timeout, ) ) @@ -177,6 +178,7 @@ async def propose_charge( bounds=timeseries.Bounds(None, None), component_ids=self._battery_pool._batteries, priority=self._priority, + creation_time=asyncio.get_running_loop().time(), request_timeout=request_timeout, ) ) @@ -222,6 +224,7 @@ async def propose_discharge( bounds=timeseries.Bounds(None, None), component_ids=self._battery_pool._batteries, priority=self._priority, + creation_time=asyncio.get_running_loop().time(), request_timeout=request_timeout, ) ) diff --git a/tests/actor/_power_managing/test_matryoshka.py b/tests/actor/_power_managing/test_matryoshka.py index f212ecaa1..de64d3aa7 100644 --- a/tests/actor/_power_managing/test_matryoshka.py +++ b/tests/actor/_power_managing/test_matryoshka.py @@ -3,7 +3,8 @@ """Tests for the Matryoshka power manager algorithm.""" -from datetime import datetime, timezone +import asyncio +from datetime import datetime, timedelta, timezone from frequenz.sdk import timeseries from frequenz.sdk.actor._power_managing import Proposal @@ -22,8 +23,8 @@ def __init__( """Create a new instance of the stateful tester.""" self._call_count = 0 self._batteries = batteries - self._algorithm = Matryoshka() self._system_bounds = system_bounds + self.algorithm = Matryoshka(max_proposal_age=timedelta(seconds=60.0)) def tgt_power( # pylint: disable=too-many-arguments self, @@ -31,11 +32,12 @@ def tgt_power( # pylint: disable=too-many-arguments power: float | None, bounds: tuple[float | None, float | None], expected: float | None, + creation_time: float | None = None, must_send: bool = False, ) -> None: """Test the target power calculation.""" self._call_count += 1 - tgt_power = self._algorithm.calculate_target_power( + tgt_power = self.algorithm.calculate_target_power( self._batteries, Proposal( component_ids=self._batteries, @@ -46,6 +48,9 @@ def tgt_power( # pylint: disable=too-many-arguments None if bounds[1] is None else Power.from_watts(bounds[1]), ), priority=priority, + creation_time=creation_time + if creation_time is not None + else asyncio.get_event_loop().time(), ), self._system_bounds, must_send, @@ -61,7 +66,7 @@ def bounds( expected_bounds: tuple[float, float], ) -> None: """Test the status report.""" - report = self._algorithm.get_status( + report = self.algorithm.get_status( self._batteries, priority, self._system_bounds, None ) if expected_power is None: @@ -350,3 +355,67 @@ async def test_matryoshka_with_excl_3() -> None: tester.tgt_power(priority=1, power=-40.0, bounds=(-100.0, -35.0), expected=-40.0) tester.bounds(priority=0, expected_power=-40.0, expected_bounds=(-100.0, -35.0)) + + +async def test_matryoshka_drop_old_proposals() -> None: + """Tests for the power managing actor. + + With inclusion bounds, and exclusion bounds -30.0 to 30.0. + """ + batteries = frozenset({2, 5}) + + system_bounds = _base_types.SystemBounds( + timestamp=datetime.now(tz=timezone.utc), + inclusion_bounds=timeseries.Bounds( + lower=Power.from_watts(-200.0), upper=Power.from_watts(200.0) + ), + exclusion_bounds=timeseries.Bounds(lower=Power.zero(), upper=Power.zero()), + ) + + tester = StatefulTester(batteries, system_bounds) + + now = asyncio.get_event_loop().time() + + tester.tgt_power(priority=3, power=22.0, bounds=(22.0, 30.0), expected=22.0) + + # When a proposal is too old and hasn't been updated, it is dropped. + tester.tgt_power( + priority=2, + power=25.0, + bounds=(25.0, 50.0), + creation_time=now - 70.0, + expected=25.0, + ) + + tester.tgt_power( + priority=1, power=20.0, bounds=(20.0, 50.0), expected=25.0, must_send=True + ) + tester.algorithm.drop_old_proposals(now) + tester.tgt_power( + priority=1, power=20.0, bounds=(20.0, 50.0), expected=22.0, must_send=True + ) + + # When overwritten by a newer proposal, that proposal is not dropped. + tester.tgt_power( + priority=2, + power=25.0, + bounds=(25.0, 50.0), + creation_time=now - 70.0, + expected=25.0, + ) + tester.tgt_power( + priority=2, + power=25.0, + bounds=(25.0, 50.0), + creation_time=now - 30.0, + expected=25.0, + must_send=True, + ) + + tester.tgt_power( + priority=1, power=20.0, bounds=(20.0, 50.0), expected=25.0, must_send=True + ) + tester.algorithm.drop_old_proposals(now) + tester.tgt_power( + priority=1, power=20.0, bounds=(20.0, 50.0), expected=25.0, must_send=True + )