Skip to content

Commit f97597b

Browse files
committed
Update new_content generation to give a
repeatable value when a field has not been updated. Provide more context for enum fields as to what can be set. Finally, throw an error during YML read if an un-UPDATED field still exists in any of the YMLs.
1 parent 4bb9d41 commit f97597b

File tree

3 files changed

+31
-24
lines changed

3 files changed

+31
-24
lines changed

contentctl/actions/new_content.py

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
import pathlib
1010
from contentctl.objects.abstract_security_content_objects.security_content_object_abstract import SecurityContentObject_Abstract
1111
from contentctl.output.yml_writer import YmlWriter
12-
12+
from contentctl.objects.enums import AssetType
13+
from contentctl.objects.constants import SES_OBSERVABLE_TYPE_MAPPING, SES_OBSERVABLE_ROLE_MAPPING
1314
class NewContent:
1415
DEFAULT_DRILLDOWN_DEF = [
1516
{
@@ -25,6 +26,7 @@ class NewContent:
2526
"latest_offset": '$info_max_time$'
2627
}
2728
]
29+
UPDATE_PREFIX = "_UPDATE_"
2830

2931
def buildDetection(self) -> tuple[dict[str, Any], str]:
3032
questions = NewContentQuestions.get_questions_detection()
@@ -36,7 +38,7 @@ def buildDetection(self) -> tuple[dict[str, Any], str]:
3638
raise ValueError("User didn't answer one or more questions!")
3739

3840
data_source_field = (
39-
answers["data_source"] if len(answers["data_source"]) > 0 else ["UPDATE"]
41+
answers["data_source"] if len(answers["data_source"]) > 0 else [f"{NewContent.UPDATE_PREFIX} zero or more data_sources"]
4042
)
4143
file_name = (
4244
answers["detection_name"]
@@ -52,7 +54,7 @@ def buildDetection(self) -> tuple[dict[str, Any], str]:
5254
mitre_attack_ids = [x.strip() for x in answers["mitre_attack_ids"].split(",")]
5355
else:
5456
#string was too short, so just put a placeholder
55-
mitre_attack_ids = ["UPDATE"]
57+
mitre_attack_ids = [f"{NewContent.UPDATE_PREFIX} zero or more mitre_attack_ids"]
5658

5759
output_file_answers: dict[str, Any] = {
5860
"name": answers["detection_name"],
@@ -62,39 +64,39 @@ def buildDetection(self) -> tuple[dict[str, Any], str]:
6264
"author": answers["detection_author"],
6365
"status": "production", # start everything as production since that's what we INTEND the content to become
6466
"type": answers["detection_type"],
65-
"description": "UPDATE_DESCRIPTION",
67+
"description": f"{NewContent.UPDATE_PREFIX} by providing a description of your search",
6668
"data_source": data_source_field,
6769
"search": f"{answers['detection_search']} | `{file_name}_filter`'",
68-
"how_to_implement": "UPDATE_HOW_TO_IMPLEMENT",
69-
"known_false_positives": "UPDATE_KNOWN_FALSE_POSITIVES",
70-
"references": ["REFERENCE"],
70+
"how_to_implement": f"{NewContent.UPDATE_PREFIX} how to implement your search",
71+
"known_false_positives": f"{NewContent.UPDATE_PREFIX} known false positives for your search",
72+
"references": [f"{NewContent.UPDATE_PREFIX} zero or more http references to provide more information about your search"],
7173
"drilldown_searches": NewContent.DEFAULT_DRILLDOWN_DEF,
7274
"tags": {
73-
"analytic_story": ["UPDATE_STORY_NAME"],
74-
"asset_type": "UPDATE asset_type",
75-
"confidence": "UPDATE value between 1-100",
76-
"impact": "UPDATE value between 1-100",
77-
"message": "UPDATE message",
75+
"analytic_story": [f"{NewContent.UPDATE_PREFIX} by providing zero or more analytic stories"],
76+
"asset_type": f"{NewContent.UPDATE_PREFIX} by providing and asset type from {list(AssetType._value2member_map_)}",
77+
"confidence": f"{NewContent.UPDATE_PREFIX} by providing a value between 1-100",
78+
"impact": f"{NewContent.UPDATE_PREFIX} by providing a value between 1-100",
79+
"message": f"{NewContent.UPDATE_PREFIX} by providing a risk message. Fields in your search results can be referenced using $fieldName$",
7880
"mitre_attack_id": mitre_attack_ids,
7981
"observable": [
80-
{"name": "UPDATE", "type": "UPDATE", "role": ["UPDATE"]}
82+
{"name": f"{NewContent.UPDATE_PREFIX} the field name of the observable. This is a field that exists in your search results.", "type": f"{NewContent.UPDATE_PREFIX} the type of your observable from the list {list(SES_OBSERVABLE_TYPE_MAPPING.keys())}.", "role": [f"{NewContent.UPDATE_PREFIX} the role from the list {list(SES_OBSERVABLE_ROLE_MAPPING.keys())}"]}
8183
],
8284
"product": [
8385
"Splunk Enterprise",
8486
"Splunk Enterprise Security",
8587
"Splunk Cloud",
8688
],
8789
"security_domain": answers["security_domain"],
88-
"cve": ["UPDATE WITH CVE(S) IF APPLICABLE"],
90+
"cve": [f"{NewContent.UPDATE_PREFIX} with CVE(s) if applicable"],
8991
},
9092
"tests": [
9193
{
9294
"name": "True Positive Test",
9395
"attack_data": [
9496
{
95-
"data": "Go to https://github.com/splunk/contentctl/wiki for information about the format of this field",
96-
"sourcetype": "UPDATE SOURCETYPE",
97-
"source": "UPDATE SOURCE",
97+
"data": f"{NewContent.UPDATE_PREFIX} the data file to replay. Go to https://github.com/splunk/contentctl/wiki for information about the format of this field",
98+
"sourcetype": f"{NewContent.UPDATE_PREFIX} the sourcetype of your data file.",
99+
"source": f"{NewContent.UPDATE_PREFIX} the source of your datafile",
98100
}
99101
],
100102
}

contentctl/input/new_content_questions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def get_questions_detection(cls) -> list[dict[str,Any]]:
5757
"type": "text",
5858
"message": "enter search (spl)",
5959
"name": "detection_search",
60-
"default": "| UPDATE_SPL",
60+
"default": "| _UPDATE_ SPL",
6161
},
6262
{
6363
"type": "text",

contentctl/input/yml_reader.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
from typing import Dict, Any
2-
32
import yaml
4-
5-
63
import sys
74
import pathlib
85

96
class YmlReader():
107

118
@staticmethod
12-
def load_file(file_path: pathlib.Path, add_fields=True, STRICT_YML_CHECKING=False) -> Dict[str,Any]:
9+
def load_file(file_path: pathlib.Path, add_fields:bool=True, STRICT_YML_CHECKING:bool=False) -> Dict[str,Any]:
1310
try:
1411
file_handler = open(file_path, 'r', encoding="utf-8")
1512

@@ -27,8 +24,16 @@ def load_file(file_path: pathlib.Path, add_fields=True, STRICT_YML_CHECKING=Fals
2724
print(f"Error loading YML file {file_path}: {str(e)}")
2825
sys.exit(1)
2926
try:
30-
#yml_obj = list(yaml.safe_load_all(file_handler))[0]
31-
yml_obj = yaml.load(file_handler, Loader=yaml.CSafeLoader)
27+
#Ideally we should use
28+
# from contentctl.actions.new_content import NewContent
29+
# and use NewContent.UPDATE_PREFIX,
30+
# but there is a circular dependency right now which makes that difficult.
31+
# We have instead hardcoded UPDATE_PREFIX
32+
UPDATE_PREFIX = "_UPDATE_"
33+
data = file_handler.read()
34+
if UPDATE_PREFIX in data:
35+
raise Exception(f"The file {file_path} contains the value '{UPDATE_PREFIX}'. Please fill out any unpopulated fields as required.")
36+
yml_obj = yaml.load(data, Loader=yaml.CSafeLoader)
3237
except yaml.YAMLError as exc:
3338
print(exc)
3439
sys.exit(1)

0 commit comments

Comments
 (0)