Skip to content

Commit 1d13c3d

Browse files
committed
fixing enrichment map
1 parent 816e3e3 commit 1d13c3d

File tree

3 files changed

+239
-65
lines changed

3 files changed

+239
-65
lines changed

contentctl/enrichments/attack_enrichment.py

Lines changed: 66 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,44 @@
11
from __future__ import annotations
2-
from attackcti import attack_client
2+
33
import logging
4-
from pydantic import BaseModel
54
from dataclasses import field
6-
from typing import Any
75
from pathlib import Path
6+
from typing import Any, Dict, List, TypedDict, cast
7+
8+
from attackcti import attack_client # type: ignore[reportMissingTypeStubs]
9+
from pydantic import BaseModel
10+
11+
from contentctl.objects.annotated_types import MITRE_ATTACK_ID_TYPE
12+
from contentctl.objects.config import validate
813
from contentctl.objects.mitre_attack_enrichment import (
914
MitreAttackEnrichment,
1015
MitreTactics,
1116
)
12-
from contentctl.objects.config import validate
13-
from contentctl.objects.annotated_types import MITRE_ATTACK_ID_TYPE
1417

18+
# Suppress attackcti logging
1519
logging.getLogger("taxii2client").setLevel(logging.CRITICAL)
20+
logging.getLogger("stix2").setLevel(logging.CRITICAL)
21+
22+
23+
class AttackPattern(TypedDict):
24+
id: str
25+
technique_id: str
26+
technique: str
27+
tactic: List[str]
28+
29+
30+
class IntrusionSet(TypedDict):
31+
id: str
32+
group: str
33+
34+
35+
class Relationship(TypedDict):
36+
target_object: str
37+
source_object: str
1638

1739

1840
class AttackEnrichment(BaseModel):
19-
data: dict[str, MitreAttackEnrichment] = field(default_factory=dict)
41+
data: Dict[str, MitreAttackEnrichment] = field(default_factory=dict)
2042
use_enrichment: bool = True
2143

2244
@staticmethod
@@ -42,7 +64,7 @@ def getEnrichmentByMitreID(
4264
)
4365

4466
def addMitreIDViaGroupNames(
45-
self, technique: dict[str, Any], tactics: list[str], groupNames: list[str]
67+
self, technique: Dict[str, Any], tactics: List[str], groupNames: List[str]
4668
) -> None:
4769
technique_id = technique["technique_id"]
4870
technique_obj = technique["technique"]
@@ -62,15 +84,15 @@ def addMitreIDViaGroupNames(
6284

6385
def addMitreIDViaGroupObjects(
6486
self,
65-
technique: dict[str, Any],
66-
tactics: list[MitreTactics],
67-
groupDicts: list[dict[str, Any]],
87+
technique: Dict[str, Any],
88+
tactics: List[MitreTactics],
89+
groupDicts: List[Dict[str, Any]],
6890
) -> None:
6991
technique_id = technique["technique_id"]
7092
technique_obj = technique["technique"]
7193
tactics.sort()
7294

73-
groupNames: list[str] = sorted([group["group"] for group in groupDicts])
95+
groupNames: List[str] = sorted([group["group"] for group in groupDicts])
7496

7597
if technique_id in self.data:
7698
raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'")
@@ -87,8 +109,8 @@ def addMitreIDViaGroupObjects(
87109

88110
def get_attack_lookup(
89111
self, input_path: Path, enrichments: bool = False
90-
) -> dict[str, MitreAttackEnrichment]:
91-
attack_lookup: dict[str, MitreAttackEnrichment] = {}
112+
) -> Dict[str, MitreAttackEnrichment]:
113+
attack_lookup: Dict[str, MitreAttackEnrichment] = {}
92114
if not enrichments:
93115
return attack_lookup
94116

@@ -98,11 +120,6 @@ def get_attack_lookup(
98120
end="",
99121
flush=True,
100122
)
101-
# The existence of the input_path is validated during cli argument validation, but it is
102-
# possible that the repo is in the wrong format. If the following directories do not
103-
# exist, then attack_client will fall back to resolving via REST API. We do not
104-
# want this as it is slow and error prone, so we will force an exception to
105-
# be generated.
106123
enterprise_path = input_path / "enterprise-attack"
107124
mobile_path = input_path / "ics-attack"
108125
ics_path = input_path / "mobile-attack"
@@ -123,36 +140,47 @@ def get_attack_lookup(
123140
}
124141
)
125142

126-
all_enterprise_techniques = lift.get_enterprise_techniques(
127-
stix_format=False
143+
all_enterprise_techniques = cast(
144+
List[AttackPattern], lift.get_enterprise_techniques(stix_format=False)
128145
)
129-
enterprise_relationships = lift.get_enterprise_relationships(
130-
stix_format=False
146+
enterprise_relationships = cast(
147+
List[Relationship], lift.get_enterprise_relationships(stix_format=False)
148+
)
149+
enterprise_groups = cast(
150+
List[IntrusionSet], lift.get_enterprise_groups(stix_format=False)
131151
)
132-
enterprise_groups = lift.get_enterprise_groups(stix_format=False)
133152

134153
for technique in all_enterprise_techniques:
135-
apt_groups: list[dict[str, Any]] = []
154+
apt_groups: List[Dict[str, Any]] = []
136155
for relationship in enterprise_relationships:
137-
if (
138-
relationship["target_object"] == technique["id"]
139-
) and relationship["source_object"].startswith("intrusion-set"):
156+
if relationship["target_object"] == technique[
157+
"id"
158+
] and relationship["source_object"].startswith("intrusion-set"):
140159
for group in enterprise_groups:
141160
if relationship["source_object"] == group["id"]:
142-
apt_groups.append(group)
143-
# apt_groups.append(group['group'])
161+
apt_groups.append(dict(group))
144162

145-
tactics = []
163+
tactics: List[MitreTactics] = []
146164
if "tactic" in technique:
147165
for tactic in technique["tactic"]:
148-
tactics.append(tactic.replace("-", " ").title())
149-
150-
self.addMitreIDViaGroupObjects(technique, tactics, apt_groups)
151-
attack_lookup[technique["technique_id"]] = {
152-
"technique": technique["technique"],
153-
"tactics": tactics,
154-
"groups": apt_groups,
155-
}
166+
tactics.append(
167+
cast(MitreTactics, tactic.replace("-", " ").title())
168+
)
169+
170+
self.addMitreIDViaGroupObjects(dict(technique), tactics, apt_groups)
171+
attack_lookup[technique["technique_id"]] = (
172+
MitreAttackEnrichment.model_validate(
173+
{
174+
"mitre_attack_id": technique["technique_id"],
175+
"mitre_attack_technique": technique["technique"],
176+
"mitre_attack_tactics": tactics,
177+
"mitre_attack_groups": [
178+
group["group"] for group in apt_groups
179+
],
180+
"mitre_attack_group_objects": apt_groups,
181+
}
182+
)
183+
)
156184

157185
except Exception as err:
158186
raise Exception(f"Error getting MITRE Enrichment: {str(err)}")
Lines changed: 172 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,198 @@
1+
import json
12
import pathlib
2-
from typing import List, Union
3+
from datetime import datetime
4+
from typing import Any, Dict, List, Set, TypedDict, Union
35

46
from contentctl.objects.detection import Detection
5-
from contentctl.output.attack_nav_writer import AttackNavWriter
7+
8+
9+
class TechniqueData(TypedDict):
10+
score: int
11+
file_paths: List[str]
12+
links: List[Dict[str, str]]
13+
14+
15+
class LayerData(TypedDict):
16+
name: str
17+
versions: Dict[str, str]
18+
domain: str
19+
description: str
20+
filters: Dict[str, List[str]]
21+
sorting: int
22+
layout: Dict[str, Union[str, bool]]
23+
hideDisabled: bool
24+
techniques: List[Dict[str, Any]]
25+
gradient: Dict[str, Union[List[str], int]]
26+
legendItems: List[Dict[str, str]]
27+
showTacticRowBackground: bool
28+
tacticRowBackground: str
29+
selectTechniquesAcrossTactics: bool
30+
selectSubtechniquesWithParent: bool
31+
selectVisibleTechniques: bool
32+
metadata: List[Dict[str, str]]
633

734

835
class AttackNavOutput:
36+
def __init__(
37+
self,
38+
layer_name: str = "Splunk Detection Coverage",
39+
layer_description: str = "MITRE ATT&CK coverage for Splunk detections",
40+
layer_domain: str = "enterprise-attack",
41+
):
42+
self.layer_name = layer_name
43+
self.layer_description = layer_description
44+
self.layer_domain = layer_domain
45+
946
def writeObjects(
1047
self, detections: List[Detection], output_path: pathlib.Path
1148
) -> None:
12-
techniques: dict[str, dict[str, Union[List[str], int]]] = {}
49+
"""
50+
Generate MITRE ATT&CK Navigator layer file from detections
51+
Args:
52+
detections: List of Detection objects
53+
output_path: Path to write the layer file
54+
"""
55+
techniques: Dict[str, TechniqueData] = {}
56+
tactic_coverage: Dict[str, Set[str]] = {}
1357

58+
# Process each detection
1459
for detection in detections:
60+
if not hasattr(detection.tags, "mitre_attack_id"):
61+
continue
62+
1563
for tactic in detection.tags.mitre_attack_id:
1664
if tactic not in techniques:
17-
techniques[tactic] = {"score": 0, "file_paths": []}
65+
techniques[tactic] = {"score": 0, "file_paths": [], "links": []}
66+
tactic_coverage[tactic] = set()
1867

1968
detection_type = detection.source
20-
detection_id = detection.id
69+
detection_id = str(detection.id) # Convert UUID to string
70+
detection_url = (
71+
f"https://research.splunk.com/{detection_type}/{detection_id}/"
72+
)
73+
detection_name = detection.name.replace(
74+
"_", " "
75+
).title() # Convert to Title Case
76+
detection_info = f"{detection_name}"
2177

22-
# Store all three pieces of information separately
23-
detection_info = f"{detection_type}|{detection_id}|{detection.name}"
78+
techniques[tactic]["score"] += 1
79+
techniques[tactic]["file_paths"].append(detection_info)
80+
techniques[tactic]["links"].append(
81+
{"label": detection_name, "url": detection_url}
82+
)
83+
tactic_coverage[tactic].add(detection_id)
2484

25-
techniques[tactic]["score"] = techniques[tactic].get("score", 0) + 1
26-
if isinstance(techniques[tactic]["file_paths"], list):
27-
techniques[tactic]["file_paths"].append(detection_info)
85+
# Create the layer file
86+
layer: LayerData = {
87+
"name": self.layer_name,
88+
"versions": {
89+
"attack": "14", # Update as needed
90+
"navigator": "5.1.0",
91+
"layer": "4.5",
92+
},
93+
"domain": self.layer_domain,
94+
"description": self.layer_description,
95+
"filters": {
96+
"platforms": [
97+
"Windows",
98+
"Linux",
99+
"macOS",
100+
"AWS",
101+
"GCP",
102+
"Azure",
103+
"Office 365",
104+
"SaaS",
105+
]
106+
},
107+
"sorting": 0,
108+
"layout": {
109+
"layout": "flat",
110+
"showName": True,
111+
"showID": False,
112+
"showAggregateScores": True,
113+
"countUnscored": True,
114+
"aggregateFunction": "average",
115+
"expandedSubtechniques": "none",
116+
},
117+
"hideDisabled": False,
118+
"techniques": [
119+
{
120+
"techniqueID": tid,
121+
"score": data["score"],
122+
"metadata": [
123+
{
124+
"name": "Detections",
125+
"value": "\n".join(
126+
[f"• {name}" for name in data["file_paths"]]
127+
),
128+
},
129+
{"divider": True},
130+
{
131+
"name": "Links",
132+
"value": "\n".join(
133+
[
134+
f"• [{link['label']}]({link['url']})"
135+
for link in data["links"]
136+
]
137+
),
138+
},
139+
],
140+
"links": data["links"],
141+
}
142+
for tid, data in techniques.items()
143+
],
144+
"gradient": {
145+
"colors": [
146+
"#1a365d", # Dark blue
147+
"#2c5282", # Medium blue
148+
"#4299e1", # Light blue
149+
"#48bb78", # Light green
150+
"#38a169", # Medium green
151+
"#276749", # Dark green
152+
],
153+
"minValue": 0,
154+
"maxValue": 5, # Adjust based on your max detections per technique
155+
},
156+
"legendItems": [
157+
{"label": "1 Detection", "color": "#1a365d"},
158+
{"label": "2 Detections", "color": "#4299e1"},
159+
{"label": "3 Detections", "color": "#48bb78"},
160+
{"label": "4+ Detections", "color": "#276749"},
161+
],
162+
"showTacticRowBackground": True,
163+
"tacticRowBackground": "#dddddd",
164+
"selectTechniquesAcrossTactics": True,
165+
"selectSubtechniquesWithParent": True,
166+
"selectVisibleTechniques": False,
167+
"metadata": [
168+
{"name": "Generated", "value": datetime.now().isoformat()},
169+
{"name": "Total Detections", "value": str(len(detections))},
170+
{"name": "Covered Techniques", "value": str(len(techniques))},
171+
],
172+
}
28173

29-
"""
30-
for detection in objects:
31-
if detection.tags.mitre_attack_enrichments:
32-
for mitre_attack_enrichment in detection.tags.mitre_attack_enrichments:
33-
if not mitre_attack_enrichment.mitre_attack_id in techniques:
34-
techniques[mitre_attack_enrichment.mitre_attack_id] = {
35-
'score': 1,
36-
'file_paths': ['https://github.com/splunk/security_content/blob/develop/detections/' + detection.getSource() + '/' + self.convertNameToFileName(detection.name)]
37-
}
38-
else:
39-
techniques[mitre_attack_enrichment.mitre_attack_id]['score'] = techniques[mitre_attack_enrichment.mitre_attack_id]['score'] + 1
40-
techniques[mitre_attack_enrichment.mitre_attack_id]['file_paths'].append('https://github.com/splunk/security_content/blob/develop/detections/' + detection.getSource() + '/' + self.convertNameToFileName(detection.name))
41-
"""
42-
AttackNavWriter.writeAttackNavFile(techniques, output_path / "coverage.json")
174+
# Write the layer file
175+
output_file = output_path / "coverage.json"
176+
with open(output_file, "w") as f:
177+
json.dump(layer, f, indent=2)
178+
179+
print(f"\n✅ MITRE ATT&CK Navigator layer file written to: {output_file}")
180+
print("📊 Coverage Summary:")
181+
print(f" Total Detections: {len(detections)}")
182+
print(f" Covered Techniques: {len(techniques)}")
183+
print(f" Tactics with Coverage: {len(tactic_coverage)}")
184+
print("\n🗺️ To view the layer:")
185+
print(" 1. Go to https://mitre-attack.github.io/attack-navigator/")
186+
print(" 2. Click 'Open Existing Layer'")
187+
print(f" 3. Select the file: {output_file}")
43188

44-
def convertNameToFileName(self, name: str):
189+
def convertNameToFileName(self, name: str) -> str:
190+
"""Convert a detection name to a valid filename"""
45191
file_name = (
46192
name.replace(" ", "_")
47193
.replace("-", "_")
48194
.replace(".", "_")
49195
.replace("/", "_")
50196
.lower()
51197
)
52-
file_name = file_name + ".yml"
53-
return file_name
198+
return f"{file_name}.yml"

0 commit comments

Comments
 (0)