Skip to content

Commit db7de0b

Browse files
committed
fixes to ensure that every search that
needs one should have the appropriate default drilldown.
1 parent 3280fbf commit db7de0b

File tree

2 files changed

+33
-14
lines changed

2 files changed

+33
-14
lines changed

contentctl/objects/abstract_security_content_objects/detection_abstract.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
from contentctl.objects.integration_test import IntegrationTest
3737
from contentctl.objects.data_source import DataSource
3838
from contentctl.objects.base_test_result import TestResultStatus
39-
from contentctl.objects.drilldown import Drilldown
39+
from contentctl.objects.drilldown import Drilldown, DRILLDOWN_SEARCH_PLACEHOLDER
4040
from contentctl.objects.enums import ProvidingTechnology
4141
from contentctl.enrichments.cve_enrichment import CveEnrichmentObj
4242
import datetime
@@ -564,15 +564,32 @@ def model_post_init(self, __context: Any) -> None:
564564
# Derive TestGroups and IntegrationTests, adjust for ManualTests, skip as needed
565565
self.adjust_tests_and_groups()
566566

567-
#Add the default drilldown
568-
#self.drilldown_searches.append(Drilldown.constructDrilldownFromDetection(self))
569-
567+
# Ensure that if there is at least 1 drilldown, at least
568+
# 1 of the drilldowns contains the string Drilldown.SEARCH_PLACEHOLDER.
569+
# This is presently a requirement when 1 or more drilldowns are added to a detection.
570+
# Note that this is only required for production searches that are not hunting
571+
572+
if self.type == AnalyticsType.Hunting.value or self.status != DetectionStatus.production.value:
573+
#No additional check need to happen on the potential drilldowns.
574+
pass
575+
else:
576+
found_placeholder = False
577+
if len(self.drilldown_searches) < 2:
578+
raise ValueError(f"This detection is required to have 2 drilldown_searches, but only has [{len(self.drilldown_searches)}]")
579+
for drilldown in self.drilldown_searches:
580+
if DRILLDOWN_SEARCH_PLACEHOLDER in drilldown.search:
581+
found_placeholder = True
582+
if not found_placeholder:
583+
raise ValueError("Detection has one or more drilldown_searches, but none of them "
584+
f"contained '{DRILLDOWN_SEARCH_PLACEHOLDER}. This is a requirement "
585+
"if drilldown_searches are defined.'")
586+
570587
# Update the search fields with the original search, if required
571588
for drilldown in self.drilldown_searches:
572589
drilldown.perform_search_substitutions(self)
573-
590+
574591
#For experimental purposes, add the default drilldowns
575-
self.drilldown_searches.extend(Drilldown.constructDrilldownsFromDetection(self))
592+
#self.drilldown_searches.extend(Drilldown.constructDrilldownsFromDetection(self))
576593

577594
@property
578595
def drilldowns_in_JSON(self) -> list[dict[str,str]]:

contentctl/objects/drilldown.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
if TYPE_CHECKING:
55
from contentctl.objects.detection import Detection
66
from contentctl.objects.enums import AnalyticsType
7-
SEARCH_PLACEHOLDER = "%original_detection_search%"
7+
DRILLDOWN_SEARCH_PLACEHOLDER = "%original_detection_search%"
88
EARLIEST_OFFSET = "$info_min_time$"
99
LATEST_OFFSET = "$info_max_time$"
1010
RISK_SEARCH = "index = risk starthoursago = 168 endhoursago = 0 | stats count values(search_name) values(risk_message) values(analyticstories) values(annotations._all) values(annotations.mitre_attack.mitre_tactic) "
@@ -29,7 +29,7 @@ def constructDrilldownsFromDetection(cls, detection: Detection) -> list[Drilldow
2929
if len(victim_observables) == 0 or detection.type == AnalyticsType.Hunting:
3030
# No victims, so no drilldowns
3131
return []
32-
print("Adding default drilldowns. REMOVE THIS BEFORE MERGING")
32+
print(f"Adding default drilldowns for [{detection.name}]")
3333
variableNamesString = ' and '.join([f"${o.name}$" for o in victim_observables])
3434
nameField = f"View the detection results for {variableNamesString}"
3535
appendedSearch = " | search " + ' '.join([f"{o.name} = ${o.name}$" for o in victim_observables])
@@ -40,18 +40,20 @@ def constructDrilldownsFromDetection(cls, detection: Detection) -> list[Drilldow
4040
nameField = f"View risk events for the last 7 days for {variableNamesString}"
4141
fieldNamesListString = ', '.join([o.name for o in victim_observables])
4242
search_field = f"{RISK_SEARCH}by {fieldNamesListString} {appendedSearch}"
43-
#risk_events_last_7_days = cls(name=nameField, earliest_offset=EARLIEST_OFFSET, latest_offset=LATEST_OFFSET, search=search_field)
4443
risk_events_last_7_days = cls(name=nameField, earliest_offset=None, latest_offset=None, search=search_field)
4544

4645
return [detection_results,risk_events_last_7_days]
4746

4847

4948
def perform_search_substitutions(self, detection:Detection)->None:
50-
if (self.search.count("%") % 2) or (self.search.count("$") % 2):
51-
print("\n\nWarning - a non-even number of '%' or '$' characters were found in the\n"
52-
f"drilldown search '{self.search}' for Detection {detection.file_path}.\n"
53-
"If this was intentional, then please ignore this warning.\n")
54-
self.search = self.search.replace(SEARCH_PLACEHOLDER, detection.search)
49+
"""Replaces the field DRILLDOWN_SEARCH_PLACEHOLDER (%original_detection_search%)
50+
with the search contained in the detection. We do this so that the YML does not
51+
need the search copy/pasted from the search field into the drilldown object.
52+
53+
Args:
54+
detection (Detection): Detection to be used to update the search field of the drilldown
55+
"""
56+
self.search = self.search.replace(DRILLDOWN_SEARCH_PLACEHOLDER, detection.search)
5557

5658

5759
@model_serializer

0 commit comments

Comments
 (0)