22
22
from contentctl .objects .unit_test import UnitTest
23
23
from contentctl .objects .test_group import TestGroup
24
24
from contentctl .objects .integration_test import IntegrationTest
25
- from contentctl .objects .event_source import EventSource
26
25
from contentctl .objects .data_source import DataSource
27
26
28
27
#from contentctl.objects.playbook import Playbook
@@ -39,7 +38,7 @@ class Detection_Abstract(SecurityContentObject):
39
38
status : DetectionStatus = Field (...)
40
39
data_source : list [str ] = []
41
40
tags : DetectionTags = Field (...)
42
- search : Union [ str , dict [ str , Any ]] = Field (...)
41
+ search : str = Field (...)
43
42
how_to_implement : str = Field (..., min_length = 4 )
44
43
known_false_positives : str = Field (..., min_length = 4 )
45
44
@@ -136,10 +135,9 @@ def validate_test_groups(cls, value:Union[None, List[TestGroup]], info:Validatio
136
135
@computed_field
137
136
@property
138
137
def datamodel (self )-> List [DataModel ]:
139
- if isinstance (self .search , str ):
140
- return [dm for dm in DataModel if dm .value in self .search ]
141
- else :
142
- return []
138
+ return [dm for dm in DataModel if dm .value in self .search ]
139
+
140
+
143
141
144
142
145
143
@computed_field
@@ -238,11 +236,8 @@ def nes_fields(self)->Optional[str]:
238
236
@computed_field
239
237
@property
240
238
def providing_technologies (self )-> List [ProvidingTechnology ]:
241
- if isinstance (self .search , str ):
242
- return ProvidingTechnology .getProvidingTechFromSearch (self .search )
243
- else :
244
- #Dict-formatted searches (sigma) will not have providing technologies
245
- return []
239
+ return ProvidingTechnology .getProvidingTechFromSearch (self .search )
240
+
246
241
247
242
@computed_field
248
243
@property
@@ -599,35 +594,32 @@ def ensureProperObservablesExist(self):
599
594
600
595
@model_validator (mode = "after" )
601
596
def search_observables_exist_validate (self ):
597
+ observable_fields = [ob .name .lower () for ob in self .tags .observable ]
598
+
599
+ #All $field$ fields from the message must appear in the search
600
+ field_match_regex = r"\$([^\s.]*)\$"
601
+
602
+
603
+ if self .tags .message :
604
+ message_fields = [match .replace ("$" , "" ).lower () for match in re .findall (field_match_regex , self .tags .message .lower ())]
605
+ missing_fields = set ([field for field in observable_fields if field not in self .search .lower ()])
606
+ else :
607
+ message_fields = []
608
+ missing_fields = set ()
602
609
603
- if isinstance (self .search , str ):
604
-
605
- observable_fields = [ob .name .lower () for ob in self .tags .observable ]
606
-
607
- #All $field$ fields from the message must appear in the search
608
- field_match_regex = r"\$([^\s.]*)\$"
609
-
610
-
611
- if self .tags .message :
612
- message_fields = [match .replace ("$" , "" ).lower () for match in re .findall (field_match_regex , self .tags .message .lower ())]
613
- missing_fields = set ([field for field in observable_fields if field not in self .search .lower ()])
614
- else :
615
- message_fields = []
616
- missing_fields = set ()
617
-
618
610
619
- error_messages = []
620
- if len (missing_fields ) > 0 :
621
- error_messages .append (f"The following fields are declared as observables, but do not exist in the search: { missing_fields } " )
611
+ error_messages = []
612
+ if len (missing_fields ) > 0 :
613
+ error_messages .append (f"The following fields are declared as observables, but do not exist in the search: { missing_fields } " )
622
614
623
-
624
- missing_fields = set ([field for field in message_fields if field not in self .search .lower ()])
625
- if len (missing_fields ) > 0 :
626
- error_messages .append (f"The following fields are used as fields in the message, but do not exist in the search: { missing_fields } " )
627
-
628
- if len (error_messages ) > 0 and self .status == DetectionStatus .production .value :
629
- msg = "Use of fields in observables/messages that do not appear in search:\n \t - " + "\n \t - " .join (error_messages )
630
- raise (ValueError (msg ))
615
+
616
+ missing_fields = set ([field for field in message_fields if field not in self .search .lower ()])
617
+ if len (missing_fields ) > 0 :
618
+ error_messages .append (f"The following fields are used as fields in the message, but do not exist in the search: { missing_fields } " )
619
+
620
+ if len (error_messages ) > 0 and self .status == DetectionStatus .production .value :
621
+ msg = "Use of fields in observables/messages that do not appear in search:\n \t - " + "\n \t - " .join (error_messages )
622
+ raise (ValueError (msg ))
631
623
632
624
# Found everything
633
625
return self
0 commit comments