Skip to content

Commit e2a792c

Browse files
Prevent stacking of power requests (#1023)
The power distributing actor processes one power request at a time to prevent multiple requests for the same components from being sent to the microgrid API concurrently. Previously, this could lead to the request channel receiver becoming full if the power request frequency was higher than the processing time. Even worse, the requests could be processed late, causing unexpected behavior for applications setting power requests. Moreover, the actor was blocking power requests with different sets of components from being processed if there was any existing request. This patch ensures that the actor processes one request at a time for different sets of components and keeps track of the latest pending request if there is an existing request with the same set of components being processed. The pending request will be overwritten by the latest received request with the same set of components, and the actor will process it once the request with the same components is done processing.
2 parents e71fd32 + 61faa42 commit e2a792c

File tree

3 files changed

+112
-35
lines changed

3 files changed

+112
-35
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,4 @@
4343
- Fixed a bug that was causing the `PowerDistributor` to exit if power requests to PV inverters or EV chargers timeout.
4444
- Fix handling of cancelled tasks in the data sourcing and resampling actor.
4545
- Fix PV power distribution excluding inverters that haven't sent any data since startup.
46+
- Prevent stacking of power requests to avoid delays in processing when the power requests frequency exceeds the processing time.

src/frequenz/sdk/actor/power_distributing/power_distributing.py

Lines changed: 97 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
# License: MIT
22
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
33

4-
"""Actor to distribute power between batteries.
4+
"""Actor to distribute power between components.
55
6-
When charge/discharge method is called the power should be distributed so that
7-
the SoC in batteries stays at the same level. That way of distribution
8-
prevents using only one battery, increasing temperature, and maximize the total
9-
amount power to charge/discharge.
6+
The purpose of this actor is to distribute power between components in a microgrid.
107
11-
Purpose of this actor is to keep SoC level of each component at the equal level.
8+
The actor receives power requests from the power manager, process them by
9+
distributing the power between the components and sends the results back to it.
1210
"""
1311

1412

13+
import asyncio
14+
import logging
1515
from datetime import timedelta
1616

1717
from frequenz.channels import Receiver, Sender
@@ -29,31 +29,33 @@
2929
from .request import Request
3030
from .result import Result
3131

32+
_logger = logging.getLogger(__name__)
33+
3234

3335
class PowerDistributingActor(Actor):
3436
# pylint: disable=too-many-instance-attributes
35-
"""Actor to distribute the power between batteries in a microgrid.
36-
37-
The purpose of this tool is to keep an equal SoC level in all batteries.
38-
The PowerDistributingActor can have many concurrent users which at this time
39-
need to be known at construction time.
37+
"""Actor to distribute the power between components in a microgrid.
4038
41-
For each user a bidirectional channel needs to be created through which
42-
they can send and receive requests and responses.
39+
One instance of the actor can handle only one component category and type,
40+
which needs to be specified at actor startup and it will setup the correct
41+
component manager based on the given category and type.
4342
44-
It is recommended to wait for PowerDistributingActor output with timeout. Otherwise if
45-
the processing function fails then the response will never come.
46-
The timeout should be Result:request_timeout + time for processing the request.
43+
Only one power request is processed at a time to prevent from sending
44+
multiple requests for the same components to the microgrid API at the
45+
same time.
4746
4847
Edge cases:
49-
* If there are 2 requests to be processed for the same subset of batteries, then
50-
only the latest request will be processed. Older request will be ignored. User with
51-
older request will get response with Result.Status.IGNORED.
52-
53-
* If there are 2 requests and their subset of batteries is different but they
54-
overlap (they have at least one common battery), then then both batteries
55-
will be processed. However it is not expected so the proper error log will be
56-
printed.
48+
* If a new power request is received while a power request with the same
49+
set of components is being processed, the new request will be added to
50+
the pending requests. Then the pending request will be processed after the
51+
request with the same set of components being processed is done. Only one
52+
pending request is kept for each set of components, the latest request will
53+
overwrite the previous one if there is any.
54+
55+
* If there are 2 requests and their set of components is different but they
56+
overlap (they have at least one common component), then both requests will
57+
be processed concurrently. Though, the power manager will make sure this
58+
doesn't happen as overlapping component IDs are not possible at the moment.
5759
"""
5860

5961
def __init__( # pylint: disable=too-many-arguments
@@ -67,7 +69,7 @@ def __init__( # pylint: disable=too-many-arguments
6769
component_type: ComponentType | None = None,
6870
name: str | None = None,
6971
) -> None:
70-
"""Create class instance.
72+
"""Create actor instance.
7173
7274
Args:
7375
requests_receiver: Receiver for receiving power requests from the power
@@ -99,6 +101,16 @@ def __init__( # pylint: disable=too-many-arguments
99101
self._result_sender = results_sender
100102
self._api_power_request_timeout = api_power_request_timeout
101103

104+
self._processing_tasks: dict[frozenset[int], asyncio.Task[None]] = {}
105+
"""Track the power request tasks currently being processed."""
106+
107+
self._pending_requests: dict[frozenset[int], Request] = {}
108+
"""Track the power requests that are waiting to be processed.
109+
110+
Only one pending power request is kept for each set of components, the
111+
latest request will overwrite the previous one.
112+
"""
113+
102114
self._component_manager: ComponentManager
103115
if component_category == ComponentCategory.BATTERY:
104116
self._component_manager = BatteryManager(
@@ -121,19 +133,34 @@ def __init__( # pylint: disable=too-many-arguments
121133
)
122134

123135
@override
124-
async def _run(self) -> None: # pylint: disable=too-many-locals
125-
"""Run actor main function.
136+
async def _run(self) -> None:
137+
"""Run this actor's logic.
138+
139+
It waits for new power requests and process them. Only one power request
140+
can be processed at a time to prevent from sending multiple requests for
141+
the same components to the microgrid API at the same time.
126142
127-
It waits for new requests in task_queue and process it, and send
128-
`set_power` request with distributed power.
129-
The output of the `set_power` method is processed.
130-
Every battery and inverter that failed or didn't respond in time will be marked
143+
A new power request will be ignored if a power request with the same
144+
components is currently being processed.
145+
146+
Every component that failed or didn't respond in time will be marked
131147
as broken for some time.
132148
"""
133149
await self._component_manager.start()
134150

135151
async for request in self._requests_receiver:
136-
await self._component_manager.distribute_power(request)
152+
req_id = frozenset(request.component_ids)
153+
154+
if req_id in self._processing_tasks:
155+
if pending_request := self._pending_requests.get(req_id):
156+
_logger.debug(
157+
"Pending request: %s, overwritten with request: %s",
158+
pending_request,
159+
request,
160+
)
161+
self._pending_requests[req_id] = request
162+
else:
163+
self._process_request(req_id, request)
137164

138165
@override
139166
async def stop(self, msg: str | None = None) -> None:
@@ -144,3 +171,41 @@ async def stop(self, msg: str | None = None) -> None:
144171
"""
145172
await self._component_manager.stop()
146173
await super().stop(msg)
174+
175+
def _handle_task_completion(
176+
self, req_id: frozenset[int], request: Request, task: asyncio.Task[None]
177+
) -> None:
178+
"""Handle the completion of a power request task.
179+
180+
Args:
181+
req_id: The id to identify the power request.
182+
request: The power request that has been processed.
183+
task: The task that has completed.
184+
"""
185+
try:
186+
task.result()
187+
except Exception: # pylint: disable=broad-except
188+
_logger.exception("Failed power request: %s", request)
189+
190+
if req_id in self._pending_requests:
191+
self._process_request(req_id, self._pending_requests.pop(req_id))
192+
elif req_id in self._processing_tasks:
193+
del self._processing_tasks[req_id]
194+
else:
195+
_logger.error("Request id not found in processing tasks: %s", req_id)
196+
197+
def _process_request(self, req_id: frozenset[int], request: Request) -> None:
198+
"""Process a power request.
199+
200+
Args:
201+
req_id: The id to identify the power request.
202+
request: The power request to process.
203+
"""
204+
task = asyncio.create_task(
205+
self._component_manager.distribute_power(request),
206+
name=f"{type(self).__name__}:{request}",
207+
)
208+
task.add_done_callback(
209+
lambda t: self._handle_task_completion(req_id, request, t)
210+
)
211+
self._processing_tasks[req_id] = task

tests/timeseries/_battery_pool/test_battery_pool_control_methods.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ async def test_case_1(
213213
await bounds_rx.receive(), power=1000.0, lower=-4000.0, upper=4000.0
214214
)
215215

216+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
216217
assert set_power.call_count == 4
217218
assert sorted(set_power.call_args_list) == [
218219
mocker.call(inv_id, 250.0)
@@ -248,6 +249,7 @@ async def side_effect(inv_id: int, _: float) -> None:
248249
result, power_distributing.Success
249250
),
250251
)
252+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
251253
assert set_power.call_count == 4
252254
assert sorted(set_power.call_args_list) == [
253255
mocker.call(inv_id, 25.0) for inv_id in mocks.microgrid.battery_inverter_ids
@@ -267,6 +269,7 @@ async def side_effect(inv_id: int, _: float) -> None:
267269

268270
# There should be an automatic retry.
269271
set_power.side_effect = None
272+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
270273
assert set_power.call_count == 4
271274
assert sorted(set_power.call_args_list) == [
272275
mocker.call(inv_id, 25.0) for inv_id in mocks.microgrid.battery_inverter_ids
@@ -318,6 +321,7 @@ async def test_case_2(self, mocks: Mocks, mocker: MockerFixture) -> None:
318321
self._assert_report(
319322
await bounds_1_rx.receive(), power=1000.0, lower=-2000.0, upper=2000.0
320323
)
324+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
321325
assert set_power.call_count == 2
322326
assert sorted(set_power.call_args_list) == [
323327
mocker.call(inv_id, 500.0)
@@ -331,6 +335,7 @@ async def test_case_2(self, mocks: Mocks, mocker: MockerFixture) -> None:
331335
if not latest_dist_result_2.has_value():
332336
bounds = await bounds_2_rx.receive()
333337
self._assert_report(bounds, power=1000.0, lower=-2000.0, upper=2000.0)
338+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
334339
assert set_power.call_count == 2
335340
assert sorted(set_power.call_args_list) == [
336341
mocker.call(inv_id, 500.0)
@@ -375,7 +380,7 @@ async def test_case_3(self, mocks: Mocks, mocker: MockerFixture) -> None:
375380
self._assert_report(
376381
await bounds_2_rx.receive(), power=-1000.0, lower=-1000.0, upper=0.0
377382
)
378-
383+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
379384
assert set_power.call_count == 4
380385
assert sorted(set_power.call_args_list) == [
381386
mocker.call(inv_id, -250.0)
@@ -394,7 +399,7 @@ async def test_case_3(self, mocks: Mocks, mocker: MockerFixture) -> None:
394399
if not latest_dist_result_2.has_value():
395400
bounds = await bounds_2_rx.receive()
396401
self._assert_report(bounds, power=0.0, lower=-1000.0, upper=0.0)
397-
402+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
398403
assert set_power.call_count == 4
399404
assert sorted(set_power.call_args_list) == [
400405
mocker.call(inv_id, 0.0) for inv_id in mocks.microgrid.battery_inverter_ids
@@ -428,6 +433,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None:
428433
self._assert_report(
429434
await bounds_rx.receive(), power=1000.0, lower=-4000.0, upper=4000.0
430435
)
436+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
431437
assert set_power.call_count == 4
432438
assert sorted(set_power.call_args_list) == [
433439
mocker.call(inv_id, 250.0)
@@ -453,6 +459,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None:
453459
self._assert_report(
454460
await bounds_rx.receive(), power=400.0, lower=-4000.0, upper=4000.0
455461
)
462+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
456463
assert set_power.call_count == 4
457464
assert sorted(set_power.call_args_list) == [
458465
mocker.call(inv_id, 100.0)
@@ -477,6 +484,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None:
477484
self._assert_report(
478485
await bounds_rx.receive(), power=0.0, lower=-4000.0, upper=4000.0
479486
)
487+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
480488
assert set_power.call_count == 4
481489
assert sorted(set_power.call_args_list) == [
482490
mocker.call(inv_id, 0.0) for inv_id in mocks.microgrid.battery_inverter_ids
@@ -501,6 +509,7 @@ async def test_case_4(self, mocks: Mocks, mocker: MockerFixture) -> None:
501509
self._assert_report(
502510
await bounds_rx.receive(), power=-400.0, lower=-4000.0, upper=4000.0
503511
)
512+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
504513
assert set_power.call_count == 4
505514
assert sorted(set_power.call_args_list) == [
506515
mocker.call(inv_id, -100.0)
@@ -586,7 +595,7 @@ async def test_case_5( # pylint: disable=too-many-statements,too-many-locals
586595
self._assert_report(
587596
await bounds_1_rx.receive(), power=200.0, lower=-1000.0, upper=1500.0
588597
)
589-
598+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
590599
assert set_power.call_count == 4
591600
assert sorted(set_power.call_args_list) == [
592601
mocker.call(inv_id, 50.0) for inv_id in mocks.microgrid.battery_inverter_ids
@@ -624,6 +633,7 @@ async def test_case_5( # pylint: disable=too-many-statements,too-many-locals
624633
if dist_result.succeeded_power == Power.from_watts(720.0):
625634
break
626635

636+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
627637
assert set_power.call_count == 4
628638
assert sorted(set_power.call_args_list) == [
629639
mocker.call(inv_id, 720.0 / 4)
@@ -664,6 +674,7 @@ async def test_case_5( # pylint: disable=too-many-statements,too-many-locals
664674
if dist_result.succeeded_power == Power.from_watts(-280.0):
665675
break
666676

677+
await asyncio.sleep(0.0) # Wait for the power to be distributed.
667678
assert set_power.call_count == 4
668679
assert sorted(set_power.call_args_list) == [
669680
mocker.call(inv_id, -280.0 / 4)

0 commit comments

Comments
 (0)