Skip to content

Commit aaa5465

Browse files
Define way to create object with async constructor (#145)
2 parents 1401ff2 + 97cef26 commit aaa5465

File tree

7 files changed

+184
-144
lines changed

7 files changed

+184
-144
lines changed

CONTRIBUTING.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ python -m pip install -e .
2121
You can also use `nox` to run the tests and other checks:
2222

2323
```sh
24-
python -m pip install nox
24+
python -m pip install nox toml tomli
2525
nox
2626
```
2727

src/frequenz/sdk/_internal/asyncio.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"""General purpose async tools."""
55

66
import asyncio
7+
from abc import ABC
78

89

910
async def cancel_and_await(task: asyncio.Task) -> None:
@@ -19,3 +20,22 @@ async def cancel_and_await(task: asyncio.Task) -> None:
1920
await task
2021
except asyncio.CancelledError:
2122
pass
23+
24+
25+
class NotSyncConstructible(AssertionError):
26+
"""Raised when object with async constructor is created in sync way."""
27+
28+
29+
class AsyncConstructible(ABC):
30+
"""Parent class for classes where part of the constructor is async."""
31+
32+
def __init__(self) -> None:
33+
"""Raise error when object is created in sync way.
34+
35+
Raises:
36+
NotSyncConstructible: If this method is called.
37+
"""
38+
raise NotSyncConstructible(
39+
"This object shouldn't be created with default constructor. ",
40+
"Check class documentation for more information.",
41+
)

src/frequenz/sdk/actor/power_distributing/_battery_pool_status.py

Lines changed: 36 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,39 @@
22
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
33
"""Class that stores pool of batteries and manage them."""
44

5+
from __future__ import annotations
6+
57
import asyncio
68
import logging
7-
from typing import Set
9+
from typing import Dict, Set
810

11+
from ..._internal.asyncio import AsyncConstructible
912
from ...microgrid._battery import BatteryStatus, StatusTracker
1013
from .result import PartialFailure, Result, Success
1114

1215
_logger = logging.getLogger(__name__)
1316

1417

15-
class BatteryPoolStatus:
16-
"""Holds pool of batteries and returns data from them."""
18+
class BatteryPoolStatus(AsyncConstructible):
19+
"""Return status of batteries in the pool.
20+
21+
To create an instance of this class you should use `async_new` class method.
22+
Standard constructor (__init__) is not supported and using it will raise
23+
`NotSyncConstructible` error.
24+
"""
1725

18-
def __init__(
19-
self,
26+
# This is instance attribute.
27+
# Don't assign default value, because then it becomes class attribute.
28+
_batteries: Dict[int, StatusTracker]
29+
30+
@classmethod
31+
async def async_new(
32+
cls,
2033
battery_ids: Set[int],
2134
max_data_age_sec: float,
2235
max_blocking_duration_sec: float,
23-
) -> None:
24-
"""Create partially initialized object instance.
25-
26-
Note:
27-
Please call `async_init` method to fully initialize BatteryPoolStatus. Otherwise
28-
it is not possible to use BatteryPoolStatus.
36+
) -> BatteryPoolStatus:
37+
"""Create BatteryPoolStatus instance.
2938
3039
Args:
3140
battery_ids: set of batteries ids that should be stored in pool.
@@ -35,20 +44,24 @@ def __init__(
3544
data.
3645
max_blocking_duration_sec: This value tell what should be the maximum
3746
timeout used for blocking failing component.
47+
48+
Raises:
49+
RuntimeError: If any battery has no adjacent inverter.
50+
51+
Returns:
52+
New instance of this class.
3853
"""
39-
self._batteries = {
40-
id: StatusTracker(id, max_data_age_sec, max_blocking_duration_sec)
54+
self: BatteryPoolStatus = BatteryPoolStatus.__new__(cls)
55+
56+
tasks = [
57+
StatusTracker.async_new(id, max_data_age_sec, max_blocking_duration_sec)
4158
for id in battery_ids
42-
}
43-
self._init_method_called: bool = False
44-
45-
async def async_init(self) -> None:
46-
"""Init battery pool."""
47-
await asyncio.gather(
48-
*[bat.async_init() for bat in self._batteries.values()],
49-
return_exceptions=True,
50-
)
51-
self._init_method_called = True
59+
]
60+
61+
trackers = await asyncio.gather(*tasks)
62+
self._batteries = {tracker.battery_id: tracker for tracker in trackers}
63+
64+
return self
5265

5366
def get_working_batteries(self, battery_ids: Set[int]) -> Set[int]:
5467
"""Get subset of battery_ids with working batteries.
@@ -64,11 +77,6 @@ def get_working_batteries(self, battery_ids: Set[int]) -> Set[int]:
6477
Returns:
6578
Subset of given batteries with working batteries.
6679
"""
67-
if not self._init_method_called:
68-
raise RuntimeError(
69-
"`async_init` method not called or not awaited. Run it before first use"
70-
)
71-
7280
working: Set[int] = set()
7381
uncertain: Set[int] = set()
7482
for bat_id in battery_ids:
@@ -101,11 +109,6 @@ def update_last_request_status(self, result: Result):
101109
RuntimeError: If `async_init` method was not called at the beginning to
102110
initialize object.
103111
"""
104-
if not self._init_method_called:
105-
raise RuntimeError(
106-
"`async_init` method not called or not awaited. Run it before first use"
107-
)
108-
109112
if isinstance(result, Success):
110113
for bat_id in result.used_batteries:
111114
self._batteries[bat_id].unblock()

src/frequenz/sdk/actor/power_distributing/power_distributing.py

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -161,16 +161,9 @@ def __init__(
161161
self.power_distributor_exponent
162162
)
163163

164-
graph = microgrid.get().component_graph
165-
batteries = graph.components(component_category={ComponentCategory.BATTERY})
166-
167-
self._battery_pool = BatteryPoolStatus(
168-
battery_ids={battery.component_id for battery in batteries},
169-
max_blocking_duration_sec=30.0,
170-
max_data_age_sec=10.0,
164+
self._bat_inv_map, self._inv_bat_map = self._get_components_pairs(
165+
microgrid.get().component_graph
171166
)
172-
173-
self._bat_inv_map, self._inv_bat_map = self._get_components_pairs(graph)
174167
self._battery_receivers: Dict[int, Peekable[BatteryData]] = {}
175168
self._inverter_receivers: Dict[int, Peekable[InverterData]] = {}
176169

@@ -251,18 +244,24 @@ async def run(self) -> None:
251244
as broken for some time.
252245
"""
253246
await self._create_channels()
254-
await self._battery_pool.async_init()
247+
255248
api = microgrid.get().api_client
249+
battery_pool = await BatteryPoolStatus.async_new(
250+
battery_ids=set(self._bat_inv_map.keys()),
251+
max_blocking_duration_sec=30.0,
252+
max_data_age_sec=10.0,
253+
)
256254

257255
# Wait few seconds to get data from the channels created above.
258256
await asyncio.sleep(self._wait_for_data_sec)
257+
259258
self._started.set()
260259
while True:
261260
request, user = await self._request_queue.get()
262261

263262
try:
264263
pairs_data: List[InvBatPair] = self._get_components_data(
265-
request.batteries
264+
battery_pool.get_working_batteries(request.batteries)
266265
)
267266
except KeyError as err:
268267
await user.channel.send(Error(request, str(err)))
@@ -294,14 +293,10 @@ async def run(self) -> None:
294293
str(battery_distribution),
295294
)
296295

297-
tasks = await self._set_distributed_power(
296+
failed_power, failed_batteries = await self._set_distributed_power(
298297
api, distribution, request.request_timeout_sec
299298
)
300299

301-
failed_power, failed_batteries = self._parse_result(
302-
tasks, distribution.distribution, request.request_timeout_sec
303-
)
304-
305300
if len(failed_batteries) > 0:
306301
succeed_batteries = set(battery_distribution.keys()) - failed_batteries
307302
response = PartialFailure(
@@ -320,15 +315,15 @@ async def run(self) -> None:
320315
excess_power=distribution.remaining_power,
321316
)
322317

323-
self._battery_pool.update_last_request_status(response)
318+
battery_pool.update_last_request_status(response)
324319
await user.channel.send(response)
325320

326321
async def _set_distributed_power(
327322
self,
328323
api: MicrogridApiClient,
329324
distribution: DistributionResult,
330325
timeout_sec: float,
331-
) -> Dict[int, asyncio.Task[Empty]]:
326+
) -> Tuple[int, Set[int]]:
332327
"""Send distributed power to the inverters.
333328
334329
Args:
@@ -337,7 +332,8 @@ async def _set_distributed_power(
337332
timeout_sec: How long wait for the response
338333
339334
Returns:
340-
Dict with finished or cancelled task for each inverter.
335+
Tuple where first element is total failed power, and the second element
336+
set of batteries that failed.
341337
"""
342338
tasks = {
343339
inverter_id: asyncio.create_task(api.set_power(inverter_id, power))
@@ -351,7 +347,8 @@ async def _set_distributed_power(
351347
)
352348

353349
await self._cancel_tasks(pending)
354-
return tasks
350+
351+
return self._parse_result(tasks, distribution.distribution, timeout_sec)
355352

356353
def _check_request(self, request: Request) -> Optional[Result]:
357354
"""Check whether the given request if correct.
@@ -539,7 +536,7 @@ def _get_components_data(self, batteries: Set[int]) -> List[InvBatPair]:
539536
"""
540537
pairs_data: List[InvBatPair] = []
541538

542-
for battery_id in self._battery_pool.get_working_batteries(batteries):
539+
for battery_id in batteries:
543540
if battery_id not in self._battery_receivers:
544541
raise KeyError(
545542
f"No battery {battery_id}, "

0 commit comments

Comments
 (0)