5
5
from pydantic import ValidationError
6
6
from uuid import UUID
7
7
from contentctl .input .yml_reader import YmlReader
8
-
9
-
10
-
8
+
9
+
11
10
from contentctl .objects .detection import Detection
12
11
from contentctl .objects .story import Story
13
12
28
27
from contentctl .objects .config import validate
29
28
30
29
31
-
32
- @dataclass ()
30
+ @dataclass
33
31
class DirectorOutputDto :
34
- # Atomic Tests are first because parsing them
35
- # is far quicker than attack_enrichment
36
- atomic_tests : Union [list [AtomicTest ],None ]
37
- attack_enrichment : AttackEnrichment
38
- cve_enrichment : CveEnrichment
39
- detections : list [Detection ]
40
- stories : list [Story ]
41
- baselines : list [Baseline ]
42
- investigations : list [Investigation ]
43
- playbooks : list [Playbook ]
44
- macros : list [Macro ]
45
- lookups : list [Lookup ]
46
- deployments : list [Deployment ]
47
- ssa_detections : list [SSADetection ]
48
-
49
-
50
- name_to_content_map : dict [str , SecurityContentObject ] = field (default_factory = dict )
51
- uuid_to_content_map : dict [UUID , SecurityContentObject ] = field (default_factory = dict )
52
-
32
+ # Atomic Tests are first because parsing them
33
+ # is far quicker than attack_enrichment
34
+ atomic_tests : Union [list [AtomicTest ],None ]
35
+ attack_enrichment : AttackEnrichment
36
+ cve_enrichment : CveEnrichment
37
+ detections : list [Detection ]
38
+ stories : list [Story ]
39
+ baselines : list [Baseline ]
40
+ investigations : list [Investigation ]
41
+ playbooks : list [Playbook ]
42
+ macros : list [Macro ]
43
+ lookups : list [Lookup ]
44
+ deployments : list [Deployment ]
45
+ ssa_detections : list [SSADetection ]
46
+
47
+ name_to_content_map : dict [str , SecurityContentObject ] = field (default_factory = dict )
48
+ uuid_to_content_map : dict [UUID , SecurityContentObject ] = field (default_factory = dict )
49
+
50
+ def addContentToDictMappings (self , content : SecurityContentObject ):
51
+ content_name = content .name
52
+ if isinstance (content , SSADetection ):
53
+ # Since SSA detections may have the same name as ESCU detection,
54
+ # for this function we prepend 'SSA ' to the name.
55
+ content_name = f"SSA { content_name } "
56
+ if content_name in self .name_to_content_map :
57
+ raise ValueError (
58
+ f"Duplicate name '{ content_name } ' with paths:\n "
59
+ f" - { content .file_path } \n "
60
+ f" - { self .name_to_content_map [content_name ].file_path } "
61
+ )
62
+ elif content .id in self .uuid_to_content_map :
63
+ raise ValueError (
64
+ f"Duplicate id '{ content .id } ' with paths:\n "
65
+ f" - { content .file_path } \n "
66
+ f" - { self .name_to_content_map [content_name ].file_path } "
67
+ )
68
+
69
+ if isinstance (content , Lookup ):
70
+ self .lookups .append (content )
71
+ elif isinstance (content , Macro ):
72
+ self .macros .append (content )
73
+ elif isinstance (content , Deployment ):
74
+ self .deployments .append (content )
75
+ elif isinstance (content , Playbook ):
76
+ self .playbooks .append (content )
77
+ elif isinstance (content , Baseline ):
78
+ self .baselines .append (content )
79
+ elif isinstance (content , Investigation ):
80
+ self .investigations .append (content )
81
+ elif isinstance (content , Story ):
82
+ self .stories .append (content )
83
+ elif isinstance (content , Detection ):
84
+ self .detections .append (content )
85
+ elif isinstance (content , SSADetection ):
86
+ self .ssa_detections .append (content )
87
+ else :
88
+ raise Exception (f"Unknown security content type: { type (content )} " )
53
89
54
90
91
+ self .name_to_content_map [content_name ] = content
92
+ self .uuid_to_content_map [content .id ] = content
55
93
56
94
57
95
from contentctl .input .ssa_detection_builder import SSADetectionBuilder
@@ -61,13 +99,6 @@ class DirectorOutputDto:
61
99
from contentctl .helper .utils import Utils
62
100
63
101
64
-
65
-
66
-
67
-
68
-
69
-
70
-
71
102
class Director ():
72
103
input_dto : validate
73
104
output_dto : DirectorOutputDto
@@ -78,27 +109,7 @@ class Director():
78
109
def __init__ (self , output_dto : DirectorOutputDto ) -> None :
79
110
self .output_dto = output_dto
80
111
self .ssa_detection_builder = SSADetectionBuilder ()
81
-
82
- def addContentToDictMappings (self , content :SecurityContentObject ):
83
- content_name = content .name
84
- if isinstance (content ,SSADetection ):
85
- # Since SSA detections may have the same name as ESCU detection,
86
- # for this function we prepend 'SSA ' to the name.
87
- content_name = f"SSA { content_name } "
88
- if content_name in self .output_dto .name_to_content_map :
89
- raise ValueError (f"Duplicate name '{ content_name } ' with paths:\n "
90
- f" - { content .file_path } \n "
91
- f" - { self .output_dto .name_to_content_map [content_name ].file_path } " )
92
- elif content .id in self .output_dto .uuid_to_content_map :
93
- raise ValueError (f"Duplicate id '{ content .id } ' with paths:\n "
94
- f" - { content .file_path } \n "
95
- f" - { self .output_dto .name_to_content_map [content_name ].file_path } " )
96
-
97
- self .output_dto .name_to_content_map [content_name ] = content
98
- self .output_dto .uuid_to_content_map [content .id ] = content
99
-
100
112
101
-
102
113
def execute (self , input_dto : validate ) -> None :
103
114
self .input_dto = input_dto
104
115
@@ -147,50 +158,41 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None:
147
158
148
159
if contentType == SecurityContentType .lookups :
149
160
lookup = Lookup .model_validate (modelDict ,context = {"output_dto" :self .output_dto , "config" :self .input_dto })
150
- self .output_dto .lookups .append (lookup )
151
- self .addContentToDictMappings (lookup )
161
+ self .output_dto .addContentToDictMappings (lookup )
152
162
153
163
elif contentType == SecurityContentType .macros :
154
164
macro = Macro .model_validate (modelDict ,context = {"output_dto" :self .output_dto })
155
- self .output_dto .macros .append (macro )
156
- self .addContentToDictMappings (macro )
165
+ self .output_dto .addContentToDictMappings (macro )
157
166
158
167
elif contentType == SecurityContentType .deployments :
159
168
deployment = Deployment .model_validate (modelDict ,context = {"output_dto" :self .output_dto })
160
- self .output_dto .deployments .append (deployment )
161
- self .addContentToDictMappings (deployment )
169
+ self .output_dto .addContentToDictMappings (deployment )
162
170
163
171
elif contentType == SecurityContentType .playbooks :
164
172
playbook = Playbook .model_validate (modelDict ,context = {"output_dto" :self .output_dto })
165
- self .output_dto .playbooks .append (playbook )
166
- self .addContentToDictMappings (playbook )
173
+ self .output_dto .addContentToDictMappings (playbook )
167
174
168
175
elif contentType == SecurityContentType .baselines :
169
176
baseline = Baseline .model_validate (modelDict ,context = {"output_dto" :self .output_dto })
170
- self .output_dto .baselines .append (baseline )
171
- self .addContentToDictMappings (baseline )
177
+ self .output_dto .addContentToDictMappings (baseline )
172
178
173
179
elif contentType == SecurityContentType .investigations :
174
180
investigation = Investigation .model_validate (modelDict ,context = {"output_dto" :self .output_dto })
175
- self .output_dto .investigations .append (investigation )
176
- self .addContentToDictMappings (investigation )
181
+ self .output_dto .addContentToDictMappings (investigation )
177
182
178
183
elif contentType == SecurityContentType .stories :
179
184
story = Story .model_validate (modelDict ,context = {"output_dto" :self .output_dto })
180
- self .output_dto .stories .append (story )
181
- self .addContentToDictMappings (story )
185
+ self .output_dto .addContentToDictMappings (story )
182
186
183
187
elif contentType == SecurityContentType .detections :
184
- detection = Detection .model_validate (modelDict ,context = {"output_dto" :self .output_dto })
185
- self .output_dto .detections .append (detection )
186
- self .addContentToDictMappings (detection )
188
+ detection = Detection .model_validate (modelDict ,context = {"output_dto" :self .output_dto , "app" :self .input_dto .app })
189
+ self .output_dto .addContentToDictMappings (detection )
187
190
188
191
elif contentType == SecurityContentType .ssa_detections :
189
192
self .constructSSADetection (self .ssa_detection_builder , self .output_dto ,str (file ))
190
193
ssa_detection = self .ssa_detection_builder .getObject ()
191
194
if ssa_detection .status in [DetectionStatus .production .value , DetectionStatus .validation .value ]:
192
- self .output_dto .ssa_detections .append (ssa_detection )
193
- self .addContentToDictMappings (ssa_detection )
195
+ self .output_dto .addContentToDictMappings (ssa_detection )
194
196
195
197
else :
196
198
raise Exception (f"Unsupported type: [{ contentType } ]" )
@@ -229,6 +231,3 @@ def constructSSADetection(self, builder: SSADetectionBuilder, directorOutput:Dir
229
231
builder .addMappings ()
230
232
builder .addUnitTest ()
231
233
builder .addRBA ()
232
-
233
-
234
-
0 commit comments