Skip to content
This repository was archived by the owner on Dec 5, 2025. It is now read-only.

Commit 285a4bc

Browse files
[client] Introduce security assessment (opencti/#11707)
1 parent e1adae4 commit 285a4bc

File tree

5 files changed

+226
-16
lines changed

5 files changed

+226
-16
lines changed

pycti/api/opencti_api_client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
from pycti.entities.opencti_opinion import Opinion
5353
from pycti.entities.opencti_report import Report
5454
from pycti.entities.opencti_role import Role
55+
from pycti.entities.opencti_security_assessment import SecurityAssessment
5556
from pycti.entities.opencti_settings import Settings
5657
from pycti.entities.opencti_stix import Stix
5758
from pycti.entities.opencti_stix_core_object import StixCoreObject
@@ -223,6 +224,7 @@ def __init__(
223224
self.narrative = Narrative(self)
224225
self.language = Language(self)
225226
self.vulnerability = Vulnerability(self)
227+
self.security_assessment = SecurityAssessment(self)
226228
self.attack_pattern = AttackPattern(self)
227229
self.course_of_action = CourseOfAction(self)
228230
self.data_component = DataComponent(self)

pycti/connector/opencti_connector.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ def __init__(
4343
auto: bool,
4444
only_contextual: bool,
4545
playbook_compatible: bool,
46+
auto_update: bool,
47+
enrichment_resolution: str,
4648
listen_callback_uri=None,
4749
):
4850
self.id = connector_id
@@ -55,6 +57,8 @@ def __init__(
5557
else:
5658
self.scope = []
5759
self.auto = auto
60+
self.auto_update = auto_update
61+
self.enrichment_resolution = enrichment_resolution
5862
self.only_contextual = only_contextual
5963
self.playbook_compatible = playbook_compatible
6064
self.listen_callback_uri = listen_callback_uri
@@ -72,6 +76,8 @@ def to_input(self) -> dict:
7276
"type": self.type.name,
7377
"scope": self.scope,
7478
"auto": self.auto,
79+
"auto_update": self.auto_update,
80+
"enrichment_resolution": self.enrichment_resolution,
7581
"only_contextual": self.only_contextual,
7682
"playbook_compatible": self.playbook_compatible,
7783
"listen_callback_uri": self.listen_callback_uri,

pycti/connector/opencti_connector_helper.py

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,16 @@ def _data_handler(self, json_data) -> None:
365365
event_data = json_data["event"]
366366
entity_id = event_data.get("entity_id")
367367
entity_type = event_data.get("entity_type")
368+
stix_entity = (
369+
json.loads(event_data.get("stix_entity"))
370+
if event_data.get("stix_entity")
371+
else None
372+
)
373+
stix_objects = (
374+
json.loads(event_data.get("stix_objects"))
375+
if event_data.get("stix_objects")
376+
else None
377+
)
368378
validation_mode = event_data.get("validation_mode", "workbench")
369379
force_validation = event_data.get("force_validation", False)
370380
# Set the API headers
@@ -430,15 +440,16 @@ def _data_handler(self, json_data) -> None:
430440
else:
431441
# If not playbook but enrichment, compute object on enrichment_entity
432442
opencti_entity = event_data["enrichment_entity"]
433-
stix_objects = self.helper.api.stix2.prepare_export(
434-
entity=self.helper.api.stix2.generate_export(
435-
copy.copy(opencti_entity)
443+
if stix_objects is None:
444+
stix_objects = self.helper.api.stix2.prepare_export(
445+
entity=self.helper.api.stix2.generate_export(
446+
copy.copy(opencti_entity)
447+
)
436448
)
437-
)
438-
stix_entity = [
439-
e
440-
for e in stix_objects
441-
if e["id"] == opencti_entity["standard_id"]
449+
stix_entity = [
450+
e
451+
for e in stix_objects
452+
if e["id"] == opencti_entity["standard_id"]
442453
or e["id"] == "x-opencti-" + opencti_entity["standard_id"]
443454
][0]
444455
event_data["stix_objects"] = stix_objects
@@ -1116,6 +1127,15 @@ def __init__(self, config: Dict, playbook_compatible=False) -> None:
11161127
self.connect_auto = get_config_variable(
11171128
"CONNECTOR_AUTO", ["connector", "auto"], config, default=False
11181129
)
1130+
self.connect_auto_update = get_config_variable(
1131+
"CONNECTOR_AUTO_UPDATE", ["connector", "auto_update"], config, default=False
1132+
)
1133+
self.connect_enrichment_resolution = get_config_variable(
1134+
"CONNECTOR_ENRICHMENT_RESOLUTION",
1135+
["connector", "enrichment_resolution"],
1136+
config,
1137+
default="none",
1138+
)
11191139
self.bundle_send_to_queue = get_config_variable(
11201140
"CONNECTOR_SEND_TO_QUEUE",
11211141
["connector", "send_to_queue"],
@@ -1231,14 +1251,16 @@ def __init__(self, config: Dict, playbook_compatible=False) -> None:
12311251
)
12321252
# Register the connector in OpenCTI
12331253
self.connector = OpenCTIConnector(
1234-
self.connect_id,
1235-
self.connect_name,
1236-
self.connect_type,
1237-
self.connect_scope,
1238-
self.connect_auto,
1239-
self.connect_only_contextual,
1240-
playbook_compatible,
1241-
(
1254+
connector_id=self.connect_id,
1255+
connector_name=self.connect_name,
1256+
connector_type=self.connect_type,
1257+
scope=self.connect_scope,
1258+
auto=self.connect_auto,
1259+
only_contextual=self.connect_only_contextual,
1260+
playbook_compatible=playbook_compatible,
1261+
auto_update=self.connect_auto_update,
1262+
enrichment_resolution=self.connect_enrichment_resolution,
1263+
listen_callback_uri=(
12421264
self.listen_protocol_api_uri + self.listen_protocol_api_path
12431265
if self.listen_protocol == "API"
12441266
else None
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
# coding: utf-8
2+
3+
import json
4+
import uuid
5+
6+
from stix2.canonicalization.Canonicalize import canonicalize
7+
8+
9+
class SecurityAssessment:
10+
def __init__(self, opencti):
11+
self.opencti = opencti
12+
self.properties = """
13+
id
14+
standard_id
15+
entity_type
16+
parent_types
17+
spec_version
18+
created_at
19+
updated_at
20+
objectAssess {
21+
id
22+
}
23+
objectMarking {
24+
id
25+
standard_id
26+
entity_type
27+
definition_type
28+
definition
29+
created
30+
modified
31+
x_opencti_order
32+
x_opencti_color
33+
}
34+
"""
35+
36+
@staticmethod
37+
def generate_id(name):
38+
name = name.lower().strip()
39+
data = {"name": name}
40+
data = canonicalize(data, utf8=False)
41+
id = str(uuid.uuid5(uuid.UUID("00abedb4-aa42-466c-9c01-fed23315a9b7"), data))
42+
return "securityAssessment--" + id
43+
44+
@staticmethod
45+
def generate_id_from_data(data):
46+
return SecurityAssessment.generate_id(data["name"])
47+
48+
"""
49+
List SecurityAssessment objects
50+
51+
:param filters: the filters to apply
52+
:param search: the search keyword
53+
:param first: return the first n rows from the after ID (or the beginning if not set)
54+
:param after: ID of the first row for pagination
55+
:return List of SecurityAssessment objects
56+
"""
57+
58+
def list(self, **kwargs):
59+
filters = kwargs.get("filters", None)
60+
search = kwargs.get("search", None)
61+
first = kwargs.get("first", 100)
62+
after = kwargs.get("after", None)
63+
order_by = kwargs.get("orderBy", None)
64+
order_mode = kwargs.get("orderMode", None)
65+
custom_attributes = kwargs.get("customAttributes", None)
66+
get_all = kwargs.get("getAll", False)
67+
with_pagination = kwargs.get("withPagination", False)
68+
69+
self.opencti.app_logger.info(
70+
"Listing SecurityAssessment with filters", {"filters": json.dumps(filters)}
71+
)
72+
query = (
73+
"""
74+
query SecurityAssessment($filters: FilterGroup, $search: String, $first: Int, $after: ID, $orderBy: SecurityAssessmentOrdering, $orderMode: OrderingMode) {
75+
securityAssessments(filters: $filters, search: $search, first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode) {
76+
edges {
77+
node {
78+
"""
79+
+ (custom_attributes if custom_attributes is not None else self.properties)
80+
+ """
81+
}
82+
}
83+
pageInfo {
84+
startCursor
85+
endCursor
86+
hasNextPage
87+
hasPreviousPage
88+
globalCount
89+
}
90+
}
91+
}
92+
"""
93+
)
94+
result = self.opencti.query(
95+
query,
96+
{
97+
"filters": filters,
98+
"search": search,
99+
"first": first,
100+
"after": after,
101+
"orderBy": order_by,
102+
"orderMode": order_mode,
103+
},
104+
)
105+
106+
if get_all:
107+
final_data = []
108+
data = self.opencti.process_multiple(result["data"]["securityAssessments"])
109+
final_data = final_data + data
110+
while result["data"]["securityAssessments"]["pageInfo"]["hasNextPage"]:
111+
after = result["data"]["securityAssessments"]["pageInfo"]["endCursor"]
112+
self.opencti.app_logger.info(
113+
"Listing SecurityAssessment", {"after": after}
114+
)
115+
result = self.opencti.query(
116+
query,
117+
{
118+
"filters": filters,
119+
"search": search,
120+
"first": first,
121+
"after": after,
122+
"orderBy": order_by,
123+
"orderMode": order_mode,
124+
},
125+
)
126+
data = self.opencti.process_multiple(
127+
result["data"]["securityAssessments"]
128+
)
129+
final_data = final_data + data
130+
return final_data
131+
else:
132+
return self.opencti.process_multiple(
133+
result["data"]["securityAssessments"], with_pagination
134+
)
135+
136+
"""
137+
Read a SecurityAssessment object
138+
139+
:param id: the id of the SecurityAssessment
140+
:param filters: the filters to apply if no id provided
141+
:return SecurityAssessment object
142+
"""
143+
144+
def read(self, **kwargs):
145+
id = kwargs.get("id", None)
146+
filters = kwargs.get("filters", None)
147+
custom_attributes = kwargs.get("customAttributes", None)
148+
if id is not None:
149+
self.opencti.app_logger.info("Reading SecurityAssessment", {"id": id})
150+
query = (
151+
"""
152+
query SecurityAssessment($id: String!) {
153+
securityAssessment(id: $id) {
154+
"""
155+
+ (
156+
custom_attributes
157+
if custom_attributes is not None
158+
else self.properties
159+
)
160+
+ """
161+
}
162+
}
163+
"""
164+
)
165+
result = self.opencti.query(query, {"id": id})
166+
return self.opencti.process_multiple_fields(
167+
result["data"]["securityAssessment"]
168+
)
169+
elif filters is not None:
170+
result = self.list(filters=filters)
171+
if len(result) > 0:
172+
return result[0]
173+
else:
174+
return None
175+
else:
176+
self.opencti.app_logger.error(
177+
"[opencti_tool] Missing parameters: id or filters"
178+
)
179+
return None

pycti/utils/opencti_stix2.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -885,6 +885,7 @@ def get_readers(self):
885885
"Tool": self.opencti.tool.read,
886886
"Vocabulary": self.opencti.vocabulary.read,
887887
"Vulnerability": self.opencti.vulnerability.read,
888+
"SecurityAssessment": self.opencti.security_assessment.read,
888889
}
889890

890891
def get_reader(self, entity_type: str):

0 commit comments

Comments
 (0)