@@ -106,6 +106,8 @@ def __init__(self, helper, config: Dict, callback) -> None:
106106 self .user = config ["connection" ]["user" ]
107107 self .password = config ["connection" ]["pass" ]
108108 self .queue_name = config ["listen" ]
109+ self .exit_event = threading .Event ()
110+ self .thread = None
109111
110112 # noinspection PyUnusedLocal
111113 def _process_message (self , channel , method , properties , body ) -> None :
@@ -122,9 +124,9 @@ def _process_message(self, channel, method, properties, body) -> None:
122124 """
123125
124126 json_data = json .loads (body )
125- thread = threading .Thread (target = self ._data_handler , args = [json_data ])
126- thread .start ()
127- while thread .is_alive (): # Loop while the thread is processing
127+ self . thread = threading .Thread (target = self ._data_handler , args = [json_data ])
128+ self . thread .start ()
129+ while self . thread .is_alive (): # Loop while the thread is processing
128130 assert self .pika_connection is not None
129131 self .pika_connection .sleep (1.0 )
130132 logging .info (
@@ -159,7 +161,7 @@ def _data_handler(self, json_data) -> None:
159161 logging .error ("Failing reporting the processing" )
160162
161163 def run (self ) -> None :
162- while True :
164+ while not self . exit_event . is_set () :
163165 try :
164166 # Connect the broker
165167 self .pika_credentials = pika .PlainCredentials (self .user , self .password )
@@ -186,6 +188,11 @@ def run(self) -> None:
186188 self .helper .log_error (str (e ))
187189 time .sleep (10 )
188190
191+ def stop (self ):
192+ self .exit_event .set ()
193+ if self .thread :
194+ self .thread .join ()
195+
189196
190197class PingAlive (threading .Thread ):
191198 def __init__ (self , connector_id , api , get_state , set_state ) -> None :
@@ -195,9 +202,10 @@ def __init__(self, connector_id, api, get_state, set_state) -> None:
195202 self .api = api
196203 self .get_state = get_state
197204 self .set_state = set_state
205+ self .exit_event = threading .Event ()
198206
199207 def ping (self ) -> None :
200- while True :
208+ while not self . exit_event . is_set () :
201209 try :
202210 initial_state = self .get_state ()
203211 result = self .api .connector .ping (self .connector_id , initial_state )
@@ -221,12 +229,16 @@ def ping(self) -> None:
221229 except Exception : # pylint: disable=broad-except
222230 self .in_error = True
223231 logging .error ("Error pinging the API" )
224- time . sleep (40 )
232+ self . exit_event . wait (40 )
225233
226234 def run (self ) -> None :
227235 logging .info ("Starting ping alive thread" )
228236 self .ping ()
229237
238+ def stop (self ) -> None :
239+ logging .info ("Preparing for clean shutdown" )
240+ self .exit_event .set ()
241+
230242
231243class ListenStream (threading .Thread ):
232244 def __init__ (
@@ -239,6 +251,7 @@ def __init__(
239251 self .token = token
240252 self .verify_ssl = verify_ssl
241253 self .start_timestamp = start_timestamp
254+ self .exit_event = threading .Event ()
242255
243256 def run (self ) -> None : # pylint: disable=too-many-branches
244257 current_state = self .helper .get_state ()
@@ -432,6 +445,17 @@ def __init__(self, config: Dict) -> None:
432445 )
433446 self .ping .start ()
434447
448+ # self.listen_stream = None
449+ self .listen_queue = None
450+
451+ def stop (self ) -> None :
452+ if self .listen_queue :
453+ self .listen_queue .stop ()
454+ # if self.listen_stream:
455+ # self.listen_stream.stop()
456+ self .ping .stop ()
457+ self .api .connector .unregister (self .connector_id )
458+
435459 def get_name (self ) -> Optional [Union [bool , int , str ]]:
436460 return self .connect_name
437461
@@ -470,8 +494,8 @@ def listen(self, message_callback: Callable[[Dict], str]) -> None:
470494 :type message_callback: Callable[[Dict], str]
471495 """
472496
473- listen_queue = ListenQueue (self , self .config , message_callback )
474- listen_queue .start ()
497+ self . listen_queue = ListenQueue (self , self .config , message_callback )
498+ self . listen_queue .start ()
475499
476500 def listen_stream (
477501 self ,
@@ -486,10 +510,10 @@ def listen_stream(
486510 :param message_callback: callback function to process messages
487511 """
488512
489- listen_stream = ListenStream (
513+ self . listen_stream = ListenStream (
490514 self , message_callback , url , token , verify_ssl , start_timestamp
491515 )
492- listen_stream .start ()
516+ self . listen_stream .start ()
493517
494518 def get_opencti_url (self ) -> Optional [Union [bool , int , str ]]:
495519 return self .opencti_url
0 commit comments