1
1
import logging
2
2
import time
3
3
import json
4
- from typing import Union , Optional , Any
4
+ from typing import Any
5
5
from enum import Enum
6
+ from functools import cached_property
6
7
7
8
from pydantic import ConfigDict , BaseModel , computed_field , Field , PrivateAttr
8
9
from splunklib .results import JSONResultsReader , Message # type: ignore
15
16
from contentctl .objects .base_test_result import TestResultStatus
16
17
from contentctl .objects .integration_test_result import IntegrationTestResult
17
18
from contentctl .actions .detection_testing .progress_bar import (
18
- format_pbar_string ,
19
+ format_pbar_string , # type: ignore
19
20
TestReportingType ,
20
21
TestingStates
21
22
)
@@ -178,12 +179,14 @@ class PbarData(BaseModel):
178
179
:param fq_test_name: the fully qualifed (fq) test name ("<detection_name>:<test_name>") used for logging
179
180
:param start_time: the start time used for logging
180
181
"""
181
- pbar : tqdm
182
+ pbar : tqdm # type: ignore
182
183
fq_test_name : str
183
184
start_time : float
184
-
185
+
185
186
# needed to support the tqdm type
186
- model_config = ConfigDict (arbitrary_types_allowed = True )
187
+ model_config = ConfigDict (
188
+ arbitrary_types_allowed = True
189
+ )
187
190
188
191
189
192
class CorrelationSearch (BaseModel ):
@@ -196,78 +199,110 @@ class CorrelationSearch(BaseModel):
196
199
:param pbar_data: the encapsulated info needed for logging w/ pbar
197
200
:param test_index: the index attack data is forwarded to for testing (optionally used in cleanup)
198
201
"""
199
- ## The following three fields are explicitly needed at instantiation # noqa: E266
200
-
201
202
# the detection associated with the correlation search (e.g. "Windows Modify Registry EnableLinkedConnections")
202
- detection : Detection
203
+ detection : Detection = Field (...)
203
204
204
205
# a Service instance representing a connection to a Splunk instance
205
- service : splunklib .Service
206
+ service : splunklib .Service = Field (...)
206
207
207
208
# the encapsulated info needed for logging w/ pbar
208
- pbar_data : PbarData
209
-
210
- ## The following field is optional for instantiation # noqa: E266
209
+ pbar_data : PbarData = Field (...)
211
210
212
211
# The index attack data is sent to; can be None if we are relying on the caller to do our
213
212
# cleanup of this index
214
- test_index : Optional [str ] = Field (default = None , min_length = 1 )
215
-
216
- ## All remaining fields can be derived from other fields or have intentional defaults that # noqa: E266
217
- ## should not be changed (validators should prevent instantiating some of these fields directly # noqa: E266
218
- ## to prevent undefined behavior) # noqa: E266
213
+ test_index : str | None = Field (default = None , min_length = 1 )
219
214
220
215
# The logger to use (logs all go to a null pipe unless ENABLE_LOGGING is set to True, so as not
221
216
# to conflict w/ tqdm)
222
- logger : logging .Logger = Field (default_factory = get_logger )
217
+ logger : logging .Logger = Field (default_factory = get_logger , init = False )
218
+
219
+ # The set of indexes to clear on cleanup
220
+ indexes_to_purge : set [str ] = Field (default = set (), init = False )
221
+
222
+ # The risk analysis adaptive response action (if defined)
223
+ _risk_analysis_action : RiskAnalysisAction | None = PrivateAttr (default = None )
224
+
225
+ # The notable adaptive response action (if defined)
226
+ _notable_action : NotableAction | None = PrivateAttr (default = None )
227
+
228
+ # The list of risk events found
229
+ _risk_events : list [RiskEvent ] | None = PrivateAttr (default = None )
230
+
231
+ # The list of notable events found
232
+ _notable_events : list [NotableEvent ] | None = PrivateAttr (default = None )
233
+
234
+ # Need arbitrary types to allow fields w/ types like SavedSearch; we also want to forbid
235
+ # unexpected fields
236
+ model_config = ConfigDict (
237
+ arbitrary_types_allowed = True ,
238
+ extra = 'forbid'
239
+ )
240
+
241
+ def model_post_init (self , __context : Any ) -> None :
242
+ super ().model_post_init (__context )
243
+
244
+ # Parse the initial values for the risk/notable actions
245
+ self ._parse_risk_and_notable_actions ()
223
246
224
- # The search name (e.g. "ESCU - Windows Modify Registry EnableLinkedConnections - Rule")
225
247
@computed_field
226
- @property
248
+ @cached_property
227
249
def name (self ) -> str :
250
+ """
251
+ The search name (e.g. "ESCU - Windows Modify Registry EnableLinkedConnections - Rule")
252
+
253
+ :returns: the search name
254
+ :rtype: str
255
+ """
228
256
return f"ESCU - { self .detection .name } - Rule"
229
257
230
- # The path to the saved search on the Splunk instance
231
258
@computed_field
232
- @property
259
+ @cached_property
233
260
def splunk_path (self ) -> str :
261
+ """
262
+ The path to the saved search on the Splunk instance
263
+
264
+ :returns: the search path
265
+ :rtype: str
266
+ """
234
267
return f"/saved/searches/{ self .name } "
235
268
236
- # A model of the saved search as provided by splunklib
237
269
@computed_field
238
- @property
239
- def saved_search (self ) -> splunklib .SavedSearch | None :
270
+ @cached_property
271
+ def saved_search (self ) -> splunklib .SavedSearch :
272
+ """
273
+ A model of the saved search as provided by splunklib
274
+
275
+ :returns: the SavedSearch object
276
+ :rtype: :class:`splunklib.client.SavedSearch`
277
+ """
240
278
return splunklib .SavedSearch (
241
279
self .service ,
242
280
self .splunk_path ,
243
281
)
244
282
245
- # The set of indexes to clear on cleanup
246
- indexes_to_purge : set [str ] = set ()
247
-
248
- # The risk analysis adaptive response action (if defined)
283
+ # TODO (cmcginley): need to make this refreshable
249
284
@computed_field
250
285
@property
251
286
def risk_analysis_action (self ) -> RiskAnalysisAction | None :
252
- if not self .saved_search .content :
253
- return None
254
- return CorrelationSearch ._get_risk_analysis_action (self .saved_search .content )
287
+ """
288
+ The risk analysis adaptive response action (if defined)
255
289
256
- # The notable adaptive response action (if defined)
290
+ :returns: the RiskAnalysisAction object, if it exists
291
+ :rtype: :class:`contentctl.objects.risk_analysis_action.RiskAnalysisAction` | None
292
+ """
293
+ return self ._risk_analysis_action
294
+
295
+ # TODO (cmcginley): need to make this refreshable
257
296
@computed_field
258
297
@property
259
298
def notable_action (self ) -> NotableAction | None :
260
- if not self .saved_search .content :
261
- return None
262
- return CorrelationSearch ._get_notable_action (self .saved_search .content )
263
-
264
- # The list of risk events found
265
- _risk_events : Optional [list [RiskEvent ]] = PrivateAttr (default = None )
266
-
267
- # The list of notable events found
268
- _notable_events : Optional [list [NotableEvent ]] = PrivateAttr (default = None )
269
- model_config = ConfigDict (arbitrary_types_allowed = True , extra = 'forbid' )
299
+ """
300
+ The notable adaptive response action (if defined)
270
301
302
+ :returns: the NotableAction object, if it exists
303
+ :rtype: :class:`contentctl.objects.notable_action.NotableAction` | None
304
+ """
305
+ return self ._notable_action
271
306
272
307
@property
273
308
def earliest_time (self ) -> str :
@@ -327,7 +362,7 @@ def has_notable_action(self) -> bool:
327
362
return self .notable_action is not None
328
363
329
364
@staticmethod
330
- def _get_risk_analysis_action (content : dict [str , Any ]) -> Optional [ RiskAnalysisAction ] :
365
+ def _get_risk_analysis_action (content : dict [str , Any ]) -> RiskAnalysisAction | None :
331
366
"""
332
367
Given the saved search content, parse the risk analysis action
333
368
:param content: a dict of strings to values
@@ -341,7 +376,7 @@ def _get_risk_analysis_action(content: dict[str, Any]) -> Optional[RiskAnalysisA
341
376
return None
342
377
343
378
@staticmethod
344
- def _get_notable_action (content : dict [str , Any ]) -> Optional [ NotableAction ] :
379
+ def _get_notable_action (content : dict [str , Any ]) -> NotableAction | None :
345
380
"""
346
381
Given the saved search content, parse the notable action
347
382
:param content: a dict of strings to values
@@ -365,10 +400,6 @@ def _get_relevant_observables(observables: list[Observable]) -> list[Observable]
365
400
relevant .append (observable )
366
401
return relevant
367
402
368
- # TODO (PEX-484): ideally, we could handle this and the following init w/ a call to
369
- # model_post_init, so that all the logic is encapsulated w/in _parse_risk_and_notable_actions
370
- # but that is a pydantic v2 feature (see the init validators for risk/notable actions):
371
- # https://docs.pydantic.dev/latest/api/base_model/#pydantic.main.BaseModel.model_post_init
372
403
def _parse_risk_and_notable_actions (self ) -> None :
373
404
"""Parses the risk/notable metadata we care about from self.saved_search.content
374
405
@@ -379,12 +410,12 @@ def _parse_risk_and_notable_actions(self) -> None:
379
410
unpacked to be anything other than a singleton
380
411
"""
381
412
# grab risk details if present
382
- self .risk_analysis_action = CorrelationSearch ._get_risk_analysis_action (
413
+ self ._risk_analysis_action = CorrelationSearch ._get_risk_analysis_action (
383
414
self .saved_search .content # type: ignore
384
415
)
385
416
386
417
# grab notable details if present
387
- self .notable_action = CorrelationSearch ._get_notable_action (self .saved_search .content ) # type: ignore
418
+ self ._notable_action = CorrelationSearch ._get_notable_action (self .saved_search .content ) # type: ignore
388
419
389
420
def refresh (self ) -> None :
390
421
"""Refreshes the metadata in the SavedSearch entity, and re-parses the fields we care about
@@ -672,7 +703,7 @@ def validate_risk_events(self) -> None:
672
703
# TODO (#250): Re-enable and refactor code that validates the specific risk counts
673
704
# Validate risk events in aggregate; we should have an equal amount of risk events for each
674
705
# relevant observable, and the total count should match the total number of events
675
- # individual_count: Optional[ int] = None
706
+ # individual_count: int | None = None
676
707
# total_count = 0
677
708
# for observable_str in observable_counts:
678
709
# self.logger.debug(
@@ -736,7 +767,7 @@ def test(self, max_sleep: int = TimeoutConfig.MAX_SLEEP.value, raise_on_exc: boo
736
767
)
737
768
738
769
# initialize result as None
739
- result : Optional [ IntegrationTestResult ] = None
770
+ result : IntegrationTestResult | None = None
740
771
741
772
# keep track of time slept and number of attempts for exponential backoff (base 2)
742
773
elapsed_sleep_time = 0
0 commit comments