Skip to content
This repository was archived by the owner on Dec 5, 2025. It is now read-only.

Commit 2d869e5

Browse files
committed
[client] Ability to reconnect to RabbitMQ in case of errors
1 parent 9831b4f commit 2d869e5

File tree

2 files changed

+24
-16
lines changed

2 files changed

+24
-16
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,6 @@ $ pytest ./tests/02-integration/
8585

8686
## About
8787

88-
OpenCTI is a product powered by the collaboration of the private company [Filigran](https://www.filigran.io), the [French national cybersecurity agency (ANSSI)](https://ssi.gouv.fr), the [CERT-EU](https://cert.europa.eu) and the [Luatix](https://www.luatix.org) non-profit organization.
88+
OpenCTI is a product designed and developed by the company [Filigran](https://www.filigran.io).
89+
90+
<a href="https://www.filigran.io" alt="Filigran"><img src="https://www.filigran.io/wp-content/uploads/2022/08/filigran_text_horizontal_dense_margin.png" width="230" /></a>

pycti/connector/opencti_connector_helper.py

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def killProgramHook(etype, value, tb):
3636
os.kill(os.getpid(), signal.SIGTERM)
3737

3838

39-
def run_loop(loop):
39+
def start_loop(loop):
4040
asyncio.set_event_loop(loop)
4141
loop.run_forever()
4242

@@ -128,7 +128,6 @@ def __init__(self, helper, config: Dict, callback) -> None:
128128
self.user = config["connection"]["user"]
129129
self.password = config["connection"]["pass"]
130130
self.queue_name = config["listen"]
131-
self.exit_event = threading.Event()
132131
self.connector_thread = None
133132
self.connector_event_loop = None
134133
self.queue_event_loop = asyncio.new_event_loop()
@@ -207,7 +206,7 @@ def _data_handler(self, json_data) -> None:
207206
LOGGER.error("Failing reporting the processing")
208207

209208
def run(self) -> None:
210-
while not self.exit_event.is_set():
209+
while True:
211210
try:
212211
# Connect the broker
213212
self.pika_credentials = pika.PlainCredentials(self.user, self.password)
@@ -223,25 +222,39 @@ def run(self) -> None:
223222
if asyncio.iscoroutinefunction(self.callback):
224223
self.connector_event_loop = asyncio.new_event_loop()
225224
self.connector_thread = threading.Thread(
226-
target=lambda: run_loop(self.connector_event_loop)
227-
).start()
225+
target=lambda: start_loop(self.connector_event_loop)
226+
)
227+
self.connector_thread.start()
228228
self.pika_connection = AsyncioConnection(
229229
self.pika_parameters,
230230
on_open_callback=self.on_connection_open,
231+
on_open_error_callback=self.on_connection_open_error,
232+
on_close_callback=self.on_connection_closed,
231233
custom_ioloop=self.queue_event_loop,
232234
)
233235
self.pika_connection.ioloop.run_forever()
236+
# If the connection fails, sleep between reconnect attempts
237+
time.sleep(10)
234238
except (KeyboardInterrupt, SystemExit):
235239
LOGGER.info("Connector stop")
236240
sys.exit(0)
237-
except Exception as e: # pylint: disable=broad-except
238-
LOGGER.error("%s", e)
239-
time.sleep(10)
241+
except Exception as err: # pylint: disable=broad-except
242+
LOGGER.error("%s", err)
240243

241244
# noinspection PyUnusedLocal
242245
def on_connection_open(self, _unused_connection):
243246
self.pika_connection.channel(on_open_callback=self.on_channel_open)
244247

248+
# noinspection PyUnusedLocal
249+
def on_connection_open_error(self, _unused_connection, err):
250+
LOGGER.info("Unable to connect to the queue. %s", err)
251+
self.pika_connection.ioloop.stop()
252+
253+
# noinspection PyUnusedLocal
254+
def on_connection_closed(self, _unused_connection, reason):
255+
LOGGER.info("The connection to the queue closed: %s", reason)
256+
self.pika_connection.ioloop.stop()
257+
245258
def on_channel_open(self, channel):
246259
self.channel = channel
247260
assert self.channel is not None
@@ -252,13 +265,6 @@ def on_channel_open(self, channel):
252265
),
253266
)
254267

255-
def stop(self):
256-
self.queue_event_loop.stop()
257-
self.exit_event.set()
258-
if self.connector_thread:
259-
self.connector_event_loop.stop()
260-
self.connector_thread.join()
261-
262268

263269
class PingAlive(threading.Thread):
264270
def __init__(self, connector_id, api, get_state, set_state) -> None:

0 commit comments

Comments
 (0)