2424from typing import Any , Dict , Iterable , List , Optional , Self , Set , Tuple
2525
2626import grpc
27- from frequenz .channels import Bidirectional , Peekable , Receiver , Sender
27+ from frequenz .channels import Peekable , Receiver , Sender
2828from google .protobuf .empty_pb2 import Empty # pylint: disable=no-name-in-module
2929
30- from ..._internal . _asyncio import cancel_and_await
30+ from ...actor import ChannelRegistry
3131from ...actor ._decorator import actor
3232from ...microgrid import ComponentGraph , connection_manager
3333from ...microgrid .client import MicrogridApiClient
@@ -119,7 +119,7 @@ class PowerDistributingActor:
119119 PartialFailure,
120120 Ignored,
121121 )
122- from frequenz.channels import Bidirectional, Broadcast, Receiver, Sender
122+ from frequenz.channels import Broadcast, Receiver, Sender
123123 from datetime import timedelta
124124 from frequenz.sdk import actor
125125
@@ -177,21 +177,31 @@ class PowerDistributingActor:
177177
178178 def __init__ (
179179 self ,
180- users_channels : Dict [str , Bidirectional .Handle [Result , Request ]],
180+ requests_receiver : Receiver [Request ],
181+ channel_registry : ChannelRegistry ,
181182 battery_status_sender : Sender [BatteryStatus ],
182183 wait_for_data_sec : float = 2 ,
183184 ) -> None :
184185 """Create class instance.
185186
186187 Args:
187- users_channels: BidirectionalHandle for each user. Key should be
188- user id and value should be BidirectionalHandle.
188+ requests_receiver: Receiver for receiving power requests from other actors.
189+ channel_registry: Channel registry for creating result channels dynamically
190+ for each request namespace.
189191 battery_status_sender: Channel for sending information which batteries are
190192 working.
191193 wait_for_data_sec: How long actor should wait before processing first
192194 request. It is a time needed to collect first components data.
193195 """
196+ self ._requests_receiver = requests_receiver
197+ self ._channel_registry = channel_registry
194198 self ._wait_for_data_sec = wait_for_data_sec
199+ self ._result_senders : Dict [str , Sender [Result ]] = {}
200+ """Dictionary of result senders for each request namespace.
201+
202+ They are for channels owned by the channel registry, we just hold a reference
203+ to their senders, for fast access.
204+ """
195205
196206 # NOTE: power_distributor_exponent should be received from ConfigManager
197207 self .power_distributor_exponent : float = 1.0
@@ -260,6 +270,20 @@ def _get_lower_bound(self, batteries: abc.Set[int], include_broken: bool) -> flo
260270 for battery , inverter in pairs_data
261271 )
262272
273+ async def _send_result (self , namespace : str , result : Result ) -> None :
274+ """Send result to the user.
275+
276+ Args:
277+ namespace: namespace of the sender, to identify the result channel with.
278+ result: Result to send out.
279+ """
280+ if not namespace in self ._result_senders :
281+ self ._result_senders [namespace ] = self ._channel_registry .new_sender (
282+ namespace
283+ )
284+
285+ await self ._result_senders [namespace ].send (result )
286+
263287 async def run (self ) -> None :
264288 """Run actor main function.
265289
@@ -276,28 +300,36 @@ async def run(self) -> None:
276300 # Wait few seconds to get data from the channels created above.
277301 await asyncio .sleep (self ._wait_for_data_sec )
278302
279- self ._started .set ()
280- while True :
281- request , user = await self ._request_queue .get ()
303+ async for request in self ._requests_receiver :
304+ error = self ._check_request (request )
305+ if error :
306+ await self ._send_result (request .namespace , error )
307+ continue
282308
283309 try :
284310 pairs_data : List [InvBatPair ] = self ._get_components_data (
285311 request .batteries , request .include_broken_batteries
286312 )
287313 except KeyError as err :
288- await user .channel .send (Error (request = request , msg = str (err )))
314+ await self ._send_result (
315+ request .namespace , Error (request = request , msg = str (err ))
316+ )
289317 continue
290318
291319 if not pairs_data and not request .include_broken_batteries :
292320 error_msg = f"No data for the given batteries { str (request .batteries )} "
293- await user .channel .send (Error (request = request , msg = str (error_msg )))
321+ await self ._send_result (
322+ request .namespace , Error (request = request , msg = str (error_msg ))
323+ )
294324 continue
295325
296326 try :
297327 distribution = self ._get_power_distribution (request , pairs_data )
298328 except ValueError as err :
299329 error_msg = f"Couldn't distribute power, error: { str (err )} "
300- await user .channel .send (Error (request = request , msg = str (error_msg )))
330+ await self ._send_result (
331+ request .namespace , Error (request = request , msg = str (error_msg ))
332+ )
301333 continue
302334
303335 distributed_power_value = request .power - distribution .remaining_power
@@ -306,8 +338,7 @@ async def run(self) -> None:
306338 for bat_id , dist in distribution .distribution .items ()
307339 }
308340 _logger .debug (
309- "%s: Distributing power %d between the batteries %s" ,
310- user .user_id ,
341+ "Distributing power %d between the batteries %s" ,
311342 distributed_power_value ,
312343 str (battery_distribution ),
313344 )
@@ -341,7 +372,7 @@ async def run(self) -> None:
341372 self ._all_battery_status .update_status (
342373 succeed_batteries , failed_batteries
343374 ),
344- user . channel . send ( response ),
375+ self . _send_result ( request . namespace , response ),
345376 ]
346377 )
347378
0 commit comments