@@ -117,6 +117,14 @@ def total_threads(self) -> int | None:
117117 def total_threads (self , new : int ):
118118 self .AD .config .total_threads = new
119119
120+ def stop (self ):
121+ """Stop all threads."""
122+ for thread in self .threads .values ():
123+ match thread :
124+ case {"queue" : Queue () as q , "thread" : Thread () as t }:
125+ q .put (None )
126+ t .join (timeout = 1 )
127+
120128 async def get_q_update (self ):
121129 """Updates queue sizes"""
122130 for thread in self .threads :
@@ -233,7 +241,7 @@ async def create_initial_threads(self) -> None:
233241 },
234242 )
235243
236- def get_q (self , thread_id : str ) -> Queue :
244+ def get_q (self , thread_id : str ) -> Queue [ dict [ str , Any ] | None ] :
237245 return self .threads [thread_id ]["queue" ]
238246
239247 @staticmethod
@@ -1016,25 +1024,31 @@ def worker(self): # noqa: C901
10161024 thread_id = threading .current_thread ().name
10171025 q = self .get_q (thread_id )
10181026 while True :
1019- args = q .get ()
1020- _type = args ["type" ]
1021- funcref = args ["function" ]
1022- _id = args ["id" ]
1023- objectid = args ["objectid" ]
1024- name = args ["name" ]
1025- error_logger = logging .getLogger (f"Error.{ name } " )
1026- args ["kwargs" ]["__thread_id" ] = thread_id
1027-
1028- silent = False
1029- if "__silent" in args ["kwargs" ]:
1030- silent = args ["kwargs" ]["__silent" ]
1027+ match args := q .get ():
1028+ case {
1029+ "type" : _type ,
1030+ "function" : funcref ,
1031+ "id" : _id ,
1032+ "objectid" : objectid ,
1033+ "name" : name ,
1034+ "kwargs" : kwargs
1035+ }:
1036+ args ["kwargs" ]["__thread_id" ] = thread_id
1037+ error_logger = logging .getLogger (f"Error.{ name } " )
1038+ silent = kwargs .get ("__silent" , False )
1039+ case None :
1040+ self .logger .debug ("Stopping worker thread %s" , thread_id )
1041+ break
1042+ case _:
1043+ self .logger .warning ("Unknown callback type for %s - discarding" , name )
1044+ return
10311045
10321046 app = self .AD .app_management .get_app_instance (name , objectid )
10331047 if app is not None :
10341048 try :
10351049 pos_args = tuple ()
10361050 kwargs = dict ()
1037- match args [ "type" ] :
1051+ match _type :
10381052 case "scheduler" :
10391053 kwargs = self .AD .sched .sanitize_timer_kwargs (app , args ["kwargs" ])
10401054
@@ -1108,6 +1122,8 @@ def safe_callback():
11081122 self .logger .warning (f"Found stale callback for { name } - discarding" )
11091123 q .task_done ()
11101124
1125+ self .logger .debug ("Shutdown worker thread queue %s" , thread_id )
1126+
11111127 def report_callback_sig (self , name , type , funcref , args ):
11121128 error_logger = logging .getLogger ("Error.{}" .format (name ))
11131129
0 commit comments