Skip to content

Commit ff526d9

Browse files
committed
Set api request timeout from microgrid.initialize
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 933bc61 commit ff526d9

File tree

8 files changed

+72
-15
lines changed

8 files changed

+72
-15
lines changed

benchmarks/power_distribution/power_distributor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ async def run_test( # pylint: disable=too-many-locals
120120
requests_receiver=power_request_channel.new_receiver(),
121121
results_sender=power_result_channel.new_sender(),
122122
component_pool_status_sender=battery_status_channel.new_sender(),
123+
api_power_request_timeout=timedelta(seconds=5.0),
123124
):
124125
tasks: list[Coroutine[Any, Any, list[Result]]] = []
125126
tasks.append(send_requests(batteries, num_requests))

src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,14 +125,15 @@ def _add(key: str, value: dict[int, set[int]] | None) -> None:
125125
return mapping
126126

127127

128-
class BatteryManager(ComponentManager):
128+
class BatteryManager(ComponentManager): # pylint: disable=too-many-instance-attributes
129129
"""Class to manage the data streams for batteries."""
130130

131131
@override
132132
def __init__(
133133
self,
134134
component_pool_status_sender: Sender[ComponentPoolStatus],
135135
results_sender: Sender[Result],
136+
api_power_request_timeout: timedelta,
136137
):
137138
"""Initialize this instance.
138139
@@ -142,8 +143,11 @@ def __init__(
142143
streams, to dynamically adjust the values based on the health of the
143144
individual batteries.
144145
results_sender: Channel sender to send the power distribution results to.
146+
api_power_request_timeout: Timeout to use when making power requests to
147+
the microgrid API.
145148
"""
146149
self._results_sender = results_sender
150+
self._api_power_request_timeout = api_power_request_timeout
147151
self._batteries = connection_manager.get().component_graph.components(
148152
component_categories={ComponentCategory.BATTERY}
149153
)
@@ -277,7 +281,7 @@ async def _distribute_power(
277281
)
278282

279283
failed_power, failed_batteries = await self._set_distributed_power(
280-
distribution, request.request_timeout
284+
distribution, self._api_power_request_timeout
281285
)
282286

283287
response: Success | PartialFailure

src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@
3232

3333
_logger = logging.getLogger(__name__)
3434

35-
_DEFAULT_API_REQUEST_TIMEOUT = timedelta(seconds=5.0)
36-
3735

3836
class EVChargerManager(ComponentManager):
3937
"""Manage ev chargers for the power distributor."""
@@ -43,15 +41,19 @@ def __init__(
4341
self,
4442
component_pool_status_sender: Sender[ComponentPoolStatus],
4543
results_sender: Sender[Result],
44+
api_power_request_timeout: timedelta,
4645
):
4746
"""Initialize the ev charger data manager.
4847
4948
Args:
5049
component_pool_status_sender: Channel for sending information about which
5150
components are expected to be working.
5251
results_sender: Channel for sending results of power distribution.
52+
api_power_request_timeout: Timeout to use when making power requests to
53+
the microgrid API.
5354
"""
5455
self._results_sender = results_sender
56+
self._api_power_request_timeout = api_power_request_timeout
5557
self._ev_charger_ids = self._get_ev_charger_ids()
5658
self._evc_states = EvcStates()
5759
self._voltage_cache: LatestValueCache[Sample3Phase[Voltage]] = LatestValueCache(
@@ -226,7 +228,6 @@ async def _run(self) -> None: # pylint: disable=too-many-locals
226228
*[await api.ev_charger_data(evc_id) for evc_id in self._ev_charger_ids]
227229
)
228230
target_power_rx = self._target_power_channel.new_receiver()
229-
api_request_timeout = _DEFAULT_API_REQUEST_TIMEOUT
230231
latest_target_powers: dict[int, Power] = {}
231232
async for selected in select(ev_charger_data_rx, target_power_rx):
232233
target_power_changes = {}
@@ -260,7 +261,6 @@ async def _run(self) -> None: # pylint: disable=too-many-locals
260261

261262
elif selected_from(selected, target_power_rx):
262263
self._latest_request = selected.message
263-
api_request_timeout = selected.message.request_timeout
264264
self._target_power = selected.message.power
265265
_logger.debug("New target power: %s", self._target_power)
266266
used_power = self._evc_states.get_ev_total_used_power()
@@ -288,7 +288,7 @@ async def _run(self) -> None: # pylint: disable=too-many-locals
288288

289289
latest_target_powers.update(target_power_changes)
290290
result = await self._set_api_power(
291-
api, target_power_changes, api_request_timeout
291+
api, target_power_changes, self._api_power_request_timeout
292292
)
293293
await self._results_sender.send(result)
294294

src/frequenz/sdk/actor/power_distributing/_component_managers/_pv_inverter_manager/_pv_inverter_manager.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,19 @@ def __init__(
3838
self,
3939
component_pool_status_sender: Sender[ComponentPoolStatus],
4040
results_sender: Sender[Result],
41+
api_power_request_timeout: timedelta,
4142
) -> None:
4243
"""Initialize this instance.
4344
4445
Args:
4546
component_pool_status_sender: Channel for sending information about which
4647
components are expected to be working.
4748
results_sender: Channel for sending results of power distribution.
49+
api_power_request_timeout: Timeout to use when making power requests to
50+
the microgrid API.
4851
"""
4952
self._results_sender = results_sender
53+
self._api_power_request_timeout = api_power_request_timeout
5054
self._pv_inverter_ids = self._get_pv_inverter_ids()
5155

5256
self._component_pool_status_tracker = (
@@ -177,7 +181,7 @@ async def _set_api_power( # pylint: disable=too-many-locals
177181
)
178182
_, pending = await asyncio.wait(
179183
tasks.values(),
180-
timeout=request.request_timeout.total_seconds(),
184+
timeout=self._api_power_request_timeout.total_seconds(),
181185
return_when=asyncio.ALL_COMPLETED,
182186
)
183187
# collect the timed out tasks and cancel them while keeping the

src/frequenz/sdk/actor/power_distributing/power_distributing.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
"""
1313

1414

15+
from datetime import timedelta
16+
1517
from frequenz.channels import Receiver, Sender
1618
from frequenz.client.microgrid import ComponentCategory, ComponentType, InverterType
1719
from typing_extensions import override
@@ -60,6 +62,7 @@ def __init__( # pylint: disable=too-many-arguments
6062
results_sender: Sender[Result],
6163
component_pool_status_sender: Sender[ComponentPoolStatus],
6264
*,
65+
api_power_request_timeout: timedelta,
6366
component_category: ComponentCategory,
6467
component_type: ComponentType | None = None,
6568
name: str | None = None,
@@ -72,6 +75,8 @@ def __init__( # pylint: disable=too-many-arguments
7275
results_sender: Sender for sending results to the power manager.
7376
component_pool_status_sender: Channel for sending information about which
7477
components are expected to be working.
78+
api_power_request_timeout: Timeout to use when making power requests to
79+
the microgrid API.
7580
component_category: The category of the components that this actor is
7681
responsible for.
7782
component_type: The type of the component of the given category that this
@@ -92,22 +97,23 @@ def __init__( # pylint: disable=too-many-arguments
9297
self._component_type = component_type
9398
self._requests_receiver = requests_receiver
9499
self._result_sender = results_sender
100+
self._api_power_request_timeout = api_power_request_timeout
95101

96102
self._component_manager: ComponentManager
97103
if component_category == ComponentCategory.BATTERY:
98104
self._component_manager = BatteryManager(
99-
component_pool_status_sender, results_sender
105+
component_pool_status_sender, results_sender, api_power_request_timeout
100106
)
101107
elif component_category == ComponentCategory.EV_CHARGER:
102108
self._component_manager = EVChargerManager(
103-
component_pool_status_sender, results_sender
109+
component_pool_status_sender, results_sender, api_power_request_timeout
104110
)
105111
elif (
106112
component_category == ComponentCategory.INVERTER
107113
and component_type == InverterType.SOLAR
108114
):
109115
self._component_manager = PVManager(
110-
component_pool_status_sender, results_sender
116+
component_pool_status_sender, results_sender, api_power_request_timeout
111117
)
112118
else:
113119
raise ValueError(

src/frequenz/sdk/microgrid/_data_pipeline.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import typing
1515
from collections import abc
1616
from dataclasses import dataclass
17+
from datetime import timedelta
1718

1819
from frequenz.channels import Broadcast, Sender
1920
from frequenz.client.microgrid import ComponentCategory, InverterType
@@ -79,11 +80,14 @@ class _DataPipeline: # pylint: disable=too-many-instance-attributes
7980
def __init__(
8081
self,
8182
resampler_config: ResamplerConfig,
83+
api_power_request_timeout: timedelta = timedelta(seconds=5.0),
8284
) -> None:
8385
"""Create a `DataPipeline` instance.
8486
8587
Args:
8688
resampler_config: Config to pass on to the resampler.
89+
api_power_request_timeout: Timeout to use when making power requests to
90+
the microgrid API.
8791
"""
8892
from ..actor import ChannelRegistry
8993

@@ -97,13 +101,18 @@ def __init__(
97101
self._resampling_actor: _ActorInfo | None = None
98102

99103
self._battery_power_wrapper = PowerWrapper(
100-
self._channel_registry, component_category=ComponentCategory.BATTERY
104+
self._channel_registry,
105+
api_power_request_timeout=api_power_request_timeout,
106+
component_category=ComponentCategory.BATTERY,
101107
)
102108
self._ev_power_wrapper = PowerWrapper(
103-
self._channel_registry, component_category=ComponentCategory.EV_CHARGER
109+
self._channel_registry,
110+
api_power_request_timeout=api_power_request_timeout,
111+
component_category=ComponentCategory.EV_CHARGER,
104112
)
105113
self._pv_power_wrapper = PowerWrapper(
106114
self._channel_registry,
115+
api_power_request_timeout=api_power_request_timeout,
107116
component_category=ComponentCategory.INVERTER,
108117
component_type=InverterType.SOLAR,
109118
)
@@ -485,11 +494,18 @@ async def _stop(self) -> None:
485494
_DATA_PIPELINE: _DataPipeline | None = None
486495

487496

488-
async def initialize(resampler_config: ResamplerConfig) -> None:
497+
async def initialize(
498+
resampler_config: ResamplerConfig,
499+
api_power_request_timeout: timedelta = timedelta(seconds=5.0),
500+
) -> None:
489501
"""Initialize a `DataPipeline` instance.
490502
491503
Args:
492504
resampler_config: Config to pass on to the resampler.
505+
api_power_request_timeout: Timeout to use when making power requests to
506+
the microgrid API. When requests to components timeout, they will
507+
be marked as blocked for a short duration, during which time they
508+
will be unavailable from the corresponding component pools.
493509
494510
Raises:
495511
RuntimeError: if the DataPipeline is already initialized.
@@ -498,7 +514,7 @@ async def initialize(resampler_config: ResamplerConfig) -> None:
498514

499515
if _DATA_PIPELINE is not None:
500516
raise RuntimeError("DataPipeline is already initialized.")
501-
_DATA_PIPELINE = _DataPipeline(resampler_config)
517+
_DATA_PIPELINE = _DataPipeline(resampler_config, api_power_request_timeout)
502518

503519

504520
def frequency() -> GridFrequency:

src/frequenz/sdk/microgrid/_power_wrapper.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import logging
99
import typing
10+
from datetime import timedelta
1011

1112
from frequenz.channels import Broadcast
1213

@@ -38,13 +39,16 @@ def __init__(
3839
self,
3940
channel_registry: ChannelRegistry,
4041
*,
42+
api_power_request_timeout: timedelta,
4143
component_category: ComponentCategory,
4244
component_type: ComponentType | None = None,
4345
):
4446
"""Initialize the power control.
4547
4648
Args:
4749
channel_registry: A channel registry for use in the actors.
50+
api_power_request_timeout: Timeout to use when making power requests to
51+
the microgrid API.
4852
component_category: The category of the components that actors started by
4953
this instance of the PowerWrapper will be responsible for.
5054
component_type: The type of the component of the given category that this
@@ -58,6 +62,7 @@ def __init__(
5862
self._component_category = component_category
5963
self._component_type = component_type
6064
self._channel_registry = channel_registry
65+
self._api_power_request_timeout = api_power_request_timeout
6166

6267
self.status_channel: Broadcast[ComponentPoolStatus] = Broadcast(
6368
name="Component Status Channel", resend_latest=True
@@ -145,6 +150,7 @@ def _start_power_distributing_actor(self) -> None:
145150
self._power_distributing_actor = PowerDistributingActor(
146151
component_category=self._component_category,
147152
component_type=self._component_type,
153+
api_power_request_timeout=self._api_power_request_timeout,
148154
requests_receiver=self._power_distribution_requests_channel.new_receiver(),
149155
results_sender=self._power_distribution_results_channel.new_sender(),
150156
component_pool_status_sender=self.status_channel.new_sender(),

0 commit comments

Comments
 (0)