@@ -199,6 +199,8 @@ def __init__(self, inter_delay=1, post_db_delay=1.0, debug=False):
199199 self .previous_action = None
200200 # Queue-level error state (persists after sweep cleanup)
201201 self ._last_error : Optional [QueueError ] = None
202+ # Queue-level killed flag (persists after sweep cleanup)
203+ self ._last_killed = False
202204
203205 def _exec_in_kernel (self , fn ):
204206 """Execute a callable synchronously.
@@ -511,6 +513,8 @@ def start(self, rts=True):
511513
512514 self .log .info ("Starting sweeps" )
513515 self .current_action = self .queue .popleft ()
516+ # Clear kill flag once new work begins
517+ self ._last_killed = False
514518 if isinstance (self .current_action , BaseSweep ):
515519 self .current_sweep = self .current_action
516520 # Ensure metadata shows this sweep was launched by SweepQueue
@@ -592,6 +596,7 @@ def resume(self):
592596 def kill (self ):
593597 """Kills the current sweep. Use kill_all() to also clear the queue."""
594598 if self .current_sweep is not None :
599+ self ._last_killed = True
595600 # Clear current_sweep AND current_action before kill() to prevent
596601 # begin_next() from processing if kill() emits completion synchronously
597602 sweep_to_kill = self .current_sweep
@@ -609,6 +614,11 @@ def kill_all(self):
609614 Note: If a DatabaseEntry or callable is currently executing, it cannot be
610615 interrupted, but the queue will not continue after it completes.
611616 """
617+ had_work = (
618+ self .current_sweep is not None
619+ or self .current_action is not None
620+ or len (self .queue ) > 0
621+ )
612622 # Save reference to sweep before clearing state
613623 sweep_to_kill = self .current_sweep
614624
@@ -625,6 +635,8 @@ def kill_all(self):
625635
626636 # Clear error state for full reset
627637 self ._last_error = None
638+ # Mark queue as killed if there was work to stop
639+ self ._last_killed = had_work
628640
629641 # Now safe to kill - even if completion fires, queue is empty
630642 if sweep_to_kill is not None :
@@ -650,7 +662,7 @@ def status(self):
650662
651663 Returns a dictionary with:
652664 - effective_state: Overall queue state accounting for pending items
653- ("idle", "pending", "running", "paused", "error", "stopped")
665+ ("idle", "pending", "running", "paused", "killed", " error", "stopped")
654666 - current_sweep_state: State name of the currently executing sweep (or None)
655667 - queue_length: Number of items waiting in the queue
656668 - current_sweep_type: Class name of current sweep (or None)
@@ -667,6 +679,7 @@ def status(self):
667679 - "running": Current sweep is actively running/ramping, or a DatabaseEntry/callable
668680 is executing
669681 - "paused": Current sweep is paused
682+ - "killed": Queue stopped due to a kill() or kill_all() call. Call start() to resume.
670683 - "error": Current sweep is in error state (actively erroring)
671684 - "stopped": Queue stopped due to a previous error (check last_error for details).
672685 Call clear_error() and start() to resume.
@@ -704,12 +717,16 @@ def status(self):
704717 effective_state = "running"
705718 elif self ._last_error is not None :
706719 effective_state = "stopped"
720+ elif self ._last_killed :
721+ effective_state = "killed"
707722 elif queue_length > 0 :
708723 effective_state = "pending"
709724 else :
710725 effective_state = "idle"
711726 elif current_sweep_state == SweepState .ERROR :
712727 effective_state = "error"
728+ elif current_sweep_state == SweepState .KILLED :
729+ effective_state = "killed"
713730 elif current_sweep_state == SweepState .PAUSED :
714731 effective_state = "paused"
715732 elif current_sweep_state in (SweepState .RUNNING , SweepState .RAMPING ):
@@ -724,7 +741,7 @@ def status(self):
724741 else :
725742 effective_state = "idle"
726743 else :
727- # KILLED or unexpected states with a current_sweep still set
744+ # Unexpected states with a current_sweep still set
728745 if queue_length > 0 :
729746 effective_state = "pending"
730747 else :
@@ -799,11 +816,12 @@ def begin_next(self):
799816 self ._processing = True
800817 self ._pending_begin_next = False
801818
802- # Guard: if queue is in error state, don't process further
803- # This prevents re-entrancy from draining the queue after an error
804- if self ._last_error is not None :
819+ # Guard: if queue is in error or killed state, don't process further
820+ # This prevents re-entrancy from draining the queue after a stop
821+ if self ._last_error is not None or self . _last_killed :
805822 if self .debug :
806- self .log .debug ("Queue in error state, not processing further" )
823+ state = "error" if self ._last_error is not None else "killed"
824+ self .log .debug ("Queue in %s state, not processing further" , state )
807825 self ._processing = False
808826 return
809827
0 commit comments