Skip to content

Commit 8bb2533

Browse files
Add fine-grained Result types for the PowerDistributingActor
Previously Result was one class with many fields. Now each result has its own class that derives from Result parent class. Signed-off-by: ela-kotulska-frequenz <[email protected]>
1 parent 5693da1 commit 8bb2533

File tree

7 files changed

+356
-174
lines changed

7 files changed

+356
-174
lines changed

RELEASE_NOTES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88

99
<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
1010

11+
- Add fine-grained Result types for the PowerDistributingActor.
12+
Previously Result was one class with many fields. Now each result has its own class
13+
that derives from Result parent class.
14+
1115
## New Features
1216

1317
<!-- Here goes the main new features and examples or instructions on how to use them -->

benchmarks/power_distribution/power_distributor.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,14 @@
1414
from frequenz.channels import Bidirectional
1515

1616
from frequenz.sdk.actor.power_distributing import (
17+
Error,
18+
Ignored,
19+
OutOfBound,
20+
PartialFailure,
1721
PowerDistributingActor,
1822
Request,
1923
Result,
24+
Success,
2025
)
2126
from frequenz.sdk.microgrid import ComponentGraph
2227
from frequenz.sdk.microgrid._graph import _MicrogridComponentGraph
@@ -75,21 +80,23 @@ def parse_result(result: List[List[Result]]) -> Dict[str, float]:
7580
Number of each result.
7681
"""
7782
result_counts = {
78-
Result.Status.ERROR: 0,
79-
Result.Status.IGNORED: 0,
80-
Result.Status.SUCCESS: 0,
81-
Result.Status.FAILED: 0,
83+
Error: 0,
84+
Ignored: 0,
85+
Success: 0,
86+
PartialFailure: 0,
87+
OutOfBound: 0,
8288
}
8389

8490
for result_list in result:
8591
for item in result_list:
86-
result_counts[item.status] += 1
92+
result_counts[type(item)] += 1
8793

8894
return {
89-
"success_num": result_counts[Result.Status.SUCCESS],
90-
"failed_num": result_counts[Result.Status.FAILED],
91-
"ignore_num": result_counts[Result.Status.IGNORED],
92-
"error_num": result_counts[Result.Status.ERROR],
95+
"success_num": result_counts[Success],
96+
"failed_num": result_counts[PartialFailure],
97+
"ignore_num": result_counts[Ignored],
98+
"error_num": result_counts[Error],
99+
"out_of_bound": result_counts[OutOfBound],
93100
}
94101

95102

examples/power_distribution.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
PowerDistributingActor,
2828
Request,
2929
Result,
30+
Success,
3031
)
3132
from frequenz.sdk.microgrid.component import Component, ComponentCategory
3233

@@ -97,14 +98,12 @@ async def run(self) -> None:
9798
continue
9899
if result is None:
99100
raise RuntimeError("PowerDistributingActor channel has been closed.")
100-
if result.status != Result.Status.SUCCESS:
101+
if not isinstance(result, Success):
101102
_logger.error(
102-
"Could not set %d power. Result: %s", power_to_set, str(result)
103+
"Could not set %d power. Result: %s", power_to_set, type(result)
103104
)
104105
else:
105-
_logger.info(
106-
"Set power with %d succeed, result: %s", power_to_set, str(result)
107-
)
106+
_logger.info("Set power with %d succeed.", power_to_set)
108107

109108

110109
@actor

src/frequenz/sdk/actor/power_distributing/__init__.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,17 @@
1010
PowerDistributingActor and send requests for charging or discharging power.
1111
"""
1212

13-
from .power_distributing import PowerDistributingActor, Result
13+
from .power_distributing import PowerDistributingActor
1414
from .request import Request
15+
from .result import Error, Ignored, OutOfBound, PartialFailure, Result, Success
1516

16-
__all__ = ["PowerDistributingActor", "Request", "Result"]
17+
__all__ = [
18+
"PowerDistributingActor",
19+
"Request",
20+
"Result",
21+
"Error",
22+
"Success",
23+
"Ignored",
24+
"OutOfBound",
25+
"PartialFailure",
26+
]

src/frequenz/sdk/actor/power_distributing/power_distributing.py

Lines changed: 66 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
from asyncio.tasks import ALL_COMPLETED
1919
from dataclasses import dataclass
2020
from datetime import datetime, timezone
21-
from enum import Enum
2221
from math import ceil, floor
2322
from typing import ( # pylint: disable=unused-import
2423
Any,
@@ -46,6 +45,7 @@
4645
)
4746
from ...power import DistributionAlgorithm, InvBatPair
4847
from .request import Request
48+
from .result import Error, Ignored, OutOfBound, PartialFailure, Result, Success
4949

5050
_logger = logging.getLogger(__name__)
5151

@@ -141,33 +141,6 @@ def is_broken(self, component_id: int) -> bool:
141141
return False
142142

143143

144-
@dataclass
145-
class Result:
146-
"""Result on distribution request."""
147-
148-
class Status(Enum):
149-
"""Status of the result."""
150-
151-
FAILED = 0 # If any request for any battery didn't succeed for any reason.
152-
SUCCESS = 1 # If all requests for all batteries succeed.
153-
IGNORED = 2 # If request was dispossessed by newer request with the same set
154-
# of batteries.
155-
ERROR = 3 # If any error happened. In this case error_message describes error.
156-
OUT_OF_BOUND = 4 # When Request.adjust_power=False and the requested power was
157-
# out of the bounds for specified batteries.
158-
159-
status: Status # Status of the request.
160-
161-
failed_power: float # How much power failed.
162-
163-
above_upper_bound: float # How much power was not used because it was beyond the
164-
# limits.
165-
166-
error_message: Optional[
167-
str
168-
] = None # error_message filled only when status is ERROR
169-
170-
171144
@actor
172145
class PowerDistributingActor:
173146
# pylint: disable=too-many-instance-attributes
@@ -197,10 +170,14 @@ class PowerDistributingActor:
197170
198171
from frequenz.sdk.microgrid.graph import _MicrogridComponentGraph
199172
from frequenz.sdk.microgrid.component import ComponentCategory
200-
from frequenz.sdk.power_distribution import (
173+
from frequenz.sdk.actor.power_distribution import (
201174
PowerDistributor,
202175
Request,
203176
Result,
177+
Success,
178+
Error,
179+
PartialFailure,
180+
Ignored,
204181
)
205182
206183
@@ -228,15 +205,16 @@ class PowerDistributingActor:
228205
# It is recommended to use timeout when waiting for the response!
229206
result: Result = await asyncio.wait_for(client_handle.receive(), timeout=10)
230207
231-
if result.status == Result.Status.SUCCESS:
208+
if isinstance(result, Success):
232209
print("Command succeed")
233-
elif result.status == Result.Status.FAILED:
210+
elif isinstance(result, PartialFailure):
234211
print(
235-
f"Some batteries failed, total failed power: {result.failed_power}")
236-
elif result.status == Result.Status.IGNORED:
237-
print(f"Request was ignored, because of newer command")
238-
elif result.status == Result.Status.ERROR:
239-
print(f"Request failed with error: {request.error_message}")
212+
f"Batteries {result.failed_batteries} failed, total failed power" \
213+
f"{result.failed_power}")
214+
elif isinstance(result, Ignored):
215+
print(f"Request was ignored, because of newer request")
216+
elif isinstance(result, Error):
217+
print(f"Request failed with error: {result.msg}")
240218
```
241219
"""
242220

@@ -339,19 +317,6 @@ def _get_lower_bound(self, batteries: Set[int]) -> int:
339317
)
340318
return ceil(bound)
341319

342-
def _within_bounds(self, request: Request) -> bool:
343-
"""Check whether the requested power is withing the bounds.
344-
345-
Args:
346-
request: request
347-
348-
Returns:
349-
True if power is between the bounds, False otherwise.
350-
"""
351-
power = request.power
352-
lower_bound = self._get_lower_bound(request.batteries)
353-
return lower_bound <= power <= self._get_upper_bound(request.batteries)
354-
355320
async def run(self) -> None:
356321
"""Run actor main function.
357322
@@ -374,33 +339,33 @@ async def run(self) -> None:
374339
request.batteries
375340
)
376341
except KeyError as err:
377-
await user.channel.send(
378-
Result(Result.Status.ERROR, request.power, 0, str(err))
379-
)
342+
await user.channel.send(Error(request, str(err)))
380343
continue
344+
381345
if len(pairs_data) == 0:
382346
error_msg = f"No data for the given batteries {str(request.batteries)}"
383-
await user.channel.send(
384-
Result(Result.Status.ERROR, request.power, 0, str(error_msg))
385-
)
347+
await user.channel.send(Error(request, str(error_msg)))
386348
continue
349+
387350
try:
388351
distribution = self.distribution_algorithm.distribute_power(
389352
request.power, pairs_data
390353
)
391354
except ValueError as err:
392355
error_msg = f"Couldn't distribute power, error: {str(err)}"
393-
await user.channel.send(
394-
Result(Result.Status.ERROR, request.power, 0, error_msg)
395-
)
356+
await user.channel.send(Error(request, str(error_msg)))
396357
continue
397358

398359
distributed_power_value = request.power - distribution.remaining_power
360+
battery_distribution = {
361+
self._inv_bat_map[bat_id]: dist
362+
for bat_id, dist in distribution.distribution.items()
363+
}
399364
_logger.debug(
400-
"%s: Distributing power %d between the inverters %s",
365+
"%s: Distributing power %d between the batteries %s",
401366
user.user_id,
402367
distributed_power_value,
403-
str(distribution.distribution),
368+
str(battery_distribution),
404369
)
405370

406371
tasks = {
@@ -417,14 +382,31 @@ async def run(self) -> None:
417382
)
418383

419384
await self._cancel_tasks(pending)
420-
any_fail, failed_power = self._parse_result(
385+
failed_power, failed_batteries = self._parse_result(
421386
tasks, distribution.distribution, request.request_timeout_sec
422387
)
423388

424-
status = Result.Status.FAILED if any_fail else Result.Status.SUCCESS
425-
await user.channel.send(
426-
Result(status, failed_power, distribution.remaining_power)
427-
)
389+
if len(failed_batteries) > 0:
390+
succeed_batteries = set(battery_distribution.keys()) - failed_batteries
391+
await user.channel.send(
392+
PartialFailure(
393+
request=request,
394+
succeed_power=distributed_power_value,
395+
succeed_batteries=succeed_batteries,
396+
failed_power=failed_power,
397+
failed_batteries=failed_batteries,
398+
excess_power=distribution.remaining_power,
399+
)
400+
)
401+
else:
402+
await user.channel.send(
403+
Success(
404+
request=request,
405+
succeed_power=distributed_power_value,
406+
used_batteries=set(battery_distribution.keys()),
407+
excess_power=distribution.remaining_power,
408+
)
409+
)
428410

429411
def _check_request(self, request: Request) -> Optional[Result]:
430412
"""Check whether the given request if correct.
@@ -441,10 +423,17 @@ def _check_request(self, request: Request) -> Optional[Result]:
441423
f"No battery {battery}, available batteries: "
442424
f"{list(self._battery_receivers.keys())}"
443425
)
444-
return Result(Result.Status.ERROR, request.power, 0, error_message=msg)
426+
return Error(request, msg)
445427

446-
if not request.adjust_power and not self._within_bounds(request):
447-
return Result(Result.Status.OUT_OF_BOUND, request.power, 0)
428+
if not request.adjust_power:
429+
if request.power < 0:
430+
bound = self._get_lower_bound(request.batteries)
431+
if request.power < bound:
432+
return OutOfBound(request, bound)
433+
else:
434+
bound = self._get_upper_bound(request.batteries)
435+
if request.power > bound:
436+
return OutOfBound(request, bound)
448437

449438
return None
450439

@@ -474,9 +463,7 @@ def _remove_duplicated_requests(
474463
# Generators seems to be the fastest
475464
if prev_request.batteries == batteries:
476465
task = asyncio.create_task(
477-
prev_user.channel.send(
478-
Result(Result.Status.IGNORED, prev_request.power, 0)
479-
)
466+
prev_user.channel.send(Ignored(prev_request))
480467
)
481468
to_ignore.append(task)
482469
# Use generators as generators seems to be the fastest.
@@ -546,9 +533,7 @@ async def _wait_for_request(self, user: _User) -> None:
546533
"Consider increasing size of the queue."
547534
)
548535
_logger.error(msg)
549-
await user.channel.send(
550-
Result(Result.Status.ERROR, request.power, 0, msg)
551-
)
536+
await user.channel.send(Error(request, str(msg)))
552537
else:
553538
self._request_queue.put_nowait((request, user))
554539
await asyncio.gather(*tasks)
@@ -686,7 +671,7 @@ def _parse_result(
686671
tasks, # type: Dict[int, asyncio.Task[Empty]]
687672
distribution: Dict[int, int],
688673
request_timeout_sec: float,
689-
) -> Tuple[bool, int]:
674+
) -> Tuple[int, Set[int]]:
690675
"""Parse result of `set_power` requests.
691676
692677
Check if any task failed and why. If any task didn't success, then corresponding
@@ -700,19 +685,19 @@ def _parse_result(
700685
request_timeout_sec: timeout which has been used for request.
701686
702687
Returns:
703-
Tuple where first element tells if any task didn't succeed, and the
704-
second element is total amount of power that failed.
688+
Tuple where first element is total failed power, and the second element
689+
set of batteries that failed.
705690
"""
706-
any_fail: bool = False
707691
failed_power: int = 0
692+
failed_batteries: Set[int] = set()
708693

709694
for inverter_id, aws in tasks.items():
710695
battery_id = self._inv_bat_map[inverter_id]
711696
try:
712697
aws.result()
713698
except grpc.aio.AioRpcError as err:
714-
any_fail = True
715699
failed_power += distribution[inverter_id]
700+
failed_batteries.add(battery_id)
716701
if err.code() == grpc.StatusCode.OUT_OF_RANGE:
717702
_logger.debug(
718703
"Set power for battery %d failed, error %s",
@@ -727,16 +712,16 @@ def _parse_result(
727712
)
728713
self._broken_components.mark_as_broken(battery_id)
729714
except asyncio.exceptions.CancelledError:
730-
any_fail = True
731715
failed_power += distribution[inverter_id]
716+
failed_batteries.add(battery_id)
732717
_logger.warning(
733718
"Battery %d didn't respond in %f sec. Mark it as broken.",
734719
battery_id,
735720
request_timeout_sec,
736721
)
737722
self._broken_components.mark_as_broken(battery_id)
738723

739-
return any_fail, failed_power
724+
return failed_power, failed_batteries
740725

741726
async def _cancel_tasks(self, tasks: Iterable[asyncio.Task[Any]]) -> None:
742727
"""Cancel given asyncio tasks and wait for them.

0 commit comments

Comments
 (0)