Skip to content
This repository was archived by the owner on Dec 5, 2025. It is now read-only.

Commit 40a0924

Browse files
address review feedback
1 parent a343e96 commit 40a0924

File tree

2 files changed

+97
-8
lines changed

2 files changed

+97
-8
lines changed

pycti/api/opencti_api_client.py

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
# coding: utf-8
2+
import atexit
23
import base64
34
import datetime
45
import io
56
import json
67
import os
8+
import shutil
9+
import signal
710
import tempfile
811
from typing import Dict, Tuple, Union
912

@@ -168,9 +171,18 @@ def __init__(
168171
self.app_logger = self.logger_class("api")
169172
self.admin_logger = self.logger_class("admin")
170173

174+
# Initialize temp certificate directory tracker
175+
self.temp_cert_dir = None
176+
171177
# Setup proxy certificates if provided
172178
self._setup_proxy_certificates()
173179

180+
# Register cleanup handlers for temp certificates
181+
if self.temp_cert_dir:
182+
atexit.register(self._cleanup_temp_certificates)
183+
signal.signal(signal.SIGTERM, self._signal_handler)
184+
signal.signal(signal.SIGINT, self._signal_handler)
185+
174186
# Define API
175187
self.api_token = token
176188
self.api_url = url + "/graphql"
@@ -268,6 +280,7 @@ def _setup_proxy_certificates(self):
268280
try:
269281
# Create secure temporary directory
270282
cert_dir = tempfile.mkdtemp(prefix="opencti_proxy_certs_")
283+
self.temp_cert_dir = cert_dir
271284

272285
# Determine if HTTPS_CA_CERTIFICATES contains inline content or file path
273286
cert_content = self._get_certificate_content(https_ca_certificates)
@@ -323,9 +336,10 @@ def _setup_proxy_certificates(self):
323336
)
324337

325338
except Exception as e:
326-
self.app_logger.warning(
339+
self.app_logger.error(
327340
"Failed to setup proxy certificates", {"error": str(e)}
328341
)
342+
raise
329343

330344
def _get_certificate_content(self, https_ca_certificates):
331345
"""Extract certificate content from environment variable.
@@ -337,16 +351,19 @@ def _get_certificate_content(self, https_ca_certificates):
337351
:return: Certificate content in PEM format or None if invalid
338352
:rtype: str or None
339353
"""
354+
# Strip whitespace once at the beginning
355+
stripped_https_ca_certificates = https_ca_certificates.strip()
356+
340357
# Check if it's inline certificate content (starts with PEM header)
341-
if https_ca_certificates.strip().startswith("-----BEGIN CERTIFICATE-----"):
358+
if stripped_https_ca_certificates.startswith("-----BEGIN CERTIFICATE-----"):
342359
self.app_logger.debug(
343360
"HTTPS_CA_CERTIFICATES contains inline certificate content"
344361
)
345362
return https_ca_certificates
346363

347364
# Check if it's a file path
348-
if os.path.isfile(https_ca_certificates.strip()):
349-
cert_file_path = https_ca_certificates.strip()
365+
if os.path.isfile(stripped_https_ca_certificates):
366+
cert_file_path = stripped_https_ca_certificates
350367
try:
351368
with open(cert_file_path, "r") as f:
352369
cert_content = f.read()
@@ -373,6 +390,43 @@ def _get_certificate_content(self, https_ca_certificates):
373390
# Neither inline content nor valid file path
374391
return None
375392

393+
def _cleanup_temp_certificates(self):
394+
"""Clean up temporary certificate directory.
395+
396+
This method is called on normal program exit via atexit
397+
or when receiving termination signals (SIGTERM/SIGINT).
398+
"""
399+
if self.temp_cert_dir and os.path.exists(self.temp_cert_dir):
400+
try:
401+
shutil.rmtree(self.temp_cert_dir)
402+
self.app_logger.debug(
403+
"Cleaned up temporary certificates",
404+
{"cert_dir": self.temp_cert_dir}
405+
)
406+
except Exception as e:
407+
self.app_logger.warning(
408+
"Failed to cleanup temporary certificates",
409+
{"cert_dir": self.temp_cert_dir, "error": str(e)}
410+
)
411+
finally:
412+
self.temp_cert_dir = None
413+
414+
def _signal_handler(self, signum, frame):
415+
"""Handle termination signals (SIGTERM/SIGINT).
416+
417+
Performs cleanup and then raises SystemExit to allow
418+
normal shutdown procedures to complete.
419+
420+
:param signum: Signal number
421+
:param frame: Current stack frame
422+
"""
423+
self.app_logger.info(
424+
"Received termination signal, cleaning up",
425+
{"signal": signum}
426+
)
427+
self._cleanup_temp_certificates()
428+
raise SystemExit(0)
429+
376430
def set_applicant_id_header(self, applicant_id):
377431
self.request_headers["opencti-applicant-id"] = applicant_id
378432

tests/01-unit/api/test_opencti_api_client-proxy.py

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,12 +177,47 @@ def test_setup_proxy_certificates_with_invalid_path(self, mock_mkdtemp, api_clie
177177
assert not hasattr(api_client, "ssl_verify") or api_client.ssl_verify is False
178178

179179
def test_setup_proxy_certificates_exception_handling(self, api_client):
180-
"""Test _setup_proxy_certificates handles exceptions gracefully."""
180+
"""Test _setup_proxy_certificates raises exception on error."""
181181
with patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": self.SAMPLE_CERTIFICATE}):
182182
with patch("tempfile.mkdtemp", side_effect=Exception("Mock error")):
183-
api_client._setup_proxy_certificates()
183+
with pytest.raises(Exception, match="Mock error"):
184+
api_client._setup_proxy_certificates()
184185

185-
# Should log warning and continue
186-
api_client.app_logger.warning.assert_called_with(
186+
# Should log error before raising
187+
api_client.app_logger.error.assert_called_with(
187188
"Failed to setup proxy certificates", {"error": "Mock error"}
188189
)
190+
191+
def test_cleanup_temp_certificates_successful(self, api_client):
192+
"""Test _cleanup_temp_certificates successfully removes temporary directory."""
193+
# Create a real temporary directory
194+
temp_dir = tempfile.mkdtemp(prefix="opencti_test_")
195+
api_client.temp_cert_dir = temp_dir
196+
197+
# Call cleanup
198+
api_client._cleanup_temp_certificates()
199+
200+
# Verify directory was removed and temp_cert_dir reset
201+
assert not os.path.exists(temp_dir)
202+
assert api_client.temp_cert_dir is None
203+
204+
@patch("shutil.rmtree")
205+
@patch("os.path.exists")
206+
def test_cleanup_temp_certificates_with_error(
207+
self, mock_exists, mock_rmtree, api_client
208+
):
209+
"""Test _cleanup_temp_certificates handles errors during removal."""
210+
temp_dir = "/tmp/opencti_test_certs"
211+
api_client.temp_cert_dir = temp_dir
212+
mock_exists.return_value = True
213+
mock_rmtree.side_effect = OSError("Permission denied")
214+
215+
# Call cleanup - should not raise exception
216+
api_client._cleanup_temp_certificates()
217+
218+
# Should log warning and reset temp_cert_dir
219+
api_client.app_logger.warning.assert_called_with(
220+
"Failed to cleanup temporary certificates",
221+
{"cert_dir": temp_dir, "error": "Permission denied"},
222+
)
223+
assert api_client.temp_cert_dir is None

0 commit comments

Comments
 (0)