@@ -68,14 +68,23 @@ class CannotRunBaselineException(Exception):
68
68
# exception
69
69
pass
70
70
71
+ class ReplayIndexDoesNotExistOnServer (Exception ):
72
+ '''
73
+ In order to replay data files into the Splunk Server
74
+ for testing, they must be replayed into an index that
75
+ exists. If that index does not exist, this error will
76
+ be generated and raised before we try to do anything else
77
+ with that Data File.
78
+ '''
79
+ pass
71
80
72
81
@dataclasses .dataclass (frozen = False )
73
82
class DetectionTestingManagerOutputDto ():
74
83
inputQueue : list [Detection ] = Field (default_factory = list )
75
84
outputQueue : list [Detection ] = Field (default_factory = list )
76
85
currentTestingQueue : dict [str , Union [Detection , None ]] = Field (default_factory = dict )
77
86
start_time : Union [datetime .datetime , None ] = None
78
- replay_index : str = "CONTENTCTL_TESTING_INDEX "
87
+ replay_index : str = "contentctl_testing_index "
79
88
replay_host : str = "CONTENTCTL_HOST"
80
89
timeout_seconds : int = 60
81
90
terminate : bool = False
@@ -88,6 +97,7 @@ class DetectionTestingInfrastructure(BaseModel, abc.ABC):
88
97
sync_obj : DetectionTestingManagerOutputDto
89
98
hec_token : str = ""
90
99
hec_channel : str = ""
100
+ all_indexes_on_server : list [str ] = []
91
101
_conn : client .Service = PrivateAttr ()
92
102
pbar : tqdm .tqdm = None
93
103
start_time : Optional [float ] = None
@@ -131,6 +141,7 @@ def setup(self):
131
141
(self .get_conn , "Waiting for App Installation" ),
132
142
(self .configure_conf_file_datamodels , "Configuring Datamodels" ),
133
143
(self .create_replay_index , f"Create index '{ self .sync_obj .replay_index } '" ),
144
+ (self .get_all_indexes , "Getting all indexes from server" ),
134
145
(self .configure_imported_roles , "Configuring Roles" ),
135
146
(self .configure_delete_indexes , "Configuring Indexes" ),
136
147
(self .configure_hec , "Configuring HEC" ),
@@ -169,14 +180,11 @@ def configure_hec(self):
169
180
pass
170
181
171
182
try :
172
- # Retrieve all available indexes on the splunk instance
173
- all_indexes = self .get_all_indexes ()
174
-
175
183
res = self .get_conn ().inputs .create (
176
184
name = "DETECTION_TESTING_HEC" ,
177
185
kind = "http" ,
178
186
index = self .sync_obj .replay_index ,
179
- indexes = "," .join (all_indexes ), # This allows the HEC to write to all indexes
187
+ indexes = "," .join (self . all_indexes_on_server ), # This allows the HEC to write to all indexes
180
188
useACK = True ,
181
189
)
182
190
self .hec_token = str (res .token )
@@ -185,17 +193,20 @@ def configure_hec(self):
185
193
except Exception as e :
186
194
raise (Exception (f"Failure creating HEC Endpoint: { str (e )} " ))
187
195
188
- def get_all_indexes (self ) -> list [ str ] :
196
+ def get_all_indexes (self ) -> None :
189
197
"""
190
198
Retrieve a list of all indexes in the Splunk instance
191
199
"""
192
200
try :
193
- # Always include the special, default replay index here
194
- indexes = [self .sync_obj .replay_index ]
201
+ # We do not include the replay index because by
202
+ # the time we get to this function, it has already
203
+ # been created on the server.
204
+ indexes = []
195
205
res = self .get_conn ().indexes
196
206
for index in res .list ():
197
207
indexes .append (index .name )
198
- return indexes
208
+ # Retrieve all available indexes on the splunk instance
209
+ self .all_indexes_on_server = indexes
199
210
except Exception as e :
200
211
raise (Exception (f"Failure getting indexes: { str (e )} " ))
201
212
@@ -281,11 +292,7 @@ def configure_imported_roles(
281
292
self ,
282
293
imported_roles : list [str ] = ["user" , "power" , "can_delete" ],
283
294
enterprise_security_roles : list [str ] = ["ess_admin" , "ess_analyst" , "ess_user" ],
284
- indexes : list [str ] = ["_*" , "*" ],
285
- ):
286
- indexes .append (self .sync_obj .replay_index )
287
- indexes_encoded = ";" .join (indexes )
288
-
295
+ ):
289
296
try :
290
297
# Set which roles should be configured. For Enterprise Security/Integration Testing,
291
298
# we must add some extra foles.
@@ -297,7 +304,7 @@ def configure_imported_roles(
297
304
self .get_conn ().roles .post (
298
305
self .infrastructure .splunk_app_username ,
299
306
imported_roles = roles ,
300
- srchIndexesAllowed = indexes_encoded ,
307
+ srchIndexesAllowed = ";" . join ( self . all_indexes_on_server ) ,
301
308
srchIndexesDefault = self .sync_obj .replay_index ,
302
309
)
303
310
return
@@ -309,19 +316,17 @@ def configure_imported_roles(
309
316
self .get_conn ().roles .post (
310
317
self .infrastructure .splunk_app_username ,
311
318
imported_roles = imported_roles ,
312
- srchIndexesAllowed = indexes_encoded ,
319
+ srchIndexesAllowed = ";" . join ( self . all_indexes_on_server ) ,
313
320
srchIndexesDefault = self .sync_obj .replay_index ,
314
321
)
315
322
316
- def configure_delete_indexes (self , indexes : list [str ] = ["_*" , "*" ]):
317
- indexes .append (self .sync_obj .replay_index )
323
+ def configure_delete_indexes (self ):
318
324
endpoint = "/services/properties/authorize/default/deleteIndexesAllowed"
319
- indexes_encoded = ";" .join (indexes )
320
325
try :
321
- self .get_conn ().post (endpoint , value = indexes_encoded )
326
+ self .get_conn ().post (endpoint , value = ";" . join ( self . all_indexes_on_server ) )
322
327
except Exception as e :
323
328
self .pbar .write (
324
- f"Error configuring deleteIndexesAllowed with '{ indexes_encoded } ': [{ str (e )} ]"
329
+ f"Error configuring deleteIndexesAllowed with '{ self . all_indexes_on_server } ': [{ str (e )} ]"
325
330
)
326
331
327
332
def wait_for_conf_file (self , app_name : str , conf_file_name : str ):
@@ -670,8 +675,6 @@ def execute_unit_test(
670
675
# Set the mode and timeframe, if required
671
676
kwargs = {"exec_mode" : "blocking" }
672
677
673
-
674
-
675
678
# Set earliest_time and latest_time appropriately if FORCE_ALL_TIME is False
676
679
if not FORCE_ALL_TIME :
677
680
if test .earliest_time is not None :
@@ -1051,8 +1054,8 @@ def retry_search_until_timeout(
1051
1054
# Get the start time and compute the timeout
1052
1055
search_start_time = time .time ()
1053
1056
search_stop_time = time .time () + self .sync_obj .timeout_seconds
1054
-
1055
- # Make a copy of the search string since we may
1057
+
1058
+ # Make a copy of the search string since we may
1056
1059
# need to make some small changes to it below
1057
1060
search = detection .search
1058
1061
@@ -1104,8 +1107,6 @@ def retry_search_until_timeout(
1104
1107
# Initialize the collection of fields that are empty that shouldn't be
1105
1108
present_threat_objects : set [str ] = set ()
1106
1109
empty_fields : set [str ] = set ()
1107
-
1108
-
1109
1110
1110
1111
# Filter out any messages in the results
1111
1112
for result in results :
@@ -1135,7 +1136,7 @@ def retry_search_until_timeout(
1135
1136
# not populated and we should throw an error. This can happen if there is a typo
1136
1137
# on a field. In this case, the field will appear but will not contain any values
1137
1138
current_empty_fields : set [str ] = set ()
1138
-
1139
+
1139
1140
for field in observable_fields_set :
1140
1141
if result .get (field , 'null' ) == 'null' :
1141
1142
if field in risk_object_fields_set :
@@ -1155,9 +1156,7 @@ def retry_search_until_timeout(
1155
1156
if field in threat_object_fields_set :
1156
1157
present_threat_objects .add (field )
1157
1158
continue
1158
-
1159
1159
1160
-
1161
1160
# If everything succeeded up until now, and no empty fields are found in the
1162
1161
# current result, then the search was a success
1163
1162
if len (current_empty_fields ) == 0 :
@@ -1171,8 +1170,7 @@ def retry_search_until_timeout(
1171
1170
1172
1171
else :
1173
1172
empty_fields = empty_fields .union (current_empty_fields )
1174
-
1175
-
1173
+
1176
1174
missing_threat_objects = threat_object_fields_set - present_threat_objects
1177
1175
# Report a failure if there were empty fields in a threat object in all results
1178
1176
if len (missing_threat_objects ) > 0 :
@@ -1188,7 +1186,6 @@ def retry_search_until_timeout(
1188
1186
duration = time .time () - search_start_time ,
1189
1187
)
1190
1188
return
1191
-
1192
1189
1193
1190
test .result .set_job_content (
1194
1191
job .content ,
@@ -1249,9 +1246,19 @@ def replay_attack_data_file(
1249
1246
test_group : TestGroup ,
1250
1247
test_group_start_time : float ,
1251
1248
):
1252
- tempfile = mktemp (dir = tmp_dir )
1253
-
1249
+ # Before attempting to replay the file, ensure that the index we want
1250
+ # to replay into actuall exists. If not, we should throw a detailed
1251
+ # exception that can easily be interpreted by the user.
1252
+ if attack_data_file .custom_index is not None and \
1253
+ attack_data_file .custom_index not in self .all_indexes_on_server :
1254
+ raise ReplayIndexDoesNotExistOnServer (
1255
+ f"Unable to replay data file { attack_data_file .data } "
1256
+ f"into index '{ attack_data_file .custom_index } '. "
1257
+ "The index does not exist on the Splunk Server. "
1258
+ f"The only valid indexes on the server are { self .all_indexes_on_server } "
1259
+ )
1254
1260
1261
+ tempfile = mktemp (dir = tmp_dir )
1255
1262
if not (str (attack_data_file .data ).startswith ("http://" ) or
1256
1263
str (attack_data_file .data ).startswith ("https://" )) :
1257
1264
if pathlib .Path (str (attack_data_file .data )).is_file ():
@@ -1296,7 +1303,6 @@ def replay_attack_data_file(
1296
1303
)
1297
1304
)
1298
1305
1299
-
1300
1306
# Upload the data
1301
1307
self .format_pbar_string (
1302
1308
TestReportingType .GROUP ,
0 commit comments