77import csv
88import random
99import timeit
10- from dataclasses import dataclass
1110from datetime import timedelta
1211from typing import Any , Coroutine , Dict , List , Set # pylint: disable=unused-import
1312
14- from frequenz .channels import Bidirectional , Broadcast
13+ from frequenz .channels import Broadcast
1514
1615from frequenz .sdk import microgrid
17- from frequenz .sdk .actor import ResamplerConfig
16+ from frequenz .sdk .actor import ChannelRegistry , ResamplerConfig
1817from frequenz .sdk .actor .power_distributing import (
1918 BatteryStatus ,
2019 Error ,
3231PORT = 61060
3332
3433
35- @dataclass
36- class User :
37- """User definition."""
38-
39- user_id : str
40- channel : Bidirectional .Handle [Request , Result ]
41-
42-
43- async def run_user (user : User , batteries : Set [int ], request_num : int ) -> List [Result ]:
34+ async def send_requests (batteries : Set [int ], request_num : int ) -> List [Result ]:
4435 """Send requests to the PowerDistributingActor and wait for the response.
4536
4637 Args:
@@ -54,15 +45,15 @@ async def run_user(user: User, batteries: Set[int], request_num: int) -> List[Re
5445 Returns:
5546 List of the results from the PowerDistributingActor.
5647 """
48+ battery_pool = microgrid .battery_pool (batteries )
49+ results_rx = battery_pool .power_distribution_results ()
5750 result : List [Result ] = []
5851 for _ in range (request_num ):
59- await user .channel .send (
60- Request (power = float (random .randrange (100000 , 1000000 )), batteries = batteries )
61- )
52+ await battery_pool .set_power (float (random .randrange (100000 , 1000000 )))
6253 try :
63- output = await asyncio .wait_for (user . channel .receive (), timeout = 3 )
54+ output = await asyncio .wait_for (results_rx .receive (), timeout = 3 )
6455 if output is None :
65- raise SystemError (f"Channel for { user . user_id } closed!" )
56+ raise SystemError (f"Power response channel for { battery_pool } closed!" )
6657 result .append (output )
6758 except asyncio .exceptions .TimeoutError :
6859 print ("TIMEOUT ERROR" )
@@ -99,50 +90,39 @@ def parse_result(result: List[List[Result]]) -> Dict[str, float]:
9990
10091
10192async def run_test ( # pylint: disable=too-many-locals
102- users_num : int ,
103- requests_per_user : int ,
93+ num_requests : int ,
10494 batteries : Set [int ],
10595) -> Dict [str , Any ]:
10696 """Run test.
10797
10898 Args:
109- users_num: Number of users to register
110- requests_per_user: How many request user should send.
99+ num_requests: Number of requests to send.
111100 batteries: Set of batteries for each request.
112101
113102 Returns:
114103 Dictionary with statistics.
115104 """
116105 start = timeit .default_timer ()
117106
118- channels : Dict [str , Bidirectional [Request , Result ]] = {
119- str (user_id ): Bidirectional [Request , Result ](str (user_id ), "power_distributor" )
120- for user_id in range (users_num )
121- }
122-
123- service_channels = {
124- user_id : channel .service_handle for user_id , channel in channels .items ()
125- }
126-
107+ power_request_channel = Broadcast [Request ]("power-request" )
127108 battery_status_channel = Broadcast [BatteryStatus ]("battery-status" )
128-
109+ channel_registry = ChannelRegistry ( name = "power_distributor" )
129110 distributor = PowerDistributingActor (
130- service_channels , battery_status_sender = battery_status_channel .new_sender ()
111+ channel_registry = channel_registry ,
112+ requests_receiver = power_request_channel .new_receiver (),
113+ battery_status_sender = battery_status_channel .new_sender (),
131114 )
132115
133116 tasks : List [Coroutine [Any , Any , List [Result ]]] = []
134- for user_id , channel in channels .items ():
135- user = User (user_id , channel .client_handle )
136- tasks .append (run_user (user , batteries , requests_per_user ))
117+ tasks .append (send_requests (batteries , num_requests ))
137118
138119 result = await asyncio .gather (* tasks )
139120 exec_time = timeit .default_timer () - start
140121
141122 await distributor ._stop () # type: ignore # pylint: disable=no-member, protected-access
142123
143124 summary = parse_result (result )
144- summary ["users_num" ] = users_num
145- summary ["requests_per_user" ] = requests_per_user
125+ summary ["num_requests" ] = num_requests
146126 summary ["batteries_num" ] = len (batteries )
147127 summary ["exec_time" ] = exec_time
148128 return summary
@@ -163,12 +143,11 @@ async def run() -> None:
163143 # Take some time to get data from components
164144 await asyncio .sleep (4 )
165145 with open ("/dev/stdout" , "w" , encoding = "utf-8" ) as csvfile :
166- fields = await run_test (0 , 0 , batteries_ids )
146+ fields = await run_test (0 , batteries_ids )
167147 out = csv .DictWriter (csvfile , fields .keys ())
168148 out .writeheader ()
169- out .writerow (await run_test (1 , 1 , batteries_ids ))
170- out .writerow (await run_test (1 , 10 , batteries_ids ))
171- out .writerow (await run_test (10 , 10 , batteries_ids ))
149+ out .writerow (await run_test (1 , batteries_ids ))
150+ out .writerow (await run_test (10 , batteries_ids ))
172151
173152
174153async def main () -> None :
0 commit comments