Skip to content

Commit bfc8de2

Browse files
authored
Don't sleep on PowerDistributor startup (#971)
The PowerDistributor used to sleep for 2 seconds on startup, before processing requests, to wait for data. This feature is no longer being used, because it won't receive requests from the PowerManager unless there's data. This PR removes it.
2 parents 88bd28e + f791f4f commit bfc8de2

File tree

8 files changed

+37
-52
lines changed

8 files changed

+37
-52
lines changed

benchmarks/power_distribution/power_distributor.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,6 @@ async def run_test( # pylint: disable=too-many-locals
120120
requests_receiver=power_request_channel.new_receiver(),
121121
results_sender=power_result_channel.new_sender(),
122122
component_pool_status_sender=battery_status_channel.new_sender(),
123-
wait_for_data_sec=2.0,
124123
):
125124
tasks: list[Coroutine[Any, Any, list[Result]]] = []
126125
tasks.append(send_requests(batteries, num_requests))

src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -446,9 +446,9 @@ def _get_battery_inverter_data(
446446
# This should be handled by BatteryStatus. BatteryStatus should not return
447447
# this batteries as working.
448448
if not all(
449-
self._battery_caches[bat_id].has_value for bat_id in battery_ids
449+
self._battery_caches[bat_id].has_value() for bat_id in battery_ids
450450
) or not all(
451-
self._inverter_caches[inv_id].has_value for inv_id in inverter_ids
451+
self._inverter_caches[inv_id].has_value() for inv_id in inverter_ids
452452
):
453453
_logger.error(
454454
"Battery %s or inverter %s send no data, yet. They should be not used.",

src/frequenz/sdk/actor/power_distributing/power_distributing.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
"""
1313

1414

15-
import asyncio
16-
1715
from frequenz.channels import Receiver, Sender
1816
from frequenz.client.microgrid import ComponentCategory, ComponentType, InverterType
1917

@@ -60,7 +58,6 @@ def __init__( # pylint: disable=too-many-arguments
6058
requests_receiver: Receiver[Request],
6159
results_sender: Sender[Result],
6260
component_pool_status_sender: Sender[ComponentPoolStatus],
63-
wait_for_data_sec: float,
6461
*,
6562
component_category: ComponentCategory,
6663
component_type: ComponentType | None = None,
@@ -74,8 +71,6 @@ def __init__( # pylint: disable=too-many-arguments
7471
results_sender: Sender for sending results to the power manager.
7572
component_pool_status_sender: Channel for sending information about which
7673
components are expected to be working.
77-
wait_for_data_sec: How long actor should wait before processing first
78-
request. It is a time needed to collect first components data.
7974
component_category: The category of the components that this actor is
8075
responsible for.
8176
component_type: The type of the component of the given category that this
@@ -96,7 +91,6 @@ def __init__( # pylint: disable=too-many-arguments
9691
self._component_type = component_type
9792
self._requests_receiver = requests_receiver
9893
self._result_sender = results_sender
99-
self._wait_for_data_sec = wait_for_data_sec
10094

10195
self._component_manager: ComponentManager
10296
if component_category == ComponentCategory.BATTERY:
@@ -130,9 +124,6 @@ async def _run(self) -> None: # pylint: disable=too-many-locals
130124
"""
131125
await self._component_manager.start()
132126

133-
# Wait few seconds to get data from the channels created above.
134-
await asyncio.sleep(self._wait_for_data_sec)
135-
136127
async for request in self._requests_receiver:
137128
await self._component_manager.distribute_power(request)
138129

src/frequenz/sdk/microgrid/_power_wrapper.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@
3030

3131
_logger = logging.getLogger(__name__)
3232

33-
_POWER_DISTRIBUTING_ACTOR_WAIT_FOR_DATA_SEC = 2.0
34-
3533

3634
class PowerWrapper:
3735
"""Wrapper around the power managing and power distributing actors."""
@@ -80,7 +78,6 @@ def __init__(
8078

8179
self._power_distributing_actor: PowerDistributingActor | None = None
8280
self._power_managing_actor: _power_managing.PowerManagingActor | None = None
83-
self._pd_wait_for_data_sec: float = _POWER_DISTRIBUTING_ACTOR_WAIT_FOR_DATA_SEC
8481

8582
def _start_power_managing_actor(self) -> None:
8683
"""Start the power managing actor if it is not already running."""
@@ -151,7 +148,6 @@ def _start_power_distributing_actor(self) -> None:
151148
requests_receiver=self._power_distribution_requests_channel.new_receiver(),
152149
results_sender=self._power_distribution_results_channel.new_sender(),
153150
component_pool_status_sender=self.status_channel.new_sender(),
154-
wait_for_data_sec=self._pd_wait_for_data_sec,
155151
)
156152
self._power_distributing_actor.start()
157153

tests/actor/power_distributing/test_power_distributing.py

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ async def test_constructor_with_grid_meter(self, mocker: MockerFixture) -> None:
112112
requests_receiver=requests_channel.new_receiver(),
113113
results_sender=results_channel.new_sender(),
114114
component_pool_status_sender=battery_status_channel.new_sender(),
115-
wait_for_data_sec=0.0,
116115
) as distributor:
117116
assert isinstance(distributor._component_manager, BatteryManager)
118117
assert distributor._component_manager._bat_invs_map == {
@@ -144,7 +143,6 @@ async def test_constructor_without_grid_meter(self, mocker: MockerFixture) -> No
144143
requests_receiver=requests_channel.new_receiver(),
145144
results_sender=results_channel.new_sender(),
146145
component_pool_status_sender=battery_status_channel.new_sender(),
147-
wait_for_data_sec=0.0,
148146
) as distributor:
149147
assert isinstance(distributor._component_manager, BatteryManager)
150148
assert distributor._component_manager._bat_invs_map == {
@@ -210,8 +208,9 @@ async def test_power_distributor_one_user(self, mocker: MockerFixture) -> None:
210208
requests_receiver=requests_channel.new_receiver(),
211209
results_sender=results_channel.new_sender(),
212210
component_pool_status_sender=battery_status_channel.new_sender(),
213-
wait_for_data_sec=0.1,
214211
):
212+
await asyncio.sleep(0.1) # wait for actor to collect data
213+
215214
await requests_channel.new_sender().send(request)
216215
result_rx = results_channel.new_receiver()
217216

@@ -271,8 +270,9 @@ async def test_power_distributor_exclusion_bounds(
271270
requests_receiver=requests_channel.new_receiver(),
272271
results_sender=results_channel.new_sender(),
273272
component_pool_status_sender=battery_status_channel.new_sender(),
274-
wait_for_data_sec=0.1,
275273
):
274+
await asyncio.sleep(0.1) # wait for actor to collect data
275+
276276
# zero power requests should pass through despite the exclusion bounds.
277277
request = Request(
278278
power=Power.zero(),
@@ -374,8 +374,9 @@ async def test_two_batteries_one_inverters(self, mocker: MockerFixture) -> None:
374374
requests_receiver=requests_channel.new_receiver(),
375375
component_pool_status_sender=battery_status_channel.new_sender(),
376376
results_sender=results_channel.new_sender(),
377-
wait_for_data_sec=0.1,
378377
):
378+
await asyncio.sleep(0.1) # wait for actor to collect data
379+
379380
await requests_channel.new_sender().send(request)
380381
result_rx = results_channel.new_receiver()
381382

@@ -454,7 +455,6 @@ async def test_two_batteries_one_broken_one_inverters(
454455
requests_receiver=requests_channel.new_receiver(),
455456
component_pool_status_sender=battery_status_channel.new_sender(),
456457
results_sender=results_channel.new_sender(),
457-
wait_for_data_sec=0.1,
458458
):
459459
await requests_channel.new_sender().send(request)
460460
result_rx = results_channel.new_receiver()
@@ -510,8 +510,9 @@ async def test_battery_two_inverters(self, mocker: MockerFixture) -> None:
510510
requests_receiver=requests_channel.new_receiver(),
511511
component_pool_status_sender=battery_status_channel.new_sender(),
512512
results_sender=results_channel.new_sender(),
513-
wait_for_data_sec=0.1,
514513
):
514+
await asyncio.sleep(0.1) # wait for actor to collect data
515+
515516
await requests_channel.new_sender().send(request)
516517
result_rx = results_channel.new_receiver()
517518

@@ -562,8 +563,9 @@ async def test_two_batteries_three_inverters(self, mocker: MockerFixture) -> Non
562563
requests_receiver=requests_channel.new_receiver(),
563564
component_pool_status_sender=battery_status_channel.new_sender(),
564565
results_sender=results_channel.new_sender(),
565-
wait_for_data_sec=0.1,
566566
):
567+
await asyncio.sleep(0.1) # wait for actor to collect data
568+
567569
await requests_channel.new_sender().send(request)
568570
result_rx = results_channel.new_receiver()
569571

@@ -648,8 +650,9 @@ async def test_two_batteries_one_inverter_different_exclusion_bounds_2(
648650
requests_receiver=requests_channel.new_receiver(),
649651
component_pool_status_sender=battery_status_channel.new_sender(),
650652
results_sender=results_channel.new_sender(),
651-
wait_for_data_sec=0.1,
652653
):
654+
await asyncio.sleep(0.1) # wait for actor to collect data
655+
653656
await requests_channel.new_sender().send(request)
654657
result_rx = results_channel.new_receiver()
655658

@@ -735,8 +738,9 @@ async def test_two_batteries_one_inverter_different_exclusion_bounds(
735738
requests_receiver=requests_channel.new_receiver(),
736739
component_pool_status_sender=battery_status_channel.new_sender(),
737740
results_sender=results_channel.new_sender(),
738-
wait_for_data_sec=0.1,
739741
):
742+
await asyncio.sleep(0.1) # wait for actor to collect data
743+
740744
await requests_channel.new_sender().send(request)
741745
result_rx = results_channel.new_receiver()
742746

@@ -801,7 +805,6 @@ async def test_connected_but_not_requested_batteries(
801805
requests_receiver=requests_channel.new_receiver(),
802806
component_pool_status_sender=battery_status_channel.new_sender(),
803807
results_sender=results_channel.new_sender(),
804-
wait_for_data_sec=0.1,
805808
):
806809
await requests_channel.new_sender().send(request)
807810
result_rx = results_channel.new_receiver()
@@ -859,8 +862,9 @@ async def test_battery_soc_nan(self, mocker: MockerFixture) -> None:
859862
requests_receiver=requests_channel.new_receiver(),
860863
results_sender=results_channel.new_sender(),
861864
component_pool_status_sender=battery_status_channel.new_sender(),
862-
wait_for_data_sec=0.1,
863865
):
866+
await asyncio.sleep(0.1) # wait for actor to collect data
867+
864868
await requests_channel.new_sender().send(request)
865869
result_rx = results_channel.new_receiver()
866870

@@ -914,8 +918,9 @@ async def test_battery_capacity_nan(self, mocker: MockerFixture) -> None:
914918
requests_receiver=requests_channel.new_receiver(),
915919
results_sender=results_channel.new_sender(),
916920
component_pool_status_sender=battery_status_channel.new_sender(),
917-
wait_for_data_sec=0.1,
918921
):
922+
await asyncio.sleep(0.1) # wait for actor to collect data
923+
919924
await requests_channel.new_sender().send(request)
920925
result_rx = results_channel.new_receiver()
921926

@@ -988,8 +993,9 @@ async def test_battery_power_bounds_nan(self, mocker: MockerFixture) -> None:
988993
requests_receiver=requests_channel.new_receiver(),
989994
results_sender=results_channel.new_sender(),
990995
component_pool_status_sender=battery_status_channel.new_sender(),
991-
wait_for_data_sec=0.1,
992996
):
997+
await asyncio.sleep(0.1) # wait for actor to collect data
998+
993999
await requests_channel.new_sender().send(request)
9941000
result_rx = results_channel.new_receiver()
9951001

@@ -1034,7 +1040,6 @@ async def test_power_distributor_invalid_battery_id(
10341040
requests_receiver=requests_channel.new_receiver(),
10351041
results_sender=results_channel.new_sender(),
10361042
component_pool_status_sender=battery_status_channel.new_sender(),
1037-
wait_for_data_sec=0.1,
10381043
):
10391044
await requests_channel.new_sender().send(request)
10401045
result_rx = results_channel.new_receiver()
@@ -1079,8 +1084,9 @@ async def test_power_distributor_one_user_adjust_power_consume(
10791084
requests_receiver=requests_channel.new_receiver(),
10801085
results_sender=results_channel.new_sender(),
10811086
component_pool_status_sender=battery_status_channel.new_sender(),
1082-
wait_for_data_sec=0.1,
10831087
):
1088+
await asyncio.sleep(0.1) # wait for actor to collect data
1089+
10841090
await requests_channel.new_sender().send(request)
10851091
result_rx = results_channel.new_receiver()
10861092

@@ -1126,8 +1132,9 @@ async def test_power_distributor_one_user_adjust_power_supply(
11261132
requests_receiver=requests_channel.new_receiver(),
11271133
results_sender=results_channel.new_sender(),
11281134
component_pool_status_sender=battery_status_channel.new_sender(),
1129-
wait_for_data_sec=0.1,
11301135
):
1136+
await asyncio.sleep(0.1) # wait for actor to collect data
1137+
11311138
await requests_channel.new_sender().send(request)
11321139
result_rx = results_channel.new_receiver()
11331140

@@ -1173,8 +1180,9 @@ async def test_power_distributor_one_user_adjust_power_success(
11731180
requests_receiver=requests_channel.new_receiver(),
11741181
results_sender=results_channel.new_sender(),
11751182
component_pool_status_sender=battery_status_channel.new_sender(),
1176-
wait_for_data_sec=0.1,
11771183
):
1184+
await asyncio.sleep(0.1) # wait for actor to collect data
1185+
11781186
await requests_channel.new_sender().send(request)
11791187
result_rx = results_channel.new_receiver()
11801188

@@ -1213,8 +1221,9 @@ async def test_not_all_batteries_are_working(self, mocker: MockerFixture) -> Non
12131221
requests_receiver=requests_channel.new_receiver(),
12141222
results_sender=results_channel.new_sender(),
12151223
component_pool_status_sender=battery_status_channel.new_sender(),
1216-
wait_for_data_sec=0.1,
12171224
):
1225+
await asyncio.sleep(0.1) # wait for actor to collect data
1226+
12181227
request = Request(
12191228
power=Power.from_kilowatts(1.2),
12201229
component_ids=batteries,
@@ -1267,8 +1276,9 @@ async def test_partial_failure_result(self, mocker: MockerFixture) -> None:
12671276
requests_receiver=requests_channel.new_receiver(),
12681277
results_sender=results_channel.new_sender(),
12691278
component_pool_status_sender=battery_status_channel.new_sender(),
1270-
wait_for_data_sec=0.1,
12711279
):
1280+
await asyncio.sleep(0.1) # wait for actor to collect data
1281+
12721282
request = Request(
12731283
power=Power.from_kilowatts(1.70),
12741284
component_ids=batteries,

tests/timeseries/_battery_pool/test_battery_pool_control_methods.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,11 @@ async def _patch_battery_pool_status(
8989
If `battery_ids` is not None, the mock will always return `battery_ids`.
9090
Otherwise, it will return the requested batteries.
9191
"""
92+
mocker.patch.object(
93+
timeseries.battery_pool._methods, # pylint: disable=protected-access
94+
"WAIT_FOR_COMPONENT_DATA_SEC",
95+
0.1,
96+
)
9297
if battery_ids:
9398
mock = MagicMock(spec=ComponentPoolStatusTracker)
9499
mock.get_working_components.return_value = battery_ids

tests/timeseries/_ev_charger_pool/test_ev_charger_pool_control_methods.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,6 @@ async def _patch_ev_pool_status(
9696
ComponentPoolStatus(working=set(mocks.microgrid.evc_ids), uncertain=set())
9797
)
9898

99-
async def _patch_data_pipeline(self, mocker: MockerFixture) -> None:
100-
mocker.patch(
101-
"frequenz.sdk.microgrid._data_pipeline._DATA_PIPELINE._ev_power_wrapper"
102-
"._pd_wait_for_data_sec",
103-
0.1,
104-
)
105-
10699
async def _patch_power_distributing_actor(
107100
self,
108101
mocker: MockerFixture,
@@ -213,7 +206,6 @@ async def test_setting_power(
213206
)
214207

215208
await self._init_ev_chargers(mocks)
216-
await self._patch_data_pipeline(mocker)
217209
ev_charger_pool = microgrid.ev_charger_pool(priority=5)
218210
await self._patch_ev_pool_status(mocks, mocker)
219211
await self._patch_power_distributing_actor(mocker)

tests/timeseries/_pv_pool/test_pv_pool_control_methods.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,6 @@ async def mocks(mocker: MockerFixture) -> typing.AsyncIterator[_Mocks]:
5252
class TestPVPoolControl:
5353
"""Test control methods for the PVPool."""
5454

55-
async def _patch_data_pipeline(self, mocker: MockerFixture) -> None:
56-
mocker.patch(
57-
"frequenz.sdk.microgrid._data_pipeline._DATA_PIPELINE._pv_power_wrapper"
58-
"._pd_wait_for_data_sec",
59-
0.1,
60-
)
61-
6255
async def _init_pv_inverters(self, mocks: _Mocks) -> None:
6356
now = datetime.now(tz=timezone.utc)
6457
for idx, comp_id in enumerate(mocks.microgrid.pv_inverter_ids):
@@ -138,7 +131,6 @@ async def test_setting_power( # pylint: disable=too-many-statements
138131
)
139132

140133
await self._init_pv_inverters(mocks)
141-
await self._patch_data_pipeline(mocker)
142134
pv_pool = microgrid.pv_pool(priority=5)
143135
bounds_rx = pv_pool.power_status.new_receiver()
144136
await self._recv_reports_until(

0 commit comments

Comments
 (0)