1
-
2
-
3
1
from dataclasses import dataclass
4
2
import questionary
5
3
from typing import Any
@@ -28,64 +26,87 @@ class NewContent:
28
26
}
29
27
]
30
28
31
- def buildDetection (self )-> dict [str ,Any ]:
29
+ def buildDetection (self ) -> tuple [ dict [str , Any ], str ]:
32
30
questions = NewContentQuestions .get_questions_detection ()
33
- answers : dict [str ,str ] = questionary .prompt (
34
- questions ,
35
- kbi_msg = "User did not answer all of the prompt questions. Exiting..." )
31
+ answers : dict [str , str ] = questionary .prompt (
32
+ questions ,
33
+ kbi_msg = "User did not answer all of the prompt questions. Exiting..." ,
34
+ )
36
35
if not answers :
37
36
raise ValueError ("User didn't answer one or more questions!" )
38
- answers .update (answers )
39
- answers ['name' ] = answers ['detection_name' ]
40
- del answers ['detection_name' ]
41
- answers ['id' ] = str (uuid .uuid4 ())
42
- answers ['version' ] = 1
43
- answers ['date' ] = datetime .today ().strftime ('%Y-%m-%d' )
44
- answers ['author' ] = answers ['detection_author' ]
45
- del answers ['detection_author' ]
46
- answers ['data_source' ] = answers ['data_source' ]
47
- answers ['type' ] = answers ['detection_type' ]
48
- del answers ['detection_type' ]
49
- answers ['status' ] = "production" #start everything as production since that's what we INTEND the content to become
50
- answers ['description' ] = 'UPDATE_DESCRIPTION'
51
- file_name = answers ['name' ].replace (' ' , '_' ).replace ('-' ,'_' ).replace ('.' ,'_' ).replace ('/' ,'_' ).lower ()
52
- answers ['search' ] = answers ['detection_search' ] + ' | `' + file_name + '_filter`'
53
- del answers ['detection_search' ]
54
- answers ['how_to_implement' ] = 'UPDATE_HOW_TO_IMPLEMENT'
55
- answers ['known_false_positives' ] = 'UPDATE_KNOWN_FALSE_POSITIVES'
56
- answers ['references' ] = ['REFERENCE' ]
57
- if answers ['type' ] in ["TTP" , "Correlation" , "Anomaly" , "TTP" ]:
58
- answers ['drilldown_searches' ] = NewContent .DEFAULT_DRILLDOWN_DEF
59
- answers ['tags' ] = dict ()
60
- answers ['tags' ]['analytic_story' ] = ['UPDATE_STORY_NAME' ]
61
- answers ['tags' ]['asset_type' ] = 'UPDATE asset_type'
62
- answers ['tags' ]['confidence' ] = 'UPDATE value between 1-100'
63
- answers ['tags' ]['impact' ] = 'UPDATE value between 1-100'
64
- answers ['tags' ]['message' ] = 'UPDATE message'
65
- answers ['tags' ]['mitre_attack_id' ] = [x .strip () for x in answers ['mitre_attack_ids' ].split (',' )]
66
- answers ['tags' ]['observable' ] = [{'name' : 'UPDATE' , 'type' : 'UPDATE' , 'role' : ['UPDATE' ]}]
67
- answers ['tags' ]['product' ] = ['Splunk Enterprise' ,'Splunk Enterprise Security' ,'Splunk Cloud' ]
68
- answers ['tags' ]['security_domain' ] = answers ['security_domain' ]
69
- del answers ["security_domain" ]
70
- answers ['tags' ]['cve' ] = ['UPDATE WITH CVE(S) IF APPLICABLE' ]
71
-
72
- #generate the tests section
73
- answers ['tests' ] = [
74
- {
75
- 'name' : "True Positive Test" ,
76
- 'attack_data' : [
77
- {
78
- 'data' : "Go to https://github.com/splunk/contentctl/wiki for information about the format of this field" ,
79
- "sourcetype" : "UPDATE SOURCETYPE" ,
80
- "source" : "UPDATE SOURCE"
81
- }
82
- ]
83
- }
84
- ]
85
- del answers ["mitre_attack_ids" ]
86
- return answers
87
37
88
- def buildStory (self )-> dict [str ,Any ]:
38
+ data_source_field = (
39
+ answers ["data_source" ] if len (answers ["data_source" ]) > 0 else ["UPDATE" ]
40
+ )
41
+ file_name = (
42
+ answers ["detection_name" ]
43
+ .replace (" " , "_" )
44
+ .replace ("-" , "_" )
45
+ .replace ("." , "_" )
46
+ .replace ("/" , "_" )
47
+ .lower ()
48
+ )
49
+
50
+ #Minimum lenght for a mitre tactic is 5 characters: T1000
51
+ if len (answers ["mitre_attack_ids" ]) >= 5 :
52
+ mitre_attack_ids = [x .strip () for x in answers ["mitre_attack_ids" ].split ("," )]
53
+ else :
54
+ #string was too short, so just put a placeholder
55
+ mitre_attack_ids = ["UPDATE" ]
56
+
57
+ output_file_answers : dict [str , Any ] = {
58
+ "name" : answers ["detection_name" ],
59
+ "id" : str (uuid .uuid4 ()),
60
+ "version" : 1 ,
61
+ "date" : datetime .today ().strftime ("%Y-%m-%d" ),
62
+ "author" : answers ["detection_author" ],
63
+ "status" : "production" , # start everything as production since that's what we INTEND the content to become
64
+ "type" : answers ["detection_type" ],
65
+ "description" : "UPDATE_DESCRIPTION" ,
66
+ "data_source" : data_source_field ,
67
+ "search" : f"{ answers ['detection_search' ]} | `{ file_name } _filter`'" ,
68
+ "how_to_implement" : "UPDATE_HOW_TO_IMPLEMENT" ,
69
+ "known_false_positives" : "UPDATE_KNOWN_FALSE_POSITIVES" ,
70
+ "references" : ["REFERENCE" ],
71
+ "drilldown_searches" : NewContent .DEFAULT_DRILLDOWN_DEF ,
72
+ "tags" : {
73
+ "analytic_story" : ["UPDATE_STORY_NAME" ],
74
+ "asset_type" : "UPDATE asset_type" ,
75
+ "confidence" : "UPDATE value between 1-100" ,
76
+ "impact" : "UPDATE value between 1-100" ,
77
+ "message" : "UPDATE message" ,
78
+ "mitre_attack_id" : mitre_attack_ids ,
79
+ "observable" : [
80
+ {"name" : "UPDATE" , "type" : "UPDATE" , "role" : ["UPDATE" ]}
81
+ ],
82
+ "product" : [
83
+ "Splunk Enterprise" ,
84
+ "Splunk Enterprise Security" ,
85
+ "Splunk Cloud" ,
86
+ ],
87
+ "security_domain" : answers ["security_domain" ],
88
+ "cve" : ["UPDATE WITH CVE(S) IF APPLICABLE" ],
89
+ },
90
+ "tests" : [
91
+ {
92
+ "name" : "True Positive Test" ,
93
+ "attack_data" : [
94
+ {
95
+ "data" : "Go to https://github.com/splunk/contentctl/wiki for information about the format of this field" ,
96
+ "sourcetype" : "UPDATE SOURCETYPE" ,
97
+ "source" : "UPDATE SOURCE" ,
98
+ }
99
+ ],
100
+ }
101
+ ],
102
+ }
103
+
104
+ if answers ["detection_type" ] not in ["TTP" , "Correlation" , "Anomaly" , "TTP" ]:
105
+ del output_file_answers ["drilldown_searches" ]
106
+
107
+ return output_file_answers , answers ['detection_kind' ]
108
+
109
+ def buildStory (self ) -> dict [str , Any ]:
89
110
questions = NewContentQuestions .get_questions_story ()
90
111
answers = questionary .prompt (
91
112
questions ,
@@ -110,12 +131,11 @@ def buildStory(self)->dict[str,Any]:
110
131
del answers ['usecase' ]
111
132
answers ['tags' ]['cve' ] = ['UPDATE WITH CVE(S) IF APPLICABLE' ]
112
133
return answers
113
-
114
134
115
135
def execute (self , input_dto : new ) -> None :
116
136
if input_dto .type == NewContentType .detection :
117
- content_dict = self .buildDetection ()
118
- subdirectory = pathlib .Path ('detections' ) / content_dict . pop ( ' detection_kind' )
137
+ content_dict , detection_kind = self .buildDetection ()
138
+ subdirectory = pathlib .Path ('detections' ) / detection_kind
119
139
elif input_dto .type == NewContentType .story :
120
140
content_dict = self .buildStory ()
121
141
subdirectory = pathlib .Path ('stories' )
@@ -125,23 +145,20 @@ def execute(self, input_dto: new) -> None:
125
145
full_output_path = input_dto .path / subdirectory / SecurityContentObject_Abstract .contentNameToFileName (content_dict .get ('name' ))
126
146
YmlWriter .writeYmlFile (str (full_output_path ), content_dict )
127
147
128
-
129
-
130
148
def writeObjectNewContent (self , object : dict , subdirectory_name : str , type : NewContentType ) -> None :
131
149
if type == NewContentType .detection :
132
150
file_path = os .path .join (self .output_path , 'detections' , subdirectory_name , self .convertNameToFileName (object ['name' ], object ['tags' ]['product' ]))
133
151
output_folder = pathlib .Path (self .output_path )/ 'detections' / subdirectory_name
134
- #make sure the output folder exists for this detection
152
+ # make sure the output folder exists for this detection
135
153
output_folder .mkdir (exist_ok = True )
136
154
137
155
YmlWriter .writeDetection (file_path , object )
138
156
print ("Successfully created detection " + file_path )
139
-
157
+
140
158
elif type == NewContentType .story :
141
159
file_path = os .path .join (self .output_path , 'stories' , self .convertNameToFileName (object ['name' ], object ['tags' ]['product' ]))
142
160
YmlWriter .writeStory (file_path , object )
143
161
print ("Successfully created story " + file_path )
144
-
162
+
145
163
else :
146
164
raise (Exception (f"Object Must be Story or Detection, but is not: { object } " ))
147
-
0 commit comments