@@ -33,16 +33,32 @@ class DispatchScheduler(BackgroundService):
3333
3434 @dataclass (order = True )
3535 class QueueItem :
36- """A queue item for the scheduled events."""
36+ """A queue item for the scheduled events.
37+
38+ This class is used to define the order of the queue items based on the
39+ scheduled time, whether it is a stop event and finally the dispatch ID
40+ (for uniqueness).
41+ """
3742
3843 time : datetime
44+ priority : int
45+ """Sort priority when the time is the same.
46+
47+ Exists to make sure that when two events are scheduled at the same time,
48+ the start event is executed first, allowing filters down the data pipeline
49+ to consider the start event when deciding whether to execute the
50+ stop event.
51+ """
3952 dispatch_id : int
4053 dispatch : Dispatch = field (compare = False )
4154
42- def __init__ (self , time : datetime , dispatch : Dispatch ) -> None :
55+ def __init__ (
56+ self , time : datetime , dispatch : Dispatch , stop_event : bool
57+ ) -> None :
4358 """Initialize the queue item."""
4459 self .time = time
4560 self .dispatch_id = dispatch .id
61+ self .priority = int (stop_event )
4662 self .dispatch = dispatch
4763
4864 # pylint: disable=too-many-arguments
@@ -103,7 +119,7 @@ def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
103119 )
104120
105121 async def new_running_state_event_receiver (
106- self , type : str , unify_running_intervals : bool = True
122+ self , type : str , * , unify_running_intervals : bool = True
107123 ) -> Receiver [Dispatch ]:
108124 """Create a new receiver for running state events of the specified type.
109125
@@ -365,7 +381,10 @@ def _schedule_start(self, dispatch: Dispatch) -> None:
365381 # Schedule the next run
366382 try :
367383 if next_run := dispatch .next_run :
368- heappush (self ._scheduled_events , self .QueueItem (next_run , dispatch ))
384+ heappush (
385+ self ._scheduled_events ,
386+ self .QueueItem (next_run , dispatch , stop_event = False ),
387+ )
369388 _logger .debug (
370389 "Scheduled dispatch %s to start at %s" , dispatch .id , next_run
371390 )
@@ -384,7 +403,9 @@ def _schedule_stop(self, dispatch: Dispatch) -> None:
384403 if dispatch .duration and dispatch .duration > timedelta (seconds = 0 ):
385404 until = dispatch .until
386405 assert until is not None
387- heappush (self ._scheduled_events , self .QueueItem (until , dispatch ))
406+ heappush (
407+ self ._scheduled_events , self .QueueItem (until , dispatch , stop_event = True )
408+ )
388409 _logger .debug ("Scheduled dispatch %s to stop at %s" , dispatch , until )
389410
390411 def _update_changed_running_state (
0 commit comments