Skip to content
This repository was archived by the owner on Dec 5, 2025. It is now read-only.

Commit fcd9047

Browse files
[client] Reintroduce blocking picka consuming as current implementation is not thread safe
1 parent d8d70f8 commit fcd9047

File tree

1 file changed

+19
-34
lines changed

1 file changed

+19
-34
lines changed

pycti/connector/opencti_connector_helper.py

Lines changed: 19 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -198,14 +198,15 @@ def __init__(self, helper, config: Dict, connector_config: Dict, callback) -> No
198198
self.user = connector_config["connection"]["user"]
199199
self.password = connector_config["connection"]["pass"]
200200
self.queue_name = connector_config["listen"]
201+
self.thread = None
201202
self.connector_thread = None
202203
self.connector_event_loop = None
203204
self.queue_event_loop = asyncio.new_event_loop()
204205
asyncio.set_event_loop(self.queue_event_loop)
205206
self.run()
206207

207208
# noinspection PyUnusedLocal
208-
async def _process_message(self, channel, method, properties, body) -> None:
209+
def _process_message(self, channel, method, properties, body) -> None:
209210
"""process a message from the rabbit queue
210211
211212
:param channel: channel instance
@@ -217,30 +218,22 @@ async def _process_message(self, channel, method, properties, body) -> None:
217218
:param body: message body (data)
218219
:type body: str or bytes or bytearray
219220
"""
220-
221221
json_data = json.loads(body)
222-
work_id = json_data["internal"]["work_id"]
223-
message_task = self._data_handler(json_data)
222+
self.thread = threading.Thread(target=self._data_handler, args=[json_data])
223+
self.thread.start()
224224
five_minutes = 60 * 5
225225
time_wait = 0
226-
try:
227-
while not message_task.done(): # Loop while the task/thread is processing
228-
if (
229-
self.helper.work_id is not None and time_wait > five_minutes
230-
): # Ping every 5 minutes
231-
self.helper.api.work.ping(self.helper.work_id)
232-
time_wait = 0
233-
else:
234-
time_wait += 1
235-
await asyncio.sleep(1)
236-
if work_id:
237-
self.helper.api.work.to_processed(work_id, message_task.result())
238-
except Exception as e: # pylint: disable=broad-except
239-
logging.exception("Error in message processing, reporting error to API")
240-
if work_id:
241-
self.helper.api.work.to_processed(work_id, str(e), True)
242-
finally:
243-
channel.basic_ack(delivery_tag=method.delivery_tag)
226+
# Wait for end of execution of the _data_handler
227+
while self.thread.is_alive(): # Loop while the thread is processing
228+
if (
229+
self.helper.work_id is not None and time_wait > five_minutes
230+
): # Ping every 5 minutes
231+
self.helper.api.work.ping(self.helper.work_id)
232+
time_wait = 0
233+
else:
234+
time_wait += 1
235+
time.sleep(1)
236+
channel.basic_ack(delivery_tag=method.delivery_tag)
244237
LOGGER.info(
245238
"Message (delivery_tag=%s) processed, thread terminated",
246239
method.delivery_tag,
@@ -280,15 +273,9 @@ def _data_handler(self, json_data) -> None:
280273
self.helper.api.work.to_received(
281274
work_id, "Connector ready to process the operation"
282275
)
283-
if asyncio.iscoroutinefunction(self.callback):
284-
message = asyncio.run_coroutine_threadsafe(
285-
self.callback(json_data["event"]), self.connector_event_loop
286-
)
287-
else:
288-
message = asyncio.get_running_loop().run_in_executor(
289-
None, self.callback, json_data["event"]
290-
)
291-
return message
276+
message = self.callback(json_data["event"])
277+
if work_id:
278+
self.helper.api.work.to_processed(work_id, message)
292279
except Exception as e: # pylint: disable=broad-except
293280
self.helper.metric.inc("error_count")
294281
LOGGER.exception("Error in message processing, reporting error to API")
@@ -356,9 +343,7 @@ def on_channel_open(self, channel):
356343
assert self.channel is not None
357344
self.channel.basic_consume(
358345
queue=self.queue_name,
359-
on_message_callback=lambda *args: asyncio.create_task(
360-
self._process_message(*args)
361-
),
346+
on_message_callback=self._process_message,
362347
)
363348

364349

0 commit comments

Comments
 (0)