Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 126 additions & 60 deletions api_app/connectors_manager/connectors/opencti.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from api_app import helpers
from api_app.choices import Classification
from api_app.connectors_manager import classes
from tests.mock_utils import if_mock_connections, patch

INTELOWL_OPENCTI_TYPE_MAP = {
Classification.IP: {
Expand Down Expand Up @@ -77,7 +76,7 @@ def generate_observable_data(self) -> dict:

@property
def organization_id(self) -> str:
# Create author (if not exists); else update
# Idempotent author organization for all OpenCTI objects.
org = pycti.Identity(self.opencti_instance).create(
type="Organization",
name="IntelOwl",
Expand All @@ -89,109 +88,176 @@ def organization_id(self) -> str:
),
update=True, # just in case the description is updated in future
)
return org["id"]
if isinstance(org, dict):
return org.get("id")
return None

@property
def marking_definition_id(self) -> str:
# Create the marking definition (if not exists)
# Idempotent TLP marking used on all OpenCTI objects.
md = pycti.MarkingDefinition(self.opencti_instance).create(
definition_type="TLP",
definition=f"TLP:{self.tlp['type'].upper()}",
x_opencti_color=self.tlp["color"].lower(),
x_opencti_order=self.tlp["x_opencti_order"],
)
return md["id"]
if isinstance(md, dict):
return md.get("id")
return None

def config(self, runtime_configuration: Dict):
super().config(runtime_configuration)
if self.ssl_verify is None:
self.ssl_verify = False

def run(self):
# set up client
self.opencti_instance = pycti.OpenCTIApiClient(
url=self._url_key_name,
token=self._api_key_name,
ssl_verify=self.ssl_verify,
proxies=self.proxies,
)

# Entities in OpenCTI are created only if they don't exist
# create queries will return the existing entity in that case
# use update (default: False) to update the entity if exists

# Create the observable (if not exists with the given type and values)
def _create_observable(self, created, org_id, marking_id):
observable_data = self.generate_observable_data()
observable = pycti.StixCyberObservable(self.opencti_instance, File).create(
observableData=observable_data,
createdBy=self.organization_id,
objectMarking=self.marking_definition_id,
createdBy=org_id,
objectMarking=marking_id,
)
observable_id = observable["id"] if isinstance(observable, dict) and "id" in observable else None
created["observable"] = observable_id
return observable_id

# Create labels from Job tags (if not exists)
def _create_labels(self, created):
label_ids = []
for tag in self._job.tags.all():
label = pycti.Label(self.opencti_instance).create(
value=f"intelowl-tag:{tag.label}",
color=tag.color,
)
label_ids.append(label["id"])
if not isinstance(label, dict) or "id" not in label:
raise ValueError("Invalid response from OpenCTI Label.create")
label_id = label["id"]
created["labels"].append(label_id)
label_ids.append(label_id)
return label_ids

# Create the report
def _create_report(self, created, org_id, marking_id, label_ids):
report = pycti.Report(self.opencti_instance).create(
name=f"IntelOwl Job-{self.job_id}",
description=(
f"This is IntelOwl's analysis report for Job: {self.job_id}."
# comma separate analyzers executed
" Analyzers Executed:"
f" {', '.join(list(self._job.analyzers_to_execute.all().values_list('name', flat=True)))}" # noqa
f" {', '.join(list(self._job.analyzers_to_execute.all().values_list('name', flat=True)))}"
),
published=self._job.received_request_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
report_types=["internal-report"],
createdBy=self.organization_id,
objectMarking=self.marking_definition_id,
createdBy=org_id,
objectMarking=marking_id,
objectLabel=label_ids,
x_opencti_report_status=2, # Analyzed
x_opencti_report_status=2,
)
# Create the external reference
if not isinstance(report, dict) or "id" not in report:
created["report"] = None
raise ValueError("Invalid response from OpenCTI Report.create")
report_id = report["id"]
created["report"] = report_id
return report_id

def _create_external_reference(self, created):
external_reference = pycti.ExternalReference(self.opencti_instance, None).create(
source_name="IntelOwl Analysis",
description="View analysis report on the IntelOwl instance",
url=f"{settings.WEB_CLIENT_URL}/jobs/{self.job_id}",
)
# Add the external reference to the report
pycti.StixDomainObject(self.opencti_instance, File).add_external_reference(
id=report["id"], external_reference_id=external_reference["id"]
)
if not isinstance(external_reference, dict) or "id" not in external_reference:
created["external_reference"] = None
raise ValueError("Invalid response from OpenCTI ExternalReference.create")
external_ref_id = external_reference["id"]
created["external_reference"] = external_ref_id
return external_ref_id

# Link Observable and Report
pycti.Report(self.opencti_instance).add_stix_object_or_stix_relationship(
id=report["id"], stixObjectOrStixRelationshipId=observable["id"]
)
def _link_report_entities(self, report_id, observable_id, external_ref_id):
if report_id is not None and external_ref_id is not None:
pycti.StixDomainObject(self.opencti_instance, File).add_external_reference(
id=report_id, external_reference_id=external_ref_id
)
if report_id is not None and observable_id is not None:
pycti.Report(self.opencti_instance).add_stix_object_or_stix_relationship(
id=report_id, stixObjectOrStixRelationshipId=observable_id
)

return {
"observable": pycti.StixCyberObservable(self.opencti_instance, File).read(id=observable["id"]),
"report": pycti.Report(self.opencti_instance).read(id=report["id"]),
def run(self):
# Initialize OpenCTI client for this run.
self.opencti_instance = pycti.OpenCTIApiClient(
url=self._url_key_name,
token=self._api_key_name,
ssl_verify=self.ssl_verify,
proxies=self.proxies,
)
created = {
"observable": None,
"report": None,
"external_reference": None,
"labels": [],
}

try:
org_id = self.organization_id
marking_id = self.marking_definition_id
observable_id = self._create_observable(created, org_id, marking_id)
label_ids = self._create_labels(created)
report_id = self._create_report(created, org_id, marking_id, label_ids)
external_ref_id = self._create_external_reference(created)
self._link_report_entities(report_id, observable_id, external_ref_id)

# Enforce observable contract once all dependent creations have been attempted.
if observable_id is None:
raise ValueError("Invalid response from OpenCTI StixCyberObservable.create")

# Return a JSON-serializable summary instead of raw SDK responses.
return {
"observable": {"id": observable_id},
"report": {"id": report_id},
}
except Exception as e:
try:
msg = (
f"OpenCTI partial state detected after exception: {type(e).__name__}: {e}. "
f"Created IDs: observable={created['observable']}, "
f"report={created['report']}, "
f"external_reference={created['external_reference']}, "
f"labels={created['labels']}"
)
self.report.errors.append(msg)
except Exception:
pass
raise

@classmethod
def _monkeypatch(cls):
patches = [
if_mock_connections(
patch("pycti.OpenCTIApiClient", return_value=None),
patch("pycti.Identity.create", return_value={"id": 1}),
patch("pycti.MarkingDefinition.create", return_value={"id": 1}),
patch("pycti.StixCyberObservable.create", return_value={"id": 1}),
patch("pycti.Label.create", return_value={"id": 1}),
patch("pycti.Report.create", return_value={"id": 1}),
patch("pycti.ExternalReference.create", return_value={"id": 1}),
patch("pycti.StixDomainObject.add_external_reference", return_value=None),
patch(
"pycti.Report.add_stix_object_or_stix_relationship",
return_value=None,
),
patch("pycti.StixCyberObservable.read", return_value={"id": 1}),
patch("pycti.Report.read", return_value={"id": 1}),
)
]
return super()._monkeypatch(patches=patches)
"""Install pycti stubs when connection mocking is enabled."""
if not getattr(settings, "MOCK_CONNECTIONS", False):
return

def _configure(start_fn):
def inner(self, job_id, runtime_configuration, task_id, *args, **kwargs):
# Avoid real OpenCTI network calls
pycti.OpenCTIApiClient = lambda *a, **k: None

def _fake_create(*_args, **_kwargs):
return {"id": 1}

def _noop(*_args, **_kwargs):
return None

# Ensure core entities always return a dict with an id in CI generic tests.
pycti.Identity.create = _fake_create
pycti.MarkingDefinition.create = _fake_create
pycti.StixCyberObservable.create = _fake_create
pycti.Label.create = _fake_create
pycti.Report.create = _fake_create
pycti.ExternalReference.create = _fake_create

# No-op the linking methods that would otherwise dereference opencti/app_logger.
pycti.StixDomainObject.add_external_reference = _noop
pycti.Report.add_stix_object_or_stix_relationship = _noop

return start_fn(self, job_id, runtime_configuration, task_id, *args, **kwargs)

return inner

return super()._monkeypatch(patches=[_configure])
Loading