diff --git a/cve_bin_tool/data_sources/nvd_source.py b/cve_bin_tool/data_sources/nvd_source.py index 3a5f00c03d..85365afb22 100644 --- a/cve_bin_tool/data_sources/nvd_source.py +++ b/cve_bin_tool/data_sources/nvd_source.py @@ -24,6 +24,7 @@ DISK_LOCATION_BACKUP, DISK_LOCATION_DEFAULT, NVD_FILENAME_TEMPLATE, + NVD_VERSION, ) from cve_bin_tool.error_handler import ( AttemptedToWriteOutsideCachedir, @@ -49,11 +50,13 @@ class NVD_Source(Data_Source): CACHEDIR = DISK_LOCATION_DEFAULT BACKUPCACHEDIR = DISK_LOCATION_BACKUP FEED_NVD = "https://nvd.nist.gov/vuln/data-feeds" - FEED_MIRROR = "https://v4.mirror.cveb.in/nvd/json/cve/1.1" + FEED_MIRROR = f"https://v4.mirror.cveb.in/nvd/json/cve/{NVD_VERSION}" LOGGER = LOGGER.getChild("CVEDB") NVDCVE_FILENAME_TEMPLATE = NVD_FILENAME_TEMPLATE + NVDCVE_VERSION = NVD_VERSION + NVDCVE_TOP_LIST_TAG = "CVE_Items" if NVD_VERSION == "1.1" else "vulnerabilities" META_LINK_NVD = "https://nvd.nist.gov" - META_LINK_MIRROR = "https://v4.mirror.cveb.in/nvd/json/cve/1.1" + META_LINK_MIRROR = f"https://v4.mirror.cveb.in/nvd/json/cve/{NVD_VERSION}" META_REGEX_NVD = re.compile(r"feeds\/json\/.*-[0-9]*\.[0-9]*-[0-9]*\.meta") META_REGEX_MIRROR = re.compile(r"nvdcve-[0-9]*\.[0-9]*-[0-9]*\.meta") RANGE_UNSET = "" @@ -107,9 +110,14 @@ async def get_cve_data(self): severity_data = [] affected_data = [] years = self.nvd_years() + formatter = ( + self.format_data + if self.NVDCVE_VERSION == "1.1" + else self.format_data_api2 + ) for year in years: - severity, affected = self.format_data( - self.load_nvd_year(year)["CVE_Items"] + severity, affected = formatter( + self.load_nvd_year(year)[self.NVDCVE_TOP_LIST_TAG] ) severity_data.extend(severity) affected_data.extend(affected) @@ -239,6 +247,10 @@ def format_data_api2(self, all_cve_entries): cve_item = cve_element["cve"] + if cve_item["vulnStatus"] == "Rejected": + # Skip this CVE if it's marked as 'REJECT' + continue + cve = { "ID": cve_item["id"], "description": cve_item["descriptions"][0]["value"], @@ -252,9 +264,6 @@ def format_data_api2(self, all_cve_entries): else cve_item["published"] ), } - if cve["description"].startswith("** REJECT **"): - # Skip this CVE if it's marked as 'REJECT' - continue # Multiple ways of including CVSS metrics. # Newer data uses "impact" -- we may wish to delete the old below @@ -612,7 +621,7 @@ def load_nvd_year(self, year: int) -> dict[str, str | object]: with gzip.open(filename, "rb") as fileobj: cves_for_year = json.load(fileobj) self.LOGGER.debug( - f'Year {year} has {len(cves_for_year["CVE_Items"])} CVEs in dataset' + f"Year {year} has {len(cves_for_year[self.NVDCVE_TOP_LIST_TAG])} CVEs in dataset" ) return cves_for_year @@ -620,9 +629,10 @@ def nvd_years(self) -> list[int]: """ Return the years we have NVD data for. """ + any_year_file = self.NVDCVE_FILENAME_TEMPLATE.format("*") return sorted( int(filename.split(".")[-3].split("-")[-1]) - for filename in glob.glob(str(Path(self.cachedir) / "nvdcve-1.1-*.json.gz")) + for filename in glob.glob(str(Path(self.cachedir) / any_year_file)) ) # FIXME: temporary workaround so we don't try to load bad year data # return list(range(2020, 2025)) diff --git a/cve_bin_tool/database_defaults.py b/cve_bin_tool/database_defaults.py index 32ab683270..328307db44 100644 --- a/cve_bin_tool/database_defaults.py +++ b/cve_bin_tool/database_defaults.py @@ -9,4 +9,5 @@ DISK_LOCATION_BACKUP = CACHE_DIR / "cve-bin-tool-backup" OLD_CACHE_DIR = Path.home() / ".cache" / "cvedb" DBNAME = "cve.db" -NVD_FILENAME_TEMPLATE = "nvdcve-1.1-{}.json.gz" +NVD_VERSION = "2.0" +NVD_FILENAME_TEMPLATE = "nvdcve-" + NVD_VERSION + "-{}.json.gz" diff --git a/test/test_database_defaults.py b/test/test_database_defaults.py index 1abf3bc2d4..77fd91f54b 100644 --- a/test/test_database_defaults.py +++ b/test/test_database_defaults.py @@ -37,7 +37,8 @@ def test_cache_paths_with_xdg(patch_env, tmp_path): assert db_defaults.DISK_LOCATION_BACKUP == tmp_path / "cve-bin-tool-backup" assert db_defaults.OLD_CACHE_DIR == Path.home() / ".cache" / "cvedb" assert db_defaults.DBNAME == "cve.db" - assert db_defaults.NVD_FILENAME_TEMPLATE.format("2024") == "nvdcve-1.1-2024.json.gz" + assert db_defaults.NVD_VERSION == "2.0" + assert db_defaults.NVD_FILENAME_TEMPLATE.format("2024") == "nvdcve-2.0-2024.json.gz" def test_cache_paths_fallback_to_home(patch_env): @@ -53,4 +54,5 @@ def test_cache_paths_fallback_to_home(patch_env): assert db_defaults.DISK_LOCATION_BACKUP == expected_cache / "cve-bin-tool-backup" assert db_defaults.OLD_CACHE_DIR == Path.home() / ".cache" / "cvedb" assert db_defaults.DBNAME == "cve.db" - assert db_defaults.NVD_FILENAME_TEMPLATE.format("2023") == "nvdcve-1.1-2023.json.gz" + assert db_defaults.NVD_VERSION == "2.0" + assert db_defaults.NVD_FILENAME_TEMPLATE.format("2023") == "nvdcve-2.0-2023.json.gz" diff --git a/test/test_json.py b/test/test_json.py index 8f5f48c1fb..568c33c621 100644 --- a/test/test_json.py +++ b/test/test_json.py @@ -20,8 +20,8 @@ from cve_bin_tool.log import LOGGER from cve_bin_tool.util import make_http_requests -NVD_SCHEMA = "https://scap.nist.gov/schema/nvd/feed/1.1/nvd_cve_feed_json_1.1.schema" -# NVD feeds from "https://nvd.nist.gov/vuln/data-feeds#JSON_FEED" but stored locally +NVD_SCHEMA = "https://csrc.nist.gov/schema/nvd/api/2.0/cve_api_json_2.0.schema" +# NVD feeds from "https://nvd.nist.gov/vuln/data-feeds#divJson20Feeds" but stored locally @pytest.mark.skipif( @@ -43,11 +43,11 @@ def test_json_validation(self, year): """Validate latest nvd json file against their published schema""" # Open the latest nvd file on disk with gzip.open( - Path(DISK_LOCATION_DEFAULT) / f"nvdcve-1.1-{year}.json.gz", + Path(DISK_LOCATION_DEFAULT) / f"nvdcve-2.0-{year}.json.gz", "rb", ) as json_file: nvd_json = json.loads(json_file.read()) - LOGGER.info(f"Loaded json for year {year}: nvdcve-1.1-{year}.json.gz") + LOGGER.info(f"Loaded json for year {year}: nvdcve-2.0-{year}.json.gz") # Validate -- will raise a ValidationError if not valid try: diff --git a/test/test_source_nvd.py b/test/test_source_nvd.py index a3d99b5677..c9483828c3 100644 --- a/test/test_source_nvd.py +++ b/test/test_source_nvd.py @@ -32,7 +32,7 @@ async def test_00_getmeta(self): ) as session: _jsonurl, meta = await self.nvd.getmeta( session, - "https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-modified.meta", + "https://nvd.nist.gov/feeds/json/cve/2.0/nvdcve-2.0-modified.meta", ) assert "sha256" in meta @@ -43,7 +43,7 @@ async def test_00_getmeta(self): async def test_01_cache_update(self): async with aiohttp.ClientSession(trust_env=True) as session: jsonurl, meta = await self.nvd.getmeta( - session, "https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-2015.meta" + session, "https://nvd.nist.gov/feeds/json/cve/2.0/nvdcve-2.0-2015.meta" ) assert "sha256" in meta await self.nvd.cache_update(session, jsonurl, meta["sha256"])