1616
1717
1818class ListenQueue (threading .Thread ):
19- def __init__ (self , helper , connection , queue_name , channel , callback ):
19+ def __init__ (self , helper , config , callback ):
2020 threading .Thread .__init__ (self )
21+ self .pika_connection = None
22+ self .channel = None
2123 self .helper = helper
22- self .connection = connection
23- self .channel = channel
2424 self .callback = callback
25- self .queue_name = queue_name
25+ self .uri = config ['uri' ]
26+ self .queue_name = config ['listen' ]
2627
2728 # noinspection PyUnusedLocal
2829 def _process_message (self , channel , method , properties , body ):
2930 json_data = json .loads (body )
3031 thread = threading .Thread (target = self ._data_handler , args = [channel , method , json_data ])
3132 thread .start ()
3233 while thread .is_alive (): # Loop while the thread is processing
33- self .connection .sleep (1.0 )
34+ self .pika_connection .sleep (1.0 )
3435
3536 def _data_handler (self , channel , method , json_data ):
3637 job_id = json_data ['job_id' ] if 'job_id' in json_data else None
@@ -53,13 +54,19 @@ def _data_handler(self, channel, method, json_data):
5354 channel .basic_ack (delivery_tag = method .delivery_tag )
5455
5556 def run (self ):
56- try :
57- logging .info ('Starting consuming listen queue' )
58- self .channel .basic_consume (queue = self .queue_name , on_message_callback = self ._process_message )
59- self .channel .start_consuming ()
60- except :
61- logging .error ('Lost connection to the broker, reconnecting' )
62- self .helper .connect ()
57+ while True :
58+ try :
59+ # Connect the broker
60+ self .pika_connection = pika .BlockingConnection (pika .URLParameters (self .uri ))
61+ self .channel = self .pika_connection .channel ()
62+ self .channel .basic_consume (queue = self .queue_name , on_message_callback = self ._process_message )
63+ self .channel .start_consuming ()
64+ except (KeyboardInterrupt , SystemExit ):
65+ self .helper .log_info ('Connector stop' )
66+ exit (0 )
67+ except Exception as e :
68+ self .helper .log_error (str (e ))
69+ time .sleep (10 )
6370
6471
6572class PingAlive (threading .Thread ):
@@ -121,11 +128,6 @@ def __init__(self, config: dict):
121128 self .connector_id = connector_configuration ['id' ]
122129 self .config = connector_configuration ['config' ]
123130
124- # Connect the broker
125- self .pika_connection = None
126- self .channel = None
127- self .connect ()
128-
129131 # Start ping thread
130132 self .ping = PingAlive (self .connector .id , self .api )
131133 self .ping .start ()
@@ -134,13 +136,8 @@ def __init__(self, config: dict):
134136 self .cache_index = {}
135137 self .cache_added = []
136138
137- def connect (self ):
138- # Connect to the broker
139- self .pika_connection = pika .BlockingConnection (pika .URLParameters (self .config ['uri' ]))
140- self .channel = self .pika_connection .channel ()
141-
142139 def listen (self , message_callback : Callable [[Dict ], List [str ]]) -> None :
143- listen_queue = ListenQueue (self , self .config [ 'listen' ], self . pika_connection , self . channel , message_callback )
140+ listen_queue = ListenQueue (self , self .config , message_callback )
144141 listen_queue .start ()
145142
146143 def get_connector (self ):
@@ -162,11 +159,14 @@ def send_stix2_bundle(self, bundle, entities_types=None):
162159 bundles = self .split_stix2_bundle (bundle )
163160 if len (bundles ) == 0 :
164161 raise ValueError ('Nothing to import' )
162+ pika_connection = pika .BlockingConnection (pika .URLParameters (self .config ['uri' ]))
163+ channel = pika_connection .channel ()
165164 for bundle in bundles :
166- self ._send_bundle (bundle , entities_types )
165+ self ._send_bundle (channel , bundle , entities_types )
166+ channel .close ()
167167 return bundles
168168
169- def _send_bundle (self , bundle , entities_types = None ):
169+ def _send_bundle (self , channel , bundle , entities_types = None ):
170170 """
171171 This method send a STIX2 bundle to RabbitMQ to be consumed by workers
172172 :param bundle: A valid STIX2 bundle
@@ -181,9 +181,6 @@ def _send_bundle(self, bundle, entities_types=None):
181181 else :
182182 job_id = None
183183
184- if self .channel is None or not self .channel .is_open :
185- self .channel = self .pika_connection .channel ()
186-
187184 # Validate the STIX 2 bundle
188185 # validation = validate_string(bundle)
189186 # if not validation.is_valid:
@@ -201,7 +198,7 @@ def _send_bundle(self, bundle, entities_types=None):
201198 # Send the message
202199 try :
203200 routing_key = 'push_routing_' + self .connector_id
204- self . channel .basic_publish (self .config ['push_exchange' ], routing_key , json .dumps (message ))
201+ channel .basic_publish (self .config ['push_exchange' ], routing_key , json .dumps (message ))
205202 logging .info ('Bundle has been sent' )
206203 except (UnroutableError , NackError ) as e :
207204 logging .error ('Unable to send bundle, retry...' , e )
0 commit comments