Skip to content
This repository was archived by the owner on Dec 5, 2025. It is now read-only.

Commit 37b4587

Browse files
authored
[client] Trying to have enrichment connector resilient to small network issues with rabbitMq (#529)
1 parent d0adc8e commit 37b4587

File tree

1 file changed

+34
-16
lines changed

1 file changed

+34
-16
lines changed

pycti/connector/opencti_connector_helper.py

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import tempfile
1212
import threading
1313
import time
14-
import traceback
1514
import uuid
1615
from queue import Queue
1716
from typing import Callable, Dict, List, Optional, Union
@@ -30,7 +29,6 @@
3029

3130

3231
def killProgramHook(etype, value, tb):
33-
traceback.print_exception(etype, value, tb)
3432
os.kill(os.getpid(), signal.SIGTERM)
3533

3634

@@ -290,8 +288,10 @@ def _data_handler(self, json_data) -> None:
290288
)
291289

292290
def run(self) -> None:
291+
self.helper.connector_logger.info("Starting ListenQueue thread")
293292
while not self.exit_event.is_set():
294293
try:
294+
self.helper.connector_logger.info("ListenQueue connecting to rabbitMq.")
295295
# Connect the broker
296296
self.pika_credentials = pika.PlainCredentials(self.user, self.password)
297297
self.pika_parameters = pika.ConnectionParameters(
@@ -308,28 +308,32 @@ def run(self) -> None:
308308
self.pika_connection = pika.BlockingConnection(self.pika_parameters)
309309
self.channel = self.pika_connection.channel()
310310
try:
311+
# confirm_delivery is only for cluster mode rabbitMQ
312+
# when not in cluster mode this line raise an exception
311313
self.channel.confirm_delivery()
312314
except Exception as err: # pylint: disable=broad-except
313-
self.helper.connector_logger.warning(str(err))
315+
self.helper.connector_logger.debug(str(err))
314316
self.channel.basic_qos(prefetch_count=1)
315317
assert self.channel is not None
316318
self.channel.basic_consume(
317319
queue=self.queue_name, on_message_callback=self._process_message
318320
)
319321
self.channel.start_consuming()
320-
except (KeyboardInterrupt, SystemExit):
321-
self.channel.stop_consuming()
322-
self.pika_connection.close()
323-
self.helper.connector_logger.info("Connector stop")
324-
sys.exit(0)
325322
except Exception as err: # pylint: disable=broad-except
326-
self.pika_connection.close()
323+
try:
324+
self.pika_connection.close()
325+
except Exception as errInException:
326+
self.helper.connector_logger.debug(
327+
type(errInException).__name__, {"reason": str(errInException)}
328+
)
327329
self.helper.connector_logger.error(
328330
type(err).__name__, {"reason": str(err)}
329331
)
330-
sys.exit(1)
332+
# Wait some time and then retry ListenQueue again.
333+
time.sleep(10)
331334

332335
def stop(self):
336+
self.helper.connector_logger.info("Preparing ListenQueue for clean shutdown")
333337
self.exit_event.set()
334338
self.pika_connection.close()
335339
if self.thread:
@@ -353,6 +357,7 @@ def __init__(
353357
def ping(self) -> None:
354358
while not self.exit_event.is_set():
355359
try:
360+
self.connector_logger.debug("PingAlive running.")
356361
initial_state = self.get_state()
357362
result = self.api.connector.ping(self.connector_id, initial_state)
358363
remote_state = (
@@ -378,11 +383,11 @@ def ping(self) -> None:
378383
self.exit_event.wait(40)
379384

380385
def run(self) -> None:
381-
self.connector_logger.info("Starting ping alive thread")
386+
self.connector_logger.info("Starting PingAlive thread")
382387
self.ping()
383388

384389
def stop(self) -> None:
385-
self.connector_logger.info("Preparing for clean shutdown")
390+
self.connector_logger.info("Preparing PingAlive for clean shutdown")
386391
self.exit_event.set()
387392

388393

@@ -395,10 +400,11 @@ def __init__(self, helper, q) -> None:
395400

396401
def run(self) -> None:
397402
try:
398-
self.helper.connector_logger.info("Starting stream alive thread")
403+
self.helper.connector_logger.info("Starting StreamAlive thread")
399404
time_since_last_heartbeat = 0
400405
while not self.exit_event.is_set():
401406
time.sleep(5)
407+
self.helper.connector_logger.debug("StreamAlive running")
402408
try:
403409
self.q.get(block=False)
404410
time_since_last_heartbeat = 0
@@ -409,12 +415,18 @@ def run(self) -> None:
409415
"Time since last heartbeat exceeded 45s, stopping the connector"
410416
)
411417
break
418+
self.helper.connector_logger.info(
419+
"Exit event in StreamAlive loop, stopping process."
420+
)
412421
sys.excepthook(*sys.exc_info())
413-
except:
422+
except Exception as ex:
423+
self.helper.connector_logger.error(
424+
"Error in StreamAlive loop, stopping process.", {"reason": str(ex)}
425+
)
414426
sys.excepthook(*sys.exc_info())
415427

416428
def stop(self) -> None:
417-
self.helper.connector_logger.info("Preparing for clean shutdown")
429+
self.helper.connector_logger.info("Preparing StreamAlive for clean shutdown")
418430
self.exit_event.set()
419431

420432

@@ -449,6 +461,7 @@ def __init__(
449461

450462
def run(self) -> None: # pylint: disable=too-many-branches
451463
try:
464+
self.helper.connector_logger.info("Starting ListenStream thread")
452465
current_state = self.helper.get_state()
453466
start_from = self.start_timestamp
454467
recover_until = self.recover_iso_date
@@ -545,10 +558,14 @@ def run(self) -> None: # pylint: disable=too-many-branches
545558
self.exit = True
546559
state["start_from"] = str(msg.id)
547560
self.helper.set_state(state)
548-
except:
561+
except Exception as ex:
562+
self.helper.connector_logger.error(
563+
"Error in ListenStream loop, exit.", {"reason": str(ex)}
564+
)
549565
sys.excepthook(*sys.exc_info())
550566

551567
def stop(self):
568+
self.helper.connector_logger.info("Preparing ListenStream for clean shutdown")
552569
self.exit_event.set()
553570

554571

@@ -735,6 +752,7 @@ def __init__(self, config: Dict, playbook_compatible=False) -> None:
735752
self.listen_queue = None
736753

737754
def stop(self) -> None:
755+
self.connector_logger.info("Preparing connector for clean shutdown")
738756
if self.listen_queue:
739757
self.listen_queue.stop()
740758
# if self.listen_stream:

0 commit comments

Comments
 (0)