@@ -366,8 +366,7 @@ async def _fetch(self, timer: Timer) -> None:
366366 """
367367 self ._initial_fetch_event .clear ()
368368
369- old_dispatches = self ._dispatches
370- self ._dispatches = {}
369+ new_dispatches = {}
371370
372371 try :
373372 _logger .debug ("Fetching dispatches for microgrid %s" , self ._microgrid_id )
@@ -381,9 +380,9 @@ async def _fetch(self, timer: Timer) -> None:
381380 continue
382381 dispatch = Dispatch (client_dispatch )
383382
384- self . _dispatches [dispatch .id ] = dispatch
385- old_dispatch = old_dispatches . pop (dispatch .id , None )
386- if not old_dispatch :
383+ new_dispatches [dispatch .id ] = dispatch
384+ old_dispatch = self . _dispatches . get (dispatch .id , None )
385+ if old_dispatch is None :
387386 _logger .debug ("New dispatch: %s" , dispatch )
388387 await self ._update_dispatch_schedule_and_notify (
389388 dispatch , None , timer
@@ -396,23 +395,40 @@ async def _fetch(self, timer: Timer) -> None:
396395 )
397396 await self ._lifecycle_events_tx .send (Updated (dispatch = dispatch ))
398397
399- _logger .debug ("Received %s dispatches" , len (self . _dispatches ))
398+ _logger .debug ("Received %s dispatches" , len (new_dispatches ))
400399
401400 except grpc .aio .AioRpcError as error :
402401 _logger .error ("Error fetching dispatches: %s" , error )
403- self ._dispatches = old_dispatches
404402 return
405403
406- for dispatch in old_dispatches .values ():
404+ # We make a copy because we mutate self._dispatches.keys() inside the loop
405+ for dispatch_id in frozenset (self ._dispatches .keys () - new_dispatches .keys ()):
406+ # Use try/except as the `self._dispatches` cache can be mutated by
407+ # stream delete events while we're iterating
408+ try :
409+ dispatch = self ._dispatches .pop (dispatch_id )
410+ except KeyError as error :
411+ _logger .warning (
412+ "Inconsistency in cache detected. "
413+ + "Tried to delete non-existing dispatch %s (%s)" ,
414+ dispatch_id ,
415+ error ,
416+ )
417+ continue
418+
407419 _logger .debug ("Deleted dispatch: %s" , dispatch )
408- await self ._lifecycle_events_tx .send (Deleted (dispatch = dispatch ))
409420 await self ._update_dispatch_schedule_and_notify (None , dispatch , timer )
410421
411- # Set deleted only here as it influences the result of dispatch.started
412- # which is used in above in _running_state_change
422+ # Set deleted only here as it influences the result of
423+ # dispatch.started, which is used in
424+ # _update_dispatch_schedule_and_notify above.
413425 dispatch ._set_deleted () # pylint: disable=protected-access
414426 await self ._lifecycle_events_tx .send (Deleted (dispatch = dispatch ))
415427
428+ # Update the dispatch list with the dispatches
429+ self ._dispatches .update (new_dispatches )
430+
431+ # Set event to indicate fetch ran at least once
416432 self ._initial_fetch_event .set ()
417433
418434 async def _update_dispatch_schedule_and_notify (
0 commit comments