@@ -114,13 +114,6 @@ def __init__(
114114 )
115115
116116 self ._running_state_status_tx = self ._running_state_status_channel .new_sender ()
117- self ._next_event_timer = Timer (
118- timedelta (seconds = 100 ), SkipMissedAndResync (), auto_start = False
119- )
120- """The timer to schedule the next event.
121-
122- Interval is chosen arbitrarily, as it will be reset on the first event.
123- """
124117
125118 self ._scheduled_events : list ["DispatchScheduler.QueueItem" ] = []
126119 """The scheduled events, sorted by time.
@@ -235,19 +228,20 @@ async def _run(self) -> None:
235228 self ._microgrid_id ,
236229 )
237230
238- # Initial fetch
239- await self ._fetch ()
240-
241- stream = self ._client .stream (microgrid_id = self ._microgrid_id )
242-
243231 # Streaming updates
244- with closing (self ._next_event_timer ) as next_event_timer :
232+ with closing (
233+ Timer (timedelta (seconds = 100 ), SkipMissedAndResync (), auto_start = False )
234+ ) as next_event_timer :
235+ # Initial fetch
236+ await self ._fetch (next_event_timer )
237+ stream = self ._client .stream (microgrid_id = self ._microgrid_id )
238+
245239 async for selected in select (next_event_timer , stream ):
246240 if selected_from (selected , next_event_timer ):
247241 if not self ._scheduled_events :
248242 continue
249243 await self ._execute_scheduled_event (
250- heappop (self ._scheduled_events ).dispatch
244+ heappop (self ._scheduled_events ).dispatch , next_event_timer
251245 )
252246 elif selected_from (selected , stream ):
253247 _logger .debug ("Received dispatch event: %s" , selected .message )
@@ -256,14 +250,16 @@ async def _run(self) -> None:
256250 case Event .CREATED :
257251 self ._dispatches [dispatch .id ] = dispatch
258252 await self ._update_dispatch_schedule_and_notify (
259- dispatch , None
253+ dispatch , None , next_event_timer
260254 )
261255 await self ._lifecycle_events_tx .send (
262256 Created (dispatch = dispatch )
263257 )
264258 case Event .UPDATED :
265259 await self ._update_dispatch_schedule_and_notify (
266- dispatch , self ._dispatches [dispatch .id ]
260+ dispatch ,
261+ self ._dispatches [dispatch .id ],
262+ next_event_timer ,
267263 )
268264 self ._dispatches [dispatch .id ] = dispatch
269265 await self ._lifecycle_events_tx .send (
@@ -272,18 +268,19 @@ async def _run(self) -> None:
272268 case Event .DELETED :
273269 self ._dispatches .pop (dispatch .id )
274270 await self ._update_dispatch_schedule_and_notify (
275- None , dispatch
271+ None , dispatch , next_event_timer
276272 )
277273
278274 await self ._lifecycle_events_tx .send (
279275 Deleted (dispatch = dispatch )
280276 )
281277
282- async def _execute_scheduled_event (self , dispatch : Dispatch ) -> None :
278+ async def _execute_scheduled_event (self , dispatch : Dispatch , timer : Timer ) -> None :
283279 """Execute a scheduled event.
284280
285281 Args:
286282 dispatch: The dispatch to execute.
283+ timer: The timer to use for scheduling the next event.
287284 """
288285 _logger .debug ("Executing scheduled event: %s (%s)" , dispatch , dispatch .started )
289286 await self ._send_running_state_change (dispatch )
@@ -298,9 +295,9 @@ async def _execute_scheduled_event(self, dispatch: Dispatch) -> None:
298295 else :
299296 self ._schedule_start (dispatch )
300297
301- self ._update_timer ()
298+ self ._update_timer (timer )
302299
303- async def _fetch (self ) -> None :
300+ async def _fetch (self , timer : Timer ) -> None :
304301 """Fetch all relevant dispatches using list.
305302
306303 This is used for the initial fetch and for re-fetching all dispatches
@@ -321,12 +318,14 @@ async def _fetch(self) -> None:
321318 old_dispatch = old_dispatches .pop (dispatch .id , None )
322319 if not old_dispatch :
323320 _logger .debug ("New dispatch: %s" , dispatch )
324- await self ._update_dispatch_schedule_and_notify (dispatch , None )
321+ await self ._update_dispatch_schedule_and_notify (
322+ dispatch , None , timer
323+ )
325324 await self ._lifecycle_events_tx .send (Created (dispatch = dispatch ))
326325 elif dispatch .update_time != old_dispatch .update_time :
327326 _logger .debug ("Updated dispatch: %s" , dispatch )
328327 await self ._update_dispatch_schedule_and_notify (
329- dispatch , old_dispatch
328+ dispatch , old_dispatch , timer
330329 )
331330 await self ._lifecycle_events_tx .send (Updated (dispatch = dispatch ))
332331
@@ -340,7 +339,7 @@ async def _fetch(self) -> None:
340339 for dispatch in old_dispatches .values ():
341340 _logger .debug ("Deleted dispatch: %s" , dispatch )
342341 await self ._lifecycle_events_tx .send (Deleted (dispatch = dispatch ))
343- await self ._update_dispatch_schedule_and_notify (None , dispatch )
342+ await self ._update_dispatch_schedule_and_notify (None , dispatch , timer )
344343
345344 # Set deleted only here as it influences the result of dispatch.started
346345 # which is used in above in _running_state_change
@@ -350,7 +349,7 @@ async def _fetch(self) -> None:
350349 self ._initial_fetch_event .set ()
351350
352351 async def _update_dispatch_schedule_and_notify (
353- self , dispatch : Dispatch | None , old_dispatch : Dispatch | None
352+ self , dispatch : Dispatch | None , old_dispatch : Dispatch | None , timer : Timer
354353 ) -> None :
355354 """Update the schedule for a dispatch.
356355
@@ -408,13 +407,13 @@ async def _update_dispatch_schedule_and_notify(
408407 self ._schedule_start (dispatch )
409408
410409 # We modified the schedule, so we need to reset the timer
411- self ._update_timer ()
410+ self ._update_timer (timer )
412411
413- def _update_timer (self ) -> None :
412+ def _update_timer (self , timer : Timer ) -> None :
414413 """Update the timer to the next event."""
415414 if self ._scheduled_events :
416415 due_at : datetime = self ._scheduled_events [0 ].time
417- self . _next_event_timer .reset (interval = due_at - datetime .now (timezone .utc ))
416+ timer .reset (interval = due_at - datetime .now (timezone .utc ))
418417 _logger .debug ("Next event scheduled at %s" , self ._scheduled_events [0 ].time )
419418
420419 def _remove_scheduled (self , dispatch : Dispatch ) -> bool :
0 commit comments