20
20
if TYPE_CHECKING :
21
21
from contentctl .input .director import DirectorOutputDto
22
22
from contentctl .objects .baseline import Baseline
23
-
23
+ from contentctl .objects .config import CustomApp
24
+
24
25
from contentctl .objects .security_content_object import SecurityContentObject
25
26
from contentctl .objects .enums import AnalyticsType
26
27
from contentctl .objects .enums import DataModel
36
37
from contentctl .objects .data_source import DataSource
37
38
from contentctl .objects .base_test_result import TestResultStatus
38
39
39
- # from contentctl.objects.playbook import Playbook
40
40
from contentctl .objects .enums import ProvidingTechnology
41
41
from contentctl .enrichments .cve_enrichment import CveEnrichmentObj
42
42
import datetime
43
+ from contentctl .objects .constants import (
44
+ ES_MAX_STANZA_LENGTH ,
45
+ ES_SEARCH_STANZA_NAME_FORMAT_AFTER_CLONING_IN_PRODUCT_TEMPLATE ,
46
+ CONTENTCTL_MAX_SEARCH_NAME_LENGTH ,
47
+ CONTENTCTL_DETECTION_STANZA_NAME_FORMAT_TEMPLATE
48
+ )
49
+
43
50
MISSING_SOURCES : set [str ] = set ()
44
51
45
52
# Those AnalyticsTypes that we do not test via contentctl
51
58
# TODO (#266): disable the use_enum_values configuration
52
59
class Detection_Abstract (SecurityContentObject ):
53
60
model_config = ConfigDict (use_enum_values = True )
54
-
55
- # contentType: SecurityContentType = SecurityContentType.detections
61
+ name : str = Field (..., max_length = CONTENTCTL_MAX_SEARCH_NAME_LENGTH )
62
+ #contentType: SecurityContentType = SecurityContentType.detections
56
63
type : AnalyticsType = Field (...)
57
64
status : DetectionStatus = Field (...)
58
65
data_source : list [str ] = []
@@ -70,10 +77,30 @@ class Detection_Abstract(SecurityContentObject):
70
77
# https://github.com/pydantic/pydantic/issues/9101#issuecomment-2019032541
71
78
tests : List [Annotated [Union [UnitTest , IntegrationTest , ManualTest ], Field (union_mode = 'left_to_right' )]] = []
72
79
# A list of groups of tests, relying on the same data
73
- test_groups : Union [ list [TestGroup ], None ] = Field ( None , validate_default = True )
80
+ test_groups : list [TestGroup ] = []
74
81
75
82
data_source_objects : list [DataSource ] = []
76
83
84
+ def get_conf_stanza_name (self , app :CustomApp )-> str :
85
+ stanza_name = CONTENTCTL_DETECTION_STANZA_NAME_FORMAT_TEMPLATE .format (app_label = app .label , detection_name = self .name )
86
+ self .check_conf_stanza_max_length (stanza_name )
87
+ return stanza_name
88
+
89
+
90
+ def get_action_dot_correlationsearch_dot_label (self , app :CustomApp , max_stanza_length :int = ES_MAX_STANZA_LENGTH )-> str :
91
+ stanza_name = self .get_conf_stanza_name (app )
92
+ stanza_name_after_saving_in_es = ES_SEARCH_STANZA_NAME_FORMAT_AFTER_CLONING_IN_PRODUCT_TEMPLATE .format (
93
+ security_domain_value = self .tags .security_domain .value ,
94
+ search_name = stanza_name
95
+ )
96
+
97
+
98
+ if len (stanza_name_after_saving_in_es ) > max_stanza_length :
99
+ raise ValueError (f"label may only be { max_stanza_length } characters to allow updating in-product, "
100
+ f"but stanza was actually { len (stanza_name_after_saving_in_es )} characters: '{ stanza_name_after_saving_in_es } ' " )
101
+
102
+ return stanza_name
103
+
77
104
@field_validator ("search" , mode = "before" )
78
105
@classmethod
79
106
def validate_presence_of_filter_macro (cls , value :str , info :ValidationInfo )-> str :
@@ -518,7 +545,7 @@ def model_post_init(self, __context: Any) -> None:
518
545
self .data_source_objects = matched_data_sources
519
546
520
547
for story in self .tags .analytic_story :
521
- story .detections .append (self )
548
+ story .detections .append (self )
522
549
523
550
self .cve_enrichment_func (__context )
524
551
@@ -653,6 +680,27 @@ def addTags_nist(self):
653
680
else :
654
681
self .tags .nist = [NistCategory .DE_AE ]
655
682
return self
683
+
684
+
685
+ @model_validator (mode = "after" )
686
+ def ensureThrottlingFieldsExist (self ):
687
+ '''
688
+ For throttling to work properly, the fields to throttle on MUST
689
+ exist in the search itself. If not, then we cannot apply the throttling
690
+ '''
691
+ if self .tags .throttling is None :
692
+ # No throttling configured for this detection
693
+ return self
694
+
695
+ missing_fields :list [str ] = [field for field in self .tags .throttling .fields if field not in self .search ]
696
+ if len (missing_fields ) > 0 :
697
+ raise ValueError (f"The following throttle fields were missing from the search: { missing_fields } " )
698
+
699
+ else :
700
+ # All throttling fields present in search
701
+ return self
702
+
703
+
656
704
657
705
@model_validator (mode = "after" )
658
706
def ensureProperObservablesExist (self ):
0 commit comments