Skip to content

Commit b76dd76

Browse files
committed
Update enrichments to always
require checkout of mitre/cti repo.
1 parent 3cda211 commit b76dd76

File tree

5 files changed

+94
-85
lines changed

5 files changed

+94
-85
lines changed

.github/workflows/test_against_escu.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,11 @@ jobs:
5353
poetry install --no-interaction
5454
5555
56-
- name: Clone the AtomicRedTeam Repo (for extended validation)
56+
- name: Clone the AtomicRedTeam Repo and the Mitre/CTI repos for testing enrichments
5757
run: |
5858
cd security_content
59-
git clone --depth 1 https://github.com/redcanaryco/atomic-red-team
59+
git clone --single-branch https://github.com/redcanaryco/atomic-red-team external_repos/atomic-red-team
60+
git clone --single-branch https://github.com/mitre/cti external_repos/cti
6061
6162
6263
# We do not separately run validate and build

contentctl/actions/validate.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def execute(self, input_dto: validate) -> DirectorOutputDto:
1616

1717
director_output_dto = DirectorOutputDto(
1818
AtomicTest.getAtomicTestsFromArtRepo(
19-
repo_path=input_dto.getAtomicRedTeamRepoPath(),
19+
repo_path=input_dto.atomic_red_team_repo_path,
2020
enabled=input_dto.enrichments,
2121
),
2222
AttackEnrichment.getAttackEnrichment(input_dto),

contentctl/contentctl.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,9 @@ def main():
211211
test_common_func(config)
212212
else:
213213
raise Exception(f"Unknown command line type '{type(config).__name__}'")
214+
except FileNotFoundError as e:
215+
print(e)
216+
sys.exit(1)
214217
except Exception as e:
215218
if config is None:
216219
print("There was a serious issue where the config file could not be created.\n"
Lines changed: 48 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11

22
from __future__ import annotations
3-
import csv
4-
import os
53
import sys
64
from attackcti import attack_client
75
import logging
8-
from pydantic import BaseModel, Field
6+
from pydantic import BaseModel
97
from dataclasses import field
10-
from typing import Annotated,Any
11-
from contentctl.objects.mitre_attack_enrichment import MitreAttackEnrichment
8+
from typing import Any
9+
from pathlib import Path
10+
from contentctl.objects.mitre_attack_enrichment import MitreAttackEnrichment, MitreTactics
1211
from contentctl.objects.config import validate
1312
from contentctl.objects.annotated_types import MITRE_ATTACK_ID_TYPE
1413
logging.getLogger('taxii2client').setLevel(logging.CRITICAL)
@@ -21,7 +20,7 @@ class AttackEnrichment(BaseModel):
2120
@staticmethod
2221
def getAttackEnrichment(config:validate)->AttackEnrichment:
2322
enrichment = AttackEnrichment(use_enrichment=config.enrichments)
24-
_ = enrichment.get_attack_lookup(str(config.path))
23+
_ = enrichment.get_attack_lookup(config.mitre_cti_repo_path, config.enrichments)
2524
return enrichment
2625

2726
def getEnrichmentByMitreID(self, mitre_id:MITRE_ATTACK_ID_TYPE)->MitreAttackEnrichment:
@@ -34,71 +33,69 @@ def getEnrichmentByMitreID(self, mitre_id:MITRE_ATTACK_ID_TYPE)->MitreAttackEnri
3433
else:
3534
raise Exception(f"Error, Unable to find Mitre Enrichment for MitreID {mitre_id}")
3635

37-
def addMitreIDViaGroupNames(self, technique:dict, tactics:list[str], groupNames:list[str])->None:
36+
def addMitreIDViaGroupNames(self, technique:dict[str,Any], tactics:list[str], groupNames:list[str])->None:
3837
technique_id = technique['technique_id']
3938
technique_obj = technique['technique']
4039
tactics.sort()
4140

4241
if technique_id in self.data:
4342
raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'")
44-
self.data[technique_id] = MitreAttackEnrichment(mitre_attack_id=technique_id,
45-
mitre_attack_technique=technique_obj,
46-
mitre_attack_tactics=tactics,
47-
mitre_attack_groups=groupNames,
48-
mitre_attack_group_objects=[])
43+
self.data[technique_id] = MitreAttackEnrichment.model_validate({'mitre_attack_id':technique_id,
44+
'mitre_attack_technique':technique_obj,
45+
'mitre_attack_tactics':tactics,
46+
'mitre_attack_groups':groupNames,
47+
'mitre_attack_group_objects':[]})
4948

50-
def addMitreIDViaGroupObjects(self, technique:dict, tactics:list[str], groupObjects:list[dict[str,Any]])->None:
49+
def addMitreIDViaGroupObjects(self, technique:dict[str,Any], tactics:list[MitreTactics], groupDicts:list[dict[str,Any]])->None:
5150
technique_id = technique['technique_id']
5251
technique_obj = technique['technique']
5352
tactics.sort()
5453

55-
groupNames:list[str] = sorted([group['group'] for group in groupObjects])
54+
groupNames:list[str] = sorted([group['group'] for group in groupDicts])
5655

5756
if technique_id in self.data:
5857
raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'")
59-
self.data[technique_id] = MitreAttackEnrichment(mitre_attack_id=technique_id,
60-
mitre_attack_technique=technique_obj,
61-
mitre_attack_tactics=tactics,
62-
mitre_attack_groups=groupNames,
63-
mitre_attack_group_objects=groupObjects)
58+
59+
self.data[technique_id] = MitreAttackEnrichment.model_validate({'mitre_attack_id': technique_id,
60+
'mitre_attack_technique': technique_obj,
61+
'mitre_attack_tactics': tactics,
62+
'mitre_attack_groups': groupNames,
63+
'mitre_attack_group_objects': groupDicts})
6464

6565

66-
def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cached_or_offline: bool = False, skip_enrichment:bool = False) -> dict:
67-
if not self.use_enrichment:
68-
return {}
69-
print("Getting MITRE Attack Enrichment Data. This may take some time...")
70-
attack_lookup = dict()
71-
file_path = os.path.join(input_path, "app_template", "lookups", "mitre_enrichment.csv")
72-
73-
if skip_enrichment is True:
74-
print("Skipping enrichment")
66+
def get_attack_lookup(self, input_path: Path, enrichments:bool = False) -> dict[str,MitreAttackEnrichment]:
67+
attack_lookup:dict[str,MitreAttackEnrichment] = {}
68+
if not enrichments:
7569
return attack_lookup
70+
7671
try:
77-
78-
if force_cached_or_offline is True:
79-
raise(Exception("WARNING - Using cached MITRE Attack Enrichment. Attack Enrichment may be out of date. Only use this setting for offline environments and development purposes."))
80-
print(f"\r{'Client'.rjust(23)}: [{0:3.0f}%]...", end="", flush=True)
81-
lift = attack_client()
82-
print(f"\r{'Client'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True)
72+
print(f"Performing MITRE Enrichment using the repository at {input_path}...",end="", flush=True)
73+
# The existence of the input_path is validated during cli argument validation, but it is
74+
# possible that the repo is in the wrong format. If the following directories do not
75+
# exist, then attack_client will fall back to resolving via REST API. We do not
76+
# want this as it is slow and error prone, so we will force an exception to
77+
# be generated.
78+
enterprise_path = input_path/"enterprise-attack"
79+
mobile_path = input_path/"ics-attack"
80+
ics_path = input_path/"mobile-attack"
81+
if not (enterprise_path.is_dir() and mobile_path.is_dir() and ics_path.is_dir()):
82+
raise FileNotFoundError("One or more of the following paths does not exist: "
83+
f"{[str(enterprise_path),str(mobile_path),str(ics_path)]}. "
84+
f"Please ensure that the {input_path} directory "
85+
"has been git cloned correctly.")
86+
lift = attack_client(
87+
local_paths= {
88+
"enterprise":str(enterprise_path),
89+
"mobile":str(mobile_path),
90+
"ics":str(ics_path)
91+
}
92+
)
8393

84-
print(f"\r{'Techniques'.rjust(23)}: [{0.0:3.0f}%]...", end="", flush=True)
8594
all_enterprise_techniques = lift.get_enterprise_techniques(stix_format=False)
86-
87-
print(f"\r{'Techniques'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True)
88-
89-
print(f"\r{'Relationships'.rjust(23)}: [{0.0:3.0f}%]...", end="", flush=True)
9095
enterprise_relationships = lift.get_enterprise_relationships(stix_format=False)
91-
print(f"\r{'Relationships'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True)
92-
93-
print(f"\r{'Groups'.rjust(23)}: [{0:3.0f}%]...", end="", flush=True)
9496
enterprise_groups = lift.get_enterprise_groups(stix_format=False)
95-
print(f"\r{'Groups'.rjust(23)}: [{100:3.0f}%]...Done!", end="\n", flush=True)
96-
9797

98-
for index, technique in enumerate(all_enterprise_techniques):
99-
progress_percent = ((index+1)/len(all_enterprise_techniques)) * 100
100-
if (sys.stdout.isatty() and sys.stdin.isatty() and sys.stderr.isatty()):
101-
print(f"\r\t{'MITRE Technique Progress'.rjust(23)}: [{progress_percent:3.0f}%]...", end="", flush=True)
98+
for technique in all_enterprise_techniques:
10299
apt_groups:list[dict[str,Any]] = []
103100
for relationship in enterprise_relationships:
104101
if (relationship['target_object'] == technique['id']) and relationship['source_object'].startswith('intrusion-set'):
@@ -115,39 +112,10 @@ def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cach
115112
self.addMitreIDViaGroupObjects(technique, tactics, apt_groups)
116113
attack_lookup[technique['technique_id']] = {'technique': technique['technique'], 'tactics': tactics, 'groups': apt_groups}
117114

118-
if store_csv:
119-
f = open(file_path, 'w')
120-
writer = csv.writer(f)
121-
writer.writerow(['mitre_id', 'technique', 'tactics' ,'groups'])
122-
for key in attack_lookup.keys():
123-
if len(attack_lookup[key]['groups']) == 0:
124-
groups = 'no'
125-
else:
126-
groups = '|'.join(attack_lookup[key]['groups'])
127-
128-
writer.writerow([
129-
key,
130-
attack_lookup[key]['technique'],
131-
'|'.join(attack_lookup[key]['tactics']),
132-
groups
133-
])
134-
135-
f.close()
136115

116+
137117
except Exception as err:
138-
print(f'\nError: {str(err)}')
139-
print('Use local copy app_template/lookups/mitre_enrichment.csv')
140-
with open(file_path, mode='r') as inp:
141-
reader = csv.reader(inp)
142-
attack_lookup = {rows[0]:{'technique': rows[1], 'tactics': rows[2].split('|'), 'groups': rows[3].split('|')} for rows in reader}
143-
attack_lookup.pop('mitre_id')
144-
for key in attack_lookup.keys():
145-
technique_input = {'technique_id': key , 'technique': attack_lookup[key]['technique'] }
146-
tactics_input = attack_lookup[key]['tactics']
147-
groups_input = attack_lookup[key]['groups']
148-
self.addMitreIDViaGroupNames(technique=technique_input, tactics=tactics_input, groups=groups_input)
149-
150-
151-
118+
raise Exception(f"Error getting MITRE Enrichment: {str(err)}")
119+
152120
print("Done!")
153121
return attack_lookup

contentctl/objects/config.py

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,45 @@ class validate(Config_Base):
191191
build_api: bool = Field(default=False, description="Should api objects be built and output in the build_path?")
192192
data_source_TA_validation: bool = Field(default=False, description="Validate latest TA information from Splunkbase")
193193

194-
def getAtomicRedTeamRepoPath(self, atomic_red_team_repo_name:str = "atomic-red-team"):
195-
return self.path/atomic_red_team_repo_name
194+
@property
195+
def external_repos_path(self)->pathlib.Path:
196+
return self.path/"external_repos"
197+
198+
@property
199+
def mitre_cti_repo_path(self)->pathlib.Path:
200+
return self.external_repos_path/"cti"
201+
202+
@property
203+
def atomic_red_team_repo_path(self):
204+
return self.external_repos_path/"atomic-red-team"
205+
206+
@model_validator(mode="after")
207+
def ensureEnrichmentReposPresent(self)->Self:
208+
'''
209+
Ensures that the enrichments repos, the atomic red team repo and the
210+
mitre attack enrichment repo, are present at the inded path.
211+
Raises a detailed exception if either of these are not present
212+
when enrichments are enabled.
213+
'''
214+
if not self.enrichments:
215+
return self
216+
# If enrichments are enabled, ensure that all of the
217+
# enrichment directories exist
218+
missing_repos:list[str] = []
219+
if not self.atomic_red_team_repo_path.is_dir():
220+
missing_repos.append(f"https://github.com/redcanaryco/atomic-red-team {self.atomic_red_team_repo_path}")
221+
222+
if not self.mitre_cti_repo_path.is_dir():
223+
missing_repos.append(f"https://github.com/mitre/cti {self.mitre_cti_repo_path}")
224+
225+
if len(missing_repos) > 0:
226+
msg_list = ["The following repositories, which are required for enrichment, have not "
227+
f"been checked out to the {self.external_repos_path} directory. "
228+
"Please check them out using the following commands:"]
229+
msg_list.extend([f"git clone --single-branch {repo_string}" for repo_string in missing_repos])
230+
msg = '\n\t'.join(msg_list)
231+
raise FileNotFoundError(msg)
232+
return self
196233

197234
class report(validate):
198235
#reporting takes no extra args, but we define it here so that it can be a mode on the command line

0 commit comments

Comments
 (0)