44"""The dispatch actor."""
55
66import logging
7+ from dataclasses import dataclass , field
78from datetime import datetime , timedelta , timezone
89from heapq import heappop , heappush
910
@@ -30,6 +31,20 @@ class DispatchingActor(Actor):
3031 dispatches as necessary.
3132 """
3233
34+ @dataclass (order = True )
35+ class QueueItem :
36+ """A queue item for the scheduled events."""
37+
38+ time : datetime
39+ dispatch_id : int
40+ dispatch : Dispatch = field (compare = False )
41+
42+ def __init__ (self , time : datetime , dispatch : Dispatch ) -> None :
43+ """Initialize the queue item."""
44+ self .time = time
45+ self .dispatch_id = dispatch .id
46+ self .dispatch = dispatch
47+
3348 # pylint: disable=too-many-arguments
3449 def __init__ (
3550 self ,
@@ -61,7 +76,7 @@ def __init__(
6176 Interval is chosen arbitrarily, as it will be reset on the first event.
6277 """
6378
64- self ._scheduled_events : list [tuple [ datetime , Dispatch ] ] = []
79+ self ._scheduled_events : list ["DispatchingActor.QueueItem" ] = []
6580 """The scheduled events, sorted by time.
6681
6782 Each event is a tuple of the scheduled time and the dispatch.
@@ -84,9 +99,11 @@ async def _run(self) -> None:
8499 if not self ._scheduled_events :
85100 continue
86101 _logger .debug (
87- "Executing scheduled event: %s" , self ._scheduled_events [0 ][1 ]
102+ "Executing scheduled event: %s" , self ._scheduled_events [0 ].dispatch
103+ )
104+ await self ._execute_scheduled_event (
105+ heappop (self ._scheduled_events ).dispatch
88106 )
89- await self ._execute_scheduled_event (heappop (self ._scheduled_events )[1 ])
90107 elif selected_from (selected , stream ):
91108 _logger .debug ("Received dispatch event: %s" , selected .message )
92109 dispatch = Dispatch (selected .message .dispatch )
@@ -243,9 +260,9 @@ async def _update_dispatch_schedule_and_notify(
243260 def _update_timer (self ) -> None :
244261 """Update the timer to the next event."""
245262 if self ._scheduled_events :
246- due_at : datetime = self ._scheduled_events [0 ][ 0 ]
263+ due_at : datetime = self ._scheduled_events [0 ]. time
247264 self ._next_event_timer .reset (interval = due_at - datetime .now (timezone .utc ))
248- _logger .debug ("Next event scheduled at %s" , self ._scheduled_events [0 ][ 0 ] )
265+ _logger .debug ("Next event scheduled at %s" , self ._scheduled_events [0 ]. time )
249266
250267 def _remove_scheduled (self , dispatch : Dispatch ) -> bool :
251268 """Remove a dispatch from the scheduled events.
@@ -256,8 +273,8 @@ def _remove_scheduled(self, dispatch: Dispatch) -> bool:
256273 Returns:
257274 True if the dispatch was found and removed, False otherwise.
258275 """
259- for idx , ( _ , sched_dispatch ) in enumerate (self ._scheduled_events ):
260- if dispatch .id == sched_dispatch .id :
276+ for idx , item in enumerate (self ._scheduled_events ):
277+ if dispatch .id == item . dispatch .id :
261278 self ._scheduled_events .pop (idx )
262279 return True
263280
@@ -276,7 +293,7 @@ def _schedule_start(self, dispatch: Dispatch) -> None:
276293 # Schedule the next run
277294 try :
278295 if next_run := dispatch .next_run :
279- heappush (self ._scheduled_events , (next_run , dispatch ))
296+ heappush (self ._scheduled_events , self . QueueItem (next_run , dispatch ))
280297 _logger .debug (
281298 "Scheduled dispatch %s to start at %s" , dispatch .id , next_run
282299 )
@@ -295,7 +312,7 @@ def _schedule_stop(self, dispatch: Dispatch) -> None:
295312 if dispatch .duration and dispatch .duration > timedelta (seconds = 0 ):
296313 until = dispatch .until
297314 assert until is not None
298- heappush (self ._scheduled_events , (until , dispatch ))
315+ heappush (self ._scheduled_events , self . QueueItem (until , dispatch ))
299316 _logger .debug ("Scheduled dispatch %s to stop at %s" , dispatch , until )
300317
301318 def _update_changed_running_state (
0 commit comments