@@ -366,8 +366,8 @@ async def _fetch(self, timer: Timer) -> None:
366366 """
367367 self ._initial_fetch_event .clear ()
368368
369- old_dispatches = self ._dispatches
370- self . _dispatches = {}
369+ old_dispatches = set ( self ._dispatches . keys ())
370+ new_dispatches = {}
371371
372372 try :
373373 _logger .debug ("Fetching dispatches for microgrid %s" , self ._microgrid_id )
@@ -381,9 +381,9 @@ async def _fetch(self, timer: Timer) -> None:
381381 continue
382382 dispatch = Dispatch (client_dispatch )
383383
384- self . _dispatches [dispatch .id ] = dispatch
385- old_dispatch = old_dispatches . pop (dispatch .id , None )
386- if not old_dispatch :
384+ new_dispatches [dispatch .id ] = dispatch
385+ old_dispatch = self . _dispatches . get (dispatch .id , None )
386+ if old_dispatch is None :
387387 _logger .debug ("New dispatch: %s" , dispatch )
388388 await self ._update_dispatch_schedule_and_notify (
389389 dispatch , None , timer
@@ -396,23 +396,33 @@ async def _fetch(self, timer: Timer) -> None:
396396 )
397397 await self ._lifecycle_events_tx .send (Updated (dispatch = dispatch ))
398398
399- _logger .debug ("Received %s dispatches" , len (self . _dispatches ))
399+ _logger .debug ("Received %s dispatches" , len (new_dispatches ))
400400
401401 except grpc .aio .AioRpcError as error :
402402 _logger .error ("Error fetching dispatches: %s" , error )
403- self ._dispatches = old_dispatches
404403 return
405404
406- for dispatch in old_dispatches .values ():
407- _logger .debug ("Deleted dispatch: %s" , dispatch )
408- await self ._lifecycle_events_tx .send (Deleted (dispatch = dispatch ))
409- await self ._update_dispatch_schedule_and_notify (None , dispatch , timer )
405+ # Delete old dispatches
406+ for dispatch_id in old_dispatches - new_dispatches .keys ():
407+ try :
408+ dispatch = self ._dispatches .pop (dispatch_id )
409+ _logger .debug ("Deleted dispatch: %s" , dispatch )
410+ await self ._update_dispatch_schedule_and_notify (None , dispatch , timer )
411+
412+ # Set deleted only here as it influences the result of dispatch.started
413+ # which is used in the func call above ^
414+ dispatch ._set_deleted () # pylint: disable=protected-access
415+ await self ._lifecycle_events_tx .send (Deleted (dispatch = dispatch ))
416+ except KeyError :
417+ _logger .warning (
418+ "Inconsistency in cache detected. Tried to delete non-existing dispatch %s" ,
419+ dispatch_id ,
420+ )
410421
411- # Set deleted only here as it influences the result of dispatch.started
412- # which is used in above in _running_state_change
413- dispatch ._set_deleted () # pylint: disable=protected-access
414- await self ._lifecycle_events_tx .send (Deleted (dispatch = dispatch ))
422+ # Update the dispatch list with the dispatches
423+ self ._dispatches .update (new_dispatches )
415424
425+ # Set event to indicate fetch ran at least once
416426 self ._initial_fetch_event .set ()
417427
418428 async def _update_dispatch_schedule_and_notify (
0 commit comments