|
11 | 11 | from datetime import datetime, timedelta, timezone |
12 | 12 |
|
13 | 13 | from frequenz.channels import Receiver, Sender |
14 | | -from frequenz.channels.util import select, selected_from |
| 14 | +from frequenz.channels.util import SkipMissedAndDrift, Timer, select, selected_from |
15 | 15 | from typing_extensions import override |
16 | 16 |
|
17 | 17 | from ...timeseries._base_types import PoolType, SystemBounds |
@@ -183,10 +183,12 @@ async def _send_updated_target_power( |
183 | 183 | async def _run(self) -> None: |
184 | 184 | """Run the power managing actor.""" |
185 | 185 | last_result_partial_failure = False |
| 186 | + drop_old_proposals_timer = Timer(timedelta(seconds=1.0), SkipMissedAndDrift()) |
186 | 187 | async for selected in select( |
187 | 188 | self._proposals_receiver, |
188 | 189 | self._bounds_subscription_receiver, |
189 | 190 | self._power_distributing_results_receiver, |
| 191 | + drop_old_proposals_timer, |
190 | 192 | ): |
191 | 193 | if selected_from(selected, self._proposals_receiver): |
192 | 194 | proposal = selected.value |
@@ -251,3 +253,6 @@ async def _run(self) -> None: |
251 | 253 | case power_distributing.Success(): |
252 | 254 | last_result_partial_failure = False |
253 | 255 | await self._send_reports(frozenset(result.request.component_ids)) |
| 256 | + |
| 257 | + elif selected_from(selected, drop_old_proposals_timer): |
| 258 | + self._algorithm.drop_old_proposals(asyncio.get_event_loop().time()) |
0 commit comments