Skip to content

Commit 1595a5f

Browse files
authored
Drop old propsals from the PowerManager (#845)
The PowerManager doesn't have a way to identify if an actor that sent a proposal is still active. It needs to know that to know when to drop proposals from a given sender. With this PR, the resending of proposals by actors also functions as a heart-beat between the actors and the power manager. If an actor hasn't sent a new proposal in 60 seconds, the latest available proposal from that actor is also removed from the power manager.
2 parents a29e600 + ce586b0 commit 1595a5f

File tree

6 files changed

+136
-10
lines changed

6 files changed

+136
-10
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,3 +85,5 @@
8585
- The resampler now properly handles sending zero values.
8686

8787
A bug made the resampler interpret zero values as `None` when generating new samples, so if the result of the resampling is zero, the resampler would just produce `None` values.
88+
89+
- The PowerManager no longer holds on to proposals from dead actors forever. If an actor hasn't sent a new proposal in 60 seconds, the available proposal from that actor is dropped.

src/frequenz/sdk/actor/_power_managing/_base_classes.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,12 @@ class Proposal:
207207
priority: int
208208
"""The priority of the actor sending the proposal."""
209209

210+
creation_time: float
211+
"""The loop time when the proposal is created.
212+
213+
This is used by the power manager to determine the age of the proposal.
214+
"""
215+
210216
request_timeout: datetime.timedelta = datetime.timedelta(seconds=5.0)
211217
"""The maximum amount of time to wait for the request to be fulfilled."""
212218

@@ -279,3 +285,13 @@ def get_status(
279285
Returns:
280286
The bounds for the components.
281287
"""
288+
289+
@abc.abstractmethod
290+
def drop_old_proposals(self, loop_time: float) -> None:
291+
"""Drop old proposals.
292+
293+
This method is called periodically by the power manager.
294+
295+
Args:
296+
loop_time: The current loop time.
297+
"""

src/frequenz/sdk/actor/_power_managing/_matryoshka.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import logging
2323
import typing
24+
from datetime import timedelta
2425

2526
from typing_extensions import override
2627

@@ -40,8 +41,9 @@
4041
class Matryoshka(BaseAlgorithm):
4142
"""The matryoshka algorithm."""
4243

43-
def __init__(self) -> None:
44+
def __init__(self, max_proposal_age: timedelta) -> None:
4445
"""Create a new instance of the matryoshka algorithm."""
46+
self._max_proposal_age_sec = max_proposal_age.total_seconds()
4547
self._component_buckets: dict[frozenset[int], SortedSet[Proposal]] = {}
4648
self._target_power: dict[frozenset[int], Power] = {}
4749

@@ -267,3 +269,21 @@ def get_status(
267269
_exclusion_bounds=system_bounds.exclusion_bounds,
268270
distribution_result=distribution_result,
269271
)
272+
273+
@override
274+
def drop_old_proposals(self, loop_time: float) -> None:
275+
"""Drop old proposals.
276+
277+
This will remove all proposals that have not been updated for longer than
278+
`max_proposal_age`.
279+
280+
Args:
281+
loop_time: The current loop time.
282+
"""
283+
for bucket in self._component_buckets.values():
284+
to_delete = []
285+
for proposal in bucket:
286+
if (loop_time - proposal.creation_time) > self._max_proposal_age_sec:
287+
to_delete.append(proposal)
288+
for proposal in to_delete:
289+
bucket.delete(proposal)

src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from datetime import datetime, timedelta, timezone
1212

1313
from frequenz.channels import Receiver, Sender
14-
from frequenz.channels.util import select, selected_from
14+
from frequenz.channels.util import SkipMissedAndDrift, Timer, select, selected_from
1515
from typing_extensions import override
1616

1717
from ...timeseries._base_types import PoolType, SystemBounds
@@ -75,7 +75,9 @@ def __init__( # pylint: disable=too-many-arguments
7575
self._subscriptions: dict[frozenset[int], dict[int, Sender[_Report]]] = {}
7676
self._distribution_results: dict[frozenset[int], power_distributing.Result] = {}
7777

78-
self._algorithm: BaseAlgorithm = Matryoshka()
78+
self._algorithm: BaseAlgorithm = Matryoshka(
79+
max_proposal_age=timedelta(seconds=60.0)
80+
)
7981

8082
super().__init__()
8183

@@ -180,10 +182,13 @@ async def _send_updated_target_power(
180182
@override
181183
async def _run(self) -> None:
182184
"""Run the power managing actor."""
185+
last_result_partial_failure = False
186+
drop_old_proposals_timer = Timer(timedelta(seconds=1.0), SkipMissedAndDrift())
183187
async for selected in select(
184188
self._proposals_receiver,
185189
self._bounds_subscription_receiver,
186190
self._power_distributing_results_receiver,
191+
drop_old_proposals_timer,
187192
):
188193
if selected_from(selected, self._proposals_receiver):
189194
proposal = selected.value
@@ -234,9 +239,20 @@ async def _run(self) -> None:
234239
self._distribution_results[
235240
frozenset(result.request.component_ids)
236241
] = result
242+
if not isinstance(result, power_distributing.Success):
243+
_logger.warning(
244+
"PowerManagingActor: PowerDistributing failed: %s", result
245+
)
237246
match result:
238247
case power_distributing.PartialFailure(request):
239-
await self._send_updated_target_power(
240-
frozenset(request.component_ids), None, must_send=True
241-
)
248+
if not last_result_partial_failure:
249+
last_result_partial_failure = True
250+
await self._send_updated_target_power(
251+
frozenset(request.component_ids), None, must_send=True
252+
)
253+
case power_distributing.Success():
254+
last_result_partial_failure = False
242255
await self._send_reports(frozenset(result.request.component_ids))
256+
257+
elif selected_from(selected, drop_old_proposals_timer):
258+
self._algorithm.drop_old_proposals(asyncio.get_event_loop().time())

src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ async def propose_power(
132132
bounds=bounds,
133133
component_ids=self._battery_pool._batteries,
134134
priority=self._priority,
135+
creation_time=asyncio.get_running_loop().time(),
135136
request_timeout=request_timeout,
136137
)
137138
)
@@ -177,6 +178,7 @@ async def propose_charge(
177178
bounds=timeseries.Bounds(None, None),
178179
component_ids=self._battery_pool._batteries,
179180
priority=self._priority,
181+
creation_time=asyncio.get_running_loop().time(),
180182
request_timeout=request_timeout,
181183
)
182184
)
@@ -222,6 +224,7 @@ async def propose_discharge(
222224
bounds=timeseries.Bounds(None, None),
223225
component_ids=self._battery_pool._batteries,
224226
priority=self._priority,
227+
creation_time=asyncio.get_running_loop().time(),
225228
request_timeout=request_timeout,
226229
)
227230
)

tests/actor/_power_managing/test_matryoshka.py

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33

44
"""Tests for the Matryoshka power manager algorithm."""
55

6-
from datetime import datetime, timezone
6+
import asyncio
7+
from datetime import datetime, timedelta, timezone
78

89
from frequenz.sdk import timeseries
910
from frequenz.sdk.actor._power_managing import Proposal
@@ -22,20 +23,21 @@ def __init__(
2223
"""Create a new instance of the stateful tester."""
2324
self._call_count = 0
2425
self._batteries = batteries
25-
self._algorithm = Matryoshka()
2626
self._system_bounds = system_bounds
27+
self.algorithm = Matryoshka(max_proposal_age=timedelta(seconds=60.0))
2728

2829
def tgt_power( # pylint: disable=too-many-arguments
2930
self,
3031
priority: int,
3132
power: float | None,
3233
bounds: tuple[float | None, float | None],
3334
expected: float | None,
35+
creation_time: float | None = None,
3436
must_send: bool = False,
3537
) -> None:
3638
"""Test the target power calculation."""
3739
self._call_count += 1
38-
tgt_power = self._algorithm.calculate_target_power(
40+
tgt_power = self.algorithm.calculate_target_power(
3941
self._batteries,
4042
Proposal(
4143
component_ids=self._batteries,
@@ -46,6 +48,9 @@ def tgt_power( # pylint: disable=too-many-arguments
4648
None if bounds[1] is None else Power.from_watts(bounds[1]),
4749
),
4850
priority=priority,
51+
creation_time=creation_time
52+
if creation_time is not None
53+
else asyncio.get_event_loop().time(),
4954
),
5055
self._system_bounds,
5156
must_send,
@@ -61,7 +66,7 @@ def bounds(
6166
expected_bounds: tuple[float, float],
6267
) -> None:
6368
"""Test the status report."""
64-
report = self._algorithm.get_status(
69+
report = self.algorithm.get_status(
6570
self._batteries, priority, self._system_bounds, None
6671
)
6772
if expected_power is None:
@@ -350,3 +355,67 @@ async def test_matryoshka_with_excl_3() -> None:
350355

351356
tester.tgt_power(priority=1, power=-40.0, bounds=(-100.0, -35.0), expected=-40.0)
352357
tester.bounds(priority=0, expected_power=-40.0, expected_bounds=(-100.0, -35.0))
358+
359+
360+
async def test_matryoshka_drop_old_proposals() -> None:
361+
"""Tests for the power managing actor.
362+
363+
With inclusion bounds, and exclusion bounds -30.0 to 30.0.
364+
"""
365+
batteries = frozenset({2, 5})
366+
367+
system_bounds = _base_types.SystemBounds(
368+
timestamp=datetime.now(tz=timezone.utc),
369+
inclusion_bounds=timeseries.Bounds(
370+
lower=Power.from_watts(-200.0), upper=Power.from_watts(200.0)
371+
),
372+
exclusion_bounds=timeseries.Bounds(lower=Power.zero(), upper=Power.zero()),
373+
)
374+
375+
tester = StatefulTester(batteries, system_bounds)
376+
377+
now = asyncio.get_event_loop().time()
378+
379+
tester.tgt_power(priority=3, power=22.0, bounds=(22.0, 30.0), expected=22.0)
380+
381+
# When a proposal is too old and hasn't been updated, it is dropped.
382+
tester.tgt_power(
383+
priority=2,
384+
power=25.0,
385+
bounds=(25.0, 50.0),
386+
creation_time=now - 70.0,
387+
expected=25.0,
388+
)
389+
390+
tester.tgt_power(
391+
priority=1, power=20.0, bounds=(20.0, 50.0), expected=25.0, must_send=True
392+
)
393+
tester.algorithm.drop_old_proposals(now)
394+
tester.tgt_power(
395+
priority=1, power=20.0, bounds=(20.0, 50.0), expected=22.0, must_send=True
396+
)
397+
398+
# When overwritten by a newer proposal, that proposal is not dropped.
399+
tester.tgt_power(
400+
priority=2,
401+
power=25.0,
402+
bounds=(25.0, 50.0),
403+
creation_time=now - 70.0,
404+
expected=25.0,
405+
)
406+
tester.tgt_power(
407+
priority=2,
408+
power=25.0,
409+
bounds=(25.0, 50.0),
410+
creation_time=now - 30.0,
411+
expected=25.0,
412+
must_send=True,
413+
)
414+
415+
tester.tgt_power(
416+
priority=1, power=20.0, bounds=(20.0, 50.0), expected=25.0, must_send=True
417+
)
418+
tester.algorithm.drop_old_proposals(now)
419+
tester.tgt_power(
420+
priority=1, power=20.0, bounds=(20.0, 50.0), expected=25.0, must_send=True
421+
)

0 commit comments

Comments
 (0)