Skip to content

Commit 308ba71

Browse files
committed
Enable CVE Enrichment.
However, there are issues with the API availability/stability that we must overcome.
1 parent 59d1c4c commit 308ba71

File tree

5 files changed

+99
-83
lines changed

5 files changed

+99
-83
lines changed

contentctl/actions/validate.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ def execute(self, input_dto: validate) -> DirectorOutputDto:
2222

2323
director_output_dto = DirectorOutputDto(AtomicTest.getAtomicTestsFromArtRepo(repo_path=input_dto.getAtomicRedTeamRepoPath(),
2424
enabled=input_dto.enrichments),
25-
AttackEnrichment.getAttackEnrichment(input_dto),
25+
AttackEnrichment.getAttackEnrichment(input_dto,
26+
CveEnrichment.getCveEnrichment(input_dto),
2627
[],[],[],[],[],[],[],[],[])
2728

2829

contentctl/enrichments/cve_enrichment.py

Lines changed: 68 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -4,67 +4,17 @@
44
import os
55
import shelve
66
import time
7-
from typing import Annotated
8-
from pydantic import BaseModel,Field,ConfigDict
9-
7+
from typing import Annotated, Any, Union, TYPE_CHECKING
8+
from pydantic import BaseModel,Field
109
from decimal import Decimal
11-
CVESSEARCH_API_URL = 'https://cve.circl.lu'
12-
13-
CVE_CACHE_FILENAME = "lookups/CVE_CACHE.db"
14-
15-
NON_PERSISTENT_CACHE = {}
16-
17-
18-
''''''
19-
@functools.cache
20-
def cvesearch_helper(url:str, cve_id:str, force_cached_or_offline:bool=False, max_api_attempts:int=3, retry_sleep_seconds:int=5):
21-
if max_api_attempts < 1:
22-
raise(Exception(f"The minimum number of CVESearch API attempts is 1. You have passed {max_api_attempts}"))
23-
24-
if force_cached_or_offline:
25-
if not os.path.exists(CVE_CACHE_FILENAME):
26-
print(f"Cache at {CVE_CACHE_FILENAME} not found - Creating it.")
27-
cache = shelve.open(CVE_CACHE_FILENAME, flag='c', writeback=True)
28-
else:
29-
cache = NON_PERSISTENT_CACHE
30-
if cve_id in cache:
31-
result = cache[cve_id]
32-
#print(f"hit cve_enrichment: {time.time() - start:.2f}")
33-
else:
34-
api_attempts_remaining = max_api_attempts
35-
result = None
36-
while api_attempts_remaining > 0:
37-
api_attempts_remaining -= 1
38-
try:
39-
cve = cvesearch_id_helper(url)
40-
result = cve.id(cve_id)
41-
break
42-
except Exception as e:
43-
if api_attempts_remaining > 0:
44-
print(f"The option 'force_cached_or_offline' was used, but {cve_id} not found in {CVE_CACHE_FILENAME} and unable to connect to {CVESSEARCH_API_URL}: {str(e)}")
45-
print(f"Retrying the CVESearch API up to {api_attempts_remaining} more times after a sleep of {retry_sleep_seconds} seconds...")
46-
time.sleep(retry_sleep_seconds)
47-
else:
48-
raise(Exception(f"The option 'force_cached_or_offline' was used, but {cve_id} not found in {CVE_CACHE_FILENAME} and unable to connect to {CVESSEARCH_API_URL} after {max_api_attempts} attempts: {str(e)}"))
49-
50-
if result is None:
51-
raise(Exception(f'CveEnrichment for [ {cve_id} ] failed - CVE does not exist'))
52-
cache[cve_id] = result
53-
54-
if isinstance(cache, shelve.Shelf):
55-
#close the cache if it was a shelf
56-
cache.close()
10+
from requests.exceptions import ReadTimeout
5711

58-
return result
12+
if TYPE_CHECKING:
13+
from contentctl.objects.config import validate
5914

60-
@functools.cache
61-
def cvesearch_id_helper(url:str):
62-
#The initial CVESearch call takes some time.
63-
#We cache it to avoid making this call each time we need to do a lookup
64-
cve = CVESearch(CVESSEARCH_API_URL)
65-
return cve
6615

6716

17+
CVESSEARCH_API_URL = 'https://cve.circl.lu'
6818

6919

7020
class CveEnrichmentObj(BaseModel):
@@ -74,27 +24,74 @@ class CveEnrichmentObj(BaseModel):
7424

7525

7626
@staticmethod
77-
def buildEnrichmentOnFailure(id:Annotated[str, "^CVE-[1|2][0-9]{3}-[0-9]+$"], errorMessage:str)->CveEnrichmentObj:
27+
def buildEnrichmentOnFailure(id:Annotated[str, "^CVE-[1|2][0-9]{3}-[0-9]+$"], errorMessage:str,
28+
raise_exception_on_failure:bool=True)->CveEnrichmentObj:
29+
if raise_exception_on_failure:
30+
raise Exception(errorMessage)
7831
message = f"{errorMessage}. Default CVSS of 5.0 used"
7932
print(message)
8033
return CveEnrichmentObj(id=id, cvss=Decimal(5.0), summary=message)
8134

82-
class CveEnrichment():
83-
@classmethod
84-
def enrich_cve(cls, cve_id: str, force_cached_or_offline: bool = False, treat_failures_as_warnings:bool=True) -> CveEnrichmentObj:
85-
cve_enriched = dict()
35+
36+
# We need a MUCH better way to handle issues with the cve.circl.lu API.
37+
# It is often extremely slow or down, which means that we cannot enrich CVEs.
38+
# Downloading the entire database is VERY large, but I don't know that there
39+
# is an alternative.
40+
# Being able to include CVEs that have not made it into this database, or additonal
41+
# enriching comments on pre-existing CVEs, would also be extremely useful.
42+
timeout_error = False
43+
class CveEnrichment(BaseModel):
44+
use_enrichment: bool = True
45+
cve_api_obj: Union[CVESearch,None] = None
46+
47+
48+
class Config:
49+
# Arbitrary_types are allowed to let us use the CVESearch Object
50+
arbitrary_types_allowed = True
51+
frozen = True
52+
53+
54+
@staticmethod
55+
def getCveEnrichment(config:validate, timeout_seconds:int=10)->CveEnrichment:
56+
57+
if config.enrichments:
58+
try:
59+
cve_api_obj = CVESearch(CVESSEARCH_API_URL, timeout=timeout_seconds)
60+
return CveEnrichment(use_enrichment=True, cve_api_obj=cve_api_obj)
61+
except Exception as e:
62+
raise Exception(f"Error setting CVE_SEARCH API to: {CVESSEARCH_API_URL}: {str(e)}")
63+
64+
return CveEnrichment(use_enrichment=False, cve_api_obj=None)
65+
66+
67+
@functools.cache
68+
def enrich_cve(self, cve_id:str, raise_exception_on_failure:bool=True)->Union[CveEnrichmentObj,None]:
69+
global timeout_error
70+
71+
if not self.use_enrichment:
72+
return None
73+
74+
if timeout_error:
75+
message = f"Previous timeout during enrichment - CVE {cve_id} enrichment skipped."
76+
return CveEnrichmentObj.buildEnrichmentOnFailure(id = cve_id, errorMessage=f"WARNING, {message}",
77+
raise_exception_on_failure=raise_exception_on_failure)
78+
79+
cve_enriched:dict[str,Any] = dict()
80+
8681
try:
87-
88-
result = cvesearch_helper(CVESSEARCH_API_URL, cve_id, force_cached_or_offline)
82+
result = self.cve_api_obj.id(cve_id)
8983
cve_enriched['id'] = cve_id
9084
cve_enriched['cvss'] = result['cvss']
9185
cve_enriched['summary'] = result['summary']
86+
return CveEnrichmentObj.model_validate(cve_enriched)
87+
except ReadTimeout as e:
88+
message = f"Timeout enriching CVE {cve_id}: {str(e)} after {self.cve_api_obj.timeout} seconds."\
89+
f" All other CVE Enrichment has been disabled"
90+
#Set a global value to true so future runs don't waste time on this
91+
timeout_error = True
92+
return CveEnrichmentObj.buildEnrichmentOnFailure(id = cve_id, errorMessage=f"ERROR, {message}",
93+
raise_exception_on_failure=raise_exception_on_failure)
9294
except Exception as e:
93-
message = f"issue enriching {cve_id}, with error: {str(e)}"
94-
if treat_failures_as_warnings:
95-
return CveEnrichmentObj.buildEnrichmentOnFailure(id = cve_id, errorMessage=f"WARNING, {message}")
96-
else:
97-
raise ValueError(f"ERROR, {message}")
98-
99-
return CveEnrichmentObj.model_validate(cve_enriched)
100-
95+
message = f"Error enriching CVE {cve_id}. Are you positive this CVE exists: {str(e)}"
96+
return CveEnrichmentObj.buildEnrichmentOnFailure(id = cve_id, errorMessage=f"WARNING, {message}",
97+
raise_exception_on_failure=raise_exception_on_failure)

contentctl/input/director.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class DirectorOutputDto:
3535
# is far quicker than attack_enrichment
3636
atomic_tests: Union[list[AtomicTest],None]
3737
attack_enrichment: AttackEnrichment
38+
cve_enrichment: CveEnrichment
3839
detections: list[Detection]
3940
stories: list[Story]
4041
baselines: list[Baseline]
@@ -44,7 +45,7 @@ class DirectorOutputDto:
4445
lookups: list[Lookup]
4546
deployments: list[Deployment]
4647
ssa_detections: list[SSADetection]
47-
#cve_enrichment: CveEnrichment
48+
4849

4950
name_to_content_map: dict[str, SecurityContentObject] = field(default_factory=dict)
5051
uuid_to_content_map: dict[UUID, SecurityContentObject] = field(default_factory=dict)

contentctl/objects/abstract_security_content_objects/detection_abstract.py

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
#from contentctl.objects.playbook import Playbook
2828
from contentctl.objects.enums import DataSource,ProvidingTechnology
29-
from contentctl.enrichments.cve_enrichment import CveEnrichment, CveEnrichmentObj
29+
from contentctl.enrichments.cve_enrichment import CveEnrichmentObj
3030

3131

3232
class Detection_Abstract(SecurityContentObject):
@@ -143,18 +143,35 @@ def mappings(self)->dict[str, List[str]]:
143143
macros: list[Macro] = Field([],validate_default=True)
144144
lookups: list[Lookup] = Field([],validate_default=True)
145145

146-
@computed_field
147-
@property
148-
def cve_enrichment(self)->List[CveEnrichmentObj]:
146+
cve_enrichment: list[CveEnrichmentObj] = Field([], validate_default=True)
147+
148+
@model_validator(mode="after")
149+
def cve_enrichment_func(self, info:ValidationInfo)->Detection_Abstract:
150+
if len(self.cve_enrichment) > 0:
151+
raise ValueError(f"Error, field 'cve_enrichment' should be empty and "
152+
f"dynamically populated at runtime. Instead, this field contained: {self.cve_enrichment}")
153+
154+
output_dto:Union[DirectorOutputDto,None]= info.context.get("output_dto",None)
155+
if output_dto is None:
156+
raise ValueError("Context not provided to detection model post validator")
149157

150-
raise Exception("CVE Enrichment Functionality not currently supported. It will be re-added at a later time.")
151-
enriched_cves = []
158+
if output_dto.cve_enrichment.use_enrichment is False:
159+
return self
160+
161+
enriched_cves:list[CveEnrichmentObj] = []
152162
for cve_id in self.tags.cve:
153-
print(f"\nEnriching {cve_id}\n")
154-
enriched_cves.append(CveEnrichment.enrich_cve(cve_id))
163+
try:
164+
enrichment = output_dto.cve_enrichment.enrich_cve(cve_id, raise_exception_on_failure=False)
165+
if enrichment is None:
166+
print(f"WARNING: Failed to find cve_id '{cve_id}'")
167+
else:
168+
enriched_cves.append(enrichment)
169+
except Exception as e:
170+
raise ValueError(f"{e}")
155171

156-
return enriched_cves
172+
return self
157173

174+
158175
splunk_app_enrichment: Optional[List[dict]] = None
159176

160177
@computed_field

contentctl/objects/detection_tags.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ def serialize_model(self):
145145
@model_validator(mode="after")
146146
def addAttackEnrichment(self, info:ValidationInfo):
147147
if len(self.mitre_attack_enrichments) > 0:
148-
raise ValueError(f"Error, field 'mitre_attack_enrichment' should be empty and dynamically populated at runtime. Instead, this field contained: {str(v)}")
148+
raise ValueError(f"Error, field 'mitre_attack_enrichment' should be empty and dynamically populated at runtime. Instead, this field contained: {self.mitre_attack_enrichments}")
149149

150150
output_dto:Union[DirectorOutputDto,None]= info.context.get("output_dto",None)
151151
if output_dto is None:

0 commit comments

Comments
 (0)