1616
1717
1818class ListenQueue (threading .Thread ):
19- def __init__ (self , helper , queue_name , channel , callback ):
19+ def __init__ (self , helper , connection , queue_name , channel , callback ):
2020 threading .Thread .__init__ (self )
2121 self .helper = helper
22+ self .connection = connection
2223 self .channel = channel
2324 self .callback = callback
2425 self .queue_name = queue_name
2526
2627 # noinspection PyUnusedLocal
2728 def _process_message (self , channel , method , properties , body ):
2829 json_data = json .loads (body )
30+ thread = threading .Thread (target = self ._data_handler , args = [channel , method , json_data ])
31+ thread .start ()
32+ while thread .is_alive (): # Loop while the thread is processing
33+ self .connection .sleep (1.0 )
34+
35+ def _data_handler (self , channel , method , json_data ):
2936 job_id = json_data ['job_id' ] if 'job_id' in json_data else None
3037 try :
3138 work_id = json_data ['work_id' ]
@@ -46,9 +53,13 @@ def _process_message(self, channel, method, properties, body):
4653 channel .basic_ack (delivery_tag = method .delivery_tag )
4754
4855 def run (self ):
49- logging .info ('Starting consuming listen queue' )
50- self .channel .basic_consume (queue = self .queue_name , on_message_callback = self ._process_message )
51- self .channel .start_consuming ()
56+ try :
57+ logging .info ('Starting consuming listen queue' )
58+ self .channel .basic_consume (queue = self .queue_name , on_message_callback = self ._process_message )
59+ self .channel .start_consuming ()
60+ except :
61+ logging .error ('Lost connection to the broker, reconnecting' )
62+ self .helper .connect ()
5263
5364
5465class PingAlive (threading .Thread ):
@@ -110,9 +121,10 @@ def __init__(self, config: dict):
110121 self .connector_id = connector_configuration ['id' ]
111122 self .config = connector_configuration ['config' ]
112123
113- # Connect to the broker
114- self .pika_connection = pika .BlockingConnection (pika .URLParameters (self .config ['uri' ]))
115- self .channel = self .pika_connection .channel ()
124+ # Connect the broker
125+ self .pika_connection = None
126+ self .channel = None
127+ self .connect ()
116128
117129 # Start ping thread
118130 self .ping = PingAlive (self .connector .id , self .api )
@@ -122,8 +134,13 @@ def __init__(self, config: dict):
122134 self .cache_index = {}
123135 self .cache_added = []
124136
137+ def connect (self ):
138+ # Connect to the broker
139+ self .pika_connection = pika .BlockingConnection (pika .URLParameters (self .config ['uri' ]))
140+ self .channel = self .pika_connection .channel ()
141+
125142 def listen (self , message_callback : Callable [[Dict ], List [str ]]) -> None :
126- listen_queue = ListenQueue (self , self .config ['listen' ], self .channel , message_callback )
143+ listen_queue = ListenQueue (self , self .config ['listen' ], self .pika_connection , self . channel , message_callback )
127144 listen_queue .start ()
128145
129146 def get_connector (self ):
@@ -173,7 +190,7 @@ def _send_bundle(self, bundle, entities_types=None):
173190 # raise ValueError('The bundle is not a valid STIX2 JSON')
174191
175192 # Prepare the message
176- #if self.current_work_id is None:
193+ # if self.current_work_id is None:
177194 # raise ValueError('The job id must be specified')
178195 message = {
179196 'job_id' : job_id ,
0 commit comments