Skip to content

Commit a6ab65e

Browse files
authored
Merge pull request #97 from splunk/fix_manual_test_bug
Fix manual test bug
2 parents 1065922 + 7fc6e05 commit a6ab65e

File tree

7 files changed

+121
-55
lines changed

7 files changed

+121
-55
lines changed

contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -414,15 +414,12 @@ def test_detection(self, detection: Detection) -> None:
414414
data)
415415
:param detection: the Detection to test
416416
"""
417-
# TODO: do we want to return a failure here if no test exists for a production detection?
418-
# Log and return if no tests exist
419-
if detection.tests is None:
420-
self.pbar.write(f"No test(s) found for {detection.name}")
421-
return
422417

423418
# iterate TestGroups
424419
for test_group in detection.test_groups:
425-
# If all tests in the group have been skipped, report and continue
420+
# If all tests in the group have been skipped, report and continue.
421+
# Note that the logic for skipping tests for detections tagged manual_test exists in
422+
# the detection builder.
426423
if test_group.all_tests_skipped():
427424
self.pbar.write(
428425
self.format_pbar_string(

contentctl/actions/detection_testing/views/DetectionTestingView.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ def getSummaryObject(
8484
tested_detections = []
8585
total_pass = 0
8686
total_fail = 0
87+
total_skipped = 0
8788

8889
# Iterate the detections tested (anything in the output queue was tested)
8990
for detection in self.sync_obj.outputQueue:
@@ -93,16 +94,27 @@ def getSummaryObject(
9394
)
9495

9596
# Aggregate detection pass/fail metrics
96-
if summary["success"] is True:
97-
total_pass += 1
98-
else:
97+
if summary["success"] is False:
9998
total_fail += 1
99+
else:
100+
#Test is marked as a success, but we need to determine if there were skipped unit tests
101+
#SKIPPED tests still show a success in this field, but we want to count them differently
102+
pass_increment = 1
103+
for test in summary.get("tests"):
104+
if test.get("test_type") == "unit" and test.get("status") == "skip":
105+
total_skipped += 1
106+
#Test should not count as a pass, so do not increment the count
107+
pass_increment = 0
108+
break
109+
total_pass += pass_increment
110+
100111

101112
# Append to our list
102113
tested_detections.append(summary)
103114

104115
# Sort s.t. all failures appear first (then by name)
105-
tested_detections.sort(key=lambda x: (x["success"], x["name"]))
116+
#Second short condition is a hack to get detections with unit skipped tests to appear above pass tests
117+
tested_detections.sort(key=lambda x: (x["success"], 0 if x.get("tests",[{}])[0].get("status","status_missing")=="skip" else 1, x["name"]))
106118

107119
# Aggregate summaries for the untested detections (anything still in the input queue was untested)
108120
total_untested = len(self.sync_obj.inputQueue)
@@ -128,14 +140,15 @@ def getSummaryObject(
128140
overall_success = False
129141

130142
# Compute total detections
131-
total_detections = total_fail + total_pass + total_untested
143+
total_detections = total_fail + total_pass + total_untested + total_skipped
144+
132145

133146
# Compute the percentage of completion for testing, as well as the success rate
134147
percent_complete = Utils.getPercent(
135148
len(tested_detections), len(untested_detections), 1
136149
)
137150
success_rate = Utils.getPercent(
138-
total_pass, total_detections, 1
151+
total_pass, total_detections-total_skipped, 1
139152
)
140153

141154
# TODO (cmcginley): add stats around total test cases and unit/integration test
@@ -149,6 +162,7 @@ def getSummaryObject(
149162
"total_detections": total_detections,
150163
"total_pass": total_pass,
151164
"total_fail": total_fail,
165+
"total_skipped": total_skipped,
152166
"total_untested": total_untested,
153167
"total_experimental_or_deprecated": len(deprecated_detections+experimental_detections),
154168
"success_rate": success_rate,

contentctl/input/detection_builder.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,23 @@ def skipIntegrationTests(self) -> None:
344344
"security_content_obj must be an instance of Detection to skip integration tests, "
345345
f"not {type(self.security_content_obj)}"
346346
)
347+
348+
def skipAllTests(self, manual_test_explanation:str) -> None:
349+
"""
350+
Skip all unit and integration tests if the manual_test flag is defined in the yml
351+
"""
352+
# Sanity check for typing and in setObject wasn't called yet
353+
if self.security_content_obj is not None and isinstance(self.security_content_obj, Detection):
354+
for test in self.security_content_obj.tests:
355+
#This should skip both unit and integration tests as appropriate
356+
test.skip(f"TEST SKIPPED: Detection marked as 'manual_test' with explanation: {manual_test_explanation}")
357+
358+
else:
359+
raise ValueError(
360+
"security_content_obj must be an instance of Detection to skip unit and integration tests due "
361+
f"to the presence of the manual_test field, not {type(self.security_content_obj)}"
362+
)
363+
347364

348365
def reset(self) -> None:
349366
self.security_content_obj = None

contentctl/input/director.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,13 @@ def constructDetection(self, builder: DetectionBuilder, file_path: str) -> None:
221221
# TODO: is there a better way to handle this? The `test` portion of the config is not defined for validate
222222
if (self.input_dto.config.test is not None) and (not self.input_dto.config.test.enable_integration_testing):
223223
builder.skipIntegrationTests()
224+
225+
if builder.security_content_obj is not None and \
226+
builder.security_content_obj.tags is not None and \
227+
isinstance(builder.security_content_obj.tags.manual_test,str):
228+
# Set all tests, both Unit AND Integration, to manual_test. Note that integration test messages
229+
# will intentionally overwrite the justification in the skipIntegrationTests call above.
230+
builder.skipAllTests(builder.security_content_obj.tags.manual_test)
224231

225232

226233
def constructSSADetection(self, builder: DetectionBuilder, file_path: str) -> None:

contentctl/objects/abstract_security_content_objects/detection_abstract.py

Lines changed: 61 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from contentctl.objects.playbook import Playbook
2020
from contentctl.helper.link_validator import LinkValidator
2121
from contentctl.objects.enums import SecurityContentType
22-
22+
from contentctl.objects.test_group import TestGroup
2323

2424
class Detection_Abstract(SecurityContentObject):
2525
# contentType: SecurityContentType = SecurityContentType.detections
@@ -60,6 +60,34 @@ class Detection_Abstract(SecurityContentObject):
6060
class Config:
6161
use_enum_values = True
6262

63+
64+
# A list of groups of tests, relying on the same data
65+
test_groups: Union[list[TestGroup], None] = None
66+
67+
@validator("test_groups", always=True)
68+
def validate_test_groups(cls, value, values) -> Union[list[TestGroup], None]:
69+
"""
70+
Validates the `test_groups` field and constructs the model from the list of unit tests
71+
if no explicit construct was provided
72+
:param value: the value of the field `test_groups`
73+
:param values: a dict of the other fields in the Detection model
74+
"""
75+
# if the value was not the None default, do nothing
76+
if value is not None:
77+
return value
78+
79+
# iterate over the unit tests and create a TestGroup (and as a result, an IntegrationTest) for each
80+
test_groups: list[TestGroup] = []
81+
for unit_test in values["tests"]:
82+
test_group = TestGroup.derive_from_unit_test(unit_test, values["name"])
83+
test_groups.append(test_group)
84+
85+
# now add each integration test to the list of tests
86+
for test_group in test_groups:
87+
values["tests"].append(test_group.integration_test)
88+
return test_groups
89+
90+
6391
def get_content_dependencies(self) -> list[SecurityContentObject]:
6492
return self.playbooks + self.baselines + self.macros + self.lookups
6593

@@ -194,16 +222,39 @@ def search_obsersables_exist_validate(cls, v, values):
194222
# Found everything
195223
return v
196224

197-
# TODO (cmcginley): Fix detection_abstract.tests_validate so that it surfaces validation errors
198-
# (e.g. a lack of tests) to the final results, instead of just showing a failed detection w/
199-
# no tests (maybe have a message propagated at the detection level? do a separate coverage
200-
# check as part of validation?):
201-
@validator("tests")
225+
@validator("tests", always=True)
202226
def tests_validate(cls, v, values):
203-
if values.get("status", "") == DetectionStatus.production.value and not v:
204-
raise ValueError(
205-
"At least one test is REQUIRED for production detection: " + values["name"]
206-
)
227+
# TODO (cmcginley): Fix detection_abstract.tests_validate so that it surfaces validation errors
228+
# (e.g. a lack of tests) to the final results, instead of just showing a failed detection w/
229+
# no tests (maybe have a message propagated at the detection level? do a separate coverage
230+
# check as part of validation?):
231+
232+
233+
#Only production analytics require tests
234+
if values.get("status","") != DetectionStatus.production.value:
235+
return v
236+
237+
# All types EXCEPT Correlation MUST have test(s). Any other type, including newly defined types, requires them.
238+
# Accordingly, we do not need to do additional checks if the type is Correlation
239+
if values.get("type","") in set([AnalyticsType.Correlation.value]):
240+
return v
241+
242+
243+
# Ensure that there is at least 1 test
244+
if len(v) == 0:
245+
if values.get("tags",None) and values.get("tags").manual_test is not None:
246+
# Detections that are manual_test MAY have detections, but it is not required. If they
247+
# do not have one, then create one which will be a placeholder.
248+
# Note that this fake UnitTest (and by extension, Integration Test) will NOT be generated
249+
# if there ARE test(s) defined for a Detection.
250+
placeholder_test = UnitTest(name="PLACEHOLDER FOR DETECTION TAGGED MANUAL_TEST WITH NO TESTS SPECIFIED IN YML FILE", attack_data=[])
251+
return [placeholder_test]
252+
253+
else:
254+
raise ValueError("At least one test is REQUIRED for production detection: " + values.get("name", "NO NAME FOUND"))
255+
256+
257+
#No issues - at least one test provided for production type requiring testing
207258
return v
208259

209260
@validator("datamodel")

contentctl/objects/detection.py

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from pydantic import validator
33

44
from contentctl.objects.abstract_security_content_objects.detection_abstract import Detection_Abstract
5-
from contentctl.objects.test_group import TestGroup
5+
66

77

88
class Detection(Detection_Abstract):
@@ -16,29 +16,4 @@ class Detection(Detection_Abstract):
1616
# them or modifying their behavior may cause
1717
# undefined issues with the contentctl tooling
1818
# or output of the tooling.
19-
20-
# A list of groups of tests, relying on the same data
21-
test_groups: Union[list[TestGroup], None] = None
22-
23-
@validator("test_groups", always=True)
24-
def validate_test_groups(cls, value, values) -> Union[list[TestGroup], None]:
25-
"""
26-
Validates the `test_groups` field and constructs the model from the list of unit tests
27-
if no explicit construct was provided
28-
:param value: the value of the field `test_groups`
29-
:param values: a dict of the other fields in the Detection model
30-
"""
31-
# if the value was not the None default, do nothing
32-
if value is not None:
33-
return value
34-
35-
# iterate over the unit tests and create a TestGroup (and as a result, an IntegrationTest) for each
36-
test_groups: list[TestGroup] = []
37-
for unit_test in values["tests"]:
38-
test_group = TestGroup.derive_from_unit_test(unit_test, values["name"])
39-
test_groups.append(test_group)
40-
41-
# now add each integration test to the list of tests
42-
for test_group in test_groups:
43-
values["tests"].append(test_group.integration_test)
44-
return test_groups
19+
pass

contentctl/objects/unit_test_result.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
class UnitTestResult(BaseTestResult):
1515
missing_observables: list[str] = []
16-
16+
1717
def set_job_content(
1818
self,
1919
content: Union[Record, None],
@@ -37,18 +37,18 @@ def set_job_content(
3737
self.duration = round(duration, 2)
3838
self.exception = exception
3939
self.status = status
40-
40+
self.job_content = content
41+
4142
# Set the job content, if given
4243
if content is not None:
43-
self.job_content = content
44-
4544
if self.status == TestResultStatus.PASS:
4645
self.message = "TEST PASSED"
4746
elif self.status == TestResultStatus.FAIL:
4847
self.message = "TEST FAILED"
4948
elif self.status == TestResultStatus.ERROR:
50-
self.message == "TEST FAILED (ERROR)"
49+
self.message = "TEST ERROR"
5150
elif self.status == TestResultStatus.SKIP:
51+
#A test that was SKIPPED should not have job content since it should not have been run.
5252
self.message = "TEST SKIPPED"
5353

5454
if not config.instance_address.startswith("http://"):
@@ -61,11 +61,16 @@ def set_job_content(
6161
sid=content.get("sid", None),
6262
)
6363

64+
elif self.status == TestResultStatus.SKIP:
65+
self.message = "TEST SKIPPED"
66+
pass
67+
6468
elif content is None:
65-
self.job_content = None
6669
self.status = TestResultStatus.ERROR
6770
if self.exception is not None:
6871
self.message = f"EXCEPTION: {str(self.exception)}"
72+
else:
73+
self.message = f"ERROR with no more specific message available."
6974
self.sid_link = NO_SID
7075

7176
return self.success

0 commit comments

Comments
 (0)