20
20
if TYPE_CHECKING :
21
21
from contentctl .input .director import DirectorOutputDto
22
22
from contentctl .objects .baseline import Baseline
23
-
23
+ from contentctl .objects .config import CustomApp
24
+
24
25
from contentctl .objects .security_content_object import SecurityContentObject
25
26
from contentctl .objects .enums import AnalyticsType
26
27
from contentctl .objects .enums import DataModel
36
37
from contentctl .objects .data_source import DataSource
37
38
from contentctl .objects .base_test_result import TestResultStatus
38
39
39
- # from contentctl.objects.playbook import Playbook
40
40
from contentctl .objects .enums import ProvidingTechnology
41
41
from contentctl .enrichments .cve_enrichment import CveEnrichmentObj
42
42
import datetime
43
+ from contentctl .objects .constants import (
44
+ ES_MAX_STANZA_LENGTH ,
45
+ ES_SEARCH_STANZA_NAME_FORMAT_AFTER_CLONING_IN_PRODUCT_TEMPLATE ,
46
+ CONTENTCTL_MAX_SEARCH_NAME_LENGTH ,
47
+ CONTENTCTL_DETECTION_STANZA_NAME_FORMAT_TEMPLATE
48
+ )
49
+
43
50
MISSING_SOURCES : set [str ] = set ()
44
51
45
52
# Those AnalyticsTypes that we do not test via contentctl
51
58
# TODO (#266): disable the use_enum_values configuration
52
59
class Detection_Abstract (SecurityContentObject ):
53
60
model_config = ConfigDict (use_enum_values = True )
54
-
55
- # contentType: SecurityContentType = SecurityContentType.detections
61
+ name : str = Field (..., max_length = CONTENTCTL_MAX_SEARCH_NAME_LENGTH )
62
+ #contentType: SecurityContentType = SecurityContentType.detections
56
63
type : AnalyticsType = Field (...)
57
64
status : DetectionStatus = Field (...)
58
65
data_source : list [str ] = []
@@ -71,10 +78,30 @@ class Detection_Abstract(SecurityContentObject):
71
78
# https://github.com/pydantic/pydantic/issues/9101#issuecomment-2019032541
72
79
tests : List [Annotated [Union [UnitTest , IntegrationTest , ManualTest ], Field (union_mode = 'left_to_right' )]] = []
73
80
# A list of groups of tests, relying on the same data
74
- test_groups : Union [ list [TestGroup ], None ] = Field ( None , validate_default = True )
81
+ test_groups : list [TestGroup ] = []
75
82
76
83
data_source_objects : list [DataSource ] = []
77
84
85
+ def get_conf_stanza_name (self , app :CustomApp )-> str :
86
+ stanza_name = CONTENTCTL_DETECTION_STANZA_NAME_FORMAT_TEMPLATE .format (app_label = app .label , detection_name = self .name )
87
+ self .check_conf_stanza_max_length (stanza_name )
88
+ return stanza_name
89
+
90
+
91
+ def get_action_dot_correlationsearch_dot_label (self , app :CustomApp , max_stanza_length :int = ES_MAX_STANZA_LENGTH )-> str :
92
+ stanza_name = self .get_conf_stanza_name (app )
93
+ stanza_name_after_saving_in_es = ES_SEARCH_STANZA_NAME_FORMAT_AFTER_CLONING_IN_PRODUCT_TEMPLATE .format (
94
+ security_domain_value = self .tags .security_domain .value ,
95
+ search_name = stanza_name
96
+ )
97
+
98
+
99
+ if len (stanza_name_after_saving_in_es ) > max_stanza_length :
100
+ raise ValueError (f"label may only be { max_stanza_length } characters to allow updating in-product, "
101
+ f"but stanza was actually { len (stanza_name_after_saving_in_es )} characters: '{ stanza_name_after_saving_in_es } ' " )
102
+
103
+ return stanza_name
104
+
78
105
@field_validator ("search" , mode = "before" )
79
106
@classmethod
80
107
def validate_presence_of_filter_macro (cls , value :str , info :ValidationInfo )-> str :
@@ -519,7 +546,7 @@ def model_post_init(self, __context: Any) -> None:
519
546
self .data_source_objects = matched_data_sources
520
547
521
548
for story in self .tags .analytic_story :
522
- story .detections .append (self )
549
+ story .detections .append (self )
523
550
524
551
self .cve_enrichment_func (__context )
525
552
@@ -654,6 +681,27 @@ def addTags_nist(self):
654
681
else :
655
682
self .tags .nist = [NistCategory .DE_AE ]
656
683
return self
684
+
685
+
686
+ @model_validator (mode = "after" )
687
+ def ensureThrottlingFieldsExist (self ):
688
+ '''
689
+ For throttling to work properly, the fields to throttle on MUST
690
+ exist in the search itself. If not, then we cannot apply the throttling
691
+ '''
692
+ if self .tags .throttling is None :
693
+ # No throttling configured for this detection
694
+ return self
695
+
696
+ missing_fields :list [str ] = [field for field in self .tags .throttling .fields if field not in self .search ]
697
+ if len (missing_fields ) > 0 :
698
+ raise ValueError (f"The following throttle fields were missing from the search: { missing_fields } " )
699
+
700
+ else :
701
+ # All throttling fields present in search
702
+ return self
703
+
704
+
657
705
658
706
@model_validator (mode = "after" )
659
707
def ensureProperObservablesExist (self ):
0 commit comments