Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,5 @@
- The resampler now properly handles sending zero values.

A bug made the resampler interpret zero values as `None` when generating new samples, so if the result of the resampling is zero, the resampler would just produce `None` values.

- The PowerManager no longer holds on to proposals from dead actors forever. If an actor hasn't sent a new proposal in 60 seconds, the available proposal from that actor is dropped.
16 changes: 16 additions & 0 deletions src/frequenz/sdk/actor/_power_managing/_base_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ class Proposal:
priority: int
"""The priority of the actor sending the proposal."""

creation_time: float
"""The loop time when the proposal is created.

This is used by the power manager to determine the age of the proposal.
"""

request_timeout: datetime.timedelta = datetime.timedelta(seconds=5.0)
"""The maximum amount of time to wait for the request to be fulfilled."""

Expand Down Expand Up @@ -279,3 +285,13 @@ def get_status(
Returns:
The bounds for the components.
"""

@abc.abstractmethod
def drop_old_proposals(self, loop_time: float) -> None:
"""Drop old proposals.

This method is called periodically by the power manager.

Args:
loop_time: The current loop time.
"""
22 changes: 21 additions & 1 deletion src/frequenz/sdk/actor/_power_managing/_matryoshka.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import logging
import typing
from datetime import timedelta

from typing_extensions import override

Expand All @@ -40,8 +41,9 @@
class Matryoshka(BaseAlgorithm):
"""The matryoshka algorithm."""

def __init__(self) -> None:
def __init__(self, max_proposal_age: timedelta) -> None:
"""Create a new instance of the matryoshka algorithm."""
self._max_proposal_age_sec = max_proposal_age.total_seconds()
self._component_buckets: dict[frozenset[int], SortedSet[Proposal]] = {}
self._target_power: dict[frozenset[int], Power] = {}

Expand Down Expand Up @@ -267,3 +269,21 @@ def get_status(
_exclusion_bounds=system_bounds.exclusion_bounds,
distribution_result=distribution_result,
)

@override
def drop_old_proposals(self, loop_time: float) -> None:
"""Drop old proposals.

This will remove all proposals that have not been updated for longer than
`max_proposal_age`.

Args:
loop_time: The current loop time.
"""
for bucket in self._component_buckets.values():
to_delete = []
for proposal in bucket:
if (loop_time - proposal.creation_time) > self._max_proposal_age_sec:
to_delete.append(proposal)
for proposal in to_delete:
bucket.delete(proposal)
26 changes: 21 additions & 5 deletions src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from datetime import datetime, timedelta, timezone

from frequenz.channels import Receiver, Sender
from frequenz.channels.util import select, selected_from
from frequenz.channels.util import SkipMissedAndDrift, Timer, select, selected_from
from typing_extensions import override

from ...timeseries._base_types import PoolType, SystemBounds
Expand Down Expand Up @@ -75,7 +75,9 @@ def __init__( # pylint: disable=too-many-arguments
self._subscriptions: dict[frozenset[int], dict[int, Sender[_Report]]] = {}
self._distribution_results: dict[frozenset[int], power_distributing.Result] = {}

self._algorithm: BaseAlgorithm = Matryoshka()
self._algorithm: BaseAlgorithm = Matryoshka(
max_proposal_age=timedelta(seconds=60.0)
)

super().__init__()

Expand Down Expand Up @@ -180,10 +182,13 @@ async def _send_updated_target_power(
@override
async def _run(self) -> None:
"""Run the power managing actor."""
last_result_partial_failure = False
drop_old_proposals_timer = Timer(timedelta(seconds=1.0), SkipMissedAndDrift())
async for selected in select(
self._proposals_receiver,
self._bounds_subscription_receiver,
self._power_distributing_results_receiver,
drop_old_proposals_timer,
):
if selected_from(selected, self._proposals_receiver):
proposal = selected.value
Expand Down Expand Up @@ -234,9 +239,20 @@ async def _run(self) -> None:
self._distribution_results[
frozenset(result.request.component_ids)
] = result
if not isinstance(result, power_distributing.Success):
_logger.warning(
"PowerManagingActor: PowerDistributing failed: %s", result
)
match result:
case power_distributing.PartialFailure(request):
await self._send_updated_target_power(
frozenset(request.component_ids), None, must_send=True
)
if not last_result_partial_failure:
last_result_partial_failure = True
await self._send_updated_target_power(
frozenset(request.component_ids), None, must_send=True
)
case power_distributing.Success():
last_result_partial_failure = False
await self._send_reports(frozenset(result.request.component_ids))

elif selected_from(selected, drop_old_proposals_timer):
self._algorithm.drop_old_proposals(asyncio.get_event_loop().time())
3 changes: 3 additions & 0 deletions src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ async def propose_power(
bounds=bounds,
component_ids=self._battery_pool._batteries,
priority=self._priority,
creation_time=asyncio.get_running_loop().time(),
request_timeout=request_timeout,
)
)
Expand Down Expand Up @@ -177,6 +178,7 @@ async def propose_charge(
bounds=timeseries.Bounds(None, None),
component_ids=self._battery_pool._batteries,
priority=self._priority,
creation_time=asyncio.get_running_loop().time(),
request_timeout=request_timeout,
)
)
Expand Down Expand Up @@ -222,6 +224,7 @@ async def propose_discharge(
bounds=timeseries.Bounds(None, None),
component_ids=self._battery_pool._batteries,
priority=self._priority,
creation_time=asyncio.get_running_loop().time(),
request_timeout=request_timeout,
)
)
Expand Down
77 changes: 73 additions & 4 deletions tests/actor/_power_managing/test_matryoshka.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

"""Tests for the Matryoshka power manager algorithm."""

from datetime import datetime, timezone
import asyncio
from datetime import datetime, timedelta, timezone

from frequenz.sdk import timeseries
from frequenz.sdk.actor._power_managing import Proposal
Expand All @@ -22,20 +23,21 @@ def __init__(
"""Create a new instance of the stateful tester."""
self._call_count = 0
self._batteries = batteries
self._algorithm = Matryoshka()
self._system_bounds = system_bounds
self.algorithm = Matryoshka(max_proposal_age=timedelta(seconds=60.0))

def tgt_power( # pylint: disable=too-many-arguments
self,
priority: int,
power: float | None,
bounds: tuple[float | None, float | None],
expected: float | None,
creation_time: float | None = None,
must_send: bool = False,
) -> None:
"""Test the target power calculation."""
self._call_count += 1
tgt_power = self._algorithm.calculate_target_power(
tgt_power = self.algorithm.calculate_target_power(
self._batteries,
Proposal(
component_ids=self._batteries,
Expand All @@ -46,6 +48,9 @@ def tgt_power( # pylint: disable=too-many-arguments
None if bounds[1] is None else Power.from_watts(bounds[1]),
),
priority=priority,
creation_time=creation_time
if creation_time is not None
else asyncio.get_event_loop().time(),
),
self._system_bounds,
must_send,
Expand All @@ -61,7 +66,7 @@ def bounds(
expected_bounds: tuple[float, float],
) -> None:
"""Test the status report."""
report = self._algorithm.get_status(
report = self.algorithm.get_status(
self._batteries, priority, self._system_bounds, None
)
if expected_power is None:
Expand Down Expand Up @@ -350,3 +355,67 @@ async def test_matryoshka_with_excl_3() -> None:

tester.tgt_power(priority=1, power=-40.0, bounds=(-100.0, -35.0), expected=-40.0)
tester.bounds(priority=0, expected_power=-40.0, expected_bounds=(-100.0, -35.0))


async def test_matryoshka_drop_old_proposals() -> None:
"""Tests for the power managing actor.

With inclusion bounds, and exclusion bounds -30.0 to 30.0.
"""
batteries = frozenset({2, 5})

system_bounds = _base_types.SystemBounds(
timestamp=datetime.now(tz=timezone.utc),
inclusion_bounds=timeseries.Bounds(
lower=Power.from_watts(-200.0), upper=Power.from_watts(200.0)
),
exclusion_bounds=timeseries.Bounds(lower=Power.zero(), upper=Power.zero()),
)

tester = StatefulTester(batteries, system_bounds)

now = asyncio.get_event_loop().time()

tester.tgt_power(priority=3, power=22.0, bounds=(22.0, 30.0), expected=22.0)

# When a proposal is too old and hasn't been updated, it is dropped.
tester.tgt_power(
priority=2,
power=25.0,
bounds=(25.0, 50.0),
creation_time=now - 70.0,
expected=25.0,
)

tester.tgt_power(
priority=1, power=20.0, bounds=(20.0, 50.0), expected=25.0, must_send=True
)
tester.algorithm.drop_old_proposals(now)
tester.tgt_power(
priority=1, power=20.0, bounds=(20.0, 50.0), expected=22.0, must_send=True
)

# When overwritten by a newer proposal, that proposal is not dropped.
tester.tgt_power(
priority=2,
power=25.0,
bounds=(25.0, 50.0),
creation_time=now - 70.0,
expected=25.0,
)
tester.tgt_power(
priority=2,
power=25.0,
bounds=(25.0, 50.0),
creation_time=now - 30.0,
expected=25.0,
must_send=True,
)

tester.tgt_power(
priority=1, power=20.0, bounds=(20.0, 50.0), expected=25.0, must_send=True
)
tester.algorithm.drop_old_proposals(now)
tester.tgt_power(
priority=1, power=20.0, bounds=(20.0, 50.0), expected=25.0, must_send=True
)