99from heapq import heappop , heappush
1010
1111import grpc .aio
12- from frequenz .channels import Sender , select , selected_from
12+ from frequenz .channels import Broadcast , Receiver , select , selected_from
1313from frequenz .channels .timer import SkipMissedAndResync , Timer
1414from frequenz .client .dispatch import Client
1515from frequenz .client .dispatch .types import Event
2222"""The logger for this module."""
2323
2424
25+ # pylint: disable=too-many-instance-attributes
2526class DispatchingActor (Actor ):
2627 """Dispatch actor.
2728
@@ -50,24 +51,28 @@ def __init__(
5051 self ,
5152 microgrid_id : int ,
5253 client : Client ,
53- lifecycle_updates_sender : Sender [DispatchEvent ],
54- running_state_change_sender : Sender [Dispatch ],
5554 ) -> None :
5655 """Initialize the actor.
5756
5857 Args:
5958 microgrid_id: The microgrid ID to handle dispatches for.
6059 client: The client to use for fetching dispatches.
61- lifecycle_updates_sender: A sender for dispatch lifecycle events.
62- running_state_change_sender: A sender for dispatch running state changes.
6360 """
6461 super ().__init__ (name = "dispatch" )
6562
6663 self ._client = client
6764 self ._dispatches : dict [int , Dispatch ] = {}
6865 self ._microgrid_id = microgrid_id
69- self ._lifecycle_updates_sender = lifecycle_updates_sender
70- self ._running_state_change_sender = running_state_change_sender
66+
67+ self ._lifecycle_events_channel = Broadcast [DispatchEvent ](
68+ name = "lifecycle_events"
69+ )
70+ self ._lifecycle_events_tx = self ._lifecycle_events_channel .new_sender ()
71+ self ._running_state_status_channel = Broadcast [Dispatch ](
72+ name = "running_state_status"
73+ )
74+
75+ self ._running_state_status_tx = self ._running_state_status_channel .new_sender ()
7176 self ._next_event_timer = Timer (
7277 timedelta (seconds = 100 ), SkipMissedAndResync (), auto_start = False
7378 )
@@ -84,6 +89,47 @@ def __init__(
8489 always at index 0.
8590 """
8691
92+ # pylint: disable=redefined-builtin
93+ def new_lifecycle_events_receiver (self , type : str ) -> Receiver [DispatchEvent ]:
94+ """Create a new receiver for lifecycle events.
95+
96+ Args:
97+ type: The type of events to receive.
98+
99+ Returns:
100+ A new receiver for lifecycle events.
101+ """
102+ return self ._lifecycle_events_channel .new_receiver ().filter (
103+ lambda event : event .dispatch .type == type
104+ )
105+
106+ async def new_running_state_event_receiver (self , type : str ) -> Receiver [Dispatch ]:
107+ """Create a new receiver for running state events.
108+
109+ Args:
110+ type: The type of events to receive.
111+
112+ Returns:
113+ A new receiver for running state status.
114+ """
115+ # Find all matching dispatches based on the type and collect them
116+ dispatches = [
117+ dispatch for dispatch in self ._dispatches .values () if dispatch .type == type
118+ ]
119+
120+ # Create receiver with enough capacity to hold all matching dispatches
121+ receiver = self ._running_state_status_channel .new_receiver (
122+ limit = max (1 , len (dispatches ))
123+ ).filter (lambda dispatch : dispatch .type == type )
124+
125+ # Send all matching dispatches to the receiver
126+ for dispatch in dispatches :
127+ await self ._send_running_state_change (dispatch )
128+
129+ return receiver
130+
131+ # pylint: enable=redefined-builtin
132+
87133 async def _run (self ) -> None :
88134 """Run the actor."""
89135 _logger .info ("Starting dispatch actor for microgrid %s" , self ._microgrid_id )
@@ -111,24 +157,18 @@ async def _run(self) -> None:
111157 case Event .CREATED :
112158 self ._dispatches [dispatch .id ] = dispatch
113159 await self ._update_dispatch_schedule_and_notify (dispatch , None )
114- await self ._lifecycle_updates_sender .send (
115- Created (dispatch = dispatch )
116- )
160+ await self ._lifecycle_events_tx .send (Created (dispatch = dispatch ))
117161 case Event .UPDATED :
118162 await self ._update_dispatch_schedule_and_notify (
119163 dispatch , self ._dispatches [dispatch .id ]
120164 )
121165 self ._dispatches [dispatch .id ] = dispatch
122- await self ._lifecycle_updates_sender .send (
123- Updated (dispatch = dispatch )
124- )
166+ await self ._lifecycle_events_tx .send (Updated (dispatch = dispatch ))
125167 case Event .DELETED :
126168 self ._dispatches .pop (dispatch .id )
127169 await self ._update_dispatch_schedule_and_notify (None , dispatch )
128170
129- await self ._lifecycle_updates_sender .send (
130- Deleted (dispatch = dispatch )
131- )
171+ await self ._lifecycle_events_tx .send (Deleted (dispatch = dispatch ))
132172
133173 async def _execute_scheduled_event (self , dispatch : Dispatch ) -> None :
134174 """Execute a scheduled event.
@@ -170,17 +210,13 @@ async def _fetch(self) -> None:
170210 if not old_dispatch :
171211 _logger .info ("New dispatch: %s" , dispatch )
172212 await self ._update_dispatch_schedule_and_notify (dispatch , None )
173- await self ._lifecycle_updates_sender .send (
174- Created (dispatch = dispatch )
175- )
213+ await self ._lifecycle_events_tx .send (Created (dispatch = dispatch ))
176214 elif dispatch .update_time != old_dispatch .update_time :
177215 _logger .info ("Updated dispatch: %s" , dispatch )
178216 await self ._update_dispatch_schedule_and_notify (
179217 dispatch , old_dispatch
180218 )
181- await self ._lifecycle_updates_sender .send (
182- Updated (dispatch = dispatch )
183- )
219+ await self ._lifecycle_events_tx .send (Updated (dispatch = dispatch ))
184220
185221 except grpc .aio .AioRpcError as error :
186222 _logger .error ("Error fetching dispatches: %s" , error )
@@ -189,13 +225,13 @@ async def _fetch(self) -> None:
189225
190226 for dispatch in old_dispatches .values ():
191227 _logger .info ("Deleted dispatch: %s" , dispatch )
192- await self ._lifecycle_updates_sender .send (Deleted (dispatch = dispatch ))
228+ await self ._lifecycle_events_tx .send (Deleted (dispatch = dispatch ))
193229 await self ._update_dispatch_schedule_and_notify (None , dispatch )
194230
195231 # Set deleted only here as it influences the result of dispatch.started
196232 # which is used in above in _running_state_change
197233 dispatch ._set_deleted () # pylint: disable=protected-access
198- await self ._lifecycle_updates_sender .send (Deleted (dispatch = dispatch ))
234+ await self ._lifecycle_events_tx .send (Deleted (dispatch = dispatch ))
199235
200236 async def _update_dispatch_schedule_and_notify (
201237 self , dispatch : Dispatch | None , old_dispatch : Dispatch | None
@@ -359,4 +395,4 @@ async def _send_running_state_change(self, dispatch: Dispatch) -> None:
359395 Args:
360396 dispatch: The dispatch that changed.
361397 """
362- await self ._running_state_change_sender .send (dispatch )
398+ await self ._running_state_status_tx .send (dispatch )
0 commit comments