Skip to content
This repository was archived by the owner on Dec 5, 2025. It is now read-only.

Commit 9bc40df

Browse files
committed
[client] Restore threading / consuming on enrichment
1 parent 65a07f7 commit 9bc40df

File tree

1 file changed

+17
-43
lines changed

1 file changed

+17
-43
lines changed

pycti/connector/opencti_connector_helper.py

Lines changed: 17 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ def create_mq_ssl_context(config) -> ssl.SSLContext:
173173
return ssl_context
174174

175175

176-
class ListenQueue:
176+
class ListenQueue(threading.Thread):
177177
"""Main class for the ListenQueue used in OpenCTIConnectorHelper
178178
179179
:param helper: instance of a `OpenCTIConnectorHelper` class
@@ -185,6 +185,7 @@ class ListenQueue:
185185
"""
186186

187187
def __init__(self, helper, config: Dict, connector_config: Dict, callback) -> None:
188+
threading.Thread.__init__(self)
188189
self.pika_credentials = None
189190
self.pika_parameters = None
190191
self.pika_connection = None
@@ -199,12 +200,8 @@ def __init__(self, helper, config: Dict, connector_config: Dict, callback) -> No
199200
self.user = connector_config["connection"]["user"]
200201
self.password = connector_config["connection"]["pass"]
201202
self.queue_name = connector_config["listen"]
203+
self.exit_event = threading.Event()
202204
self.thread = None
203-
self.connector_thread = None
204-
self.connector_event_loop = None
205-
self.queue_event_loop = asyncio.new_event_loop()
206-
asyncio.set_event_loop(self.queue_event_loop)
207-
self.run()
208205

209206
# noinspection PyUnusedLocal
210207
def _process_message(self, channel, method, properties, body) -> None:
@@ -224,6 +221,8 @@ def _process_message(self, channel, method, properties, body) -> None:
224221
# Not ACK the message here may lead to infinite re-deliver if the connector is broken
225222
# Also ACK, will not have any impact on the blocking aspect of the following functions
226223
channel.basic_ack(delivery_tag=method.delivery_tag)
224+
LOGGER.info("Message (delivery_tag=%s) ack", method.delivery_tag)
225+
227226
self.thread = threading.Thread(target=self._data_handler, args=[json_data])
228227
self.thread.start()
229228
five_minutes = 60 * 5
@@ -291,7 +290,7 @@ def _data_handler(self, json_data) -> None:
291290
LOGGER.error("Failing reporting the processing")
292291

293292
def run(self) -> None:
294-
while True:
293+
while not self.exit_event.is_set():
295294
try:
296295
# Connect the broker
297296
self.pika_credentials = pika.PlainCredentials(self.user, self.password)
@@ -306,49 +305,23 @@ def run(self) -> None:
306305
if self.use_ssl
307306
else None,
308307
)
309-
if asyncio.iscoroutinefunction(self.callback):
310-
self.connector_event_loop = asyncio.new_event_loop()
311-
self.connector_thread = threading.Thread(
312-
target=lambda: start_loop(self.connector_event_loop)
313-
)
314-
self.connector_thread.start()
315-
self.pika_connection = AsyncioConnection(
316-
self.pika_parameters,
317-
on_open_callback=self.on_connection_open,
318-
on_open_error_callback=self.on_connection_open_error,
319-
on_close_callback=self.on_connection_closed,
320-
custom_ioloop=self.queue_event_loop,
308+
self.pika_connection = pika.BlockingConnection(self.pika_parameters)
309+
self.channel = self.pika_connection.channel()
310+
assert self.channel is not None
311+
self.channel.basic_consume(
312+
queue=self.queue_name, on_message_callback=self._process_message
321313
)
322-
self.pika_connection.ioloop.run_forever()
323-
# If the connection fails, sleep between reconnect attempts
324-
time.sleep(10)
314+
self.channel.start_consuming()
325315
except (KeyboardInterrupt, SystemExit):
326316
LOGGER.info("Connector stop")
327317
sys.exit(0)
328318
except Exception as err: # pylint: disable=broad-except
329319
LOGGER.error("%s", err)
330320

331-
# noinspection PyUnusedLocal
332-
def on_connection_open(self, _unused_connection):
333-
self.pika_connection.channel(on_open_callback=self.on_channel_open)
334-
335-
# noinspection PyUnusedLocal
336-
def on_connection_open_error(self, _unused_connection, err):
337-
LOGGER.info("Unable to connect to the queue. %s", err)
338-
self.pika_connection.ioloop.stop()
339-
340-
# noinspection PyUnusedLocal
341-
def on_connection_closed(self, _unused_connection, reason):
342-
LOGGER.info("The connection to the queue closed: %s", reason)
343-
self.pika_connection.ioloop.stop()
344-
345-
def on_channel_open(self, channel):
346-
self.channel = channel
347-
assert self.channel is not None
348-
self.channel.basic_consume(
349-
queue=self.queue_name,
350-
on_message_callback=self._process_message,
351-
)
321+
def stop(self):
322+
self.exit_event.set()
323+
if self.thread:
324+
self.thread.join()
352325

353326

354327
class PingAlive(threading.Thread):
@@ -829,6 +802,7 @@ def listen(self, message_callback: Callable[[Dict], str]) -> None:
829802
self.listen_queue = ListenQueue(
830803
self, self.config, self.connector_config, message_callback
831804
)
805+
self.listen_queue.start()
832806

833807
def listen_stream(
834808
self,

0 commit comments

Comments
 (0)