|
5 | 5 | from contentctl.objects.detection import Detection
|
6 | 6 | from contentctl.objects.enums import AnalyticsType
|
7 | 7 | SEARCH_PLACEHOLDER = "%original_detection_search%"
|
| 8 | +EARLIEST_OFFSET = "$info_min_time$" |
| 9 | +LATEST_OFFSET = "$info_max_time$" |
| 10 | +RISK_SEARCH = "index = risk | stats count values(search_name) values(risk_message) values(analyticstories) values(annotations._all) values(annotations.mitre_attack.mitre_tactic) by risk_object" |
8 | 11 |
|
9 | 12 | class Drilldown(BaseModel):
|
10 | 13 | name: str = Field(..., description="The name of the drilldown search", min_length=5)
|
11 | 14 | search: str = Field(..., description="The text of a drilldown search. This must be valid SPL.", min_length=1)
|
12 | 15 | earliest_offset:str = Field(...,
|
13 | 16 | description="Earliest offset time for the drilldown search. "
|
14 |
| - "The most common value for this field is '$info_min_time$', " |
| 17 | + f"The most common value for this field is '{EARLIEST_OFFSET}', " |
15 | 18 | "but it is NOT the default value and must be supplied explicitly.",
|
16 | 19 | min_length= 1)
|
17 | 20 | latest_offset:str = Field(...,
|
18 | 21 | description="Latest offset time for the driolldown search. "
|
19 |
| - "The most common value for this field is '$info_max_time$', " |
| 22 | + f"The most common value for this field is '{LATEST_OFFSET}', " |
20 | 23 | "but it is NOT the default value and must be supplied explicitly.",
|
21 | 24 | min_length= 1)
|
22 | 25 |
|
23 | 26 | @classmethod
|
24 |
| - def constructDrilldownFromDetection(cls, detection: Detection) -> Drilldown: |
| 27 | + def constructDrilldownsFromDetection(cls, detection: Detection) -> list[Drilldown]: |
25 | 28 | if len([f"${o.name}$" for o in detection.tags.observable if o.role[0] == "Victim"]) == 0 and detection.type != AnalyticsType.Hunting:
|
26 | 29 | print("no victim!")
|
27 | 30 | # print(detection.tags.observable)
|
28 | 31 | # print(detection.file_path)
|
29 |
| - name_field = "View the detection results for " + ' and ' + ''.join([f"${o.name}$" for o in detection.tags.observable if o.type[0] == "Victim"]) |
30 |
| - search_field = f"{detection.search} | search " + ' '.join([f"o.name = ${o.name}$" for o in detection.tags.observable]) |
31 |
| - return cls(name=name_field, search=search_field) |
| 32 | + |
| 33 | + variableNamesString = ' and'.join([f"${o.name}$" for o in detection.tags.observable if o.type[0] == "Victim"]) |
| 34 | + nameField = "View the detection results for }" + variableNamesString |
| 35 | + appendedSearch = " | search " + ' '.join([f"o.name = ${o.name}$" for o in detection.tags.observable]) |
| 36 | + search_field = f"{detection.search}{appendedSearch}" |
| 37 | + detection_results = cls(name=nameField, earliest_offset=EARLIEST_OFFSET, latest_offset=LATEST_OFFSET, search=search_field) |
| 38 | + |
| 39 | + |
| 40 | + nameField = f"View risk events for the last 7 days for {variableNamesString}" |
| 41 | + search_field = f"{RISK_SEARCH}{appendedSearch}" |
| 42 | + risk_events_last_7_days = cls(name=nameField, earliest_offset=EARLIEST_OFFSET, latest_offset=LATEST_OFFSET, search=search_field) |
| 43 | + |
| 44 | + return [detection_results,risk_events_last_7_days] |
32 | 45 |
|
33 | 46 |
|
34 | 47 | def perform_search_substitutions(self, detection:Detection)->None:
|
|
0 commit comments