|
36 | 36 | from contentctl.objects.integration_test import IntegrationTest
|
37 | 37 | from contentctl.objects.data_source import DataSource
|
38 | 38 | from contentctl.objects.base_test_result import TestResultStatus
|
39 |
| - |
| 39 | +from contentctl.objects.drilldown import Drilldown, DRILLDOWN_SEARCH_PLACEHOLDER |
40 | 40 | from contentctl.objects.enums import ProvidingTechnology
|
41 | 41 | from contentctl.enrichments.cve_enrichment import CveEnrichmentObj
|
42 | 42 | import datetime
|
@@ -90,6 +90,7 @@ class Detection_Abstract(SecurityContentObject):
|
90 | 90 | test_groups: list[TestGroup] = []
|
91 | 91 |
|
92 | 92 | data_source_objects: list[DataSource] = []
|
| 93 | + drilldown_searches: list[Drilldown] = Field(default=[], description="A list of Drilldowns that should be included with this search") |
93 | 94 |
|
94 | 95 | def get_conf_stanza_name(self, app:CustomApp)->str:
|
95 | 96 | stanza_name = CONTENTCTL_DETECTION_STANZA_NAME_FORMAT_TEMPLATE.format(app_label=app.label, detection_name=self.name)
|
@@ -167,6 +168,7 @@ def adjust_tests_and_groups(self) -> None:
|
167 | 168 | the model from the list of unit tests. Also, preemptively skips all manual tests, as well as
|
168 | 169 | tests for experimental/deprecated detections and Correlation type detections.
|
169 | 170 | """
|
| 171 | + |
170 | 172 | # Since ManualTest and UnitTest are not differentiable without looking at the manual_test
|
171 | 173 | # tag, Pydantic builds all tests as UnitTest objects. If we see the manual_test flag, we
|
172 | 174 | # convert these to ManualTest
|
@@ -563,6 +565,46 @@ def model_post_init(self, __context: Any) -> None:
|
563 | 565 | # Derive TestGroups and IntegrationTests, adjust for ManualTests, skip as needed
|
564 | 566 | self.adjust_tests_and_groups()
|
565 | 567 |
|
| 568 | + # Ensure that if there is at least 1 drilldown, at least |
| 569 | + # 1 of the drilldowns contains the string Drilldown.SEARCH_PLACEHOLDER. |
| 570 | + # This is presently a requirement when 1 or more drilldowns are added to a detection. |
| 571 | + # Note that this is only required for production searches that are not hunting |
| 572 | + |
| 573 | + if self.type == AnalyticsType.Hunting.value or self.status != DetectionStatus.production.value: |
| 574 | + #No additional check need to happen on the potential drilldowns. |
| 575 | + pass |
| 576 | + else: |
| 577 | + found_placeholder = False |
| 578 | + if len(self.drilldown_searches) < 2: |
| 579 | + raise ValueError(f"This detection is required to have 2 drilldown_searches, but only has [{len(self.drilldown_searches)}]") |
| 580 | + for drilldown in self.drilldown_searches: |
| 581 | + if DRILLDOWN_SEARCH_PLACEHOLDER in drilldown.search: |
| 582 | + found_placeholder = True |
| 583 | + if not found_placeholder: |
| 584 | + raise ValueError("Detection has one or more drilldown_searches, but none of them " |
| 585 | + f"contained '{DRILLDOWN_SEARCH_PLACEHOLDER}. This is a requirement " |
| 586 | + "if drilldown_searches are defined.'") |
| 587 | + |
| 588 | + # Update the search fields with the original search, if required |
| 589 | + for drilldown in self.drilldown_searches: |
| 590 | + drilldown.perform_search_substitutions(self) |
| 591 | + |
| 592 | + #For experimental purposes, add the default drilldowns |
| 593 | + #self.drilldown_searches.extend(Drilldown.constructDrilldownsFromDetection(self)) |
| 594 | + |
| 595 | + @property |
| 596 | + def drilldowns_in_JSON(self) -> list[dict[str,str]]: |
| 597 | + """This function is required for proper JSON |
| 598 | + serializiation of drilldowns to occur in savedsearches.conf. |
| 599 | + It returns the list[Drilldown] as a list[dict]. |
| 600 | + Without this function, the jinja template is unable |
| 601 | + to convert list[Drilldown] to JSON |
| 602 | +
|
| 603 | + Returns: |
| 604 | + list[dict[str,str]]: List of Drilldowns dumped to dict format |
| 605 | + """ |
| 606 | + return [drilldown.model_dump() for drilldown in self.drilldown_searches] |
| 607 | + |
566 | 608 | @field_validator('lookups', mode="before")
|
567 | 609 | @classmethod
|
568 | 610 | def getDetectionLookups(cls, v:list[str], info:ValidationInfo) -> list[Lookup]:
|
@@ -789,6 +831,45 @@ def search_observables_exist_validate(self):
|
789 | 831 | # Found everything
|
790 | 832 | return self
|
791 | 833 |
|
| 834 | + @field_validator("tests", mode="before") |
| 835 | + def ensure_yml_test_is_unittest(cls, v:list[dict]): |
| 836 | + """The typing for the tests field allows it to be one of |
| 837 | + a number of different types of tests. However, ONLY |
| 838 | + UnitTest should be allowed to be defined in the YML |
| 839 | + file. If part of the UnitTest defined in the YML |
| 840 | + is incorrect, such as the attack_data file, then |
| 841 | + it will FAIL to be instantiated as a UnitTest and |
| 842 | + may instead be instantiated as a different type of |
| 843 | + test, such as IntegrationTest (since that requires |
| 844 | + less fields) which is incorrect. Ensure that any |
| 845 | + raw data read from the YML can actually construct |
| 846 | + a valid UnitTest and, if not, return errors right |
| 847 | + away instead of letting Pydantic try to construct |
| 848 | + it into a different type of test |
| 849 | +
|
| 850 | + Args: |
| 851 | + v (list[dict]): list of dicts read from the yml. |
| 852 | + Each one SHOULD be a valid UnitTest. If we cannot |
| 853 | + construct a valid unitTest from it, a ValueError should be raised |
| 854 | +
|
| 855 | + Returns: |
| 856 | + _type_: The input of the function, assuming no |
| 857 | + ValueError is raised. |
| 858 | + """ |
| 859 | + valueErrors:list[ValueError] = [] |
| 860 | + for unitTest in v: |
| 861 | + #This raises a ValueError on a failed UnitTest. |
| 862 | + try: |
| 863 | + UnitTest.model_validate(unitTest) |
| 864 | + except ValueError as e: |
| 865 | + valueErrors.append(e) |
| 866 | + if len(valueErrors): |
| 867 | + raise ValueError(valueErrors) |
| 868 | + # All of these can be constructred as UnitTests with no |
| 869 | + # Exceptions, so let the normal flow continue |
| 870 | + return v |
| 871 | + |
| 872 | + |
792 | 873 | @field_validator("tests")
|
793 | 874 | def tests_validate(
|
794 | 875 | cls,
|
|
0 commit comments