33
33
SecurityContentProductName ,
34
34
SecurityDomain ,
35
35
)
36
- from contentctl .objects .mitre_attack_enrichment import MitreAttackEnrichment
36
+ from contentctl .objects .mitre_attack_enrichment import (
37
+ MitreAttackEnrichment ,
38
+ MitreAttackGroup ,
39
+ )
37
40
38
41
39
42
class DetectionTags (BaseModel ):
@@ -44,7 +47,7 @@ class DetectionTags(BaseModel):
44
47
asset_type : AssetType = Field (...)
45
48
group : list [str ] = []
46
49
47
- mitre_attack_id : List [MITRE_ATTACK_ID_TYPE ] = []
50
+ mitre_attack_id : list [MITRE_ATTACK_ID_TYPE ] = []
48
51
nist : list [NistCategory ] = []
49
52
50
53
product : list [SecurityContentProductName ] = Field (..., min_length = 1 )
@@ -68,6 +71,15 @@ def kill_chain_phases(self) -> list[KillChainPhase]:
68
71
phases .add (phase )
69
72
return sorted (list (phases ))
70
73
74
+ # We do not want this to be included in serialization. By default, @property
75
+ # objects are not included in dumps
76
+ @property
77
+ def unique_mitre_attack_groups (self ) -> list [MitreAttackGroup ]:
78
+ group_set : set [MitreAttackGroup ] = set ()
79
+ for enrichment in self .mitre_attack_enrichments :
80
+ group_set .update (set (enrichment .mitre_attack_group_objects ))
81
+ return sorted (group_set , key = lambda k : k .group )
82
+
71
83
# enum is intentionally Cis18 even though field is named cis20 for legacy reasons
72
84
@computed_field
73
85
@property
@@ -134,8 +146,8 @@ def addAttackEnrichment(self, info: ValidationInfo):
134
146
135
147
if len (missing_tactics ) > 0 :
136
148
raise ValueError (f"Missing Mitre Attack IDs. { missing_tactics } not found." )
137
- else :
138
- self .mitre_attack_enrichments = mitre_enrichments
149
+
150
+ self .mitre_attack_enrichments = mitre_enrichments
139
151
140
152
return self
141
153
@@ -159,6 +171,44 @@ def addAttackEnrichments(cls, v:list[MitreAttackEnrichment], info:ValidationInfo
159
171
return enrichments
160
172
"""
161
173
174
+ @field_validator ("mitre_attack_id" , mode = "after" )
175
+ @classmethod
176
+ def sameTypeAndSubtypeNotPresent (
177
+ cls , techniques_and_subtechniques : list [MITRE_ATTACK_ID_TYPE ]
178
+ ) -> list [MITRE_ATTACK_ID_TYPE ]:
179
+ techniques : list [str ] = [
180
+ f"{ unknown_technique } ."
181
+ for unknown_technique in techniques_and_subtechniques
182
+ if "." not in unknown_technique
183
+ ]
184
+ subtechniques : list [MITRE_ATTACK_ID_TYPE ] = [
185
+ unknown_technique
186
+ for unknown_technique in techniques_and_subtechniques
187
+ if "." in unknown_technique
188
+ ]
189
+ subtype_and_parent_exist_exceptions : list [ValueError ] = []
190
+
191
+ for subtechnique in subtechniques :
192
+ for technique in techniques :
193
+ if subtechnique .startswith (technique ):
194
+ subtype_and_parent_exist_exceptions .append (
195
+ ValueError (
196
+ f" Technique : { technique .split ('.' )[0 ]} \n "
197
+ f" SubTechnique: { subtechnique } \n "
198
+ )
199
+ )
200
+
201
+ if len (subtype_and_parent_exist_exceptions ):
202
+ error_string = "\n " .join (
203
+ str (e ) for e in subtype_and_parent_exist_exceptions
204
+ )
205
+ raise ValueError (
206
+ "Overlapping MITRE Attack ID Techniques and Subtechniques may not be defined. "
207
+ f"Remove the Technique and keep the Subtechnique:\n { error_string } "
208
+ )
209
+
210
+ return techniques_and_subtechniques
211
+
162
212
@field_validator ("analytic_story" , mode = "before" )
163
213
@classmethod
164
214
def mapStoryNamesToStoryObjects (
@@ -238,3 +288,6 @@ def mapAtomicGuidsToAtomicTests(
238
288
return matched_tests + [
239
289
AtomicTest .AtomicTestWhenTestIsMissing (test ) for test in missing_tests
240
290
]
291
+ return matched_tests + [
292
+ AtomicTest .AtomicTestWhenTestIsMissing (test ) for test in missing_tests
293
+ ]
0 commit comments