Skip to content

Commit fc7de1f

Browse files
committed
Resend request if PowerDistributor returns PartialFailure
- When there is an issue with any of the batteries, the power distributor would mark the battery as temporarily broken and update the battery status stream. - Changes to the battery status stream would trigger big changes to the system bounds calculated in the battery pool. - Changes to the system bounds would not affect the target power unless it was very big to begin with, so we need to resend the target power to the power distributor, whenever a partial failure is received from the power distributor. - In the next run, the power distributor won't use the broken battery and will return `Success`. - If all batteries are broken and the power distributor returns an `Error` instead, then the bounds will go down to zero, and it will force a target power to get sent out already. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 7be3518 commit fc7de1f

File tree

4 files changed

+133
-43
lines changed

4 files changed

+133
-43
lines changed

src/frequenz/sdk/actor/_power_managing/_base_classes.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ def get_target_power(
130130
battery_ids: frozenset[int],
131131
proposal: Proposal | None,
132132
system_bounds: PowerMetrics,
133+
must_return_power: bool = False,
133134
) -> Power | None:
134135
"""Calculate and return the target power for the given batteries.
135136
@@ -138,6 +139,8 @@ def get_target_power(
138139
proposal: If given, the proposal to added to the bucket, before the target
139140
power is calculated.
140141
system_bounds: The system bounds for the batteries in the proposal.
142+
must_return_power: If `True`, the algorithm must return a target power,
143+
even if it hasn't changed since the last call.
141144
142145
Returns:
143146
The new target power for the batteries, or `None` if the target power

src/frequenz/sdk/actor/_power_managing/_matryoshka.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ def get_target_power(
8080
battery_ids: frozenset[int],
8181
proposal: Proposal | None,
8282
system_bounds: PowerMetrics,
83+
must_return_power: bool = False,
8384
) -> Power | None:
8485
"""Calculate and return the target power for the given batteries.
8586
@@ -88,6 +89,8 @@ def get_target_power(
8889
proposal: If given, the proposal to added to the bucket, before the target
8990
power is calculated.
9091
system_bounds: The system bounds for the batteries in the proposal.
92+
must_return_power: If `True`, the algorithm must return a target power,
93+
even if it hasn't changed since the last call.
9194
9295
Returns:
9396
The new target power for the batteries, or `None` if the target power
@@ -132,7 +135,8 @@ def get_target_power(
132135
target_power = self._calc_target_power(proposals, system_bounds)
133136

134137
if (
135-
battery_ids not in self._target_power
138+
must_return_power
139+
or battery_ids not in self._target_power
136140
or self._target_power[battery_ids] != target_power
137141
):
138142
self._target_power[battery_ids] = target_power

src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,14 +142,18 @@ def _add_bounds_tracker(self, battery_ids: frozenset[int]) -> None:
142142
)
143143

144144
async def _send_updated_target_power(
145-
self, battery_ids: frozenset[int], proposal: Proposal | None
145+
self,
146+
battery_ids: frozenset[int],
147+
proposal: Proposal | None,
148+
must_send: bool = False,
146149
) -> None:
147150
from .. import power_distributing # pylint: disable=import-outside-toplevel
148151

149152
target_power = self._algorithm.get_target_power(
150153
battery_ids,
151154
proposal,
152155
self._system_bounds[battery_ids],
156+
must_send,
153157
)
154158
request_timeout = (
155159
proposal.request_timeout if proposal else timedelta(seconds=5.0)
@@ -203,5 +207,15 @@ async def _run(self) -> None:
203207
self._add_bounds_tracker(sub.battery_ids)
204208

205209
elif selected_from(selected, self._power_distributing_results_receiver):
210+
from .. import ( # pylint: disable=import-outside-toplevel
211+
power_distributing,
212+
)
213+
206214
result = selected.value
207215
self._distribution_results[frozenset(result.request.batteries)] = result
216+
match result:
217+
case power_distributing.PartialFailure(request):
218+
await self._send_updated_target_power(
219+
frozenset(request.batteries), None, must_send=True
220+
)
221+
await self._send_reports(frozenset(result.request.batteries))

tests/timeseries/_battery_pool/test_battery_pool_control_methods.py

Lines changed: 110 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
from frequenz.channels import Sender
1414
from pytest_mock import MockerFixture
1515

16-
from frequenz.sdk import microgrid, timeseries
17-
from frequenz.sdk.actor import ResamplerConfig, _power_managing
16+
from frequenz.sdk import microgrid
17+
from frequenz.sdk.actor import ResamplerConfig, _power_managing, power_distributing
1818
from frequenz.sdk.actor.power_distributing import BatteryStatus
1919
from frequenz.sdk.actor.power_distributing._battery_pool_status import BatteryPoolStatus
2020
from frequenz.sdk.timeseries import Power
@@ -136,21 +136,29 @@ async def _init_data_for_inverters(self, mocks: Mocks) -> None:
136136
0.05,
137137
)
138138

139-
def _make_report(
140-
self, *, power: float | None, lower: float, upper: float
141-
) -> _power_managing.Report:
142-
return _power_managing.Report(
143-
target_power=Power.from_watts(power) if power is not None else None,
144-
inclusion_bounds=timeseries.Bounds(
145-
lower=Power.from_watts(lower),
146-
upper=Power.from_watts(upper),
147-
),
148-
exclusion_bounds=timeseries.Bounds(
149-
lower=Power.from_watts(0.0),
150-
upper=Power.from_watts(0.0),
151-
),
152-
distribution_result=None,
139+
def _assert_report(
140+
self,
141+
report: _power_managing.Report,
142+
*,
143+
power: float | None,
144+
lower: float,
145+
upper: float,
146+
expected_result_pred: typing.Callable[[power_distributing.Result], bool]
147+
| None = None,
148+
) -> None:
149+
assert report.target_power == (
150+
Power.from_watts(power) if power is not None else None
153151
)
152+
assert (
153+
report.inclusion_bounds is not None and report.exclusion_bounds is not None
154+
)
155+
assert report.inclusion_bounds.lower == Power.from_watts(lower)
156+
assert report.inclusion_bounds.upper == Power.from_watts(upper)
157+
assert report.exclusion_bounds.lower == Power.from_watts(0.0)
158+
assert report.exclusion_bounds.upper == Power.from_watts(0.0)
159+
if expected_result_pred is not None:
160+
assert report.distribution_result is not None
161+
assert expected_result_pred(report.distribution_result)
154162

155163
async def test_case_1(
156164
self,
@@ -160,7 +168,7 @@ async def test_case_1(
160168
"""Test case 1.
161169
162170
- single battery pool with all batteries.
163-
- all batteries are working.
171+
- all batteries are working, then one battery stops working.
164172
"""
165173
set_power = typing.cast(
166174
AsyncMock, microgrid.connection_manager.get().api_client.set_power
@@ -179,21 +187,82 @@ async def test_case_1(
179187
# subsequent commit.
180188
bounds_rx = battery_pool.power_bounds().new_receiver()
181189

182-
assert await bounds_rx.receive() == self._make_report(
183-
power=None, lower=-4000.0, upper=4000.0
190+
self._assert_report(
191+
await bounds_rx.receive(), power=None, lower=-4000.0, upper=4000.0
184192
)
185193

186194
await battery_pool.set_power(Power.from_watts(1000.0))
187195

188-
assert await bounds_rx.receive() == self._make_report(
189-
power=1000.0, lower=-4000.0, upper=4000.0
196+
self._assert_report(
197+
await bounds_rx.receive(), power=1000.0, lower=-4000.0, upper=4000.0
190198
)
191199

192200
assert set_power.call_count == 4
193201
assert set_power.call_args_list == [
194202
mocker.call(inv_id, 250.0)
195203
for inv_id in mocks.microgrid.battery_inverter_ids
196204
]
205+
self._assert_report(
206+
await bounds_rx.receive(),
207+
power=1000.0,
208+
lower=-4000.0,
209+
upper=4000.0,
210+
expected_result_pred=lambda result: isinstance(
211+
result, power_distributing.Success
212+
),
213+
)
214+
215+
set_power.reset_mock()
216+
217+
# First battery stops working (aka set_power never returns for it, times out).
218+
async def side_effect(inv_id: int, _: float) -> None:
219+
if inv_id == mocks.microgrid.battery_inverter_ids[0]:
220+
await asyncio.sleep(1000.0)
221+
222+
set_power.side_effect = side_effect
223+
await battery_pool.set_power(
224+
Power.from_watts(100.0), request_timeout=timedelta(seconds=0.1)
225+
)
226+
self._assert_report(
227+
await bounds_rx.receive(),
228+
power=100.0,
229+
lower=-4000.0,
230+
upper=4000.0,
231+
expected_result_pred=lambda result: isinstance(
232+
result, power_distributing.Success
233+
),
234+
)
235+
assert set_power.call_count == 4
236+
assert set_power.call_args_list == [
237+
mocker.call(inv_id, 25.0) for inv_id in mocks.microgrid.battery_inverter_ids
238+
]
239+
set_power.reset_mock()
240+
self._assert_report(
241+
await bounds_rx.receive(),
242+
power=100.0,
243+
lower=-4000.0,
244+
upper=4000.0,
245+
expected_result_pred=lambda result: isinstance(
246+
result, power_distributing.PartialFailure
247+
)
248+
and result.failed_batteries == {mocks.microgrid.battery_ids[0]},
249+
)
250+
251+
# There should be an automatic retry.
252+
set_power.side_effect = None
253+
assert set_power.call_count == 4
254+
assert set_power.call_args_list == [
255+
mocker.call(inv_id, 25.0) for inv_id in mocks.microgrid.battery_inverter_ids
256+
]
257+
self._assert_report(
258+
await bounds_rx.receive(),
259+
power=100.0,
260+
lower=-4000.0,
261+
upper=4000.0,
262+
expected_result_pred=lambda result: isinstance(
263+
result, power_distributing.Success
264+
),
265+
)
197266

198267
async def test_case_2(self, mocks: Mocks, mocker: MockerFixture) -> None:
199268
"""Test case 2.
@@ -214,15 +283,15 @@ async def test_case_2(self, mocks: Mocks, mocker: MockerFixture) -> None:
214283
battery_pool_2 = microgrid.battery_pool(set(mocks.microgrid.battery_ids[2:]))
215284
bounds_2_rx = battery_pool_2.power_bounds().new_receiver()
216285

217-
assert await bounds_1_rx.receive() == self._make_report(
218-
power=None, lower=-2000.0, upper=2000.0
286+
self._assert_report(
287+
await bounds_1_rx.receive(), power=None, lower=-2000.0, upper=2000.0
219288
)
220-
assert await bounds_2_rx.receive() == self._make_report(
221-
power=None, lower=-2000.0, upper=2000.0
289+
self._assert_report(
290+
await bounds_2_rx.receive(), power=None, lower=-2000.0, upper=2000.0
222291
)
223292
await battery_pool_1.set_power(Power.from_watts(1000.0))
224-
assert await bounds_1_rx.receive() == self._make_report(
225-
power=1000.0, lower=-2000.0, upper=2000.0
293+
self._assert_report(
294+
await bounds_1_rx.receive(), power=1000.0, lower=-2000.0, upper=2000.0
226295
)
227296
assert set_power.call_count == 2
228297
assert set_power.call_args_list == [
@@ -232,8 +301,8 @@ async def test_case_2(self, mocks: Mocks, mocker: MockerFixture) -> None:
232301
set_power.reset_mock()
233302

234303
await battery_pool_2.set_power(Power.from_watts(1000.0))
235-
assert await bounds_2_rx.receive() == self._make_report(
236-
power=1000.0, lower=-2000.0, upper=2000.0
304+
self._assert_report(
305+
await bounds_2_rx.receive(), power=1000.0, lower=-2000.0, upper=2000.0
237306
)
238307
assert set_power.call_count == 2
239308
assert set_power.call_args_list == [
@@ -260,22 +329,22 @@ async def test_case_3(self, mocks: Mocks, mocker: MockerFixture) -> None:
260329
battery_pool_2 = microgrid.battery_pool()
261330
bounds_2_rx = battery_pool_2.power_bounds(1).new_receiver()
262331

263-
assert await bounds_1_rx.receive() == self._make_report(
264-
power=None, lower=-4000.0, upper=4000.0
332+
self._assert_report(
333+
await bounds_1_rx.receive(), power=None, lower=-4000.0, upper=4000.0
265334
)
266-
assert await bounds_2_rx.receive() == self._make_report(
267-
power=None, lower=-4000.0, upper=4000.0
335+
self._assert_report(
336+
await bounds_2_rx.receive(), power=None, lower=-4000.0, upper=4000.0
268337
)
269338
await battery_pool_1.set_power(
270339
Power.from_watts(-1000.0),
271340
_priority=2,
272341
_bounds=(Power.from_watts(-1000.0), Power.from_watts(0.0)),
273342
)
274-
assert await bounds_1_rx.receive() == self._make_report(
275-
power=-1000.0, lower=-4000.0, upper=4000.0
343+
self._assert_report(
344+
await bounds_1_rx.receive(), power=-1000.0, lower=-4000.0, upper=4000.0
276345
)
277-
assert await bounds_2_rx.receive() == self._make_report(
278-
power=-1000.0, lower=-1000.0, upper=0.0
346+
self._assert_report(
347+
await bounds_2_rx.receive(), power=-1000.0, lower=-1000.0, upper=0.0
279348
)
280349

281350
assert set_power.call_count == 4
@@ -290,11 +359,11 @@ async def test_case_3(self, mocks: Mocks, mocker: MockerFixture) -> None:
290359
_priority=1,
291360
_bounds=(Power.from_watts(0.0), Power.from_watts(1000.0)),
292361
)
293-
assert await bounds_1_rx.receive() == self._make_report(
294-
power=0.0, lower=-4000.0, upper=4000.0
362+
self._assert_report(
363+
await bounds_1_rx.receive(), power=0.0, lower=-4000.0, upper=4000.0
295364
)
296-
assert await bounds_2_rx.receive() == self._make_report(
297-
power=0.0, lower=-1000.0, upper=0.0
365+
self._assert_report(
366+
await bounds_2_rx.receive(), power=0.0, lower=-1000.0, upper=0.0
298367
)
299368

300369
assert set_power.call_count == 4

0 commit comments

Comments
 (0)