Skip to content

Commit 498b131

Browse files
committed
Rename .sdk.power_distribution and move actor
Rename frequenz.sdk.power_distribution to frequenz.sdk.power and expose symbols only in this main module. Also moves the PowerDistributor to frequenz.sdk.actor.powerdistributing.PowerDistributingActor, including Request and Result. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 6818406 commit 498b131

File tree

11 files changed

+211
-188
lines changed

11 files changed

+211
-188
lines changed

benchmarks/power_distribution/power_distributor.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,14 @@
1313
import grpc.aio as grpcaio
1414
from frequenz.channels import Bidirectional
1515

16+
from frequenz.sdk.actor.power_distributing import (
17+
PowerDistributingActor,
18+
Request,
19+
Result,
20+
)
1621
from frequenz.sdk.microgrid.client import MicrogridApiClient, MicrogridGrpcClient
1722
from frequenz.sdk.microgrid.component import Component, ComponentCategory
1823
from frequenz.sdk.microgrid.graph import ComponentGraph, _MicrogridComponentGraph
19-
from frequenz.sdk.power_distribution import PowerDistributor, Request, Result
2024

2125
HOST = "157.90.243.180"
2226
PORT = 61060
@@ -31,7 +35,7 @@ class User:
3135

3236

3337
async def run_user(user: User, batteries: Set[int], request_num: int) -> List[Result]:
34-
"""Send requests to the PowerDistributor and wait for the response.
38+
"""Send requests to the PowerDistributingActor and wait for the response.
3539
3640
Args:
3741
user: user that should send request
@@ -42,7 +46,7 @@ async def run_user(user: User, batteries: Set[int], request_num: int) -> List[Re
4246
SystemError: If the channel was closed.
4347
4448
Returns:
45-
List of the results from the PowerDistributor.
49+
List of the results from the PowerDistributingActor.
4650
"""
4751
result: List[Result] = []
4852
for _ in range(request_num):
@@ -101,7 +105,7 @@ async def run_test( # pylint: disable=too-many-locals
101105
users_num: Number of users to register
102106
requests_per_user: How many request user should send.
103107
batteries: Set of batteries for each request.
104-
distributor: PowerDistributor instance.
108+
distributor: PowerDistributingActor instance.
105109
106110
Returns:
107111
Dictionary with statistics.
@@ -117,7 +121,7 @@ async def run_test( # pylint: disable=too-many-locals
117121
user_id: channel.service_handle for user_id, channel in channels.items()
118122
}
119123

120-
distributor = PowerDistributor(api, graph, service_channels)
124+
distributor = PowerDistributingActor(api, graph, service_channels)
121125

122126
tasks: List[Coroutine[Any, Any, List[Result]]] = []
123127
for user_id, channel in channels.items():

examples/sdk_usage_example.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"""Frequenz Python SDK usage examples.
55
66
This example creates two users.
7-
One user sends request with power to apply in PowerDistributor.
7+
One user sends request with power to apply in PowerDistributingActor.
88
Second user receives requests and set that power.
99
"""
1010

@@ -22,13 +22,17 @@
2222
from frequenz.sdk._data_ingestion import MicrogridData
2323
from frequenz.sdk._data_ingestion.formula_calculator import FormulaCalculator
2424
from frequenz.sdk.actor import actor
25+
from frequenz.sdk.actor.power_distributing import (
26+
PowerDistributingActor,
27+
Request,
28+
Result,
29+
)
2530
from frequenz.sdk.microgrid import (
2631
Component,
2732
ComponentCategory,
2833
MicrogridApi,
2934
microgrid_api,
3035
)
31-
from frequenz.sdk.power_distribution import PowerDistributor, Request, Result
3236

3337
_logger = logging.getLogger(__name__)
3438
HOST = "microgrid.sandbox.api.frequenz.io" # it should be the host name.
@@ -92,11 +96,11 @@ async def run(self) -> None:
9296
)
9397
except asyncio.exceptions.TimeoutError:
9498
_logger.error(
95-
"Got timeout error when waiting for response from PowerDistributor"
99+
"Got timeout error when waiting for response from PowerDistributingActor"
96100
)
97101
continue
98102
if result is None:
99-
raise RuntimeError("PowerDistributor channel has been closed.")
103+
raise RuntimeError("PowerDistributingActor channel has been closed.")
100104
if result.status != Result.Status.SUCCESS:
101105
_logger.error(
102106
"Could not set %d power. Result: %s", power_to_set, str(result)
@@ -181,11 +185,11 @@ async def run() -> None:
181185
# Bidirectional channel is used for one sender - one receiver communication
182186
power_distributor_channels = {
183187
sending_actor_id: Bidirectional[Request, Result](
184-
client_id=sending_actor_id, service_id="PowerDistributor"
188+
client_id=sending_actor_id, service_id="PowerDistributingActor"
185189
)
186190
}
187191

188-
power_distributor = PowerDistributor(
192+
power_distributor = PowerDistributingActor(
189193
microgrid_api=api.microgrid_api_client,
190194
# microgrid_api=microgrid_api.microgrid_api, in v0.8.0
191195
component_graph=api.component_graph,

src/frequenz/sdk/actor/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
"""A base class for creating simple composable actors."""
55

6+
from . import power_distributing
67
from ._channel_registry import ChannelRegistry
78
from ._config_managing import ConfigManagingActor
89
from ._data_sourcing import ComponentMetricRequest, DataSourcingActor
@@ -16,4 +17,5 @@
1617
"ConfigManagingActor",
1718
"DataSourcingActor",
1819
"actor",
20+
"power_distributing",
1921
]

src/frequenz/sdk/power_distribution/power_distributor.py renamed to src/frequenz/sdk/actor/power_distributing.py

Lines changed: 127 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,24 @@
11
# License: MIT
22
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
33

4-
"""Tool to distribute power between batteries.
4+
"""Actor to distribute power between batteries.
55
6-
Purpose of this tool is to keep SoC level of each component at the equal level.
6+
When charge/discharge method is called the power should be distributed so that
7+
the SoC in batteries stays at the same level. That way of distribution
8+
prevents using only one battery, increasing temperature, and maximize the total
9+
amount power to charge/discharge.
10+
11+
Purpose of this actor is to keep SoC level of each component at the equal level.
712
"""
813

914
from __future__ import annotations
1015

1116
import asyncio
1217
import logging
1318
from asyncio.tasks import ALL_COMPLETED
19+
from dataclasses import dataclass
1420
from datetime import datetime, timezone
21+
from enum import Enum
1522
from typing import ( # pylint: disable=unused-import
1623
Any,
1724
Dict,
@@ -27,19 +34,127 @@
2734
from frequenz.channels import Bidirectional, Peekable, Receiver
2835
from google.protobuf.empty_pb2 import Empty # pylint: disable=no-name-in-module
2936

30-
from ..actor import actor
37+
from ..actor._decorator import actor
3138
from ..microgrid.client import MicrogridApiClient
3239
from ..microgrid.component import Component, ComponentCategory
3340
from ..microgrid.component_data import BatteryData, InverterData
3441
from ..microgrid.graph import ComponentGraph
35-
from .distribution_algorithm import DistributionAlgorithm
36-
from .utils import BrokenComponents, InvBatPair, Request, Result, User
42+
from ..power import DistributionAlgorithm, InvBatPair
3743

3844
_logger = logging.getLogger(__name__)
3945

4046

47+
@dataclass
48+
class _User:
49+
"""User definitions."""
50+
51+
user_id: str
52+
"""The unique identifier for a user of the power distributing actor."""
53+
54+
# Channel for the communication
55+
channel: Bidirectional.Handle[Result, Request]
56+
"""The bidirectional channel to communicate with the user."""
57+
58+
59+
class _BrokenComponents:
60+
"""Store components marked as broken."""
61+
62+
def __init__(self, timeout_sec: float) -> None:
63+
"""Create object instance.
64+
65+
Args:
66+
timeout_sec: How long the component should be marked as broken.
67+
"""
68+
self._broken: Dict[int, datetime] = {}
69+
self._timeout_sec = timeout_sec
70+
71+
def mark_as_broken(self, component_id: int) -> None:
72+
"""Mark component as broken.
73+
74+
After marking component as broken it would be considered as broken for
75+
self._timeout_sec.
76+
77+
Args:
78+
component_id: component id
79+
"""
80+
self._broken[component_id] = datetime.now(timezone.utc)
81+
82+
def update_retry(self, timeout_sec: float) -> None:
83+
"""Change how long the component should be marked as broken.
84+
85+
Args:
86+
timeout_sec: New retry time after sec.
87+
"""
88+
self._timeout_sec = timeout_sec
89+
90+
def is_broken(self, component_id: int) -> bool:
91+
"""Check if component is marked as broken.
92+
93+
Args:
94+
component_id: component id
95+
96+
Returns:
97+
True if component is broken, False otherwise.
98+
"""
99+
if component_id in self._broken:
100+
last_broken = self._broken[component_id]
101+
if (
102+
datetime.now(timezone.utc) - last_broken
103+
).total_seconds() < self._timeout_sec:
104+
return True
105+
106+
del self._broken[component_id]
107+
return False
108+
109+
110+
@dataclass
111+
class Request:
112+
"""Request from the user."""
113+
114+
# How much power to set
115+
power: int
116+
# In which batteries the power should be set
117+
batteries: Set[int]
118+
# Timeout for the server to respond on the request.
119+
request_timeout_sec: float = 5.0
120+
# If True and requested power value is out of bound, then
121+
# PowerDistributor will decrease the power to match the bounds and
122+
# distribute only decreased power.
123+
# If False and the requested power is out of bound, then
124+
# PowerDistributor will not process this request and send result with status
125+
# Result.Status.OUT_OF_BOUND.
126+
adjust_power: bool = True
127+
128+
129+
@dataclass
130+
class Result:
131+
"""Result on distribution request."""
132+
133+
class Status(Enum):
134+
"""Status of the result."""
135+
136+
FAILED = 0 # If any request for any battery didn't succeed for any reason.
137+
SUCCESS = 1 # If all requests for all batteries succeed.
138+
IGNORED = 2 # If request was dispossessed by newer request with the same set
139+
# of batteries.
140+
ERROR = 3 # If any error happened. In this case error_message describes error.
141+
OUT_OF_BOUND = 4 # When Request.adjust_power=False and the requested power was
142+
# out of the bounds for specified batteries.
143+
144+
status: Status # Status of the request.
145+
146+
failed_power: float # How much power failed.
147+
148+
above_upper_bound: float # How much power was not used because it was beyond the
149+
# limits.
150+
151+
error_message: Optional[
152+
str
153+
] = None # error_message filled only when status is ERROR
154+
155+
41156
@actor
42-
class PowerDistributor:
157+
class PowerDistributingActor:
43158
# pylint: disable=too-many-instance-attributes
44159
"""Tool to distribute power between batteries in microgrid.
45160
@@ -142,7 +257,7 @@ def __init__(
142257
self.distribution_algorithm = DistributionAlgorithm(
143258
self.power_distributor_exponent
144259
)
145-
self._broken_components = BrokenComponents(self.broken_component_timeout_sec)
260+
self._broken_components = _BrokenComponents(self.broken_component_timeout_sec)
146261

147262
self._bat_inv_map, self._inv_bat_map = self._get_components_pairs(
148263
component_graph
@@ -156,7 +271,7 @@ def __init__(
156271
# important. It will execute both. And later request will override the previous
157272
# one.
158273
# That is why the queue of maxsize = total number of batteries should be enough.
159-
self._request_queue: asyncio.Queue[Tuple[Request, User]] = asyncio.Queue(
274+
self._request_queue: asyncio.Queue[Tuple[Request, _User]] = asyncio.Queue(
160275
maxsize=len(self._bat_inv_map)
161276
)
162277

@@ -169,7 +284,7 @@ def __init__(
169284
def _create_users_tasks(self) -> None:
170285
"""For each user create a task to wait for request."""
171286
for user, handler in self._users_channels.items():
172-
asyncio.create_task(self._wait_for_request(User(user, handler)))
287+
asyncio.create_task(self._wait_for_request(_User(user, handler)))
173288

174289
def get_upper_bound(self, batteries: Set[int]) -> float:
175290
"""Get total upper bound of power to be set for given batteries.
@@ -317,7 +432,7 @@ def _check_request(self, request: Request) -> Optional[Result]:
317432
return None
318433

319434
def _remove_duplicated_requests(
320-
self, request: Request, user: User
435+
self, request: Request, user: _User
321436
) -> List[asyncio.Task[bool]]:
322437
"""Remove duplicated requests from the queue.
323438
@@ -334,7 +449,7 @@ def _remove_duplicated_requests(
334449
"""
335450
batteries = request.batteries
336451

337-
good_requests: List[Tuple[Request, User]] = []
452+
good_requests: List[Tuple[Request, _User]] = []
338453
to_ignore: List[asyncio.Task[bool]] = []
339454

340455
while not self._request_queue.empty():
@@ -367,7 +482,7 @@ def _remove_duplicated_requests(
367482
self._request_queue.put_nowait(good_request)
368483
return to_ignore
369484

370-
async def _wait_for_request(self, user: User) -> None:
485+
async def _wait_for_request(self, user: _User) -> None:
371486
"""Wait for the request from user.
372487
373488
Check if request is correct. If request is not correct send ERROR response

src/frequenz/sdk/power/__init__.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# License: MIT
2+
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Utilities to manage power in a microgrid."""
5+
6+
from ._distribution_algorithm import (
7+
DistributionAlgorithm,
8+
DistributionResult,
9+
InvBatPair,
10+
)
11+
12+
__all__ = [
13+
"DistributionAlgorithm",
14+
"DistributionResult",
15+
"InvBatPair",
16+
]

0 commit comments

Comments
 (0)