Skip to content
This repository was archived by the owner on Dec 5, 2025. It is now read-only.

Commit cd92f43

Browse files
[client] Adapt attack pattern and relationship stix ids
1 parent 10feec6 commit cd92f43

File tree

4 files changed

+49
-13
lines changed

4 files changed

+49
-13
lines changed

pycti/entities/opencti_attack_pattern.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,8 @@ def generate_id(name, x_mitre_id=None):
232232

233233
@staticmethod
234234
def generate_id_from_data(data):
235-
return AttackPattern.generate_id(data.get("name"), data.get("x_mitre_id"))
235+
external_id = data.get("x_mitre_id") or data.get("x_opencti_external_id")
236+
return AttackPattern.generate_id(data.get("name"), external_id)
236237

237238
"""
238239
List Attack-Pattern objects

pycti/entities/opencti_stix_core_relationship.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,7 @@ def create(self, **kwargs):
610610
kill_chain_phases = kwargs.get("killChainPhases", None)
611611
granted_refs = kwargs.get("objectOrganization", None)
612612
x_opencti_workflow_id = kwargs.get("x_opencti_workflow_id", None)
613+
x_opencti_stix_ids = kwargs.get("x_opencti_stix_ids", None)
613614
update = kwargs.get("update", False)
614615

615616
self.opencti.app_logger.info(
@@ -653,6 +654,7 @@ def create(self, **kwargs):
653654
"externalReferences": external_references,
654655
"killChainPhases": kill_chain_phases,
655656
"x_opencti_workflow_id": x_opencti_workflow_id,
657+
"x_opencti_stix_ids": x_opencti_stix_ids,
656658
"update": update,
657659
}
658660
},
@@ -1143,6 +1145,10 @@ def import_from_stix2(self, **kwargs):
11431145
default_date = kwargs.get("defaultDate", False)
11441146
if stix_relation is not None:
11451147
# Search in extensions
1148+
if "x_opencti_stix_ids" not in stix_relation:
1149+
stix_relation["x_opencti_stix_ids"] = (
1150+
self.opencti.get_attribute_in_extension("stix_ids", stix_relation)
1151+
)
11461152
if "x_opencti_granted_refs" not in stix_relation:
11471153
stix_relation["x_opencti_granted_refs"] = (
11481154
self.opencti.get_attribute_in_extension(
@@ -1224,6 +1230,11 @@ def import_from_stix2(self, **kwargs):
12241230
if "x_opencti_workflow_id" in stix_relation
12251231
else None
12261232
),
1233+
x_opencti_stix_ids=(
1234+
stix_relation["x_opencti_stix_ids"]
1235+
if "x_opencti_stix_ids" in stix_relation
1236+
else None
1237+
),
12271238
update=update,
12281239
)
12291240
else:

pycti/utils/opencti_stix2.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2412,18 +2412,31 @@ def prepare_bundle_ids(self, bundle, use_json=True, keep_original_id=False):
24122412
helper = stix_helpers.get(item["type"])
24132413
if hasattr(helper, "generate_id_from_data"):
24142414
standard_id = helper.generate_id_from_data(item)
2415-
cache_ids[item["id"]] = standard_id
2415+
if item["id"] != standard_id:
2416+
cache_ids[item["id"]] = standard_id
2417+
2418+
# If no id to rewrite, return the processed bundle
2419+
if not bool(cache_ids):
2420+
# As rewrite can have multiple roundtrip
2421+
# we kept the first original id in a meta attribute
2422+
# final behavior is to rewrite this in the x_opencti_stix_ids
2423+
for item in bundle_data["objects"]:
2424+
if "x_keep_original_id" in item:
2425+
item["x_opencti_stix_ids"] = item.get("x_opencti_stix_ids", []) + [
2426+
item["x_keep_original_id"]
2427+
]
2428+
del item["x_keep_original_id"]
2429+
return json.dumps(bundle_data) if use_json else bundle_data
2430+
24162431
# Second iteration to replace and remap
24172432
for item in bundle_data["objects"]:
24182433
# For entities, try to replace the main id
24192434
# Keep the current one if needed
24202435
if cache_ids.get(item["id"]):
24212436
original_id = item["id"]
24222437
item["id"] = cache_ids[original_id]
2423-
if keep_original_id:
2424-
item["x_opencti_stix_ids"] = item.get("x_opencti_stix_ids", []) + [
2425-
original_id
2426-
]
2438+
if keep_original_id and "x_keep_original_id" not in item:
2439+
item["x_keep_original_id"] = original_id
24272440
# For all elements, replace all refs (source_ref, object_refs, ...)
24282441
ref_keys = list(
24292442
filter(lambda i: i.endswith("_ref") or i.endswith("_refs"), item.keys())
@@ -2436,7 +2449,9 @@ def prepare_bundle_ids(self, bundle, use_json=True, keep_original_id=False):
24362449
else:
24372450
item[ref_key] = cache_ids.get(item[ref_key], item[ref_key])
24382451

2439-
return json.dumps(bundle_data) if use_json else bundle_data
2452+
return self.prepare_bundle_ids(
2453+
bundle_data, False, keep_original_id=keep_original_id
2454+
)
24402455

24412456
def import_item(
24422457
self,

tests/01-unit/stix/test_bundle_ids_rewrite.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ def test_ids_generation():
2323
gen_id = get_cti_helper().generate_standard_id_from_stix
2424
# attack-pattern
2525
assert gen_id({"type": "attack-pattern", "name": "attack"}) =='attack-pattern--25f21617-8de8-5d5e-8cd4-b7e88547ba76'
26-
assert gen_id({"type": "attack-pattern", "name": "attack", "x_mitre_id": 'MITREID'}) == 'attack-pattern--b74cfee2-7b14-585e-862f-fea45e802da9'
26+
assert gen_id({"type": "attack-pattern", "name": "attack", "x_opencti_external_id": 'MITREID'}) == 'attack-pattern--b74cfee2-7b14-585e-862f-fea45e802da9'
27+
assert gen_id({"type": "attack-pattern", "name": "Spear phishing messages with malicious links", "x_mitre_id": 'T1368'}) == 'attack-pattern--a01046cc-192f-5d52-8e75-6e447fae3890'
2728
assert gen_id({"type": "attack-pattern", "x_mitre_id": "MITREID"}) == 'attack-pattern--b74cfee2-7b14-585e-862f-fea45e802da9'
2829
# campaign
2930
assert gen_id({"type": "campaign", "name": "attack"}) == 'campaign--25f21617-8de8-5d5e-8cd4-b7e88547ba76'
@@ -101,6 +102,7 @@ def test_ids_generation():
101102
assert gen_id(base_relationship) == "relationship--0b11fa67-da01-5d34-9864-67d4d71c3740"
102103
assert gen_id({**base_relationship, "start_time": "2022-11-25T19:00:05.000Z"}) == "relationship--c5e1e2ce-14d6-535b-911d-267e92119e01"
103104
assert gen_id({**base_relationship, "start_time": "2022-11-25T19:00:05.000Z", "stop_time": "2022-11-26T19:00:05.000Z"}) == "relationship--a7778a7d-a743-5193-9912-89f88f9ed0b4"
105+
assert gen_id({"type": "relationship", 'relationship_type': 'uses', 'source_ref': 'malware--21c45dbe-54ec-5bb7-b8cd-9f27cc518714', 'start_time': '2020-02-29T22:30:00.000Z', 'stop_time': '2020-02-29T22:30:00.000Z', 'target_ref': 'attack-pattern--fd8179dd-1632-5ec8-8b93-d2ae121e05a4'}) == 'relationship--67f5f01f-6b15-5154-ae31-019a75fedcff'
104106
# sighting
105107
base_sighting = {"type": "sighting", "sighting_of_ref": "from_id", "where_sighted_refs": ["to_id"]}
106108
assert gen_id(base_sighting) == 'sighting--161901df-21bb-527a-b96b-354119279fe2'
@@ -112,18 +114,23 @@ def test_ids_generation():
112114
def test_prepare_bundle_ids_keep_original():
113115
helper = get_cti_helper()
114116
bundle_data = load_test_file()
115-
malware_source = bundle_data["objects"][0]
116-
assert malware_source["id"] == "malware--d650c5b9-4b43-5781-8576-ea52bd6c7ce5"
117-
assert malware_source.get("x_opencti_stix_ids") is None
117+
# malware_source = bundle_data["objects"][0]
118+
# assert malware_source["id"] == "malware--d650c5b9-4b43-5781-8576-ea52bd6c7ce5"
119+
# assert malware_source.get("x_opencti_stix_ids") is None
118120
prepared_bundle = helper.prepare_bundle_ids(
119121
bundle=bundle_data, use_json=False, keep_original_id=True
120122
)
121-
print(json.dumps(prepared_bundle))
122123
malware_target = prepared_bundle["objects"][0]
123124
assert malware_target["id"] == "malware--d650c5b9-4b43-5781-8576-ea52bd6c7ce0"
124125
assert malware_target.get("x_opencti_stix_ids") == [
125126
"malware--d650c5b9-4b43-5781-8576-ea52bd6c7ce5"
126127
]
128+
sighting = prepared_bundle["objects"][5]
129+
assert sighting["id"] == "sighting--287d622a-9ffd-5e7f-bb0b-67f1e320f752"
130+
assert (
131+
sighting["x_opencti_stix_ids"][0]
132+
== "sighting--ee20065d-2555-424f-ad9e-0f8428623c75"
133+
)
127134

128135

129136
def test_prepare_bundle_ids():
@@ -135,7 +142,9 @@ def test_prepare_bundle_ids():
135142
prepared_bundle = helper.prepare_bundle_ids(
136143
bundle=bundle_data, use_json=False, keep_original_id=False
137144
)
138-
print(json.dumps(prepared_bundle))
139145
malware_target = prepared_bundle["objects"][0]
140146
assert malware_target["id"] == "malware--d650c5b9-4b43-5781-8576-ea52bd6c7ce0"
141147
assert malware_target.get("x_opencti_stix_ids") is None
148+
sighting = prepared_bundle["objects"][5]
149+
assert sighting["id"] == "sighting--287d622a-9ffd-5e7f-bb0b-67f1e320f752"
150+
assert ("x_opencti_stix_ids" in sighting) == False

0 commit comments

Comments
 (0)