Skip to content

Commit b6b7fcd

Browse files
authored
Merge pull request #413 from splunk/TR-3506_mitre_update
TR-3506 MITRE MAP Update
2 parents 8877ca0 + dba378a commit b6b7fcd

File tree

3 files changed

+230
-58
lines changed

3 files changed

+230
-58
lines changed

contentctl/enrichments/attack_enrichment.py

Lines changed: 57 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,40 @@
11
from __future__ import annotations
2-
from attackcti import attack_client
2+
33
import logging
4-
from pydantic import BaseModel
54
from dataclasses import field
6-
from typing import Any
75
from pathlib import Path
6+
from typing import Any, TypedDict, cast
7+
8+
from attackcti import attack_client # type: ignore[reportMissingTypeStubs]
9+
from pydantic import BaseModel
10+
11+
from contentctl.objects.annotated_types import MITRE_ATTACK_ID_TYPE
12+
from contentctl.objects.config import validate
813
from contentctl.objects.mitre_attack_enrichment import (
914
MitreAttackEnrichment,
1015
MitreTactics,
1116
)
12-
from contentctl.objects.config import validate
13-
from contentctl.objects.annotated_types import MITRE_ATTACK_ID_TYPE
1417

18+
# Suppress attackcti logging
1519
logging.getLogger("taxii2client").setLevel(logging.CRITICAL)
20+
logging.getLogger("stix2").setLevel(logging.CRITICAL)
21+
22+
23+
class AttackPattern(TypedDict):
24+
id: str
25+
technique_id: str
26+
technique: str
27+
tactic: list[str]
28+
29+
30+
class IntrusionSet(TypedDict):
31+
id: str
32+
group: str
33+
34+
35+
class Relationship(TypedDict):
36+
target_object: str
37+
source_object: str
1638

1739

1840
class AttackEnrichment(BaseModel):
@@ -98,11 +120,6 @@ def get_attack_lookup(
98120
end="",
99121
flush=True,
100122
)
101-
# The existence of the input_path is validated during cli argument validation, but it is
102-
# possible that the repo is in the wrong format. If the following directories do not
103-
# exist, then attack_client will fall back to resolving via REST API. We do not
104-
# want this as it is slow and error prone, so we will force an exception to
105-
# be generated.
106123
enterprise_path = input_path / "enterprise-attack"
107124
mobile_path = input_path / "ics-attack"
108125
ics_path = input_path / "mobile-attack"
@@ -123,36 +140,47 @@ def get_attack_lookup(
123140
}
124141
)
125142

126-
all_enterprise_techniques = lift.get_enterprise_techniques(
127-
stix_format=False
143+
all_enterprise_techniques = cast(
144+
list[AttackPattern], lift.get_enterprise_techniques(stix_format=False)
128145
)
129-
enterprise_relationships = lift.get_enterprise_relationships(
130-
stix_format=False
146+
enterprise_relationships = cast(
147+
list[Relationship], lift.get_enterprise_relationships(stix_format=False)
148+
)
149+
enterprise_groups = cast(
150+
list[IntrusionSet], lift.get_enterprise_groups(stix_format=False)
131151
)
132-
enterprise_groups = lift.get_enterprise_groups(stix_format=False)
133152

134153
for technique in all_enterprise_techniques:
135154
apt_groups: list[dict[str, Any]] = []
136155
for relationship in enterprise_relationships:
137-
if (
138-
relationship["target_object"] == technique["id"]
139-
) and relationship["source_object"].startswith("intrusion-set"):
156+
if relationship["target_object"] == technique[
157+
"id"
158+
] and relationship["source_object"].startswith("intrusion-set"):
140159
for group in enterprise_groups:
141160
if relationship["source_object"] == group["id"]:
142-
apt_groups.append(group)
143-
# apt_groups.append(group['group'])
161+
apt_groups.append(dict(group))
144162

145-
tactics = []
163+
tactics: list[MitreTactics] = []
146164
if "tactic" in technique:
147165
for tactic in technique["tactic"]:
148-
tactics.append(tactic.replace("-", " ").title())
149-
150-
self.addMitreIDViaGroupObjects(technique, tactics, apt_groups)
151-
attack_lookup[technique["technique_id"]] = {
152-
"technique": technique["technique"],
153-
"tactics": tactics,
154-
"groups": apt_groups,
155-
}
166+
tactics.append(
167+
cast(MitreTactics, tactic.replace("-", " ").title())
168+
)
169+
170+
self.addMitreIDViaGroupObjects(dict(technique), tactics, apt_groups)
171+
attack_lookup[technique["technique_id"]] = (
172+
MitreAttackEnrichment.model_validate(
173+
{
174+
"mitre_attack_id": technique["technique_id"],
175+
"mitre_attack_technique": technique["technique"],
176+
"mitre_attack_tactics": tactics,
177+
"mitre_attack_groups": [
178+
group["group"] for group in apt_groups
179+
],
180+
"mitre_attack_group_objects": apt_groups,
181+
}
182+
)
183+
)
156184

157185
except Exception as err:
158186
raise Exception(f"Error getting MITRE Enrichment: {str(err)}")
Lines changed: 172 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,197 @@
1+
# Standard library imports
2+
import json
13
import pathlib
2-
from typing import List, Union
4+
from datetime import datetime
5+
from typing import Any, TypedDict
36

7+
# Third-party imports
48
from contentctl.objects.detection import Detection
5-
from contentctl.output.attack_nav_writer import AttackNavWriter
9+
10+
11+
class TechniqueData(TypedDict):
12+
score: int
13+
file_paths: list[str]
14+
links: list[dict[str, str]]
15+
16+
17+
class LayerData(TypedDict):
18+
name: str
19+
versions: dict[str, str]
20+
domain: str
21+
description: str
22+
filters: dict[str, list[str]]
23+
sorting: int
24+
layout: dict[str, str | bool]
25+
hideDisabled: bool
26+
techniques: list[dict[str, Any]]
27+
gradient: dict[str, list[str] | int]
28+
legendItems: list[dict[str, str]]
29+
showTacticRowBackground: bool
30+
tacticRowBackground: str
31+
selectTechniquesAcrossTactics: bool
32+
selectSubtechniquesWithParent: bool
33+
selectVisibleTechniques: bool
34+
metadata: list[dict[str, str]]
635

736

837
class AttackNavOutput:
38+
def __init__(
39+
self,
40+
layer_name: str = "Splunk Detection Coverage",
41+
layer_description: str = "MITRE ATT&CK coverage for Splunk detections",
42+
layer_domain: str = "enterprise-attack",
43+
):
44+
self.layer_name = layer_name
45+
self.layer_description = layer_description
46+
self.layer_domain = layer_domain
47+
948
def writeObjects(
10-
self, detections: List[Detection], output_path: pathlib.Path
49+
self, detections: list[Detection], output_path: pathlib.Path
1150
) -> None:
12-
techniques: dict[str, dict[str, Union[List[str], int]]] = {}
51+
"""
52+
Generate MITRE ATT&CK Navigator layer file from detections
53+
Args:
54+
detections: List of Detection objects
55+
output_path: Path to write the layer file
56+
"""
57+
techniques: dict[str, TechniqueData] = {}
58+
tactic_coverage: dict[str, set[str]] = {}
1359

60+
# Process each detection
1461
for detection in detections:
62+
if not hasattr(detection.tags, "mitre_attack_id"):
63+
continue
64+
1565
for tactic in detection.tags.mitre_attack_id:
1666
if tactic not in techniques:
17-
techniques[tactic] = {"score": 0, "file_paths": []}
67+
techniques[tactic] = {"score": 0, "file_paths": [], "links": []}
68+
tactic_coverage[tactic] = set()
1869

1970
detection_type = detection.source
20-
detection_id = detection.id
71+
detection_id = str(detection.id) # Convert UUID to string
72+
detection_url = (
73+
f"https://research.splunk.com/{detection_type}/{detection_id}/"
74+
)
75+
detection_name = detection.name.replace(
76+
"_", " "
77+
).title() # Convert to Title Case
78+
detection_info = f"{detection_name}"
2179

22-
# Store all three pieces of information separately
23-
detection_info = f"{detection_type}|{detection_id}|{detection.name}"
80+
techniques[tactic]["score"] += 1
81+
techniques[tactic]["file_paths"].append(detection_info)
82+
techniques[tactic]["links"].append(
83+
{"label": detection_name, "url": detection_url}
84+
)
85+
tactic_coverage[tactic].add(detection_id)
2486

25-
techniques[tactic]["score"] = techniques[tactic].get("score", 0) + 1
26-
if isinstance(techniques[tactic]["file_paths"], list):
27-
techniques[tactic]["file_paths"].append(detection_info)
87+
# Create the layer file
88+
layer: LayerData = {
89+
"name": self.layer_name,
90+
"versions": {
91+
"attack": "14", # Update as needed
92+
"navigator": "5.1.0",
93+
"layer": "4.5",
94+
},
95+
"domain": self.layer_domain,
96+
"description": self.layer_description,
97+
"filters": {
98+
"platforms": [
99+
"Windows",
100+
"Linux",
101+
"macOS",
102+
"AWS",
103+
"GCP",
104+
"Azure",
105+
"Office 365",
106+
"SaaS",
107+
]
108+
},
109+
"sorting": 0,
110+
"layout": {
111+
"layout": "flat",
112+
"showName": True,
113+
"showID": False,
114+
"showAggregateScores": True,
115+
"countUnscored": True,
116+
"aggregateFunction": "average",
117+
"expandedSubtechniques": "none",
118+
},
119+
"hideDisabled": False,
120+
"techniques": [
121+
{
122+
"techniqueID": tid,
123+
"score": data["score"],
124+
"metadata": [
125+
{"name": "Detection", "value": name, "divider": False}
126+
for name in data["file_paths"]
127+
]
128+
+ [
129+
{
130+
"name": "Link",
131+
"value": f"[View Detection]({link['url']})",
132+
"divider": False,
133+
}
134+
for link in data["links"]
135+
],
136+
"links": [
137+
{"label": link["label"], "url": link["url"]}
138+
for link in data["links"]
139+
],
140+
}
141+
for tid, data in techniques.items()
142+
],
143+
"gradient": {
144+
"colors": [
145+
"#1a365d", # Dark blue
146+
"#2c5282", # Medium blue
147+
"#4299e1", # Light blue
148+
"#48bb78", # Light green
149+
"#38a169", # Medium green
150+
"#276749", # Dark green
151+
],
152+
"minValue": 0,
153+
"maxValue": 5, # Adjust based on your max detections per technique
154+
},
155+
"legendItems": [
156+
{"label": "1 Detection", "color": "#1a365d"},
157+
{"label": "2 Detections", "color": "#4299e1"},
158+
{"label": "3 Detections", "color": "#48bb78"},
159+
{"label": "4+ Detections", "color": "#276749"},
160+
],
161+
"showTacticRowBackground": True,
162+
"tacticRowBackground": "#dddddd",
163+
"selectTechniquesAcrossTactics": True,
164+
"selectSubtechniquesWithParent": True,
165+
"selectVisibleTechniques": False,
166+
"metadata": [
167+
{"name": "Generated", "value": datetime.now().isoformat()},
168+
{"name": "Total Detections", "value": str(len(detections))},
169+
{"name": "Covered Techniques", "value": str(len(techniques))},
170+
],
171+
}
28172

29-
"""
30-
for detection in objects:
31-
if detection.tags.mitre_attack_enrichments:
32-
for mitre_attack_enrichment in detection.tags.mitre_attack_enrichments:
33-
if not mitre_attack_enrichment.mitre_attack_id in techniques:
34-
techniques[mitre_attack_enrichment.mitre_attack_id] = {
35-
'score': 1,
36-
'file_paths': ['https://github.com/splunk/security_content/blob/develop/detections/' + detection.getSource() + '/' + self.convertNameToFileName(detection.name)]
37-
}
38-
else:
39-
techniques[mitre_attack_enrichment.mitre_attack_id]['score'] = techniques[mitre_attack_enrichment.mitre_attack_id]['score'] + 1
40-
techniques[mitre_attack_enrichment.mitre_attack_id]['file_paths'].append('https://github.com/splunk/security_content/blob/develop/detections/' + detection.getSource() + '/' + self.convertNameToFileName(detection.name))
41-
"""
42-
AttackNavWriter.writeAttackNavFile(techniques, output_path / "coverage.json")
173+
# Write the layer file
174+
output_file = output_path / "coverage.json"
175+
with open(output_file, "w") as f:
176+
json.dump(layer, f, indent=2)
177+
178+
print(f"\n✅ MITRE ATT&CK Navigator layer file written to: {output_file}")
179+
print("📊 Coverage Summary:")
180+
print(f" Total Detections: {len(detections)}")
181+
print(f" Covered Techniques: {len(techniques)}")
182+
print(f" Tactics with Coverage: {len(tactic_coverage)}")
183+
print("\n🗺️ To view the layer:")
184+
print(" 1. Go to https://mitre-attack.github.io/attack-navigator/")
185+
print(" 2. Click 'Open Existing Layer'")
186+
print(f" 3. Select the file: {output_file}")
43187

44-
def convertNameToFileName(self, name: str):
188+
def convertNameToFileName(self, name: str) -> str:
189+
"""Convert a detection name to a valid filename"""
45190
file_name = (
46191
name.replace(" ", "_")
47192
.replace("-", "_")
48193
.replace(".", "_")
49194
.replace("/", "_")
50195
.lower()
51196
)
52-
file_name = file_name + ".yml"
53-
return file_name
197+
return f"{file_name}.yml"

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[tool.poetry]
22
name = "contentctl"
33

4-
version = "5.5.3"
4+
version = "5.5.4"
55

66
description = "Splunk Content Control Tool"
77
authors = ["STRT <[email protected]>"]

0 commit comments

Comments
 (0)