1818import grpc .aio
1919from frequenz .channels import Broadcast , Receiver , select , selected_from
2020from frequenz .channels .timer import SkipMissedAndResync , Timer
21+ from frequenz .client .base .streaming import (
22+ StreamFatalError ,
23+ StreamRetrying ,
24+ StreamStarted ,
25+ )
2126from frequenz .client .dispatch import DispatchApiClient
27+ from frequenz .client .dispatch .types import DispatchEvent as ApiDispatchEvent
2228from frequenz .client .dispatch .types import Event
2329from frequenz .sdk .actor import BackgroundService
2430
@@ -230,8 +236,21 @@ async def _run(self) -> None:
230236 ) as next_event_timer :
231237 # Initial fetch
232238 await self ._fetch (next_event_timer )
233- stream = self ._client .stream (microgrid_id = self ._microgrid_id )
234239
240+ # pylint: disable-next=protected-access
241+ streamer = self ._client ._get_stream (microgrid_id = self ._microgrid_id )
242+ stream = streamer .new_receiver (include_events = True )
243+
244+ # We track stream start events linked to retries to avoid re-fetching
245+ # dispatches that were already retrieved during an initial stream start.
246+ # The initial fetch gets all dispatches, and the StreamStarted event
247+ # isn't always reliable due to parallel receiver creation and stream
248+ # task initiation.
249+ # This way we get a deterministic behavior where we only fetch
250+ # dispatches once initially and then only when the stream is restarted.
251+ is_retry_attempt = False
252+
253+ # Streaming updates
235254 async for selected in select (next_event_timer , stream ):
236255 if selected_from (selected , next_event_timer ):
237256 if not self ._scheduled_events :
@@ -240,36 +259,54 @@ async def _run(self) -> None:
240259 heappop (self ._scheduled_events ).dispatch , next_event_timer
241260 )
242261 elif selected_from (selected , stream ):
243- _logger .debug ("Received dispatch event: %s" , selected .message )
244- dispatch = Dispatch (selected .message .dispatch )
245- match selected .message .event :
246- case Event .CREATED :
247- self ._dispatches [dispatch .id ] = dispatch
248- await self ._update_dispatch_schedule_and_notify (
249- dispatch , None , next_event_timer
250- )
251- await self ._lifecycle_events_tx .send (
252- Created (dispatch = dispatch )
253- )
254- case Event .UPDATED :
255- await self ._update_dispatch_schedule_and_notify (
256- dispatch ,
257- self ._dispatches [dispatch .id ],
258- next_event_timer ,
259- )
260- self ._dispatches [dispatch .id ] = dispatch
261- await self ._lifecycle_events_tx .send (
262- Updated (dispatch = dispatch )
263- )
264- case Event .DELETED :
265- self ._dispatches .pop (dispatch .id )
266- await self ._update_dispatch_schedule_and_notify (
267- None , dispatch , next_event_timer
268- )
269-
270- await self ._lifecycle_events_tx .send (
271- Deleted (dispatch = dispatch )
262+ match selected .message :
263+ case ApiDispatchEvent ():
264+ _logger .debug (
265+ "Received dispatch event: %s" , selected .message
272266 )
267+ dispatch = Dispatch (selected .message .dispatch )
268+ match selected .message .event :
269+ case Event .CREATED :
270+ self ._dispatches [dispatch .id ] = dispatch
271+ await self ._update_dispatch_schedule_and_notify (
272+ dispatch , None , next_event_timer
273+ )
274+ await self ._lifecycle_events_tx .send (
275+ Created (dispatch = dispatch )
276+ )
277+ case Event .UPDATED :
278+ await self ._update_dispatch_schedule_and_notify (
279+ dispatch ,
280+ self ._dispatches [dispatch .id ],
281+ next_event_timer ,
282+ )
283+ self ._dispatches [dispatch .id ] = dispatch
284+ await self ._lifecycle_events_tx .send (
285+ Updated (dispatch = dispatch )
286+ )
287+ case Event .DELETED :
288+ self ._dispatches .pop (dispatch .id )
289+ await self ._update_dispatch_schedule_and_notify (
290+ None , dispatch , next_event_timer
291+ )
292+
293+ await self ._lifecycle_events_tx .send (
294+ Deleted (dispatch = dispatch )
295+ )
296+
297+ case StreamRetrying ():
298+ is_retry_attempt = True
299+
300+ case StreamStarted ():
301+ if is_retry_attempt :
302+ _logger .info (
303+ "Dispatch stream restarted, getting dispatches"
304+ )
305+ await self ._fetch (next_event_timer )
306+ is_retry_attempt = False
307+
308+ case StreamFatalError ():
309+ pass
273310
274311 async def _execute_scheduled_event (self , dispatch : Dispatch , timer : Timer ) -> None :
275312 """Execute a scheduled event.
0 commit comments