Skip to content

Commit 15e0b8c

Browse files
committed
lint
1 parent e87c15d commit 15e0b8c

File tree

3 files changed

+15
-33
lines changed

3 files changed

+15
-33
lines changed

detection_rules/atlas.py

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,7 @@ def get_atlas_file_path() -> Path:
3333

3434
def download_atlas_data(save: bool = True) -> dict[str, Any] | None:
3535
"""Download ATLAS data from MITRE."""
36-
url = (
37-
"https://raw.githubusercontent.com/mitre-atlas/"
38-
"atlas-data/main/dist/ATLAS.yaml"
39-
)
36+
url = "https://raw.githubusercontent.com/mitre-atlas/atlas-data/main/dist/ATLAS.yaml"
4037
r = requests.get(url, timeout=30)
4138
r.raise_for_status()
4239
atlas_data = yaml.safe_load(r.text)
@@ -88,10 +85,7 @@ def load_atlas_yaml() -> dict[str, Any]:
8885
# Build matrix: map tactic IDs to technique IDs
8986
for tech_tactic_id in technique_tactics:
9087
# Find tactic name from ID
91-
tech_tactic_name = next(
92-
(name for name, tid in tactics_map.items() if tid == tech_tactic_id),
93-
None
94-
)
88+
tech_tactic_name = next((name for name, tid in tactics_map.items() if tid == tech_tactic_id), None)
9589
if tech_tactic_name:
9690
if tech_tactic_name not in matrix:
9791
matrix[tech_tactic_name] = []
@@ -115,9 +109,7 @@ def refresh_atlas_data(save: bool = True) -> dict[str, Any] | None:
115109
current_version_str = CURRENT_ATLAS_VERSION
116110

117111
try:
118-
current_version = Version.parse(
119-
current_version_str, optional_minor_and_patch=True
120-
)
112+
current_version = Version.parse(current_version_str, optional_minor_and_patch=True)
121113
except (ValueError, TypeError):
122114
# If version parsing fails, download anyway
123115
current_version = Version.parse("0.0.0", optional_minor_and_patch=True)
@@ -147,10 +139,7 @@ def refresh_atlas_data(save: bool = True) -> dict[str, Any] | None:
147139
print(f"No versions newer than the current detected: {current_version_str}")
148140
return None
149141

150-
download = (
151-
f"https://raw.githubusercontent.com/mitre-atlas/atlas-data/"
152-
f"{latest_release['name']}/dist/ATLAS.yaml"
153-
)
142+
download = f"https://raw.githubusercontent.com/mitre-atlas/atlas-data/{latest_release['name']}/dist/ATLAS.yaml"
154143
r = requests.get(download, timeout=30)
155144
r.raise_for_status()
156145
atlas_data = yaml.safe_load(r.text)
@@ -165,9 +154,7 @@ def refresh_atlas_data(save: bool = True) -> dict[str, Any] | None:
165154
return atlas_data
166155

167156

168-
def build_threat_map_entry(
169-
tactic_name: str, *technique_ids: str
170-
) -> dict[str, Any]:
157+
def build_threat_map_entry(tactic_name: str, *technique_ids: str) -> dict[str, Any]:
171158
"""Build rule threat map from ATLAS technique IDs."""
172159
url_base = "https://atlas.mitre.org/{type}/{id}/"
173160
tactic_id = tactics_map.get(tactic_name)
@@ -193,10 +180,7 @@ def make_entry(_id: str) -> dict[str, Any]:
193180
tech_info = technique_lookup[tid]
194181
tech_tactic_ids = tech_info.get("tactics", [])
195182
if tactic_id not in tech_tactic_ids:
196-
raise ValueError(
197-
f"ATLAS technique ID: {tid} does not fall under "
198-
f"tactic: {tactic_name}"
199-
)
183+
raise ValueError(f"ATLAS technique ID: {tid} does not fall under tactic: {tactic_name}")
200184

201185
# Handle sub-techniques (e.g., AML.T0000.000)
202186
if "." in tid and tid.count(".") > 1:
@@ -218,8 +202,6 @@ def make_entry(_id: str) -> dict[str, Any]:
218202
}
219203

220204
if tech_entries:
221-
entry["technique"] = sorted(
222-
tech_entries.values(), key=lambda x: x["id"]
223-
)
205+
entry["technique"] = sorted(tech_entries.values(), key=lambda x: x["id"])
224206

225207
return entry

detection_rules/schemas/definitions.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,9 @@ def validator_wrapper(value: Any) -> Any:
107107
INTERVAL_PATTERN = r"^\d+[mshd]$"
108108
TACTIC_URL = r"^(https://attack.mitre.org/tactics/TA[0-9]+/|https://atlas.mitre.org/tactics/AML\.TA[0-9]+/)$"
109109
TECHNIQUE_URL = r"^(https://attack.mitre.org/techniques/T[0-9]+/|https://atlas.mitre.org/techniques/AML\.T[0-9]+/)$"
110-
SUBTECHNIQUE_URL = r"^(https://attack.mitre.org/techniques/T[0-9]+/[0-9]+/|https://atlas.mitre.org/techniques/AML\.T[0-9]+\.[0-9]+/)$"
110+
SUBTECHNIQUE_URL = (
111+
r"^(https://attack.mitre.org/techniques/T[0-9]+/[0-9]+/|https://atlas.mitre.org/techniques/AML\.T[0-9]+\.[0-9]+/)$"
112+
)
111113
MACHINE_LEARNING = "machine_learning"
112114
QUERY = "query"
113115
QUERY_FIELD_OP_EXCEPTIONS = ["powershell.file.script_block_text"]

tests/test_all_rules.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,7 @@ def _validate_tactic(self, framework_module, framework_name: str, tactic, rule):
239239
"""Validate tactic mapping and reference."""
240240
# Validate techniques are under the correct tactic
241241
if tactic.name not in framework_module.matrix:
242-
self.fail(
243-
f"Unknown {framework_name} tactic '{tactic.name}' for rule: {self.rule_str(rule)}"
244-
)
242+
self.fail(f"Unknown {framework_name} tactic '{tactic.name}' for rule: {self.rule_str(rule)}")
245243

246244
# Validate tactic ID mapping
247245
expected_tactic = framework_module.tactics_map.get(tactic.name)
@@ -315,9 +313,7 @@ def _validate_subtechnique(self, framework_module, framework_name: str, sub_tech
315313
sub_technique_reference_id = sub_technique.reference.rstrip("/").split("/")[-1]
316314
else:
317315
# ATT&CK sub-technique reference format: https://attack.mitre.org/techniques/T1005/005/
318-
sub_technique_reference_id = ".".join(
319-
sub_technique.reference.rstrip("/").split("/")[-2:]
320-
)
316+
sub_technique_reference_id = ".".join(sub_technique.reference.rstrip("/").split("/")[-2:])
321317
self.assertEqual(
322318
sub_technique.id,
323319
sub_technique_reference_id,
@@ -357,7 +353,9 @@ def test_tactic_to_technique_correlations(self):
357353
# Validate sub-techniques
358354
sub_techniques = technique.subtechnique or []
359355
for sub_technique in sub_techniques:
360-
self._validate_subtechnique(framework_module, framework_name, sub_technique, framework, rule)
356+
self._validate_subtechnique(
357+
framework_module, framework_name, sub_technique, framework, rule
358+
)
361359

362360
def test_duplicated_tactics(self):
363361
"""Check that a tactic is only defined once."""

0 commit comments

Comments
 (0)