Skip to content

Commit 05f4db9

Browse files
authored
Merge pull request #170 from splunk/simple_api_features
Add simple API for a more
2 parents 8f16f1f + bf63cdd commit 05f4db9

File tree

9 files changed

+229
-96
lines changed

9 files changed

+229
-96
lines changed

contentctl/actions/validate.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ def execute(self, input_dto: validate) -> DirectorOutputDto:
2323
director_output_dto = DirectorOutputDto(AtomicTest.getAtomicTestsFromArtRepo(repo_path=input_dto.getAtomicRedTeamRepoPath(),
2424
enabled=input_dto.enrichments),
2525
AttackEnrichment.getAttackEnrichment(input_dto),
26+
CveEnrichment.getCveEnrichment(input_dto),
2627
[],[],[],[],[],[],[],[],[])
2728

2829

contentctl/api.py

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
from pathlib import Path
2+
from typing import Any, Union, Type
3+
from contentctl.input.yml_reader import YmlReader
4+
from contentctl.objects.config import test_common, test, test_servers
5+
from contentctl.objects.security_content_object import SecurityContentObject
6+
from contentctl.input.director import DirectorOutputDto
7+
8+
def config_from_file(path:Path=Path("contentctl.yml"), config: dict[str,Any]={},
9+
configType:Type[Union[test,test_servers]]=test)->test_common:
10+
11+
"""
12+
Fetch a configuration object that can be used for a number of different contentctl
13+
operations including validate, build, inspect, test, and test_servers. A file will
14+
be used as the basis for constructing the configuration.
15+
16+
Args:
17+
path (Path, optional): Relative or absolute path to a contentctl config file.
18+
Defaults to Path("contentctl.yml"), which is the default name and location (in the current directory)
19+
of the configuration files which are automatically generated for contentctl.
20+
config (dict[], optional): Dictionary of values to override values read from the YML
21+
path passed as the first argument. Defaults to {}, an empty dict meaning that nothing
22+
will be overwritten
23+
configType (Type[Union[test,test_servers]], optional): The Config Class to instantiate.
24+
This may be a test or test_servers object. Note that this is NOT an instance of the class. Defaults to test.
25+
Returns:
26+
test_common: Returns a complete contentctl test_common configuration. Note that this configuration
27+
will have all applicable field for validate and build as well, but can also be used for easily
28+
construction a test or test_servers object.
29+
"""
30+
31+
try:
32+
yml_dict = YmlReader.load_file(path, add_fields=False)
33+
34+
35+
except Exception as e:
36+
raise Exception(f"Failed to load contentctl configuration from file '{path}': {str(e)}")
37+
38+
# Apply settings that have been overridden from the ones in the file
39+
try:
40+
yml_dict.update(config)
41+
except Exception as e:
42+
raise Exception(f"Failed updating dictionary of values read from file '{path}'"
43+
f" with the dictionary of arguments passed: {str(e)}")
44+
45+
# The function below will throw its own descriptive exception if it fails
46+
configObject = config_from_dict(yml_dict, configType=configType)
47+
48+
return configObject
49+
50+
51+
52+
53+
def config_from_dict(config: dict[str,Any]={},
54+
configType:Type[Union[test,test_servers]]=test)->test_common:
55+
"""
56+
Fetch a configuration object that can be used for a number of different contentctl
57+
operations including validate, build, inspect, test, and test_servers. A dict will
58+
be used as the basis for constructing the configuration.
59+
60+
Args:
61+
config (dict[str,Any],Optional): If a dictionary is not explicitly passed, then
62+
an empty dict will be used to create a configuration, if possible, from default
63+
values. Note that based on default values in the contentctl/objects/config.py
64+
file, this may raise an exception. If so, please set appropriate default values
65+
in the file above or supply those values via this argument.
66+
configType (Type[Union[test,test_servers]], optional): The Config Class to instantiate.
67+
This may be a test or test_servers object. Note that this is NOT an instance of the class. Defaults to test.
68+
Returns:
69+
test_common: Returns a complete contentctl test_common configuration. Note that this configuration
70+
will have all applicable field for validate and build as well, but can also be used for easily
71+
construction a test or test_servers object.
72+
"""
73+
try:
74+
test_object = configType.model_validate(config)
75+
except Exception as e:
76+
raise Exception(f"Failed to load contentctl configuration from dict:\n{str(e)}")
77+
78+
return test_object
79+
80+
81+
def update_config(config:Union[test,test_servers], **key_value_updates:dict[str,Any])->test_common:
82+
83+
"""Update any relevant keys in a config file with the specified values.
84+
Full validation will be performed after this update and descriptive errors
85+
will be produced
86+
87+
Args:
88+
config (test_common): A previously-constructed test_common object. This can be
89+
build using the configFromDict or configFromFile functions.
90+
key_value_updates (kwargs, optional): Additional keyword/argument pairs to update
91+
arbitrary fields in the configuration.
92+
93+
Returns:
94+
test_common: A validated object which has had the relevant fields updated.
95+
Note that descriptive Exceptions will be generated if updated values are either
96+
invalid (have the wrong type, or disallowed values) or you attempt to update
97+
fields that do not exist
98+
"""
99+
# Create a copy so we don't change the underlying model
100+
config_copy = config.model_copy(deep=True)
101+
102+
# Force validation of assignment since doing so via arbitrary dict can be error prone
103+
# Also, ensure that we do not try to add fields that are not part of the model
104+
config_copy.model_config.update({'validate_assignment': True, 'extra': 'forbid'})
105+
106+
107+
108+
# Collect any errors that may occur
109+
errors:list[Exception] = []
110+
111+
# We need to do this one by one because the extra:forbid argument does not appear to
112+
# be respected at this time.
113+
for key, value in key_value_updates.items():
114+
try:
115+
setattr(config_copy,key,value)
116+
except Exception as e:
117+
errors.append(e)
118+
if len(errors) > 0:
119+
errors_string = '\n'.join([str(e) for e in errors])
120+
raise Exception(f"Error(s) updaitng configuration:\n{errors_string}")
121+
122+
return config_copy
123+
124+
125+
126+
def content_to_dict(director:DirectorOutputDto)->dict[str,list[dict[str,Any]]]:
127+
output_dict:dict[str,list[dict[str,Any]]] = {}
128+
for contentType in ['detections','stories','baselines','investigations',
129+
'playbooks','macros','lookups','deployments','ssa_detections']:
130+
131+
output_dict[contentType] = []
132+
t:list[SecurityContentObject] = getattr(director,contentType)
133+
134+
for item in t:
135+
output_dict[contentType].append(item.model_dump())
136+
return output_dict
137+

contentctl/contentctl.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ def deploy_acs_func(config:deploy_acs):
9999
raise Exception("deploy acs not yet implemented")
100100

101101
def test_common_func(config:test_common):
102+
if type(config) == test:
103+
#construct the container Infrastructure objects
104+
config.getContainerInfrastructureObjects()
105+
#otherwise, they have already been passed as servers
106+
102107
director_output_dto = build_func(config)
103108
gitServer = GitService(director=director_output_dto,config=config)
104109
detections_to_test = gitServer.getContent()
@@ -206,10 +211,6 @@ def main():
206211
updated_config = deploy_acs.model_validate(config)
207212
deploy_acs_func(updated_config)
208213
elif type(config) == test or type(config) == test_servers:
209-
if type(config) == test:
210-
#construct the container Infrastructure objects
211-
config.getContainerInfrastructureObjects()
212-
#otherwise, they have already been passed as servers
213214
test_common_func(config)
214215
else:
215216
raise Exception(f"Unknown command line type '{type(config).__name__}'")

contentctl/enrichments/cve_enrichment.py

Lines changed: 43 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -4,97 +4,62 @@
44
import os
55
import shelve
66
import time
7-
from typing import Annotated
8-
from pydantic import BaseModel,Field,ConfigDict
9-
7+
from typing import Annotated, Any, Union, TYPE_CHECKING
8+
from pydantic import BaseModel,Field, computed_field
109
from decimal import Decimal
11-
CVESSEARCH_API_URL = 'https://cve.circl.lu'
12-
13-
CVE_CACHE_FILENAME = "lookups/CVE_CACHE.db"
14-
15-
NON_PERSISTENT_CACHE = {}
16-
17-
18-
''''''
19-
@functools.cache
20-
def cvesearch_helper(url:str, cve_id:str, force_cached_or_offline:bool=False, max_api_attempts:int=3, retry_sleep_seconds:int=5):
21-
if max_api_attempts < 1:
22-
raise(Exception(f"The minimum number of CVESearch API attempts is 1. You have passed {max_api_attempts}"))
23-
24-
if force_cached_or_offline:
25-
if not os.path.exists(CVE_CACHE_FILENAME):
26-
print(f"Cache at {CVE_CACHE_FILENAME} not found - Creating it.")
27-
cache = shelve.open(CVE_CACHE_FILENAME, flag='c', writeback=True)
28-
else:
29-
cache = NON_PERSISTENT_CACHE
30-
if cve_id in cache:
31-
result = cache[cve_id]
32-
#print(f"hit cve_enrichment: {time.time() - start:.2f}")
33-
else:
34-
api_attempts_remaining = max_api_attempts
35-
result = None
36-
while api_attempts_remaining > 0:
37-
api_attempts_remaining -= 1
38-
try:
39-
cve = cvesearch_id_helper(url)
40-
result = cve.id(cve_id)
41-
break
42-
except Exception as e:
43-
if api_attempts_remaining > 0:
44-
print(f"The option 'force_cached_or_offline' was used, but {cve_id} not found in {CVE_CACHE_FILENAME} and unable to connect to {CVESSEARCH_API_URL}: {str(e)}")
45-
print(f"Retrying the CVESearch API up to {api_attempts_remaining} more times after a sleep of {retry_sleep_seconds} seconds...")
46-
time.sleep(retry_sleep_seconds)
47-
else:
48-
raise(Exception(f"The option 'force_cached_or_offline' was used, but {cve_id} not found in {CVE_CACHE_FILENAME} and unable to connect to {CVESSEARCH_API_URL} after {max_api_attempts} attempts: {str(e)}"))
49-
50-
if result is None:
51-
raise(Exception(f'CveEnrichment for [ {cve_id} ] failed - CVE does not exist'))
52-
cache[cve_id] = result
53-
54-
if isinstance(cache, shelve.Shelf):
55-
#close the cache if it was a shelf
56-
cache.close()
10+
from requests.exceptions import ReadTimeout
5711

58-
return result
12+
if TYPE_CHECKING:
13+
from contentctl.objects.config import validate
5914

60-
@functools.cache
61-
def cvesearch_id_helper(url:str):
62-
#The initial CVESearch call takes some time.
63-
#We cache it to avoid making this call each time we need to do a lookup
64-
cve = CVESearch(CVESSEARCH_API_URL)
65-
return cve
6615

6716

17+
CVESSEARCH_API_URL = 'https://cve.circl.lu'
6818

6919

7020
class CveEnrichmentObj(BaseModel):
7121
id:Annotated[str, "^CVE-[1|2][0-9]{3}-[0-9]+$"]
7222
cvss:Annotated[Decimal, Field(ge=.1, le=10, decimal_places=1)]
7323
summary:str
24+
25+
@computed_field
26+
@property
27+
def url(self)->str:
28+
BASE_NVD_URL = "https://nvd.nist.gov/vuln/detail/"
29+
return f"{BASE_NVD_URL}{self.id}"
7430

7531

76-
@staticmethod
77-
def buildEnrichmentOnFailure(id:Annotated[str, "^CVE-[1|2][0-9]{3}-[0-9]+$"], errorMessage:str)->CveEnrichmentObj:
78-
message = f"{errorMessage}. Default CVSS of 5.0 used"
79-
print(message)
80-
return CveEnrichmentObj(id=id, cvss=Decimal(5.0), summary=message)
32+
class CveEnrichment(BaseModel):
33+
use_enrichment: bool = True
34+
cve_api_obj: Union[CVESearch,None] = None
35+
8136

82-
class CveEnrichment():
83-
@classmethod
84-
def enrich_cve(cls, cve_id: str, force_cached_or_offline: bool = False, treat_failures_as_warnings:bool=True) -> CveEnrichmentObj:
85-
cve_enriched = dict()
86-
try:
87-
88-
result = cvesearch_helper(CVESSEARCH_API_URL, cve_id, force_cached_or_offline)
89-
cve_enriched['id'] = cve_id
90-
cve_enriched['cvss'] = result['cvss']
91-
cve_enriched['summary'] = result['summary']
92-
except Exception as e:
93-
message = f"issue enriching {cve_id}, with error: {str(e)}"
94-
if treat_failures_as_warnings:
95-
return CveEnrichmentObj.buildEnrichmentOnFailure(id = cve_id, errorMessage=f"WARNING, {message}")
96-
else:
97-
raise ValueError(f"ERROR, {message}")
37+
class Config:
38+
# Arbitrary_types are allowed to let us use the CVESearch Object
39+
arbitrary_types_allowed = True
40+
frozen = True
41+
42+
43+
@staticmethod
44+
def getCveEnrichment(config:validate, timeout_seconds:int=10, force_disable_enrichment:bool=True)->CveEnrichment:
45+
if force_disable_enrichment:
46+
return CveEnrichment(use_enrichment=False, cve_api_obj=None)
9847

99-
return CveEnrichmentObj.model_validate(cve_enriched)
48+
if config.enrichments:
49+
try:
50+
cve_api_obj = CVESearch(CVESSEARCH_API_URL, timeout=timeout_seconds)
51+
return CveEnrichment(use_enrichment=True, cve_api_obj=cve_api_obj)
52+
except Exception as e:
53+
raise Exception(f"Error setting CVE_SEARCH API to: {CVESSEARCH_API_URL}: {str(e)}")
10054

55+
return CveEnrichment(use_enrichment=False, cve_api_obj=None)
56+
57+
58+
def enrich_cve(self, cve_id:str, raise_exception_on_failure:bool=True)->CveEnrichmentObj:
59+
60+
if not self.use_enrichment:
61+
return CveEnrichmentObj(id=cve_id,cvss=Decimal(5.0),summary="SUMMARY NOT AVAILABLE! ONLY THE LINK WILL BE USED AT THIS TIME")
62+
else:
63+
print("WARNING - Dynamic enrichment not supported at this time.")
64+
return CveEnrichmentObj(id=cve_id,cvss=Decimal(5.0),summary="SUMMARY NOT AVAILABLE! ONLY THE LINK WILL BE USED AT THIS TIME")
65+
# Depending on needs, we may add dynamic enrichment functionality back to the tool

contentctl/input/director.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class DirectorOutputDto:
3535
# is far quicker than attack_enrichment
3636
atomic_tests: Union[list[AtomicTest],None]
3737
attack_enrichment: AttackEnrichment
38+
cve_enrichment: CveEnrichment
3839
detections: list[Detection]
3940
stories: list[Story]
4041
baselines: list[Baseline]
@@ -44,7 +45,7 @@ class DirectorOutputDto:
4445
lookups: list[Lookup]
4546
deployments: list[Deployment]
4647
ssa_detections: list[SSADetection]
47-
#cve_enrichment: CveEnrichment
48+
4849

4950
name_to_content_map: dict[str, SecurityContentObject] = field(default_factory=dict)
5051
uuid_to_content_map: dict[UUID, SecurityContentObject] = field(default_factory=dict)

contentctl/objects/abstract_security_content_objects/detection_abstract.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
#from contentctl.objects.playbook import Playbook
2828
from contentctl.objects.enums import DataSource,ProvidingTechnology
29-
from contentctl.enrichments.cve_enrichment import CveEnrichment, CveEnrichmentObj
29+
from contentctl.enrichments.cve_enrichment import CveEnrichmentObj
3030

3131

3232
class Detection_Abstract(SecurityContentObject):
@@ -40,7 +40,6 @@ class Detection_Abstract(SecurityContentObject):
4040
search: Union[str, dict[str,Any]] = Field(...)
4141
how_to_implement: str = Field(..., min_length=4)
4242
known_false_positives: str = Field(..., min_length=4)
43-
check_references: bool = False
4443
#data_source: Optional[List[DataSource]] = None
4544

4645
enabled_by_default: bool = False
@@ -144,17 +143,30 @@ def mappings(self)->dict[str, List[str]]:
144143
macros: list[Macro] = Field([],validate_default=True)
145144
lookups: list[Lookup] = Field([],validate_default=True)
146145

147-
@computed_field
148-
@property
149-
def cve_enrichment(self)->List[CveEnrichmentObj]:
150-
raise Exception("CVE Enrichment Functionality not currently supported. It will be re-added at a later time.")
151-
enriched_cves = []
152-
for cve_id in self.tags.cve:
153-
print(f"\nEnriching {cve_id}\n")
154-
enriched_cves.append(CveEnrichment.enrich_cve(cve_id))
146+
cve_enrichment: list[CveEnrichmentObj] = Field([], validate_default=True)
147+
148+
@model_validator(mode="after")
149+
def cve_enrichment_func(self, info:ValidationInfo):
150+
if len(self.cve_enrichment) > 0:
151+
raise ValueError(f"Error, field 'cve_enrichment' should be empty and "
152+
f"dynamically populated at runtime. Instead, this field contained: {self.cve_enrichment}")
155153

156-
return enriched_cves
154+
output_dto:Union[DirectorOutputDto,None]= info.context.get("output_dto",None)
155+
if output_dto is None:
156+
raise ValueError("Context not provided to detection model post validator")
157+
158+
159+
enriched_cves:list[CveEnrichmentObj] = []
160+
161+
for cve_id in self.tags.cve:
162+
try:
163+
enriched_cves.append(output_dto.cve_enrichment.enrich_cve(cve_id, raise_exception_on_failure=False))
164+
except Exception as e:
165+
raise ValueError(f"{e}")
166+
self.cve_enrichment = enriched_cves
167+
return self
157168

169+
158170
splunk_app_enrichment: Optional[List[dict]] = None
159171

160172
@computed_field

0 commit comments

Comments
 (0)