Skip to content

Commit 8b6aaf5

Browse files
authored
Add support for CVSSv4 & SSVC and import the data using vulnrichment (#1484)
Add vulnrichment importer. Add support to CVSSv4 Add vulnrichment importer. Add Support for SSVC. Add SSVC calculator. Ignore affected packages Add support for ref cpes & fix the tests Fix ssvc calculator bug Signed-off-by: ziadhany <[email protected]>
1 parent 457f8f8 commit 8b6aaf5

13 files changed

+1771
-38
lines changed

vulnerabilities/importers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from vulnerabilities.importers import suse_scores
3939
from vulnerabilities.importers import ubuntu
4040
from vulnerabilities.importers import ubuntu_usn
41+
from vulnerabilities.importers import vulnrichment
4142
from vulnerabilities.importers import xen
4243

4344
IMPORTERS_REGISTRY = [
@@ -73,6 +74,7 @@
7374
ruby.RubyImporter,
7475
github_osv.GithubOSVImporter,
7576
epss.EPSSImporter,
77+
vulnrichment.VulnrichImporter,
7678
]
7779

7880
IMPORTERS_REGISTRY = {x.qualified_name: x for x in IMPORTERS_REGISTRY}

vulnerabilities/importers/nvd.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,16 @@ def severities(self):
163163
"""
164164
severities = []
165165
impact = self.cve_item.get("impact") or {}
166+
base_metric_v4 = impact.get("baseMetricV4") or {}
167+
if base_metric_v4:
168+
cvss_v4 = base_metric_v4.get("cvssV4") or {}
169+
vs = VulnerabilitySeverity(
170+
system=severity_systems.CVSSV4,
171+
value=str(cvss_v4.get("baseScore") or ""),
172+
scoring_elements=str(cvss_v4.get("vectorString") or ""),
173+
)
174+
severities.append(vs)
175+
166176
base_metric_v3 = impact.get("baseMetricV3") or {}
167177
if base_metric_v3:
168178
cvss_v3 = get_item(base_metric_v3, "cvssV3")
Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
import json
2+
import logging
3+
import re
4+
from pathlib import Path
5+
from typing import Iterable
6+
7+
import dateparser
8+
9+
from vulnerabilities.importer import AdvisoryData
10+
from vulnerabilities.importer import Importer
11+
from vulnerabilities.importer import Reference
12+
from vulnerabilities.importer import VulnerabilitySeverity
13+
from vulnerabilities.models import VulnerabilityReference
14+
from vulnerabilities.severity_systems import SCORING_SYSTEMS
15+
from vulnerabilities.utils import get_advisory_url
16+
from vulnerabilities.utils import get_cwe_id
17+
from vulnerabilities.utils import get_reference_id
18+
19+
logger = logging.getLogger(__name__)
20+
21+
22+
class VulnrichImporter(Importer):
23+
spdx_license_expression = "CC0-1.0"
24+
license_url = "https://github.com/cisagov/vulnrichment/blob/develop/LICENSE"
25+
repo_url = "git+https://github.com/cisagov/vulnrichment.git"
26+
importer_name = "Vulnrichment"
27+
28+
def advisory_data(self) -> Iterable[AdvisoryData]:
29+
try:
30+
vcs_response = self.clone(repo_url=self.repo_url)
31+
base_path = Path(vcs_response.dest_dir)
32+
for file_path in base_path.glob(f"**/**/*.json"):
33+
if not file_path.name.startswith("CVE-"):
34+
continue
35+
36+
with open(file_path) as f:
37+
raw_data = json.load(f)
38+
39+
advisory_url = get_advisory_url(
40+
file=file_path,
41+
base_path=base_path,
42+
url="https://github.com/cisagov/vulnrichment/blob/develop/",
43+
)
44+
yield parse_cve_advisory(raw_data, advisory_url)
45+
finally:
46+
if self.vcs_response:
47+
self.vcs_response.delete()
48+
49+
50+
def parse_cve_advisory(raw_data, advisory_url):
51+
"""
52+
Parse a vulnrichment advisory file and return an `AdvisoryData` object.
53+
The files are in JSON format, and a JSON schema is documented at the following location:
54+
https://github.com/CVEProject/cve-schema/blob/main/schema/CVE_Record_Format.json
55+
"""
56+
# Extract CVE Metadata
57+
cve_metadata = raw_data.get("cveMetadata", {})
58+
cve_id = cve_metadata.get("cveId")
59+
state = cve_metadata.get("state")
60+
61+
date_published = cve_metadata.get("datePublished")
62+
if date_published:
63+
date_published = dateparser.parse(date_published)
64+
65+
# Extract containers
66+
containers = raw_data.get("containers", {})
67+
cna_data = containers.get("cna", {})
68+
adp_data = containers.get("adp", {})
69+
70+
# Extract descriptions
71+
summary = ""
72+
description_list = cna_data.get("descriptions", [])
73+
for description_dict in description_list:
74+
if not description_dict.get("lang") in ["en", "en-US"]:
75+
continue
76+
summary = description_dict.get("value")
77+
78+
# Extract metrics
79+
severities = []
80+
metrics = cna_data.get("metrics", []) + [
81+
adp_metrics for data in adp_data for adp_metrics in data.get("metrics", [])
82+
]
83+
84+
vulnrichment_scoring_system = {
85+
"cvssV4_0": SCORING_SYSTEMS["cvssv4"],
86+
"cvssV3_1": SCORING_SYSTEMS["cvssv3.1"],
87+
"cvssV3_0": SCORING_SYSTEMS["cvssv3"],
88+
"cvssV2_0": SCORING_SYSTEMS["cvssv2"],
89+
"other": {
90+
"ssvc": SCORING_SYSTEMS["ssvc"],
91+
}, # ignore kev
92+
}
93+
94+
for metric in metrics:
95+
for metric_type, metric_value in metric.items():
96+
if metric_type not in vulnrichment_scoring_system:
97+
continue
98+
99+
if metric_type == "other":
100+
other_types = metric_value.get("type")
101+
if other_types == "ssvc":
102+
content = metric_value.get("content", {})
103+
vector_string, decision = ssvc_calculator(content)
104+
scoring_system = vulnrichment_scoring_system[metric_type][other_types]
105+
severity = VulnerabilitySeverity(
106+
system=scoring_system, value=decision, scoring_elements=vector_string
107+
)
108+
severities.append(severity)
109+
# ignore kev
110+
else:
111+
vector_string = metric_value.get("vectorString")
112+
base_score = metric_value.get("baseScore")
113+
scoring_system = vulnrichment_scoring_system[metric_type]
114+
severity = VulnerabilitySeverity(
115+
system=scoring_system, value=base_score, scoring_elements=vector_string
116+
)
117+
severities.append(severity)
118+
119+
# Extract references cpes and ignore affected products
120+
cpes = set()
121+
for affected_product in cna_data.get("affected", []):
122+
if type(affected_product) != dict:
123+
continue
124+
cpes.update(affected_product.get("cpes") or [])
125+
126+
references = []
127+
for ref in cna_data.get("references", []):
128+
# https://github.com/CVEProject/cve-schema/blob/main/schema/tags/reference-tags.json
129+
# We removed all unwanted reference types and set the default reference type to 'OTHER'.
130+
ref_type = VulnerabilityReference.OTHER
131+
vul_ref_types = {
132+
"exploit": VulnerabilityReference.EXPLOIT,
133+
"issue-tracking": VulnerabilityReference.BUG,
134+
"mailing-list": VulnerabilityReference.MAILING_LIST,
135+
"third-party-advisory": VulnerabilityReference.ADVISORY,
136+
"vendor-advisory": VulnerabilityReference.ADVISORY,
137+
"vdb-entry": VulnerabilityReference.ADVISORY,
138+
}
139+
140+
for tag_type in ref.get("tags", []):
141+
if tag_type in vul_ref_types:
142+
ref_type = vul_ref_types.get(tag_type)
143+
144+
url = ref.get("url")
145+
reference = Reference(
146+
reference_id=get_reference_id(url),
147+
url=url,
148+
reference_type=ref_type,
149+
severities=severities,
150+
)
151+
152+
references.append(reference)
153+
154+
cpes_ref = [
155+
Reference(
156+
reference_id=cpe,
157+
reference_type=VulnerabilityReference.OTHER,
158+
url=f"https://nvd.nist.gov/vuln/search/results?adv_search=true&isCpeNameSearch=true&query={cpe}",
159+
)
160+
for cpe in sorted(list(cpes))
161+
]
162+
references.extend(cpes_ref)
163+
164+
weaknesses = set()
165+
for problem_type in cna_data.get("problemTypes", []):
166+
descriptions = problem_type.get("descriptions", [])
167+
for description in descriptions:
168+
cwe_id = description.get("cweId")
169+
if cwe_id:
170+
weaknesses.add(get_cwe_id(cwe_id))
171+
172+
description_text = description.get("description")
173+
if description_text:
174+
pattern = r"CWE-(\d+)"
175+
match = re.search(pattern, description_text)
176+
if match:
177+
weaknesses.add(int(match.group(1)))
178+
179+
return AdvisoryData(
180+
aliases=[cve_id],
181+
summary=summary,
182+
references=references,
183+
date_published=date_published,
184+
weaknesses=list(weaknesses),
185+
url=advisory_url,
186+
)
187+
188+
189+
def ssvc_calculator(ssvc_data):
190+
"""
191+
Return the ssvc vector and the decision value
192+
"""
193+
options = ssvc_data.get("options", [])
194+
timestamp = ssvc_data.get("timestamp")
195+
196+
# Extract the options into a dictionary
197+
options_dict = {k: v.lower() for option in options for k, v in option.items()}
198+
199+
# We copied the table value from this link.
200+
# https://www.cisa.gov/sites/default/files/publications/cisa-ssvc-guide%20508c.pdf
201+
202+
# Determining Mission and Well-Being Impact Value
203+
mission_well_being_table = {
204+
# (Mission Prevalence, Public Well-being Impact) : "Mission & Well-being"
205+
("minimal", "minimal"): "low",
206+
("minimal", "material"): "medium",
207+
("minimal", "irreversible"): "high",
208+
("support", "minimal"): "medium",
209+
("support", "material"): "medium",
210+
("support", "irreversible"): "high",
211+
("essential", "minimal"): "high",
212+
("essential", "material"): "high",
213+
("essential", "irreversible"): "high",
214+
}
215+
216+
if "Mission Prevalence" not in options_dict:
217+
options_dict["Mission Prevalence"] = "minimal"
218+
219+
if "Public Well-being Impact" not in options_dict:
220+
options_dict["Public Well-being Impact"] = "material"
221+
222+
options_dict["Mission & Well-being"] = mission_well_being_table[
223+
(options_dict["Mission Prevalence"], options_dict["Public Well-being Impact"])
224+
]
225+
226+
decision_key = (
227+
options_dict.get("Exploitation"),
228+
options_dict.get("Automatable"),
229+
options_dict.get("Technical Impact"),
230+
options_dict.get("Mission & Well-being"),
231+
)
232+
233+
decision_points = {
234+
"Exploitation": {"E": {"none": "N", "poc": "P", "active": "A"}},
235+
"Automatable": {"A": {"no": "N", "yes": "Y"}},
236+
"Technical Impact": {"T": {"partial": "P", "total": "T"}},
237+
"Public Well-being Impact": {"B": {"minimal": "M", "material": "A", "irreversible": "I"}},
238+
"Mission Prevalence": {"P": {"minimal": "M", "support": "S", "essential": "E"}},
239+
"Mission & Well-being": {"M": {"low": "L", "medium": "M", "high": "H"}},
240+
}
241+
242+
# Create the SSVC vector
243+
ssvc_vector = "SSVCv2/"
244+
for key, value_map in options_dict.items():
245+
options_key = decision_points.get(key)
246+
for lhs, rhs_map in options_key.items():
247+
ssvc_vector += f"{lhs}:{rhs_map.get(value_map)}/"
248+
249+
# "Decision": {"D": {"Track": "T", "Track*": "R", "Attend": "A", "Act": "C"}},
250+
decision_values = {"Track": "T", "Track*": "R", "Attend": "A", "Act": "C"}
251+
252+
decision_lookup = {
253+
("none", "no", "partial", "low"): "Track",
254+
("none", "no", "partial", "medium"): "Track",
255+
("none", "no", "partial", "high"): "Track",
256+
("none", "no", "total", "low"): "Track",
257+
("none", "no", "total", "medium"): "Track",
258+
("none", "no", "total", "high"): "Track*",
259+
("none", "yes", "partial", "low"): "Track",
260+
("none", "yes", "partial", "medium"): "Track",
261+
("none", "yes", "partial", "high"): "Attend",
262+
("none", "yes", "total", "low"): "Track",
263+
("none", "yes", "total", "medium"): "Track",
264+
("none", "yes", "total", "high"): "Attend",
265+
("poc", "no", "partial", "low"): "Track",
266+
("poc", "no", "partial", "medium"): "Track",
267+
("poc", "no", "partial", "high"): "Track*",
268+
("poc", "no", "total", "low"): "Track",
269+
("poc", "no", "total", "medium"): "Track*",
270+
("poc", "no", "total", "high"): "Attend",
271+
("poc", "yes", "partial", "low"): "Track",
272+
("poc", "yes", "partial", "medium"): "Track",
273+
("poc", "yes", "partial", "high"): "Attend",
274+
("poc", "yes", "total", "low"): "Track",
275+
("poc", "yes", "total", "medium"): "Track*",
276+
("poc", "yes", "total", "high"): "Attend",
277+
("active", "no", "partial", "low"): "Track",
278+
("active", "no", "partial", "medium"): "Track",
279+
("active", "no", "partial", "high"): "Attend",
280+
("active", "no", "total", "low"): "Track",
281+
("active", "no", "total", "medium"): "Attend",
282+
("active", "no", "total", "high"): "Act",
283+
("active", "yes", "partial", "low"): "Attend",
284+
("active", "yes", "partial", "medium"): "Attend",
285+
("active", "yes", "partial", "high"): "Act",
286+
("active", "yes", "total", "low"): "Attend",
287+
("active", "yes", "total", "medium"): "Act",
288+
("active", "yes", "total", "high"): "Act",
289+
}
290+
291+
decision = decision_lookup.get(decision_key, "")
292+
293+
if decision:
294+
ssvc_vector += f"D:{decision_values.get(decision)}/"
295+
296+
if timestamp:
297+
timestamp_formatted = dateparser.parse(timestamp).strftime("%Y-%m-%dT%H:%M:%SZ")
298+
299+
ssvc_vector += f"{timestamp_formatted}/"
300+
return ssvc_vector, decision

0 commit comments

Comments
 (0)