1717import grpc .aio
1818from frequenz .channels import Broadcast , Receiver , select , selected_from
1919from frequenz .channels .timer import SkipMissedAndResync , Timer
20+ from frequenz .client .base .streaming import (
21+ StreamFatalError ,
22+ StreamRetrying ,
23+ StreamStarted ,
24+ )
2025from frequenz .client .dispatch import DispatchApiClient
26+ from frequenz .client .dispatch .types import DispatchEvent as ApiDispatchEvent
2127from frequenz .client .dispatch .types import Event
2228from frequenz .sdk .actor import BackgroundService
2329
@@ -233,7 +239,18 @@ async def _run(self) -> None:
233239 # Initial fetch
234240 await self ._fetch ()
235241
236- stream = self ._client .stream (microgrid_id = self ._microgrid_id )
242+ # pylint: disable-next=protected-access
243+ streamer = self ._client ._get_stream (microgrid_id = self ._microgrid_id )
244+ stream = streamer .new_receiver (include_events = True )
245+
246+ # We track stream start events linked to retries to avoid re-fetching
247+ # dispatches that were already retrieved during an initial stream start.
248+ # The initial fetch gets all dispatches, and the StreamStarted event
249+ # isn't always reliable due to parallel receiver creation and stream
250+ # task initiation.
251+ # This way we get a deterministic behavior where we only fetch
252+ # dispatches once initially and then only when the stream is restarted.
253+ is_retry_attempt = False
237254
238255 # Streaming updates
239256 async for selected in select (self ._next_event_timer , stream ):
@@ -244,25 +261,50 @@ async def _run(self) -> None:
244261 heappop (self ._scheduled_events ).dispatch
245262 )
246263 elif selected_from (selected , stream ):
247- _logger .debug ("Received dispatch event: %s" , selected .message )
248- dispatch = Dispatch (selected .message .dispatch )
249- match selected .message .event :
250- case Event .CREATED :
251- self ._dispatches [dispatch .id ] = dispatch
252- await self ._update_dispatch_schedule_and_notify (dispatch , None )
253- await self ._lifecycle_events_tx .send (Created (dispatch = dispatch ))
254- case Event .UPDATED :
255- await self ._update_dispatch_schedule_and_notify (
256- dispatch , self ._dispatches [dispatch .id ]
257- )
258- self ._dispatches [dispatch .id ] = dispatch
259- await self ._lifecycle_events_tx .send (Updated (dispatch = dispatch ))
260- case Event .DELETED :
261- self ._dispatches .pop (dispatch .id )
262- await self ._update_dispatch_schedule_and_notify (None , dispatch )
263-
264- await self ._lifecycle_events_tx .send (Deleted (dispatch = dispatch ))
265-
264+ match selected .message :
265+ case ApiDispatchEvent ():
266+ _logger .debug ("Received dispatch event: %s" , selected .message )
267+ dispatch = Dispatch (selected .message .dispatch )
268+ match selected .message .event :
269+ case Event .CREATED :
270+ self ._dispatches [dispatch .id ] = dispatch
271+ await self ._update_dispatch_schedule_and_notify (
272+ dispatch , None
273+ )
274+ await self ._lifecycle_events_tx .send (
275+ Created (dispatch = dispatch )
276+ )
277+ case Event .UPDATED :
278+ await self ._update_dispatch_schedule_and_notify (
279+ dispatch , self ._dispatches [dispatch .id ]
280+ )
281+ self ._dispatches [dispatch .id ] = dispatch
282+ await self ._lifecycle_events_tx .send (
283+ Updated (dispatch = dispatch )
284+ )
285+ case Event .DELETED :
286+ self ._dispatches .pop (dispatch .id )
287+ await self ._update_dispatch_schedule_and_notify (
288+ None , dispatch
289+ )
290+
291+ await self ._lifecycle_events_tx .send (
292+ Deleted (dispatch = dispatch )
293+ )
294+
295+ case StreamRetrying ():
296+ is_retry_attempt = True
297+
298+ case StreamStarted ():
299+ if is_retry_attempt :
300+ _logger .info (
301+ "Dispatch stream restarted, getting dispatches"
302+ )
303+ await self ._fetch ()
304+ is_retry_attempt = False
305+
306+ case StreamFatalError ():
307+ pass
266308 async def _execute_scheduled_event (self , dispatch : Dispatch ) -> None :
267309 """Execute a scheduled event.
268310
0 commit comments