Skip to content

Commit a056686

Browse files
committed
Add atlas coverage and additional genai protections
1 parent ba44f43 commit a056686

10 files changed

+8119
-21
lines changed

detection_rules/atlas.py

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
# or more contributor license agreements. Licensed under the Elastic License
3+
# 2.0; you may not use this file except in compliance with the Elastic License
4+
# 2.0.
5+
6+
"""Mitre ATLAS info."""
7+
8+
from collections import OrderedDict
9+
from pathlib import Path
10+
from typing import Any
11+
12+
import requests
13+
import yaml
14+
from semver import Version
15+
16+
from .utils import cached, clear_caches, get_etc_path
17+
18+
ATLAS_FILE = get_etc_path(["ATLAS.yaml"])
19+
20+
# Maps tactic name to tactic ID (e.g., "Collection" -> "AML.TA0009")
21+
tactics_map: dict[str, str] = {}
22+
technique_lookup: dict[str, dict[str, Any]] = {}
23+
matrix: dict[str, list[str]] = {} # Maps tactic name to list of technique IDs
24+
25+
26+
def get_atlas_file_path() -> Path:
27+
"""Get the path to the ATLAS YAML file."""
28+
if not ATLAS_FILE.exists():
29+
# Try to download it if it doesn't exist
30+
download_atlas_data()
31+
return ATLAS_FILE
32+
33+
34+
def download_atlas_data(save: bool = True) -> dict[str, Any] | None:
35+
"""Download ATLAS data from MITRE."""
36+
url = (
37+
"https://raw.githubusercontent.com/mitre-atlas/"
38+
"atlas-data/main/dist/ATLAS.yaml"
39+
)
40+
r = requests.get(url, timeout=30)
41+
r.raise_for_status()
42+
atlas_data = yaml.safe_load(r.text)
43+
44+
if save:
45+
ATLAS_FILE.write_text(r.text)
46+
print(f"Downloaded ATLAS data to {ATLAS_FILE}")
47+
48+
return atlas_data
49+
50+
51+
@cached
52+
def load_atlas_yaml() -> dict[str, Any]:
53+
"""Load ATLAS data from YAML file."""
54+
atlas_file = get_atlas_file_path()
55+
return yaml.safe_load(atlas_file.read_text())
56+
57+
58+
atlas = load_atlas_yaml()
59+
60+
# Extract version
61+
CURRENT_ATLAS_VERSION = atlas.get("version", "unknown")
62+
63+
# Process the ATLAS matrix
64+
if "matrices" in atlas and len(atlas["matrices"]) > 0:
65+
matrix_data = atlas["matrices"][0] # Use the first matrix (usually "ATLAS Matrix")
66+
67+
# Build tactics map
68+
if "tactics" in matrix_data:
69+
for tactic in matrix_data["tactics"]:
70+
tactic_id = tactic["id"]
71+
tactic_name = tactic["name"]
72+
tactics_map[tactic_name] = tactic_id
73+
74+
# Build technique lookup and matrix
75+
if "techniques" in matrix_data:
76+
for technique in matrix_data["techniques"]:
77+
technique_id = technique["id"]
78+
technique_name = technique["name"]
79+
technique_tactics = technique.get("tactics", [])
80+
81+
# Store technique info
82+
technique_lookup[technique_id] = {
83+
"name": technique_name,
84+
"id": technique_id,
85+
"tactics": technique_tactics,
86+
}
87+
88+
# Build matrix: map tactic IDs to technique IDs
89+
for tech_tactic_id in technique_tactics:
90+
# Find tactic name from ID
91+
tech_tactic_name = next(
92+
(name for name, tid in tactics_map.items() if tid == tech_tactic_id),
93+
None
94+
)
95+
if tech_tactic_name:
96+
if tech_tactic_name not in matrix:
97+
matrix[tech_tactic_name] = []
98+
if technique_id not in matrix[tech_tactic_name]:
99+
matrix[tech_tactic_name].append(technique_id)
100+
101+
# Sort matrix values
102+
for val in matrix.values():
103+
val.sort(key=lambda tid: technique_lookup.get(tid, {}).get("name", "").lower())
104+
105+
technique_lookup = OrderedDict(sorted(technique_lookup.items()))
106+
techniques = sorted({v["name"] for _, v in technique_lookup.items()})
107+
technique_id_list = [t for t in technique_lookup if "." not in t]
108+
sub_technique_id_list = [t for t in technique_lookup if "." in t]
109+
tactics = list(tactics_map)
110+
111+
112+
def refresh_atlas_data(save: bool = True) -> dict[str, Any] | None:
113+
"""Refresh ATLAS data from MITRE."""
114+
atlas_file = get_atlas_file_path()
115+
current_version_str = CURRENT_ATLAS_VERSION
116+
117+
try:
118+
current_version = Version.parse(
119+
current_version_str, optional_minor_and_patch=True
120+
)
121+
except (ValueError, TypeError):
122+
# If version parsing fails, download anyway
123+
current_version = Version.parse("0.0.0", optional_minor_and_patch=True)
124+
125+
# Get latest version from GitHub
126+
r = requests.get("https://api.github.com/repos/mitre-atlas/atlas-data/tags", timeout=30)
127+
r.raise_for_status()
128+
releases = r.json()
129+
if not releases:
130+
print("No releases found")
131+
return None
132+
133+
# Find latest version (tags might be like "v5.1.0" or "5.1.0")
134+
latest_release = None
135+
latest_version = current_version
136+
for release in releases:
137+
tag_name = release["name"].lstrip("v")
138+
try:
139+
ver = Version.parse(tag_name, optional_minor_and_patch=True)
140+
if ver > latest_version:
141+
latest_version = ver
142+
latest_release = release
143+
except (ValueError, TypeError):
144+
continue
145+
146+
if latest_release is None:
147+
print(f"No versions newer than the current detected: {current_version_str}")
148+
return None
149+
150+
download = (
151+
f"https://raw.githubusercontent.com/mitre-atlas/atlas-data/"
152+
f"{latest_release['name']}/dist/ATLAS.yaml"
153+
)
154+
r = requests.get(download, timeout=30)
155+
r.raise_for_status()
156+
atlas_data = yaml.safe_load(r.text)
157+
158+
if save:
159+
_ = atlas_file.write_text(r.text)
160+
print(f"Replaced file: {atlas_file} with version {latest_version}")
161+
162+
# Clear cache to reload
163+
clear_caches()
164+
165+
return atlas_data
166+
167+
168+
def build_threat_map_entry(
169+
tactic_name: str, *technique_ids: str
170+
) -> dict[str, Any]:
171+
"""Build rule threat map from ATLAS technique IDs."""
172+
url_base = "https://atlas.mitre.org/{type}/{id}/"
173+
tactic_id = tactics_map.get(tactic_name)
174+
if not tactic_id:
175+
raise ValueError(f"Unknown ATLAS tactic: {tactic_name}")
176+
177+
tech_entries: dict[str, Any] = {}
178+
179+
def make_entry(_id: str) -> dict[str, Any]:
180+
tech_info = technique_lookup.get(_id)
181+
if not tech_info:
182+
raise ValueError(f"Unknown ATLAS technique ID: {_id}")
183+
return {
184+
"id": _id,
185+
"name": tech_info["name"],
186+
"reference": url_base.format(type="techniques", id=_id.replace(".", "/")),
187+
}
188+
189+
for tid in technique_ids:
190+
if tid not in technique_lookup:
191+
raise ValueError(f"Unknown ATLAS technique ID: {tid}")
192+
193+
tech_info = technique_lookup[tid]
194+
tech_tactic_ids = tech_info.get("tactics", [])
195+
if tactic_id not in tech_tactic_ids:
196+
raise ValueError(
197+
f"ATLAS technique ID: {tid} does not fall under "
198+
f"tactic: {tactic_name}"
199+
)
200+
201+
# Handle sub-techniques (e.g., AML.T0000.000)
202+
if "." in tid and tid.count(".") > 1:
203+
# This is a sub-technique
204+
parts = tid.rsplit(".", 1)
205+
parent_technique = parts[0]
206+
tech_entries.setdefault(parent_technique, make_entry(parent_technique))
207+
tech_entries[parent_technique].setdefault("subtechnique", []).append(make_entry(tid))
208+
else:
209+
tech_entries.setdefault(tid, make_entry(tid))
210+
211+
entry: dict[str, Any] = {
212+
"framework": "MITRE ATLAS",
213+
"tactic": {
214+
"id": tactic_id,
215+
"name": tactic_name,
216+
"reference": url_base.format(type="tactics", id=tactic_id),
217+
},
218+
}
219+
220+
if tech_entries:
221+
entry["technique"] = sorted(
222+
tech_entries.values(), key=lambda x: x["id"]
223+
)
224+
225+
return entry

0 commit comments

Comments
 (0)