From f8f3069c99eabb018d49efb6014bc4e880f0048b Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 18 Jan 2024 12:59:34 +0100 Subject: [PATCH 1/6] Retry only once on PartialFailure When the PowerManager receives a `PartialFailure`, in many cases, resending the request would fix it, because the power distributor would have new information on how to redistribute differently. But this doesn't always work, for example in single battery locations or if all batteries are in an UNCERTAIN state, in which case the power distributor has to retry with the same batteries. For this reason, if we don't have a limit on such retries from the PowerManager, we'd get into an infinite loop that doesn't have any delay inbetween retries. Signed-off-by: Sahas Subramanian --- .../_power_managing/_power_managing_actor.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py b/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py index 90405ca06..2e4532dbf 100644 --- a/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py +++ b/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py @@ -180,6 +180,7 @@ async def _send_updated_target_power( @override async def _run(self) -> None: """Run the power managing actor.""" + last_result_partial_failure = False async for selected in select( self._proposals_receiver, self._bounds_subscription_receiver, @@ -234,9 +235,17 @@ async def _run(self) -> None: self._distribution_results[ frozenset(result.request.component_ids) ] = result + if not isinstance(result, power_distributing.Success): + _logger.warning( + "PowerManagingActor: PowerDistributing failed: %s", result + ) match result: case power_distributing.PartialFailure(request): - await self._send_updated_target_power( - frozenset(request.component_ids), None, must_send=True - ) + if not last_result_partial_failure: + last_result_partial_failure = True + await self._send_updated_target_power( + frozenset(request.component_ids), None, must_send=True + ) + case power_distributing.Success(): + last_result_partial_failure = False await self._send_reports(frozenset(result.request.component_ids)) From 370db17018895c3bbbe2e9f4084b9a7247a486b0 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 18 Jan 2024 17:33:06 +0100 Subject: [PATCH 2/6] Add a `creation_time` attribute to `Proposal`s This is used to identify proposals that are too old, so that they can be removed from consideration when the PowerManager calculates target power. Signed-off-by: Sahas Subramanian --- src/frequenz/sdk/actor/_power_managing/_base_classes.py | 6 ++++++ src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py | 3 +++ tests/actor/_power_managing/test_matryoshka.py | 2 ++ 3 files changed, 11 insertions(+) diff --git a/src/frequenz/sdk/actor/_power_managing/_base_classes.py b/src/frequenz/sdk/actor/_power_managing/_base_classes.py index f5a11f1ba..86e98904c 100644 --- a/src/frequenz/sdk/actor/_power_managing/_base_classes.py +++ b/src/frequenz/sdk/actor/_power_managing/_base_classes.py @@ -207,6 +207,12 @@ class Proposal: priority: int """The priority of the actor sending the proposal.""" + creation_time: float + """The loop time when the proposal is created. + + This is used by the power manager to determine the age of the proposal. + """ + request_timeout: datetime.timedelta = datetime.timedelta(seconds=5.0) """The maximum amount of time to wait for the request to be fulfilled.""" diff --git a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py index 8c7dbe62a..7bea60fef 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py @@ -132,6 +132,7 @@ async def propose_power( bounds=bounds, component_ids=self._battery_pool._batteries, priority=self._priority, + creation_time=asyncio.get_running_loop().time(), request_timeout=request_timeout, ) ) @@ -177,6 +178,7 @@ async def propose_charge( bounds=timeseries.Bounds(None, None), component_ids=self._battery_pool._batteries, priority=self._priority, + creation_time=asyncio.get_running_loop().time(), request_timeout=request_timeout, ) ) @@ -222,6 +224,7 @@ async def propose_discharge( bounds=timeseries.Bounds(None, None), component_ids=self._battery_pool._batteries, priority=self._priority, + creation_time=asyncio.get_running_loop().time(), request_timeout=request_timeout, ) ) diff --git a/tests/actor/_power_managing/test_matryoshka.py b/tests/actor/_power_managing/test_matryoshka.py index f212ecaa1..0fe1044d0 100644 --- a/tests/actor/_power_managing/test_matryoshka.py +++ b/tests/actor/_power_managing/test_matryoshka.py @@ -3,6 +3,7 @@ """Tests for the Matryoshka power manager algorithm.""" +import asyncio from datetime import datetime, timezone from frequenz.sdk import timeseries @@ -46,6 +47,7 @@ def tgt_power( # pylint: disable=too-many-arguments None if bounds[1] is None else Power.from_watts(bounds[1]), ), priority=priority, + creation_time=asyncio.get_event_loop().time(), ), self._system_bounds, must_send, From f2e67ba98659ecd46c5ac4809bb5339841b68567 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 18 Jan 2024 17:36:47 +0100 Subject: [PATCH 3/6] Add a `drop_old_proposals` method in PowerManager algorithms In the matryoshka algorithm, this method is implemented to delete proposals that haven't been replaced/updated in the last `max_proposal_age`. Signed-off-by: Sahas Subramanian --- .../actor/_power_managing/_base_classes.py | 10 +++++++++ .../sdk/actor/_power_managing/_matryoshka.py | 22 ++++++++++++++++++- .../_power_managing/_power_managing_actor.py | 4 +++- .../actor/_power_managing/test_matryoshka.py | 4 ++-- 4 files changed, 36 insertions(+), 4 deletions(-) diff --git a/src/frequenz/sdk/actor/_power_managing/_base_classes.py b/src/frequenz/sdk/actor/_power_managing/_base_classes.py index 86e98904c..bb5afd916 100644 --- a/src/frequenz/sdk/actor/_power_managing/_base_classes.py +++ b/src/frequenz/sdk/actor/_power_managing/_base_classes.py @@ -285,3 +285,13 @@ def get_status( Returns: The bounds for the components. """ + + @abc.abstractmethod + def drop_old_proposals(self, loop_time: float) -> None: + """Drop old proposals. + + This method is called periodically by the power manager. + + Args: + loop_time: The current loop time. + """ diff --git a/src/frequenz/sdk/actor/_power_managing/_matryoshka.py b/src/frequenz/sdk/actor/_power_managing/_matryoshka.py index 7dac119a7..635be89af 100644 --- a/src/frequenz/sdk/actor/_power_managing/_matryoshka.py +++ b/src/frequenz/sdk/actor/_power_managing/_matryoshka.py @@ -21,6 +21,7 @@ import logging import typing +from datetime import timedelta from typing_extensions import override @@ -40,8 +41,9 @@ class Matryoshka(BaseAlgorithm): """The matryoshka algorithm.""" - def __init__(self) -> None: + def __init__(self, max_proposal_age: timedelta) -> None: """Create a new instance of the matryoshka algorithm.""" + self._max_proposal_age_sec = max_proposal_age.total_seconds() self._component_buckets: dict[frozenset[int], SortedSet[Proposal]] = {} self._target_power: dict[frozenset[int], Power] = {} @@ -267,3 +269,21 @@ def get_status( _exclusion_bounds=system_bounds.exclusion_bounds, distribution_result=distribution_result, ) + + @override + def drop_old_proposals(self, loop_time: float) -> None: + """Drop old proposals. + + This will remove all proposals that have not been updated for longer than + `max_proposal_age`. + + Args: + loop_time: The current loop time. + """ + for bucket in self._component_buckets.values(): + to_delete = [] + for proposal in bucket: + if (loop_time - proposal.creation_time) > self._max_proposal_age_sec: + to_delete.append(proposal) + for proposal in to_delete: + bucket.delete(proposal) diff --git a/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py b/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py index 2e4532dbf..4351efb2f 100644 --- a/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py +++ b/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py @@ -75,7 +75,9 @@ def __init__( # pylint: disable=too-many-arguments self._subscriptions: dict[frozenset[int], dict[int, Sender[_Report]]] = {} self._distribution_results: dict[frozenset[int], power_distributing.Result] = {} - self._algorithm: BaseAlgorithm = Matryoshka() + self._algorithm: BaseAlgorithm = Matryoshka( + max_proposal_age=timedelta(seconds=60.0) + ) super().__init__() diff --git a/tests/actor/_power_managing/test_matryoshka.py b/tests/actor/_power_managing/test_matryoshka.py index 0fe1044d0..a5c96bfc3 100644 --- a/tests/actor/_power_managing/test_matryoshka.py +++ b/tests/actor/_power_managing/test_matryoshka.py @@ -4,7 +4,7 @@ """Tests for the Matryoshka power manager algorithm.""" import asyncio -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from frequenz.sdk import timeseries from frequenz.sdk.actor._power_managing import Proposal @@ -23,7 +23,7 @@ def __init__( """Create a new instance of the stateful tester.""" self._call_count = 0 self._batteries = batteries - self._algorithm = Matryoshka() + self._algorithm = Matryoshka(max_proposal_age=timedelta(seconds=60.0)) self._system_bounds = system_bounds def tgt_power( # pylint: disable=too-many-arguments From 25f7f01f69b40da1de10016c819071e2049c42ba Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 18 Jan 2024 17:39:28 +0100 Subject: [PATCH 4/6] Add a timer in the PowerManager to cleanup old proposals Signed-off-by: Sahas Subramanian --- .../sdk/actor/_power_managing/_power_managing_actor.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py b/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py index 4351efb2f..f7b3e6f33 100644 --- a/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py +++ b/src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py @@ -11,7 +11,7 @@ from datetime import datetime, timedelta, timezone from frequenz.channels import Receiver, Sender -from frequenz.channels.util import select, selected_from +from frequenz.channels.util import SkipMissedAndDrift, Timer, select, selected_from from typing_extensions import override from ...timeseries._base_types import PoolType, SystemBounds @@ -183,10 +183,12 @@ async def _send_updated_target_power( async def _run(self) -> None: """Run the power managing actor.""" last_result_partial_failure = False + drop_old_proposals_timer = Timer(timedelta(seconds=1.0), SkipMissedAndDrift()) async for selected in select( self._proposals_receiver, self._bounds_subscription_receiver, self._power_distributing_results_receiver, + drop_old_proposals_timer, ): if selected_from(selected, self._proposals_receiver): proposal = selected.value @@ -251,3 +253,6 @@ async def _run(self) -> None: case power_distributing.Success(): last_result_partial_failure = False await self._send_reports(frozenset(result.request.component_ids)) + + elif selected_from(selected, drop_old_proposals_timer): + self._algorithm.drop_old_proposals(asyncio.get_event_loop().time()) From 02bab178c0150a64d666d88aaaf2f1a899b2ba58 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 18 Jan 2024 18:23:57 +0100 Subject: [PATCH 5/6] Test dropping of old proposals that have not been updated Signed-off-by: Sahas Subramanian --- .../actor/_power_managing/test_matryoshka.py | 75 ++++++++++++++++++- 1 file changed, 71 insertions(+), 4 deletions(-) diff --git a/tests/actor/_power_managing/test_matryoshka.py b/tests/actor/_power_managing/test_matryoshka.py index a5c96bfc3..de64d3aa7 100644 --- a/tests/actor/_power_managing/test_matryoshka.py +++ b/tests/actor/_power_managing/test_matryoshka.py @@ -23,8 +23,8 @@ def __init__( """Create a new instance of the stateful tester.""" self._call_count = 0 self._batteries = batteries - self._algorithm = Matryoshka(max_proposal_age=timedelta(seconds=60.0)) self._system_bounds = system_bounds + self.algorithm = Matryoshka(max_proposal_age=timedelta(seconds=60.0)) def tgt_power( # pylint: disable=too-many-arguments self, @@ -32,11 +32,12 @@ def tgt_power( # pylint: disable=too-many-arguments power: float | None, bounds: tuple[float | None, float | None], expected: float | None, + creation_time: float | None = None, must_send: bool = False, ) -> None: """Test the target power calculation.""" self._call_count += 1 - tgt_power = self._algorithm.calculate_target_power( + tgt_power = self.algorithm.calculate_target_power( self._batteries, Proposal( component_ids=self._batteries, @@ -47,7 +48,9 @@ def tgt_power( # pylint: disable=too-many-arguments None if bounds[1] is None else Power.from_watts(bounds[1]), ), priority=priority, - creation_time=asyncio.get_event_loop().time(), + creation_time=creation_time + if creation_time is not None + else asyncio.get_event_loop().time(), ), self._system_bounds, must_send, @@ -63,7 +66,7 @@ def bounds( expected_bounds: tuple[float, float], ) -> None: """Test the status report.""" - report = self._algorithm.get_status( + report = self.algorithm.get_status( self._batteries, priority, self._system_bounds, None ) if expected_power is None: @@ -352,3 +355,67 @@ async def test_matryoshka_with_excl_3() -> None: tester.tgt_power(priority=1, power=-40.0, bounds=(-100.0, -35.0), expected=-40.0) tester.bounds(priority=0, expected_power=-40.0, expected_bounds=(-100.0, -35.0)) + + +async def test_matryoshka_drop_old_proposals() -> None: + """Tests for the power managing actor. + + With inclusion bounds, and exclusion bounds -30.0 to 30.0. + """ + batteries = frozenset({2, 5}) + + system_bounds = _base_types.SystemBounds( + timestamp=datetime.now(tz=timezone.utc), + inclusion_bounds=timeseries.Bounds( + lower=Power.from_watts(-200.0), upper=Power.from_watts(200.0) + ), + exclusion_bounds=timeseries.Bounds(lower=Power.zero(), upper=Power.zero()), + ) + + tester = StatefulTester(batteries, system_bounds) + + now = asyncio.get_event_loop().time() + + tester.tgt_power(priority=3, power=22.0, bounds=(22.0, 30.0), expected=22.0) + + # When a proposal is too old and hasn't been updated, it is dropped. + tester.tgt_power( + priority=2, + power=25.0, + bounds=(25.0, 50.0), + creation_time=now - 70.0, + expected=25.0, + ) + + tester.tgt_power( + priority=1, power=20.0, bounds=(20.0, 50.0), expected=25.0, must_send=True + ) + tester.algorithm.drop_old_proposals(now) + tester.tgt_power( + priority=1, power=20.0, bounds=(20.0, 50.0), expected=22.0, must_send=True + ) + + # When overwritten by a newer proposal, that proposal is not dropped. + tester.tgt_power( + priority=2, + power=25.0, + bounds=(25.0, 50.0), + creation_time=now - 70.0, + expected=25.0, + ) + tester.tgt_power( + priority=2, + power=25.0, + bounds=(25.0, 50.0), + creation_time=now - 30.0, + expected=25.0, + must_send=True, + ) + + tester.tgt_power( + priority=1, power=20.0, bounds=(20.0, 50.0), expected=25.0, must_send=True + ) + tester.algorithm.drop_old_proposals(now) + tester.tgt_power( + priority=1, power=20.0, bounds=(20.0, 50.0), expected=25.0, must_send=True + ) From ce586b08e614577cb2d9a2e5557c0b3abcb3a609 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 18 Jan 2024 18:38:53 +0100 Subject: [PATCH 6/6] Update RELEASE_NOTES.md Signed-off-by: Sahas Subramanian --- RELEASE_NOTES.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 99f21e273..16a4e2b25 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -85,3 +85,5 @@ - The resampler now properly handles sending zero values. A bug made the resampler interpret zero values as `None` when generating new samples, so if the result of the resampling is zero, the resampler would just produce `None` values. + +- The PowerManager no longer holds on to proposals from dead actors forever. If an actor hasn't sent a new proposal in 60 seconds, the available proposal from that actor is dropped.