|
| 1 | +# Copyright (c) nexB Inc. and others. All rights reserved. |
| 2 | +# http://nexb.com and https://github.com/nexB/vulnerablecode/ |
| 3 | +# The VulnerableCode software is licensed under the Apache License version 2.0. |
| 4 | +# Data generated with VulnerableCode require an acknowledgment. |
| 5 | +# |
| 6 | +# You may not use this software except in compliance with the License. |
| 7 | +# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0 |
| 8 | +# Unless required by applicable law or agreed to in writing, software distributed |
| 9 | +# under the License is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR |
| 10 | +# CONDITIONS OF ANY KIND, either express or implied. See the License for the |
| 11 | +# specific language governing permissions and limitations under the License. |
| 12 | +# |
| 13 | +# When you publish or redistribute any data created with VulnerableCode or any VulnerableCode |
| 14 | +# derivative work, you must accompany this data with the following acknowledgment: |
| 15 | +# |
| 16 | +# Generated with VulnerableCode and provided on an 'AS IS' BASIS, WITHOUT WARRANTIES |
| 17 | +# OR CONDITIONS OF ANY KIND, either express or implied. No content created from |
| 18 | +# VulnerableCode should be considered or used as legal advice. Consult an Attorney |
| 19 | +# for any legal advice. |
| 20 | +# VulnerableCode is a free software code scanning tool from nexB Inc. and others. |
| 21 | +# Visit https://github.com/nexB/vulnerablecode/ for support and download. |
| 22 | + |
| 23 | +import dataclasses |
| 24 | +import gzip |
| 25 | +import json |
| 26 | +from dateutil import parser as dateparser |
| 27 | +from datetime import date |
| 28 | + |
| 29 | +import requests |
| 30 | + |
| 31 | +from vulnerabilities.data_source import Advisory |
| 32 | +from vulnerabilities.data_source import DataSource |
| 33 | +from vulnerabilities.data_source import DataSourceConfiguration |
| 34 | +from vulnerabilities.data_source import Reference |
| 35 | + |
| 36 | + |
| 37 | +@dataclasses.dataclass |
| 38 | +class NVDDataSourceConfiguration(DataSourceConfiguration): |
| 39 | + etags: dict |
| 40 | + |
| 41 | + |
| 42 | +BASE_URL = "https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-{}.json.gz" |
| 43 | + |
| 44 | + |
| 45 | +class NVDDataSource(DataSource): |
| 46 | + |
| 47 | + CONFIG_CLASS = NVDDataSourceConfiguration |
| 48 | + |
| 49 | + def updated_advisories(self): |
| 50 | + current_year = date.today().year |
| 51 | + # NVD json feeds start from 2002. |
| 52 | + for year in range(2002, current_year+1): |
| 53 | + download_url = BASE_URL.format(year) |
| 54 | + # Etags are like hashes of web responses. We maintain |
| 55 | + # (url, etag) mappings in the DB. `create_etag` creates |
| 56 | + # (url, etag) pair. If a (url, etag) already exists then the code |
| 57 | + # skips processing the response further to avoid duplicate work |
| 58 | + if self.create_etag(download_url): |
| 59 | + data = self.fetch(download_url) |
| 60 | + yield self.to_advisories(data) |
| 61 | + |
| 62 | + @staticmethod |
| 63 | + def fetch(url): |
| 64 | + gz_file = requests.get(url) |
| 65 | + data = gzip.decompress(gz_file.content) |
| 66 | + return json.loads(data) |
| 67 | + |
| 68 | + def to_advisories(self, nvd_data): |
| 69 | + for cve_item in nvd_data["CVE_Items"]: |
| 70 | + if self.is_outdated(cve_item): |
| 71 | + continue |
| 72 | + |
| 73 | + if self.related_to_hardware(cve_item): |
| 74 | + continue |
| 75 | + |
| 76 | + cve_id = cve_item["cve"]["CVE_data_meta"]["ID"] |
| 77 | + ref_urls = self.extract_reference_urls(cve_item) |
| 78 | + references = [Reference(url=url) for url in ref_urls] |
| 79 | + summary = self.extract_summary(cve_item) |
| 80 | + yield Advisory( |
| 81 | + cve_id=cve_id, summary=summary, vuln_references=references, impacted_package_urls=[] |
| 82 | + ) |
| 83 | + |
| 84 | + @staticmethod |
| 85 | + def extract_summary(cve_item): |
| 86 | + # In 99% of cases len(cve_item['cve']['description']['description_data']) == 1 , so |
| 87 | + # this usually returns cve_item['cve']['description']['description_data'][0]['value'] |
| 88 | + # In the remaining 1% cases this returns the longest summary. |
| 89 | + summaries = [desc["value"] for desc in cve_item["cve"]["description"]["description_data"]] |
| 90 | + return max(summaries, key=len) |
| 91 | + |
| 92 | + def extract_reference_urls(self, cve_item): |
| 93 | + urls = set() |
| 94 | + for reference in cve_item["cve"]["references"]["reference_data"]: |
| 95 | + ref_url = reference["url"] |
| 96 | + |
| 97 | + if not ref_url: |
| 98 | + continue |
| 99 | + |
| 100 | + if ref_url.startswith("http") or ref_url.startswith("ftp"): |
| 101 | + urls.add(ref_url) |
| 102 | + |
| 103 | + return urls |
| 104 | + |
| 105 | + def is_outdated(self, cve_item): |
| 106 | + cve_last_modified_date = cve_item["lastModifiedDate"] |
| 107 | + cve_last_modified_date_obj = dateparser.parse(cve_last_modified_date) |
| 108 | + |
| 109 | + if self.config.cutoff_date: |
| 110 | + return cve_last_modified_date_obj < self.config.cutoff_date |
| 111 | + |
| 112 | + if self.config.last_run_date: |
| 113 | + return cve_last_modified_date_obj < self.config.last_run_date |
| 114 | + |
| 115 | + return False |
| 116 | + |
| 117 | + def related_to_hardware(self, cve_item): |
| 118 | + for cpe in self.extract_cpes(cve_item): |
| 119 | + cpe_comps = cpe.split(":") |
| 120 | + # CPE follow the format cpe:cpe_version:product_type:vendor:product |
| 121 | + if cpe_comps[2] == "h": |
| 122 | + return True |
| 123 | + |
| 124 | + return False |
| 125 | + |
| 126 | + @staticmethod |
| 127 | + def extract_cpes(cve_item): |
| 128 | + cpes = set() |
| 129 | + for node in cve_item["configurations"]["nodes"]: |
| 130 | + for cpe_data in node.get("cpe_match", []): |
| 131 | + cpes.add(cpe_data["cpe23Uri"]) |
| 132 | + return cpes |
| 133 | + |
| 134 | + def create_etag(self, url): |
| 135 | + etag = requests.head(url).headers.get("etag") |
| 136 | + if not etag: |
| 137 | + # Kind of inaccurate to return True since etag is |
| 138 | + # not created |
| 139 | + return True |
| 140 | + elif url in self.config.etags: |
| 141 | + if self.config.etags[url] == etag: |
| 142 | + return False |
| 143 | + self.config.etags[url] = etag |
| 144 | + return True |
0 commit comments