1
1
from __future__ import annotations
2
- from pydantic import BaseModel , Field
2
+ from pydantic import BaseModel , Field , model_serializer
3
3
from typing import TYPE_CHECKING
4
4
if TYPE_CHECKING :
5
5
from contentctl .objects .detection import Detection
6
6
from contentctl .objects .enums import AnalyticsType
7
7
SEARCH_PLACEHOLDER = "%original_detection_search%"
8
8
EARLIEST_OFFSET = "$info_min_time$"
9
9
LATEST_OFFSET = "$info_max_time$"
10
- RISK_SEARCH = "index = risk | stats count values(search_name) values(risk_message) values(analyticstories) values(annotations._all) values(annotations.mitre_attack.mitre_tactic) by risk_object "
10
+ RISK_SEARCH = "index = risk starthoursago = 168 endhoursago = 0 | stats count values(search_name) values(risk_message) values(analyticstories) values(annotations._all) values(annotations.mitre_attack.mitre_tactic) "
11
11
12
12
class Drilldown (BaseModel ):
13
13
name : str = Field (..., description = "The name of the drilldown search" , min_length = 5 )
14
14
search : str = Field (..., description = "The text of a drilldown search. This must be valid SPL." , min_length = 1 )
15
- earliest_offset :str = Field (...,
15
+ earliest_offset :None | str = Field (...,
16
16
description = "Earliest offset time for the drilldown search. "
17
17
f"The most common value for this field is '{ EARLIEST_OFFSET } ', "
18
18
"but it is NOT the default value and must be supplied explicitly." ,
19
19
min_length = 1 )
20
- latest_offset :str = Field (...,
20
+ latest_offset :None | str = Field (...,
21
21
description = "Latest offset time for the driolldown search. "
22
22
f"The most common value for this field is '{ LATEST_OFFSET } ', "
23
23
"but it is NOT the default value and must be supplied explicitly." ,
@@ -29,7 +29,7 @@ def constructDrilldownsFromDetection(cls, detection: Detection) -> list[Drilldow
29
29
if len (victim_observables ) == 0 or detection .type == AnalyticsType .Hunting :
30
30
# No victims, so no drilldowns
31
31
return []
32
-
32
+ print ( "Adding default drilldowns. REMOVE THIS BEFORE MERGING" )
33
33
variableNamesString = ' and ' .join ([f"${ o .name } $" for o in victim_observables ])
34
34
nameField = f"View the detection results for { variableNamesString } "
35
35
appendedSearch = " | search " + ' ' .join ([f"{ o .name } = ${ o .name } $" for o in victim_observables ])
@@ -38,8 +38,10 @@ def constructDrilldownsFromDetection(cls, detection: Detection) -> list[Drilldow
38
38
39
39
40
40
nameField = f"View risk events for the last 7 days for { variableNamesString } "
41
- search_field = f"{ RISK_SEARCH } { appendedSearch } "
42
- risk_events_last_7_days = cls (name = nameField , earliest_offset = EARLIEST_OFFSET , latest_offset = LATEST_OFFSET , search = search_field )
41
+ fieldNamesListString = ', ' .join ([o .name for o in victim_observables ])
42
+ search_field = f"{ RISK_SEARCH } by { fieldNamesListString } { appendedSearch } "
43
+ #risk_events_last_7_days = cls(name=nameField, earliest_offset=EARLIEST_OFFSET, latest_offset=LATEST_OFFSET, search=search_field)
44
+ risk_events_last_7_days = cls (name = nameField , earliest_offset = None , latest_offset = None , search = search_field )
43
45
44
46
return [detection_results ,risk_events_last_7_days ]
45
47
@@ -50,4 +52,17 @@ def perform_search_substitutions(self, detection:Detection)->None:
50
52
f"drilldown search '{ self .search } ' for Detection { detection .file_path } .\n "
51
53
"If this was intentional, then please ignore this warning.\n " )
52
54
self .search = self .search .replace (SEARCH_PLACEHOLDER , detection .search )
53
-
55
+
56
+
57
+ @model_serializer
58
+ def serialize_model (self ) -> dict [str ,str ]:
59
+ #Call serializer for parent
60
+ model :dict [str ,str ] = {}
61
+
62
+ model ['name' ] = self .name
63
+ model ['search' ] = self .search
64
+ if self .earliest_offset is not None :
65
+ model ['earliest_offset' ] = self .earliest_offset
66
+ if self .latest_offset is not None :
67
+ model ['latest_offset' ] = self .latest_offset
68
+ return model
0 commit comments