1
1
2
2
from __future__ import annotations
3
- import csv
4
- import os
5
3
import sys
6
4
from attackcti import attack_client
7
5
import logging
8
- from pydantic import BaseModel , Field
6
+ from pydantic import BaseModel
9
7
from dataclasses import field
10
- from typing import Annotated ,Any
11
- from contentctl .objects .mitre_attack_enrichment import MitreAttackEnrichment
8
+ from typing import Any
9
+ from pathlib import Path
10
+ from contentctl .objects .mitre_attack_enrichment import MitreAttackEnrichment , MitreTactics
12
11
from contentctl .objects .config import validate
13
12
from contentctl .objects .annotated_types import MITRE_ATTACK_ID_TYPE
14
13
logging .getLogger ('taxii2client' ).setLevel (logging .CRITICAL )
@@ -21,84 +20,82 @@ class AttackEnrichment(BaseModel):
21
20
@staticmethod
22
21
def getAttackEnrichment (config :validate )-> AttackEnrichment :
23
22
enrichment = AttackEnrichment (use_enrichment = config .enrichments )
24
- _ = enrichment .get_attack_lookup (str ( config .path ) )
23
+ _ = enrichment .get_attack_lookup (config .mitre_cti_repo_path , config . enrichments )
25
24
return enrichment
26
25
27
26
def getEnrichmentByMitreID (self , mitre_id :MITRE_ATTACK_ID_TYPE )-> MitreAttackEnrichment :
28
27
if not self .use_enrichment :
29
- raise Exception (f "Error, trying to add Mitre Enrichment, but use_enrichment was set to False" )
28
+ raise Exception ("Error, trying to add Mitre Enrichment, but use_enrichment was set to False" )
30
29
31
30
enrichment = self .data .get (mitre_id , None )
32
31
if enrichment is not None :
33
32
return enrichment
34
33
else :
35
34
raise Exception (f"Error, Unable to find Mitre Enrichment for MitreID { mitre_id } " )
36
35
37
- def addMitreIDViaGroupNames (self , technique :dict , tactics :list [str ], groupNames :list [str ])-> None :
36
+ def addMitreIDViaGroupNames (self , technique :dict [ str , Any ] , tactics :list [str ], groupNames :list [str ])-> None :
38
37
technique_id = technique ['technique_id' ]
39
38
technique_obj = technique ['technique' ]
40
39
tactics .sort ()
41
40
42
41
if technique_id in self .data :
43
42
raise Exception (f"Error, trying to redefine MITRE ID '{ technique_id } '" )
44
- self .data [technique_id ] = MitreAttackEnrichment ( mitre_attack_id = technique_id ,
45
- mitre_attack_technique = technique_obj ,
46
- mitre_attack_tactics = tactics ,
47
- mitre_attack_groups = groupNames ,
48
- mitre_attack_group_objects = [] )
43
+ self .data [technique_id ] = MitreAttackEnrichment . model_validate ({ ' mitre_attack_id' : technique_id ,
44
+ ' mitre_attack_technique' : technique_obj ,
45
+ ' mitre_attack_tactics' : tactics ,
46
+ ' mitre_attack_groups' : groupNames ,
47
+ ' mitre_attack_group_objects' :[]} )
49
48
50
- def addMitreIDViaGroupObjects (self , technique :dict , tactics :list [str ], groupObjects :list [dict [str ,Any ]])-> None :
49
+ def addMitreIDViaGroupObjects (self , technique :dict [ str , Any ], tactics :list [MitreTactics ], groupDicts :list [dict [str ,Any ]])-> None :
51
50
technique_id = technique ['technique_id' ]
52
51
technique_obj = technique ['technique' ]
53
52
tactics .sort ()
54
53
55
- groupNames :list [str ] = sorted ([group ['group' ] for group in groupObjects ])
54
+ groupNames :list [str ] = sorted ([group ['group' ] for group in groupDicts ])
56
55
57
56
if technique_id in self .data :
58
57
raise Exception (f"Error, trying to redefine MITRE ID '{ technique_id } '" )
59
- self .data [technique_id ] = MitreAttackEnrichment (mitre_attack_id = technique_id ,
60
- mitre_attack_technique = technique_obj ,
61
- mitre_attack_tactics = tactics ,
62
- mitre_attack_groups = groupNames ,
63
- mitre_attack_group_objects = groupObjects )
58
+
59
+ self .data [technique_id ] = MitreAttackEnrichment .model_validate ({'mitre_attack_id' : technique_id ,
60
+ 'mitre_attack_technique' : technique_obj ,
61
+ 'mitre_attack_tactics' : tactics ,
62
+ 'mitre_attack_groups' : groupNames ,
63
+ 'mitre_attack_group_objects' : groupDicts })
64
64
65
65
66
- def get_attack_lookup (self , input_path : str , store_csv : bool = False , force_cached_or_offline : bool = False , skip_enrichment :bool = False ) -> dict :
67
- if not self .use_enrichment :
68
- return {}
69
- print ("Getting MITRE Attack Enrichment Data. This may take some time..." )
70
- attack_lookup = dict ()
71
- file_path = os .path .join (input_path , "app_template" , "lookups" , "mitre_enrichment.csv" )
72
-
73
- if skip_enrichment is True :
74
- print ("Skipping enrichment" )
66
+ def get_attack_lookup (self , input_path : Path , enrichments :bool = False ) -> dict [str ,MitreAttackEnrichment ]:
67
+ attack_lookup :dict [str ,MitreAttackEnrichment ] = {}
68
+ if not enrichments :
75
69
return attack_lookup
70
+
76
71
try :
77
-
78
- if force_cached_or_offline is True :
79
- raise (Exception ("WARNING - Using cached MITRE Attack Enrichment. Attack Enrichment may be out of date. Only use this setting for offline environments and development purposes." ))
80
- print (f"\r { 'Client' .rjust (23 )} : [{ 0 :3.0f} %]..." , end = "" , flush = True )
81
- lift = attack_client ()
82
- print (f"\r { 'Client' .rjust (23 )} : [{ 100 :3.0f} %]...Done!" , end = "\n " , flush = True )
72
+ print (f"Performing MITRE Enrichment using the repository at { input_path } ..." ,end = "" , flush = True )
73
+ # The existence of the input_path is validated during cli argument validation, but it is
74
+ # possible that the repo is in the wrong format. If the following directories do not
75
+ # exist, then attack_client will fall back to resolving via REST API. We do not
76
+ # want this as it is slow and error prone, so we will force an exception to
77
+ # be generated.
78
+ enterprise_path = input_path / "enterprise-attack"
79
+ mobile_path = input_path / "ics-attack"
80
+ ics_path = input_path / "mobile-attack"
81
+ if not (enterprise_path .is_dir () and mobile_path .is_dir () and ics_path .is_dir ()):
82
+ raise FileNotFoundError ("One or more of the following paths does not exist: "
83
+ f"{ [str (enterprise_path ),str (mobile_path ),str (ics_path )]} . "
84
+ f"Please ensure that the { input_path } directory "
85
+ "has been git cloned correctly." )
86
+ lift = attack_client (
87
+ local_paths = {
88
+ "enterprise" :str (enterprise_path ),
89
+ "mobile" :str (mobile_path ),
90
+ "ics" :str (ics_path )
91
+ }
92
+ )
83
93
84
- print (f"\r { 'Techniques' .rjust (23 )} : [{ 0.0 :3.0f} %]..." , end = "" , flush = True )
85
94
all_enterprise_techniques = lift .get_enterprise_techniques (stix_format = False )
86
-
87
- print (f"\r { 'Techniques' .rjust (23 )} : [{ 100 :3.0f} %]...Done!" , end = "\n " , flush = True )
88
-
89
- print (f"\r { 'Relationships' .rjust (23 )} : [{ 0.0 :3.0f} %]..." , end = "" , flush = True )
90
95
enterprise_relationships = lift .get_enterprise_relationships (stix_format = False )
91
- print (f"\r { 'Relationships' .rjust (23 )} : [{ 100 :3.0f} %]...Done!" , end = "\n " , flush = True )
92
-
93
- print (f"\r { 'Groups' .rjust (23 )} : [{ 0 :3.0f} %]..." , end = "" , flush = True )
94
96
enterprise_groups = lift .get_enterprise_groups (stix_format = False )
95
- print (f"\r { 'Groups' .rjust (23 )} : [{ 100 :3.0f} %]...Done!" , end = "\n " , flush = True )
96
-
97
97
98
- for index , technique in enumerate (all_enterprise_techniques ):
99
- progress_percent = ((index + 1 )/ len (all_enterprise_techniques )) * 100
100
- if (sys .stdout .isatty () and sys .stdin .isatty () and sys .stderr .isatty ()):
101
- print (f"\r \t { 'MITRE Technique Progress' .rjust (23 )} : [{ progress_percent :3.0f} %]..." , end = "" , flush = True )
98
+ for technique in all_enterprise_techniques :
102
99
apt_groups :list [dict [str ,Any ]] = []
103
100
for relationship in enterprise_relationships :
104
101
if (relationship ['target_object' ] == technique ['id' ]) and relationship ['source_object' ].startswith ('intrusion-set' ):
@@ -115,39 +112,10 @@ def get_attack_lookup(self, input_path: str, store_csv: bool = False, force_cach
115
112
self .addMitreIDViaGroupObjects (technique , tactics , apt_groups )
116
113
attack_lookup [technique ['technique_id' ]] = {'technique' : technique ['technique' ], 'tactics' : tactics , 'groups' : apt_groups }
117
114
118
- if store_csv :
119
- f = open (file_path , 'w' )
120
- writer = csv .writer (f )
121
- writer .writerow (['mitre_id' , 'technique' , 'tactics' ,'groups' ])
122
- for key in attack_lookup .keys ():
123
- if len (attack_lookup [key ]['groups' ]) == 0 :
124
- groups = 'no'
125
- else :
126
- groups = '|' .join (attack_lookup [key ]['groups' ])
127
-
128
- writer .writerow ([
129
- key ,
130
- attack_lookup [key ]['technique' ],
131
- '|' .join (attack_lookup [key ]['tactics' ]),
132
- groups
133
- ])
134
-
135
- f .close ()
136
115
116
+
137
117
except Exception as err :
138
- print (f'\n Error: { str (err )} ' )
139
- print ('Use local copy app_template/lookups/mitre_enrichment.csv' )
140
- with open (file_path , mode = 'r' ) as inp :
141
- reader = csv .reader (inp )
142
- attack_lookup = {rows [0 ]:{'technique' : rows [1 ], 'tactics' : rows [2 ].split ('|' ), 'groups' : rows [3 ].split ('|' )} for rows in reader }
143
- attack_lookup .pop ('mitre_id' )
144
- for key in attack_lookup .keys ():
145
- technique_input = {'technique_id' : key , 'technique' : attack_lookup [key ]['technique' ] }
146
- tactics_input = attack_lookup [key ]['tactics' ]
147
- groups_input = attack_lookup [key ]['groups' ]
148
- self .addMitreIDViaGroupNames (technique = technique_input , tactics = tactics_input , groups = groups_input )
149
-
150
-
151
-
118
+ raise Exception (f"Error getting MITRE Enrichment: { str (err )} " )
119
+
152
120
print ("Done!" )
153
121
return attack_lookup
0 commit comments