Skip to content
This repository was archived by the owner on Dec 5, 2025. It is now read-only.

Commit 7511c9b

Browse files
author
Samuel Hassine
committed
[client] Fix channel cross-threads
1 parent b374c33 commit 7511c9b

File tree

1 file changed

+4
-7
lines changed

1 file changed

+4
-7
lines changed

pycti/connector/opencti_connector_helper.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,30 +28,27 @@ def __init__(self, helper, config, callback):
2828
# noinspection PyUnusedLocal
2929
def _process_message(self, channel, method, properties, body):
3030
json_data = json.loads(body)
31-
thread = threading.Thread(target=self._data_handler, args=[channel, method, json_data])
31+
thread = threading.Thread(target=self._data_handler, args=[json_data])
3232
thread.start()
3333
while thread.is_alive(): # Loop while the thread is processing
3434
self.pika_connection.sleep(1.0)
35+
logging.info('Message (delivery_tag=' + str(method.delivery_tag) + ') processed, thread terminated')
36+
channel.basic_ack(delivery_tag=method.delivery_tag)
3537

36-
def _data_handler(self, channel, method, json_data):
38+
def _data_handler(self, json_data):
3739
job_id = json_data['job_id'] if 'job_id' in json_data else None
3840
try:
3941
work_id = json_data['work_id']
4042
self.helper.current_work_id = work_id
4143
self.helper.api.job.update_job(job_id, 'progress', ['Starting process'])
4244
messages = self.callback(json_data)
4345
self.helper.api.job.update_job(job_id, 'complete', messages)
44-
channel.basic_ack(delivery_tag=method.delivery_tag)
45-
except requests.exceptions.Timeout:
46-
logging.warning('API call timeout, message remains in the queue')
4746
except Exception as e:
4847
logging.exception('Error in message processing, reporting error to API')
4948
try:
5049
self.helper.api.job.update_job(job_id, 'error', [str(e)])
5150
except:
5251
logging.error('Failing reporting the processing')
53-
# We can assume that reprocessing will produce the same error, so ack the message
54-
channel.basic_ack(delivery_tag=method.delivery_tag)
5552

5653
def run(self):
5754
while True:

0 commit comments

Comments
 (0)