Skip to content

Commit bed6720

Browse files
authored
Control interface for BatteryPool (frequenz-floss#408)
This PR updates the PowerDistributingActor to remove its multi-user interface, make its accessible only through the BatteryPool, with these methods: 1. set_power :: send a PSC power request to the power distributor. 2. charge :: send a charge power (+ve) to the power distributor. 3. discharge :: send a discharge power (+ve) to the power distributor. 4. power_distribution_results :: get a receiver to power distributor responses.
2 parents 776352c + 9d5df88 commit bed6720

File tree

11 files changed

+402
-596
lines changed

11 files changed

+402
-596
lines changed

benchmarks/power_distribution/power_distributor.py

Lines changed: 20 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,16 @@
77
import csv
88
import random
99
import timeit
10-
from dataclasses import dataclass
1110
from datetime import timedelta
1211
from typing import Any, Coroutine, Dict, List, Set # pylint: disable=unused-import
1312

14-
from frequenz.channels import Bidirectional, Broadcast
13+
from frequenz.channels import Broadcast
1514

1615
from frequenz.sdk import microgrid
17-
from frequenz.sdk.actor import ResamplerConfig
16+
from frequenz.sdk.actor import ChannelRegistry, ResamplerConfig
1817
from frequenz.sdk.actor.power_distributing import (
1918
BatteryStatus,
2019
Error,
21-
Ignored,
2220
OutOfBound,
2321
PartialFailure,
2422
PowerDistributingActor,
@@ -33,15 +31,7 @@
3331
PORT = 61060
3432

3533

36-
@dataclass
37-
class User:
38-
"""User definition."""
39-
40-
user_id: str
41-
channel: Bidirectional.Handle[Request, Result]
42-
43-
44-
async def run_user(user: User, batteries: Set[int], request_num: int) -> List[Result]:
34+
async def send_requests(batteries: Set[int], request_num: int) -> List[Result]:
4535
"""Send requests to the PowerDistributingActor and wait for the response.
4636
4737
Args:
@@ -55,15 +45,15 @@ async def run_user(user: User, batteries: Set[int], request_num: int) -> List[Re
5545
Returns:
5646
List of the results from the PowerDistributingActor.
5747
"""
48+
battery_pool = microgrid.battery_pool(batteries)
49+
results_rx = battery_pool.power_distribution_results()
5850
result: List[Result] = []
5951
for _ in range(request_num):
60-
await user.channel.send(
61-
Request(power=float(random.randrange(100000, 1000000)), batteries=batteries)
62-
)
52+
await battery_pool.set_power(float(random.randrange(100000, 1000000)))
6353
try:
64-
output = await asyncio.wait_for(user.channel.receive(), timeout=3)
54+
output = await asyncio.wait_for(results_rx.receive(), timeout=3)
6555
if output is None:
66-
raise SystemError(f"Channel for {user.user_id} closed!")
56+
raise SystemError(f"Power response channel for {battery_pool} closed!")
6757
result.append(output)
6858
except asyncio.exceptions.TimeoutError:
6959
print("TIMEOUT ERROR")
@@ -82,7 +72,6 @@ def parse_result(result: List[List[Result]]) -> Dict[str, float]:
8272
"""
8373
result_counts = {
8474
Error: 0,
85-
Ignored: 0,
8675
Success: 0,
8776
PartialFailure: 0,
8877
OutOfBound: 0,
@@ -95,57 +84,45 @@ def parse_result(result: List[List[Result]]) -> Dict[str, float]:
9584
return {
9685
"success_num": result_counts[Success],
9786
"failed_num": result_counts[PartialFailure],
98-
"ignore_num": result_counts[Ignored],
9987
"error_num": result_counts[Error],
10088
"out_of_bound": result_counts[OutOfBound],
10189
}
10290

10391

10492
async def run_test( # pylint: disable=too-many-locals
105-
users_num: int,
106-
requests_per_user: int,
93+
num_requests: int,
10794
batteries: Set[int],
10895
) -> Dict[str, Any]:
10996
"""Run test.
11097
11198
Args:
112-
users_num: Number of users to register
113-
requests_per_user: How many request user should send.
99+
num_requests: Number of requests to send.
114100
batteries: Set of batteries for each request.
115101
116102
Returns:
117103
Dictionary with statistics.
118104
"""
119105
start = timeit.default_timer()
120106

121-
channels: Dict[str, Bidirectional[Request, Result]] = {
122-
str(user_id): Bidirectional[Request, Result](str(user_id), "power_distributor")
123-
for user_id in range(users_num)
124-
}
125-
126-
service_channels = {
127-
user_id: channel.service_handle for user_id, channel in channels.items()
128-
}
129-
107+
power_request_channel = Broadcast[Request]("power-request")
130108
battery_status_channel = Broadcast[BatteryStatus]("battery-status")
131-
109+
channel_registry = ChannelRegistry(name="power_distributor")
132110
distributor = PowerDistributingActor(
133-
service_channels, battery_status_sender=battery_status_channel.new_sender()
111+
channel_registry=channel_registry,
112+
requests_receiver=power_request_channel.new_receiver(),
113+
battery_status_sender=battery_status_channel.new_sender(),
134114
)
135115

136116
tasks: List[Coroutine[Any, Any, List[Result]]] = []
137-
for user_id, channel in channels.items():
138-
user = User(user_id, channel.client_handle)
139-
tasks.append(run_user(user, batteries, requests_per_user))
117+
tasks.append(send_requests(batteries, num_requests))
140118

141119
result = await asyncio.gather(*tasks)
142120
exec_time = timeit.default_timer() - start
143121

144122
await distributor._stop() # type: ignore # pylint: disable=no-member, protected-access
145123

146124
summary = parse_result(result)
147-
summary["users_num"] = users_num
148-
summary["requests_per_user"] = requests_per_user
125+
summary["num_requests"] = num_requests
149126
summary["batteries_num"] = len(batteries)
150127
summary["exec_time"] = exec_time
151128
return summary
@@ -166,12 +143,11 @@ async def run() -> None:
166143
# Take some time to get data from components
167144
await asyncio.sleep(4)
168145
with open("/dev/stdout", "w", encoding="utf-8") as csvfile:
169-
fields = await run_test(0, 0, batteries_ids)
146+
fields = await run_test(0, batteries_ids)
170147
out = csv.DictWriter(csvfile, fields.keys())
171148
out.writeheader()
172-
out.writerow(await run_test(1, 1, batteries_ids))
173-
out.writerow(await run_test(1, 10, batteries_ids))
174-
out.writerow(await run_test(10, 10, batteries_ids))
149+
out.writerow(await run_test(1, batteries_ids))
150+
out.writerow(await run_test(10, batteries_ids))
175151

176152

177153
async def main() -> None:

examples/power_distribution.py

Lines changed: 13 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,14 @@
1414
import logging
1515
from datetime import datetime, timedelta, timezone
1616
from queue import Queue
17-
from typing import List, Optional, Set
17+
from typing import List, Optional
1818

19-
from frequenz.channels import Bidirectional, Broadcast, Receiver, Sender
19+
from frequenz.channels import Broadcast, Receiver, Sender
2020

2121
from frequenz.sdk import actor, microgrid
22-
from frequenz.sdk.actor import (
23-
ChannelRegistry,
24-
ComponentMetricRequest,
25-
ComponentMetricsResamplingActor,
26-
DataSourcingActor,
27-
ResamplerConfig,
28-
)
29-
from frequenz.sdk.actor.power_distributing import (
30-
BatteryStatus,
31-
PowerDistributingActor,
32-
Request,
33-
Result,
34-
Success,
35-
)
36-
from frequenz.sdk.microgrid import connection_manager
37-
from frequenz.sdk.microgrid.component import Component, ComponentCategory
22+
from frequenz.sdk.actor import ResamplerConfig
23+
from frequenz.sdk.actor.power_distributing import Result, Success
3824
from frequenz.sdk.timeseries import Sample
39-
from frequenz.sdk.timeseries.logical_meter import LogicalMeter
4025

4126
_logger = logging.getLogger(__name__)
4227
HOST = "microgrid.sandbox.api.frequenz.io" # it should be the host name.
@@ -47,30 +32,26 @@
4732
class DecisionMakingActor:
4833
"""Actor that receives set receives power for given batteries."""
4934

50-
def __init__( # pylint: disable=too-many-arguments
35+
def __init__(
5136
self,
5237
power_channel: Receiver[List[float]],
53-
power_distributor_handle: Bidirectional.Handle[Request, Result],
54-
batteries: Set[int],
5538
) -> None:
5639
"""Create actor instance.
5740
5841
Args:
5942
power_channel: channel where actor receives requests
60-
power_distributor_handle: Channel
61-
for communication with power distributor
62-
batteries: Batteries to charge/discharge
6343
"""
6444
self._power_channel = power_channel
65-
self._power_distributor_handle = power_distributor_handle
66-
self._batteries = batteries
6745

6846
async def run(self) -> None:
6947
"""Run actor.
7048
7149
Raises:
7250
RuntimeError: If any channel was closed unexpectedly
7351
"""
52+
battery_pool = microgrid.battery_pool()
53+
result_rx = battery_pool.power_distribution_results()
54+
7455
while True:
7556
# wait for request with blocking
7657
request: Optional[List[float]] = await self._power_channel.receive()
@@ -88,16 +69,11 @@ async def run(self) -> None:
8869
# Discharge
8970
power_to_set = -10000.0
9071

91-
await self._power_distributor_handle.send(
92-
Request(
93-
power_to_set,
94-
batteries=self._batteries,
95-
request_timeout_sec=2.0,
96-
)
97-
)
72+
await battery_pool.set_power(power_to_set)
9873
try:
9974
result: Optional[Result] = await asyncio.wait_for(
100-
self._power_distributor_handle.receive(), timeout=3
75+
result_rx.receive(),
76+
timeout=3,
10177
)
10278
except asyncio.exceptions.TimeoutError:
10379
_logger.error(
@@ -164,76 +140,22 @@ async def run() -> None:
164140
HOST, PORT, ResamplerConfig(resampling_period=timedelta(seconds=1.0))
165141
)
166142

167-
channel_registry = ChannelRegistry(name="Microgrid Channel Registry")
168-
169-
data_source_request_channel = Broadcast[ComponentMetricRequest](
170-
"Data Source Request Channel"
171-
)
172-
173-
resampling_actor_request_channel = Broadcast[ComponentMetricRequest](
174-
"Resampling Actor Request Channel"
175-
)
176-
177-
_ds_actor = DataSourcingActor(
178-
request_receiver=data_source_request_channel.new_receiver(),
179-
registry=channel_registry,
180-
)
181-
182-
_resampling_actor = ComponentMetricsResamplingActor(
183-
channel_registry=channel_registry,
184-
data_sourcing_request_sender=data_source_request_channel.new_sender(),
185-
resampling_request_receiver=resampling_actor_request_channel.new_receiver(),
186-
config=ResamplerConfig(resampling_period=timedelta(seconds=1.0)),
187-
)
188-
189-
logical_meter = LogicalMeter(
190-
channel_registry,
191-
resampling_actor_request_channel.new_sender(),
192-
)
193-
sending_actor_id: str = "SendingActor"
194-
# Bidirectional channel is used for one sender - one receiver communication
195-
power_distributor_channels = {
196-
sending_actor_id: Bidirectional[Request, Result](
197-
client_id=sending_actor_id, service_id="PowerDistributingActor"
198-
)
199-
}
200-
201-
battery_status_channel = Broadcast[BatteryStatus]("battery-status")
202-
203-
power_distributor = PowerDistributingActor(
204-
users_channels={
205-
key: channel.service_handle
206-
for key, channel in power_distributor_channels.items()
207-
},
208-
battery_status_sender=battery_status_channel.new_sender(),
209-
)
210-
143+
logical_meter = microgrid.logical_meter()
211144
# Channel to communicate between actors.
212145
power_dist_req_chan = Broadcast[List[float]](
213146
"power-distribing-req", resend_latest=True
214147
)
215148

216-
# You should get components from ComponentGraph, not from the api.
217-
# It is faster and and non blocking approach.
218-
batteries: Set[Component] = connection_manager.get().component_graph.components(
219-
# component_type=set(ComponentType.BATTERY) in v0.8.0
220-
component_category={ComponentCategory.BATTERY}
221-
)
222-
223149
service_actor = DecisionMakingActor(
224150
power_channel=power_dist_req_chan.new_receiver(),
225-
power_distributor_handle=power_distributor_channels[
226-
sending_actor_id
227-
].client_handle,
228-
batteries={battery.component_id for battery in batteries},
229151
)
230152

231153
client_actor = DataCollectingActor(
232154
request_channel=power_dist_req_chan.new_sender(),
233155
active_power_data=logical_meter.grid_power.new_receiver(),
234156
)
235157

236-
await actor.run(service_actor, client_actor, power_distributor)
158+
await actor.run(service_actor, client_actor)
237159

238160

239161
asyncio.run(run())

src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,10 @@ def get_channel_name(self) -> str:
6060
Returns:
6161
A string denoting a channel name.
6262
"""
63-
return f"{self.component_id}::{self.metric_id.name}::{self.start_time}::{self.namespace}"
63+
return (
64+
f"component-stream::{self.component_id}::{self.metric_id.name}::"
65+
f"{self.start_time}::{self.namespace}"
66+
)
6467

6568

6669
_MeterDataMethods: Dict[ComponentMetricId, Callable[[MeterData], float]] = {

src/frequenz/sdk/actor/power_distributing/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,14 @@
1313
from ._battery_pool_status import BatteryStatus
1414
from .power_distributing import PowerDistributingActor
1515
from .request import Request
16-
from .result import Error, Ignored, OutOfBound, PartialFailure, Result, Success
16+
from .result import Error, OutOfBound, PartialFailure, Result, Success
1717

1818
__all__ = [
1919
"PowerDistributingActor",
2020
"Request",
2121
"Result",
2222
"Error",
2323
"Success",
24-
"Ignored",
2524
"OutOfBound",
2625
"PartialFailure",
2726
"BatteryStatus",

src/frequenz/sdk/actor/power_distributing/_battery_pool_status.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import asyncio
88
import logging
9+
from collections import abc
910
from dataclasses import dataclass
1011
from typing import Dict, Set
1112

@@ -28,7 +29,7 @@ class BatteryStatus:
2829
uncertain: Set[int]
2930
"""Set of batteries that should be used only if there are no working batteries."""
3031

31-
def get_working_batteries(self, batteries: Set[int]) -> Set[int]:
32+
def get_working_batteries(self, batteries: abc.Set[int]) -> Set[int]:
3233
"""From the given set of batteries return working batteries.
3334
3435
Args:
@@ -203,7 +204,7 @@ async def update_status(
203204
SetPowerResult(succeed_batteries, failed_batteries)
204205
)
205206

206-
def get_working_batteries(self, batteries: Set[int]) -> Set[int]:
207+
def get_working_batteries(self, batteries: abc.Set[int]) -> Set[int]:
207208
"""From the given set of batteries get working.
208209
209210
Args:

0 commit comments

Comments
 (0)