20
20
if TYPE_CHECKING :
21
21
from contentctl .input .director import DirectorOutputDto
22
22
from contentctl .objects .baseline import Baseline
23
-
23
+ from contentctl .objects .config import CustomApp
24
+
24
25
from contentctl .objects .security_content_object import SecurityContentObject
25
26
from contentctl .objects .enums import AnalyticsType
26
27
from contentctl .objects .enums import DataModel
36
37
from contentctl .objects .data_source import DataSource
37
38
from contentctl .objects .base_test_result import TestResultStatus
38
39
39
- # from contentctl.objects.playbook import Playbook
40
40
from contentctl .objects .enums import ProvidingTechnology
41
41
from contentctl .enrichments .cve_enrichment import CveEnrichmentObj
42
42
import datetime
43
+ from contentctl .objects .constants import (
44
+ ES_MAX_STANZA_LENGTH ,
45
+ ES_SEARCH_STANZA_NAME_FORMAT_AFTER_CLONING_IN_PRODUCT_TEMPLATE ,
46
+ CONTENTCTL_MAX_SEARCH_NAME_LENGTH ,
47
+ CONTENTCTL_DETECTION_STANZA_NAME_FORMAT_TEMPLATE
48
+ )
49
+
43
50
MISSING_SOURCES : set [str ] = set ()
44
51
45
52
# Those AnalyticsTypes that we do not test via contentctl
51
58
# TODO (#266): disable the use_enum_values configuration
52
59
class Detection_Abstract (SecurityContentObject ):
53
60
model_config = ConfigDict (use_enum_values = True )
54
-
55
- # contentType: SecurityContentType = SecurityContentType.detections
61
+ name : str = Field (..., max_length = CONTENTCTL_MAX_SEARCH_NAME_LENGTH )
62
+ #contentType: SecurityContentType = SecurityContentType.detections
56
63
type : AnalyticsType = Field (...)
57
64
status : DetectionStatus = Field (...)
58
65
data_source : list [str ] = []
59
66
tags : DetectionTags = Field (...)
60
67
search : str = Field (...)
61
68
how_to_implement : str = Field (..., min_length = 4 )
62
69
known_false_positives : str = Field (..., min_length = 4 )
70
+ explanation : None | str = Field (
71
+ default = None ,
72
+ exclude = True , #Don't serialize this value when dumping the object
73
+ description = "Provide an explanation to be included "
74
+ "in the 'Explanation' field of the Detection in "
75
+ "the Use Case Library. If this field is not "
76
+ "defined in the YML, it will default to the "
77
+ "value of the 'description' field when "
78
+ "serialized in analyticstories_detections.j2" ,
79
+ )
63
80
64
81
enabled_by_default : bool = False
65
82
file_path : FilePath = Field (...)
@@ -70,10 +87,30 @@ class Detection_Abstract(SecurityContentObject):
70
87
# https://github.com/pydantic/pydantic/issues/9101#issuecomment-2019032541
71
88
tests : List [Annotated [Union [UnitTest , IntegrationTest , ManualTest ], Field (union_mode = 'left_to_right' )]] = []
72
89
# A list of groups of tests, relying on the same data
73
- test_groups : Union [ list [TestGroup ], None ] = Field ( None , validate_default = True )
90
+ test_groups : list [TestGroup ] = []
74
91
75
92
data_source_objects : list [DataSource ] = []
76
93
94
+ def get_conf_stanza_name (self , app :CustomApp )-> str :
95
+ stanza_name = CONTENTCTL_DETECTION_STANZA_NAME_FORMAT_TEMPLATE .format (app_label = app .label , detection_name = self .name )
96
+ self .check_conf_stanza_max_length (stanza_name )
97
+ return stanza_name
98
+
99
+
100
+ def get_action_dot_correlationsearch_dot_label (self , app :CustomApp , max_stanza_length :int = ES_MAX_STANZA_LENGTH )-> str :
101
+ stanza_name = self .get_conf_stanza_name (app )
102
+ stanza_name_after_saving_in_es = ES_SEARCH_STANZA_NAME_FORMAT_AFTER_CLONING_IN_PRODUCT_TEMPLATE .format (
103
+ security_domain_value = self .tags .security_domain .value ,
104
+ search_name = stanza_name
105
+ )
106
+
107
+
108
+ if len (stanza_name_after_saving_in_es ) > max_stanza_length :
109
+ raise ValueError (f"label may only be { max_stanza_length } characters to allow updating in-product, "
110
+ f"but stanza was actually { len (stanza_name_after_saving_in_es )} characters: '{ stanza_name_after_saving_in_es } ' " )
111
+
112
+ return stanza_name
113
+
77
114
@field_validator ("search" , mode = "before" )
78
115
@classmethod
79
116
def validate_presence_of_filter_macro (cls , value :str , info :ValidationInfo )-> str :
@@ -519,7 +556,7 @@ def model_post_init(self, __context: Any) -> None:
519
556
self .data_source_objects = matched_data_sources
520
557
521
558
for story in self .tags .analytic_story :
522
- story .detections .append (self )
559
+ story .detections .append (self )
523
560
524
561
self .cve_enrichment_func (__context )
525
562
@@ -654,6 +691,27 @@ def addTags_nist(self):
654
691
else :
655
692
self .tags .nist = [NistCategory .DE_AE ]
656
693
return self
694
+
695
+
696
+ @model_validator (mode = "after" )
697
+ def ensureThrottlingFieldsExist (self ):
698
+ '''
699
+ For throttling to work properly, the fields to throttle on MUST
700
+ exist in the search itself. If not, then we cannot apply the throttling
701
+ '''
702
+ if self .tags .throttling is None :
703
+ # No throttling configured for this detection
704
+ return self
705
+
706
+ missing_fields :list [str ] = [field for field in self .tags .throttling .fields if field not in self .search ]
707
+ if len (missing_fields ) > 0 :
708
+ raise ValueError (f"The following throttle fields were missing from the search: { missing_fields } " )
709
+
710
+ else :
711
+ # All throttling fields present in search
712
+ return self
713
+
714
+
657
715
658
716
@model_validator (mode = "after" )
659
717
def ensureProperObservablesExist (self ):
0 commit comments