|
| 1 | +# |
| 2 | +# Copyright (c) nexB Inc. and others. All rights reserved. |
| 3 | +# VulnerableCode is a trademark of nexB Inc. |
| 4 | +# SPDX-License-Identifier: Apache-2.0 |
| 5 | +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. |
| 6 | +# See https://github.com/nexB/vulnerablecode for support or download. |
| 7 | +# See https://aboutcode.org for more information about nexB OSS projects. |
| 8 | +# |
| 9 | +import json |
| 10 | +import logging |
| 11 | +from io import BytesIO |
| 12 | +from typing import Iterable |
| 13 | +from typing import List |
| 14 | +from typing import Set |
| 15 | +from zipfile import ZipFile |
| 16 | + |
| 17 | +import dateparser |
| 18 | +import requests |
| 19 | + |
| 20 | +from vulnerabilities.importer import AdvisoryData |
| 21 | +from vulnerabilities.importer import Importer |
| 22 | +from vulnerabilities.importer import Reference |
| 23 | +from vulnerabilities.utils import build_description |
| 24 | +from vulnerabilities.utils import dedupe |
| 25 | + |
| 26 | +logger = logging.getLogger(__name__) |
| 27 | + |
| 28 | + |
| 29 | +class GSDImporter: # TODO inherit from Importer |
| 30 | + spdx_license_expression = "CC0-1.0" |
| 31 | + license_url = "https://github.com/cloudsecurityalliance/gsd-database/blob/main/LICENSE" |
| 32 | + url = "https://codeload.github.com/cloudsecurityalliance/gsd-database/zip/refs/heads/main" |
| 33 | + |
| 34 | + def advisory_data(self) -> Iterable[AdvisoryData]: |
| 35 | + response = requests.get(self.url).content |
| 36 | + with ZipFile(BytesIO(response)) as zip_file: |
| 37 | + for file_name in zip_file.namelist(): |
| 38 | + if file_name == "gsd-database-main/allowlist.json" or not file_name.endswith( |
| 39 | + ".json" |
| 40 | + ): |
| 41 | + continue |
| 42 | + |
| 43 | + with zip_file.open(file_name) as f: |
| 44 | + try: |
| 45 | + raw_data = json.load(f) |
| 46 | + yield parse_advisory_data(raw_data, file_name) |
| 47 | + except Exception as e: |
| 48 | + logger.error(f"Invalid GSD advisory data file: {file_name} - {e}") |
| 49 | + |
| 50 | + |
| 51 | +def parse_advisory_data(raw_data, file_name): |
| 52 | + """ |
| 53 | + Parse a GSD advisory file and return an AdvisoryData. |
| 54 | + Each advisory file contains the advisory information in JSON format. |
| 55 | + """ |
| 56 | + |
| 57 | + namespaces = raw_data.get("namespaces") or {} |
| 58 | + cve_org = namespaces.get("cve.org") or {} |
| 59 | + nvd_nist_gov = namespaces.get("nvd.nist.gov") or {} |
| 60 | + |
| 61 | + gsd = raw_data.get("GSD") or {} |
| 62 | + gsd_id = gsd.get("id") or file_name |
| 63 | + gsd_alias = gsd.get("alias") or [] |
| 64 | + gsd_description = gsd.get("description") or "" |
| 65 | + |
| 66 | + gsd_reference_data = gsd.get("") or [] |
| 67 | + gsd_references = [Reference(url=ref) for ref in gsd_reference_data] |
| 68 | + |
| 69 | + details = gsd_description or "".join(get_description(cve_org)) |
| 70 | + |
| 71 | + aliases_cve_org = get_aliases(cve_org) |
| 72 | + aliases_nvd_nist_gov = get_aliases(nvd_nist_gov) |
| 73 | + |
| 74 | + aliases = [gsd_alias, gsd_id] + aliases_cve_org + aliases_nvd_nist_gov |
| 75 | + aliases = [alias for alias in aliases if alias is not None] |
| 76 | + |
| 77 | + summary = build_description(summary=get_summary(cve_org), description=details) |
| 78 | + |
| 79 | + severities = get_severities(cve_org) |
| 80 | + configurations = nvd_nist_gov.get("configurations") or {} |
| 81 | + nodes = configurations.get("nodes") or [] |
| 82 | + cpes = get_cpe(nodes) |
| 83 | + |
| 84 | + references = get_references(cve_org) + gsd_references |
| 85 | + |
| 86 | + date_published = get_published_date_nvd_nist_gov(nvd_nist_gov) |
| 87 | + |
| 88 | + return AdvisoryData( |
| 89 | + aliases=dedupe(aliases), |
| 90 | + summary=summary, |
| 91 | + references=references, |
| 92 | + date_published=date_published, |
| 93 | + ) |
| 94 | + |
| 95 | + |
| 96 | +def get_summary(cve) -> str: |
| 97 | + """ |
| 98 | + Returns a title of CVE_data_meta |
| 99 | + >> get_summary {"CVE_data_meta": {"TITLE": "DoS vulnerability: Invalid Accent Colors"} |
| 100 | + 'DoS vulnerability: Invalid Accent Colors' |
| 101 | + """ |
| 102 | + cve_data_meta = cve.get("CVE_data_meta") or {} |
| 103 | + return cve_data_meta.get("TITLE") or "" |
| 104 | + |
| 105 | + |
| 106 | +def get_severities(cve) -> List: |
| 107 | + """ |
| 108 | + Return a list of CVSS vectorString |
| 109 | + >>> get_severities({"impact": {"cvss": {"vectorString": "CVSS:3.1/AV:N/AC:L/PR:L/UI:R/S:U/C:N/I:N/A:H"}}}) |
| 110 | + ['CVSS:3.1/AV:N/AC:L/PR:L/UI:R/S:U/C:N/I:N/A:H'] |
| 111 | + """ |
| 112 | + severities = [] |
| 113 | + impact = cve.get("impact") or {} |
| 114 | + |
| 115 | + base_metric_2 = impact.get("baseMetricV2") or {} |
| 116 | + if base_metric_2: |
| 117 | + cvss_v2 = base_metric_2.get("cvssV2") or {} |
| 118 | + cvss_vector = cvss_v2.get("vectorString") |
| 119 | + if cvss_vector: |
| 120 | + severities.append(cvss_vector) |
| 121 | + |
| 122 | + base_metric_v3 = impact.get("baseMetricV3") or {} |
| 123 | + if base_metric_v3: |
| 124 | + cvss_v3 = base_metric_v3.get("cvssV3") or {} |
| 125 | + cvss_vector = cvss_v3.get("vectorString") |
| 126 | + if cvss_vector: |
| 127 | + severities.append(cvss_vector) |
| 128 | + |
| 129 | + cvss = impact.get("cvss") or {} |
| 130 | + if isinstance(cvss, List): |
| 131 | + for cvss_v in cvss: |
| 132 | + if isinstance(cvss_v, dict): |
| 133 | + cvss_vector = cvss_v.get("vectorString") or {} |
| 134 | + if cvss_vector: |
| 135 | + severities.append(cvss_vector) |
| 136 | + else: |
| 137 | + cvss_vector = cvss.get("vectorString") |
| 138 | + if cvss_vector: |
| 139 | + severities.append(cvss_vector) |
| 140 | + return severities |
| 141 | + |
| 142 | + |
| 143 | +def get_description(cve) -> [str]: |
| 144 | + """ |
| 145 | + Get a list description value from description object |
| 146 | + >>> get_description({"description": {"description_data": [{"lang": "eng","value": "the description"}]}}) |
| 147 | + ['the description'] |
| 148 | + """ |
| 149 | + description = cve.get("description") or {} |
| 150 | + description_data = description.get("description_data") or [] |
| 151 | + return [desc["value"] for desc in description_data if desc["value"] and desc["lang"] == "eng"] |
| 152 | + |
| 153 | + |
| 154 | +def get_references(cve): |
| 155 | + """ |
| 156 | + Returns a list of Reference assigned with url |
| 157 | + >>> get_references({"references": { |
| 158 | + ... "reference_data": [{ |
| 159 | + ... "name": "https://kc.mcafee.com/corporate/index?page=content&id=SB10198", |
| 160 | + ... "refsource": "CONFIRM", |
| 161 | + ... "tags": ["Vendor Advisory"], |
| 162 | + ... "url": "https://kc.mcafee.com/corporate/index?page=content&id=SB10198"}]}}) |
| 163 | + [Reference(reference_id='', reference_type='', url='https://kc.mcafee.com/corporate/index?page=content&id=SB10198', severities=[])] |
| 164 | + """ |
| 165 | + references = cve.get("references") or {} |
| 166 | + reference_data = references.get("reference_data") or [] |
| 167 | + return [Reference(url=ref["url"]) for ref in reference_data if ref["url"]] |
| 168 | + |
| 169 | + |
| 170 | +def get_aliases(cve) -> [str]: |
| 171 | + """ |
| 172 | + Returns a list of aliases |
| 173 | + >>> get_aliases({"CVE_data_meta": {"ID": "CVE-2017-4017"},"source": {"advisory": "GHSA-v8x6-59g4-5g3w"}}) |
| 174 | + ['CVE-2017-4017', 'GHSA-v8x6-59g4-5g3w'] |
| 175 | + """ |
| 176 | + cve_data_meta = cve.get("CVE_data_meta") or {} |
| 177 | + alias = cve_data_meta.get("ID") |
| 178 | + |
| 179 | + source = cve.get("source") or {} |
| 180 | + advisory = source.get("advisory") |
| 181 | + |
| 182 | + aliases = [] |
| 183 | + if alias: |
| 184 | + aliases.append(alias) |
| 185 | + if advisory: |
| 186 | + aliases.append(advisory) |
| 187 | + return aliases |
| 188 | + |
| 189 | + |
| 190 | +def get_published_date_nvd_nist_gov(nvd_nist_gov): |
| 191 | + """ |
| 192 | + Returns a published datetime |
| 193 | + >>> get_published_date_nvd_nist_gov({"publishedDate": "2022-06-23T07:15Z"}) |
| 194 | + datetime.datetime(2022, 6, 23, 7, 15, tzinfo=<StaticTzInfo 'Z'>) |
| 195 | + """ |
| 196 | + published_date = nvd_nist_gov.get("publishedDate") |
| 197 | + return published_date and dateparser.parse(published_date) |
| 198 | + |
| 199 | + |
| 200 | +def get_cpe(nodes) -> List: |
| 201 | + """ |
| 202 | + >>> get_cpe([{"children": [], "cpe_match": [{ |
| 203 | + ... "cpe23Uri": "cpe:2.3:a:mutt:mutt:*:*:*:*:*:*:*:*", |
| 204 | + ... "cpe_name": [], |
| 205 | + ... "versionEndIncluding": "1.2.5.1", |
| 206 | + ... "vulnerable": True |
| 207 | + ... },{ |
| 208 | + ... "cpe23Uri": "cpe:2.3:a:mutt:mutt:*:*:*:*:*:*:*:*", |
| 209 | + ... "cpe_name": [], |
| 210 | + ... "versionEndIncluding": "1.3.25", |
| 211 | + ... "vulnerable": True |
| 212 | + ... }],"operator": "OR"}]) |
| 213 | + ['cpe:2.3:a:mutt:mutt:*:*:*:*:*:*:*:*', 'cpe:2.3:a:mutt:mutt:*:*:*:*:*:*:*:*'] |
| 214 | + """ |
| 215 | + cpe_list = [] |
| 216 | + for node in nodes: |
| 217 | + cpe_match = node.get("cpe_match") or [] |
| 218 | + for cpe23Uri in cpe_match: |
| 219 | + cpe_uri = cpe23Uri.get("cpe23Uri") |
| 220 | + if cpe_uri: |
| 221 | + cpe_list.append(cpe_uri) |
| 222 | + return cpe_list |
0 commit comments