diff --git a/.github/workflows/pythonapp.yml b/.github/workflows/pythonapp.yml index f47c549..068c564 100644 --- a/.github/workflows/pythonapp.yml +++ b/.github/workflows/pythonapp.yml @@ -49,9 +49,14 @@ jobs: npm install -g @cyclonedx/cdxgen cdxgen -t python -o bom.json . -p --profile research uv sync --all-extras --dev - uv run vdb --download-image + uv run vdb --cache --only-osv uv run vdb --bom bom.json + if: ${{ matrix.python-version == '3.13' && matrix.os == 'ubuntu-latest' }} - name: CLI tests run: | uv run vdb --search "pkg:maven/org.springframework/spring-core@6.0.13" uv run vdb --search "pkg:maven/org.hibernate.orm/hibernate-core@6.2.9.Final" + uv run vdb --search "pkg:nuget/Microsoft.Data.SqlClient@5.0.1" + uv run vdb --search "pkg:nuget/Microsoft.IdentityModel.JsonWebTokens@6.21.0" + uv run vdb --search "pkg:nuget/System.Drawing.Common@5.0.0" + if: ${{ matrix.python-version == '3.13' && matrix.os == 'ubuntu-latest' }} diff --git a/INTEGRATION.md b/INTEGRATION.md index 1333afc..b1cad7f 100644 --- a/INTEGRATION.md +++ b/INTEGRATION.md @@ -11,12 +11,36 @@ When used as a Python library, the only dependency is Python >= 3.10. When using The vulnerability database comprises two SQLite database files. - data.index.vdb6 - A smaller index database optimized for quick purl or cpe string searches and vers-based range comparisons. +- data.vdb6 - Full CVE source database containing normalized data in CVE 5.1 specification formation and purl prefix. -![Index schema](./docs/vdb-index-schema.png) +### cve_index schema -- data.vdb6 - Full CVE source database containing normalized data in CVE 5.1 specification formation and purl prefix. +```sql +CREATE TABLE if not exists cve_index( + cve_id TEXT NOT NULL, + type TEXT NOT NULL, + namespace TEXT, + name TEXT NOT NULL, + vers TEXT NOT NULL, + purl_prefix TEXT NOT NULL +) +``` -![Data schema](./docs/vdb-schema.png) +### cve_data schema + +```sql +CREATE TABLE if not exists cve_data( + cve_id TEXT NOT NULL, + type TEXT NOT NULL, + namespace TEXT, + name TEXT NOT NULL, + source_data BLOB NOT NULL, + override_data BLOB, + source_data_hash TEXT NOT NULL, + vers TEXT NOT NULL, + purl_prefix TEXT NOT NULL +) +``` ## Searching for CVEs @@ -70,8 +94,8 @@ Refer to the vers [documentation](https://github.com/package-url/purl-spec/blob/ Search the `cve_index` table in the index database first to retrieve any matching cve_id and purl_prefix values. Use these two column values to retrieve the full CVE source information from the `cve_data` table. An example query is shown below: ```sql -SELECT DISTINCT cve_id, type, namespace, name, source_data_hash, json(source_data), json(override_data), purl_prefix FROM cve_data - WHERE cve_id = ? AND purl_prefix = ? +SELECT DISTINCT cve_id, type, namespace, name, source_data_hash, json(source_data), json(override_data), vers, purl_prefix FROM cve_data + WHERE cve_id = ? AND vers = ? AND purl_prefix = ? GROUP BY purl_prefix ORDER BY cve_id DESC; ``` diff --git a/packages/mcp-server-vdb/src/mcp_server_vdb/server.py b/packages/mcp-server-vdb/src/mcp_server_vdb/server.py index 4cd29af..bdfc911 100644 --- a/packages/mcp-server-vdb/src/mcp_server_vdb/server.py +++ b/packages/mcp-server-vdb/src/mcp_server_vdb/server.py @@ -18,12 +18,12 @@ from pydantic import AnyUrl from mcp_server_vdb import display -if sys.platform == "win32" and os.environ.get('PYTHONIOENCODING') is None: +if sys.platform == "win32" and os.environ.get("PYTHONIOENCODING") is None: sys.stdin.reconfigure(encoding="utf-8") sys.stdout.reconfigure(encoding="utf-8") sys.stderr.reconfigure(encoding="utf-8") -logger = logging.getLogger('mcp_server_vdb') +logger = logging.getLogger("mcp_server_vdb") # Age in days before the database needs refreshing. 2 days VDB_AGE_DAYS = os.getenv("VDB_AGE_DAYS", "2") @@ -80,14 +80,15 @@ def print_results(results): async def run(): - @asynccontextmanager async def server_lifespan(server: Server) -> AsyncIterator[dict]: """Manage server startup and shutdown lifecycle.""" global db_conn, index_conn try: if db_lib.needs_update(days=VDB_AGE_DAYS, default_status=True): - logger.info("Vulnerability database needs to be updated! Please wait ...") + logger.info( + "Vulnerability database needs to be updated! Please wait ..." + ) db_url = config.VDB_APP_ONLY_DATABASE_URL if ORAS_AVAILABLE: logger.info( @@ -97,7 +98,9 @@ async def server_lifespan(server: Server) -> AsyncIterator[dict]: ) download_image(db_url, config.DATA_DIR) else: - logger.debug("Vulnerability database will be loaded from %s", config.DATA_DIR) + logger.debug( + "Vulnerability database will be loaded from %s", config.DATA_DIR + ) db_conn, index_conn = db_lib.get() yield {"db_conn": db_conn, "index_conn": index_conn} finally: @@ -106,7 +109,12 @@ async def server_lifespan(server: Server) -> AsyncIterator[dict]: if index_conn: index_conn.close() - server = Server("appthreat-vulnerability-db", version="1.0.0", instructions=SERVER_INSTRUCTIONS, lifespan=server_lifespan) + server = Server( + "appthreat-vulnerability-db", + version="1.0.0", + instructions=SERVER_INSTRUCTIONS, + lifespan=server_lifespan, + ) @server.list_resources() async def handle_list_resources() -> list[mtypes.Resource]: @@ -138,22 +146,18 @@ async def handle_list_prompts() -> list[mtypes.Prompt]: description="Search for vulnerabilities for the given purl, cpe, CVE id etc", arguments=[ mtypes.PromptArgument( - name="search", - description="Search string", - required=True + name="search", description="Search string", required=True ) - ] + ], ), mtypes.Prompt( name="cve-info", description="Get detailed information about a CVE", arguments=[ mtypes.PromptArgument( - name="cve_id", - description="CVE id", - required=True + name="cve_id", description="CVE id", required=True ) - ] + ], ), mtypes.Prompt( name="list-malware", @@ -162,14 +166,16 @@ async def handle_list_prompts() -> list[mtypes.Prompt]: mtypes.PromptArgument( name="count", description="No of results to return", - required=True + required=True, ) - ] - ) + ], + ), ] @server.get_prompt() - async def handle_get_prompt(name: str, arguments: dict[str, str] | None) -> mtypes.GetPromptResult: + async def handle_get_prompt( + name: str, arguments: dict[str, str] | None + ) -> mtypes.GetPromptResult: if name == "search-vulnerabilities": return mtypes.GetPromptResult( description="Search vulnerabilities", @@ -178,24 +184,24 @@ async def handle_get_prompt(name: str, arguments: dict[str, str] | None) -> mtyp role="user", content=mtypes.TextContent( type="text", - text=f"Show me the vulnerabilities for the package {arguments['search']}" - ) + text=f"Show me the vulnerabilities for the package {arguments['search']}", + ), ), mtypes.PromptMessage( role="user", content=mtypes.TextContent( type="text", - text=f"Is the package {arguments['search']} safe to use?" - ) + text=f"Is the package {arguments['search']} safe to use?", + ), ), mtypes.PromptMessage( role="user", content=mtypes.TextContent( type="text", - text=f"Check {arguments['search']} for known advisories and risks" - ) - ) - ] + text=f"Check {arguments['search']} for known advisories and risks", + ), + ), + ], ) if name == "cve-info": @@ -206,24 +212,24 @@ async def handle_get_prompt(name: str, arguments: dict[str, str] | None) -> mtyp role="user", content=mtypes.TextContent( type="text", - text=f"Information about the CVE {arguments['cve_id']}" - ) + text=f"Information about the CVE {arguments['cve_id']}", + ), ), mtypes.PromptMessage( role="user", content=mtypes.TextContent( type="text", - text=f"What packages are affected by the CVE {arguments['cve_id']}?" - ) + text=f"What packages are affected by the CVE {arguments['cve_id']}?", + ), ), mtypes.PromptMessage( role="user", content=mtypes.TextContent( type="text", - text=f"Are there any workarounds for the CVE {arguments['cve_id']}?" - ) - ) - ] + text=f"Are there any workarounds for the CVE {arguments['cve_id']}?", + ), + ), + ], ) if name == "list-malware": @@ -234,24 +240,20 @@ async def handle_get_prompt(name: str, arguments: dict[str, str] | None) -> mtyp role="user", content=mtypes.TextContent( type="text", - text="What are the latest malwares on the internet?" - ) + text="What are the latest malwares on the internet?", + ), ), mtypes.PromptMessage( role="user", content=mtypes.TextContent( - type="text", - text="Is this package a malware?" - ) + type="text", text="Is this package a malware?" + ), ), mtypes.PromptMessage( role="user", - content=mtypes.TextContent( - type="text", - text="Recent malwares" - ) - ) - ] + content=mtypes.TextContent(type="text", text="Recent malwares"), + ), + ], ) raise ValueError(f"Unknown prompt: {name}") @@ -265,10 +267,13 @@ async def handle_list_tools() -> list[mtypes.Tool]: inputSchema={ "type": "object", "properties": { - "purl": {"type": "string", "description": "PURL string to search. Must start with pkg:"}, + "purl": { + "type": "string", + "description": "PURL string to search. Must start with pkg:", + }, }, - "required": ["purl"] - } + "required": ["purl"], + }, ), mtypes.Tool( name="search_by_any", @@ -278,8 +283,8 @@ async def handle_list_tools() -> list[mtypes.Tool]: "properties": { "search": {"type": "string", "description": "String to search"}, }, - "required": ["search"] - } + "required": ["search"], + }, ), mtypes.Tool( name="search_by_cpe_like", @@ -287,10 +292,13 @@ async def handle_list_tools() -> list[mtypes.Tool]: inputSchema={ "type": "object", "properties": { - "cpe": {"type": "string", "description": "CPE string to search"}, + "cpe": { + "type": "string", + "description": "CPE string to search", + }, }, - "required": ["cpe"] - } + "required": ["cpe"], + }, ), mtypes.Tool( name="search_by_cve", @@ -298,10 +306,13 @@ async def handle_list_tools() -> list[mtypes.Tool]: inputSchema={ "type": "object", "properties": { - "cve_id": {"type": "string", "description": "CVE or GHSA id to search"}, + "cve_id": { + "type": "string", + "description": "CVE or GHSA id to search", + }, }, - "required": ["cve_id"] - } + "required": ["cve_id"], + }, ), mtypes.Tool( name="search_by_url", @@ -311,8 +322,8 @@ async def handle_list_tools() -> list[mtypes.Tool]: "properties": { "url": {"type": "string", "description": "URL to search"}, }, - "required": ["url"] - } + "required": ["url"], + }, ), mtypes.Tool( name="latest_malware", @@ -320,33 +331,49 @@ async def handle_list_tools() -> list[mtypes.Tool]: inputSchema={ "type": "object", "properties": { - "count": {"type": "number", "description": "Count of malwares to return"}, - } - } + "count": { + "type": "number", + "description": "Count of malwares to return", + }, + }, + }, ), ] @server.call_tool() - async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[mtypes.TextContent]: + async def handle_call_tool( + name: str, arguments: dict[str, Any] | None + ) -> list[mtypes.TextContent]: """Handle tool execution requests""" try: if name not in ( - "search_by_purl_like", "search_by_any", "search_by_cpe_like", "search_by_cve", "search_by_url", - "latest_malware"): + "search_by_purl_like", + "search_by_any", + "search_by_cpe_like", + "search_by_cve", + "search_by_url", + "latest_malware", + ): raise ValueError(f"Unknown tool: {name}") raw_results = None if name == "search_by_purl_like": - raw_results = search.search_by_purl_like(arguments["purl"], with_data=True) + raw_results = search.search_by_purl_like( + arguments["purl"], with_data=True + ) elif name == "search_by_any": raw_results = search.search_by_any(arguments["search"], with_data=True) elif name == "search_by_cpe_like": - raw_results = search.search_by_cpe_like(arguments["cpe"], with_data=True) + raw_results = search.search_by_cpe_like( + arguments["cpe"], with_data=True + ) elif name == "search_by_cve": raw_results = search.search_by_cve(arguments["cve_id"], with_data=True) elif name == "search_by_url": raw_results = search.search_by_url(arguments["url"], with_data=True) elif name == "latest_malware": - raw_results = search.latest_malware(with_limit=arguments["count"], with_data=True) + raw_results = search.latest_malware( + with_limit=arguments["count"], with_data=True + ) results = print_results(raw_results) return [mtypes.TextContent(type="text", text=str(results))] except Exception as e: @@ -358,7 +385,7 @@ async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[ write_stream, InitializationOptions( server_name="appthreat-vulnerability-db", - server_version="6.3.1", + server_version="6.4.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, diff --git a/pyproject.toml b/pyproject.toml index 45883ca..8635816 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "appthreat-vulnerability-db" -version = "6.3.1" +version = "6.4.0" description = "AppThreat's vulnerability database and package search library with a built-in sqlite based storage. OSV, CVE, GitHub, npm are the primary sources of vulnerabilities." authors = [ {name = "Team AppThreat", email = "cloud@appthreat.com"}, diff --git a/test/test_utils.py b/test/test_utils.py index b9f4213..02483c9 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -924,6 +924,22 @@ def test_purl_vers_convert(): ], "vers:maven/6.0.15", ), + ( + "deb", + [ + { + "status": "affected", + "versionType": "deb", + "lessThan": r"2.36.1-8+deb11u1", + }, + { + "version": r"2.36.1-8+deb11u1", + "status": "unaffected", + "versionType": "deb", + } + ], + "vers:deb/<2.36.1-8+deb11u1|!=2.36.1-8+deb11u1", + ), ] for tt in test_tuples: assert utils.to_purl_vers(tt[0], tt[1]) == tt[2] diff --git a/uv.lock b/uv.lock index a68977e..f0be326 100644 --- a/uv.lock +++ b/uv.lock @@ -42,7 +42,7 @@ wheels = [ [[package]] name = "appthreat-vulnerability-db" -version = "6.3.1" +version = "6.4.0" source = { editable = "." } dependencies = [ { name = "appdirs" }, diff --git a/vdb/cli.py b/vdb/cli.py index 0345441..2412100 100644 --- a/vdb/cli.py +++ b/vdb/cli.py @@ -20,7 +20,7 @@ from vdb.lib.gha import GitHubSource from vdb.lib.osv import OSVSource -if sys.platform == "win32" and os.environ.get('PYTHONIOENCODING') is None: +if sys.platform == "win32" and os.environ.get("PYTHONIOENCODING") is None: sys.stdin.reconfigure(encoding="utf-8") sys.stdout.reconfigure(encoding="utf-8") sys.stderr.reconfigure(encoding="utf-8") @@ -146,6 +146,7 @@ def print_results(results): table = Table(title="VDB Results", show_lines=True) table.add_column("CVE", justify="left", max_width=20) table.add_column("Locator") + table.add_column("Fix Version") table.add_column("Description") table.add_column("Affected Symbols", max_width=50) if isinstance(results, types.GeneratorType): @@ -170,9 +171,7 @@ def print_results(results): def create_db_file_metadata(sources, cve_data_count, cve_index_count): """Method to create the vdb file metadata""" metadata = { - "created_utc": datetime.now(tz=timezone.utc).isoformat( - timespec="seconds" - ), + "created_utc": datetime.now(tz=timezone.utc).isoformat(timespec="seconds"), "cve_data_count": cve_data_count, "cve_index_count": cve_index_count, "sources": [s.__class__.__name__ for s in sources], @@ -211,7 +210,11 @@ def print_db_file_metadata(metadata_file): return with open(metadata_file, encoding="utf-8") as fp: db_meta = json.load(fp) - table = Table(title="VDB Summary", show_lines=True, caption=f"Metadata file: {metadata_file}") + table = Table( + title="VDB Summary", + show_lines=True, + caption=f"Metadata file: {metadata_file}", + ) table.add_column("Property") table.add_column("Value") for k, v in db_meta.items(): @@ -233,7 +236,11 @@ def main(): if args.print_vdb_metadata: print_db_file_metadata(config.VDB_METADATA_FILE) if args.download_image or args.download_full_image: - db_url = config.VDB_DATABASE_URL if args.download_full_image else config.VDB_APP_ONLY_DATABASE_URL + db_url = ( + config.VDB_DATABASE_URL + if args.download_full_image + else config.VDB_APP_ONLY_DATABASE_URL + ) if ORAS_AVAILABLE: LOG.info( "Downloading vdb image from %s to %s", @@ -244,13 +251,15 @@ def main(): download_image(db_url, config.DATA_DIR) print_db_file_metadata(config.VDB_METADATA_FILE) except Exception as e: - LOG.info("Vulnerability database download using oras failed. Please try again.") + LOG.info( + "Vulnerability database download using oras failed. Please try again." + ) LOG.exception(e) return else: console.print( "Oras library is not available. Install using 'pip install appthreat-vulnerability-db[oras]' and re-run this command.", - markup=False + markup=False, ) elif args.cache or args.cache_os: db_lib.get() diff --git a/vdb/lib/__init__.py b/vdb/lib/__init__.py index 026101c..c0b65dc 100644 --- a/vdb/lib/__init__.py +++ b/vdb/lib/__init__.py @@ -96,7 +96,7 @@ "vagrant", "vim", "wordpress", - "yocto" + "yocto", ] # Maps variations of string to package types @@ -124,13 +124,13 @@ "suse", "opensuse", "fedora", - "fedoraproject" + "fedoraproject", ], "alpm": ["arch", "archlinux"], "ebuild": ["gentoo", "portage"], "ios": ["iphoneos", "iphone_os"], "vscode": ["vs_code", "visual_studio_code"], - "macos": ["mac_os_x", "osx"] + "macos": ["mac_os_x", "osx"], } # CPE Regex @@ -251,19 +251,19 @@ class CvssV3: vector_string: str def __init__( - self, - base_score, - exploitability_score, - impact_score, - attack_vector, - attack_complexity, - privileges_required, - user_interaction, - scope, - confidentiality_impact, - integrity_impact, - availability_impact, - vector_string=None, + self, + base_score, + exploitability_score, + impact_score, + attack_vector, + attack_complexity, + privileges_required, + user_interaction, + scope, + confidentiality_impact, + integrity_impact, + availability_impact, + vector_string=None, ): self.base_score = base_score self.exploitability_score = exploitability_score @@ -302,13 +302,13 @@ class PackageIssue: """Package issue class""" def __init__( - self, - affected_location, - fixed_location, - mii=None, - mai=None, - mie=None, - mae=None, + self, + affected_location, + fixed_location, + mii=None, + mai=None, + mie=None, + mae=None, ): self.affected_location = VulnerabilityLocation.from_values( affected_location, @@ -359,19 +359,19 @@ class VulnerabilityDetail: """Vulnerability detail class""" def __init__( - self, - cpe_uri: str, - package: str, - min_affected_version_including: str | None, - max_affected_version_including: str | None, - min_affected_version_excluding: str | None, - max_affected_version_excluding: str | None, - severity: str, - description: str | None, - fixed_location: str, - package_type: str, - is_obsolete: str, - source_update_time: str, + self, + cpe_uri: str, + package: str, + min_affected_version_including: str | None, + max_affected_version_including: str | None, + min_affected_version_excluding: str | None, + max_affected_version_excluding: str | None, + severity: str, + description: str | None, + fixed_location: str, + package_type: str, + is_obsolete: str, + source_update_time: str, ): parts = CPE_REGEX.match(cpe_uri) self.cpe_uri = cpe_uri @@ -435,11 +435,12 @@ def get_type(cpe_uri, package_type): if ptype in KNOWN_PKG_TYPES: return ptype if all_parts and ( - all_parts.group("target_sw") and all_parts.group("target_sw") != "*" - or ( - all_parts.group("sw_edition") - and all_parts.group("sw_edition") != "*" - ) + all_parts.group("target_sw") + and all_parts.group("target_sw") != "*" + or ( + all_parts.group("sw_edition") + and all_parts.group("sw_edition") != "*" + ) ): for vk, vv in PKG_TYPES_MAP.items(): target_sw = all_parts.group("target_sw") @@ -499,19 +500,19 @@ class Vulnerability: cvss4_vector_string: str | None def __init__( - self, - vid: str, - assigner: str, - problem_type: str, - score: float, - severity: str, - description: str, - related_urls: list[str], - details: list[VulnerabilityDetail], - cvss_v3: CvssV3, - source_update_time: str, - source_orig_time: str, - affects: dict = None, + self, + vid: str, + assigner: str, + problem_type: str, + score: float, + severity: str, + description: str, + related_urls: list[str], + details: list[VulnerabilityDetail], + cvss_v3: CvssV3, + source_update_time: str, + source_orig_time: str, + affects: dict = None, ): self.id = vid self.assigner = assigner @@ -544,7 +545,7 @@ def __repr__(self): ), "source_orig_time": self.source_orig_time.strftime("%Y-%m-%dT%H:%M:%S"), "affects": self.affects, - "cvss4_vector_string": self.cvss4_vector_string + "cvss4_vector_string": self.cvss4_vector_string, }, option=orjson.OPT_NAIVE_UTC, ).decode("utf-8", "ignore") @@ -564,11 +565,11 @@ def __init__(self, cpe_uri, vendor, package, version): @staticmethod def from_values( - cpe_uri, - mii=None, - mai=None, - mie=None, - mae=None, + cpe_uri, + mii=None, + mai=None, + mie=None, + mae=None, ): parts = None version = "*" @@ -644,21 +645,21 @@ class VulnerabilityOccurrence: matched_by: str def __init__( - self, - oid, - problem_type, - otype, - severity, - cvss_score, - cvss_v3, - package_issue, - short_description, - long_description, - related_urls, - effective_severity, - source_update_time, - source_orig_time, - matched_by, + self, + oid, + problem_type, + otype, + severity, + cvss_score, + cvss_v3, + package_issue, + short_description, + long_description, + related_urls, + effective_severity, + source_update_time, + source_orig_time, + matched_by, ): self.id = oid self.problem_type = problem_type diff --git a/vdb/lib/aqua.py b/vdb/lib/aqua.py index d919c5b..6f9fed8 100644 --- a/vdb/lib/aqua.py +++ b/vdb/lib/aqua.py @@ -25,7 +25,11 @@ DOWNLOAD_CHUNK_SIZE = 4096 # Some sources are included by default -DEFAULT_INCLUDE_SOURCE_PATTERNS = ["alpine-unfixed"] if os.getenv("VDB_IGNORE_ALPINE", "") not in ("true", "1") else [] +DEFAULT_INCLUDE_SOURCE_PATTERNS = ( + ["alpine-unfixed"] + if os.getenv("VDB_IGNORE_ALPINE", "") not in ("true", "1") + else [] +) # Some sources are ignored by default DEFAULT_IGNORE_SOURCE_PATTERNS = [ @@ -63,7 +67,9 @@ def get_ignored_source_patterns(): """ ignore_paths = DEFAULT_IGNORE_SOURCE_PATTERNS for name, paths in config.LINUX_DISTRO_VULN_LIST_PATHS.items(): - if os.getenv(f"VDB_IGNORE_{name.upper()}", "") in ("true", "1") or os.getenv(f"VDB_EXCLUDE_{name.upper()}", "") in ("true", "1"): + if os.getenv(f"VDB_IGNORE_{name.upper()}", "") in ("true", "1") or os.getenv( + f"VDB_EXCLUDE_{name.upper()}", "" + ) in ("true", "1"): ignore_paths += paths return ignore_paths @@ -149,7 +155,13 @@ def is_supported_source(zfname): return False nvd_start_year = config.NVD_START_YEAR for year in range(1999, nvd_start_year): - for pat in (f"CVE-{year}-", f"{os.sep}{year}{os.sep}", f"ALAS-{year}-", f"ALAS2-{year}-", f"openSUSE-SU-{year}-"): + for pat in ( + f"CVE-{year}-", + f"{os.sep}{year}{os.sep}", + f"ALAS-{year}-", + f"ALAS2-{year}-", + f"openSUSE-SU-{year}-", + ): if pat in zfname: return False if zfname.endswith(".json"): @@ -675,7 +687,10 @@ def suse_to_vuln(self, cve_data): if cve_references: for aref in cve_references: references.append( - {"name": aref.get("Description", "id"), "url": aref.get("URL")} + { + "name": aref.get("Description", "id"), + "url": aref.get("URL"), + } ) references = orjson.dumps(references, option=orjson.OPT_NAIVE_UTC) if isinstance(references, bytes): @@ -894,7 +909,7 @@ def debian_to_vuln(cve_data): version_end_including = "" version_start_excluding = "" version_end_excluding = version - fix_version_end_including = "" + fix_version_end_including = version fix_version_start_excluding = "" fix_version_end_excluding = "" fix_version_start_including = version diff --git a/vdb/lib/cna.py b/vdb/lib/cna.py index afe6e2f..090f894 100644 --- a/vdb/lib/cna.py +++ b/vdb/lib/cna.py @@ -22,6 +22,7 @@ json.dump(assigner_map, fp, indent=4) ``` """ + ASSIGNER_UUID_MAP = { "nvd@nist.gov": "8254265b-2729-46b6-b9e3-3dfca2d5bfca", "cve@mitre.org": "8254265b-2729-46b6-b9e3-3dfca2d5bfca", @@ -346,5 +347,5 @@ "ly-corporation": "657f3255-0560-4aed-82e4-7f579ec6acfb", "hiddenlayer": "6f8de1f0-f67e-45a6-b68f-98777fdb759c", "ciena": "7bd90cf1-1651-495e-9ae8-9415fb3c9feb", - "libreswan": "d42dc95b-23f1-4e06-9076-20753a0fb0df" + "libreswan": "d42dc95b-23f1-4e06-9076-20753a0fb0df", } diff --git a/vdb/lib/config.py b/vdb/lib/config.py index 5b56fbf..38f8a61 100644 --- a/vdb/lib/config.py +++ b/vdb/lib/config.py @@ -71,13 +71,21 @@ # Support for disabling individual distro feeds if os.getenv("VDB_IGNORE_ALMALINUX", "") not in ("true", "1"): - OSV_URL_DICT["almalinux"] = "https://osv-vulnerabilities.storage.googleapis.com/AlmaLinux/all.zip" + OSV_URL_DICT["almalinux"] = ( + "https://osv-vulnerabilities.storage.googleapis.com/AlmaLinux/all.zip" + ) if os.getenv("VDB_IGNORE_ALPINE", "") not in ("true", "1"): - OSV_URL_DICT["alpine"] = "https://osv-vulnerabilities.storage.googleapis.com/Alpine/all.zip" + OSV_URL_DICT["alpine"] = ( + "https://osv-vulnerabilities.storage.googleapis.com/Alpine/all.zip" + ) if os.getenv("VDB_IGNORE_DEBIAN", "") not in ("true", "1"): - OSV_URL_DICT["debian"] = "https://osv-vulnerabilities.storage.googleapis.com/Debian/all.zip" + OSV_URL_DICT["debian"] = ( + "https://osv-vulnerabilities.storage.googleapis.com/Debian/all.zip" + ) if os.getenv("VDB_IGNORE_ROCKYLINUX", "") not in ("true", "1"): - OSV_URL_DICT["rockylinux"] = "https://osv-vulnerabilities.storage.googleapis.com/Rocky%20Linux/all.zip" + OSV_URL_DICT["rockylinux"] = ( + "https://osv-vulnerabilities.storage.googleapis.com/Rocky%20Linux/all.zip" + ) # These feeds introduce too much false positives @@ -172,7 +180,9 @@ ) # A smaller application vulnerabilities database -VDB_APP_ONLY_DATABASE_URL = os.getenv("VDB_APP_ONLY_DATABASE_URL", "ghcr.io/appthreat/vdbxz-app:v6") +VDB_APP_ONLY_DATABASE_URL = os.getenv( + "VDB_APP_ONLY_DATABASE_URL", "ghcr.io/appthreat/vdbxz-app:v6" +) # This variable can be used to include or exclude distro-specific data # export VDB_IGNORE_ALMALINUX=true @@ -190,6 +200,5 @@ "redhat": ["redhat"], "rocky": ["rocky"], "ubuntu": ["ubuntu"], - "wolfi": ["wolfi"] + "wolfi": ["wolfi"], } - diff --git a/vdb/lib/cve.py b/vdb/lib/cve.py index e543804..4d537f1 100644 --- a/vdb/lib/cve.py +++ b/vdb/lib/cve.py @@ -628,7 +628,7 @@ def store5(self, data: list[CVE]): # Filter obvious duplicates if not source_completed_keys.get(pkg_key): dbc.execute( - "INSERT INTO cve_data values(?, ?, ?, ?, jsonb(?), ?, ?, ?);", + "INSERT INTO cve_data values(?, ?, ?, ?, jsonb(?), ?, ?, ?, ?);", ( cve_id, affected.vendor, @@ -637,6 +637,7 @@ def store5(self, data: list[CVE]): source_data_str, None, source_hash, + vers, purl_prefix, ), ) diff --git a/vdb/lib/cve_model/__init__.py b/vdb/lib/cve_model/__init__.py index a3b6e82..eb40de6 100644 --- a/vdb/lib/cve_model/__init__.py +++ b/vdb/lib/cve_model/__init__.py @@ -5,7 +5,15 @@ from enum import Enum from typing import Annotated, Any, Dict, List, Optional, Union -from pydantic import AnyUrl, AwareDatetime, BaseModel, ConfigDict, Field, NaiveDatetime, RootModel +from pydantic import ( + AnyUrl, + AwareDatetime, + BaseModel, + ConfigDict, + Field, + NaiveDatetime, + RootModel, +) from vdb.lib.cve_model import cvss_v2, cvss_v3, cvss_v4 @@ -612,7 +620,8 @@ class CveMetadataRejected(BaseModel): ), ] = None dateRejected: Annotated[ - Optional[AwareDatetime | NaiveDatetime], Field(description="The date/time the CVE ID was rejected.") + Optional[AwareDatetime | NaiveDatetime], + Field(description="The date/time the CVE ID was rejected."), ] = None state: Annotated[State1, Field(description="State of CVE - PUBLISHED, REJECTED.")] dateReserved: Annotated[ @@ -950,9 +959,7 @@ class TimelineItem(BaseModel): description="The language used in the description of the event. The language field is included so that CVE Records can support translations. The value must be a BCP 47 language code." ), ] - value: Annotated[ - str, Field(description="A summary of the event.", min_length=1) - ] + value: Annotated[str, Field(description="A summary of the event.", min_length=1)] class Timeline(RootModel[List[TimelineItem]]): diff --git a/vdb/lib/db6.py b/vdb/lib/db6.py index 2458243..c998a9f 100644 --- a/vdb/lib/db6.py +++ b/vdb/lib/db6.py @@ -19,7 +19,7 @@ def ensure_schemas(db_conn_obj: apsw.Connection, index_conn_obj: apsw.Connection temp_store_dir = os.getenv("VDB_TEMP_DIR", tempfile.mkdtemp(prefix="vdb-temp")) """Create the sqlite tables and indexes in case they don't exist""" db_conn_obj.execute( - "CREATE TABLE if not exists cve_data(cve_id TEXT NOT NULL, type TEXT NOT NULL, namespace TEXT, name TEXT NOT NULL, source_data BLOB NOT NULL, override_data BLOB, source_data_hash TEXT NOT NULL, purl_prefix TEXT NOT NULL);" + "CREATE TABLE if not exists cve_data(cve_id TEXT NOT NULL, type TEXT NOT NULL, namespace TEXT, name TEXT NOT NULL, source_data BLOB NOT NULL, override_data BLOB, source_data_hash TEXT NOT NULL, vers TEXT NOT NULL, purl_prefix TEXT NOT NULL);" ) db_conn_obj.pragma("synchronous", "OFF") db_conn_obj.pragma("journal_mode", "MEMORY") @@ -95,7 +95,7 @@ def optimize_and_close_all(): """ if db_conn: db_conn.execute( - "CREATE INDEX if not exists idx1 on cve_data(cve_id, purl_prefix);" + "CREATE INDEX if not exists idx1 on cve_data(cve_id, vers, purl_prefix);" ) db_conn.execute("VACUUM;") db_conn.close() diff --git a/vdb/lib/display.py b/vdb/lib/display.py index 0a2aec3..36ba60e 100644 --- a/vdb/lib/display.py +++ b/vdb/lib/display.py @@ -15,6 +15,7 @@ def add_table_row(table: Table, res: dict, added_row_keys: dict): if added_row_keys.get(row_key): return source_data: CVE = res.get("source_data") + fix_version = res.get("fix_version", "") descriptions = [] cna_container = source_data.root.containers.cna affected_functions = set() @@ -49,6 +50,7 @@ def add_table_row(table: Table, res: dict, added_row_keys: dict): table.add_row( res.get("cve_id"), res.get("matched_by"), + fix_version, Markdown("\n".join(descriptions), justify="left", hyperlinks=True), Markdown(affects, justify="left"), ) diff --git a/vdb/lib/gha.py b/vdb/lib/gha.py index 2f12399..efc6f09 100755 --- a/vdb/lib/gha.py +++ b/vdb/lib/gha.py @@ -8,6 +8,7 @@ - Full description (description) is ignored for now """ + import logging import os import re @@ -28,12 +29,7 @@ headers = {"Authorization": f"token {api_token}"} -vendor_overrides = { - "pip": "pypi", - "go": "golang", - "rust": "cargo", - "rubygems": "gem" -} +vendor_overrides = {"pip": "pypi", "go": "golang", "rust": "cargo", "rubygems": "gem"} def get_query(qtype="recent"): @@ -204,7 +200,9 @@ def convert(self, cve_data): vendor = p["package"]["ecosystem"].lower() product = p["package"]["name"] vendor = vendor_overrides.get(vendor, vendor) - if vendor not in ("golang", "swift", "composer") and (":" in product or "/" in product): + if vendor not in ("golang", "swift", "composer") and ( + ":" in product or "/" in product + ): tmp_a = re.split(r"[/|:]", product) # This extract's the correct vendor based on the namespace # Eg: org.springframework:spring-webflux would result in @@ -247,9 +245,7 @@ def convert(self, cve_data): user_interaction = cvss3_obj.get("userInteraction") description = """# {} {} - """.format( - cve.get("summary"), cve.get("description") - ) + """.format(cve.get("summary"), cve.get("description")) tdata = config.CVE_TPL % dict( cve_id=cve_id, diff --git a/vdb/lib/npm.py b/vdb/lib/npm.py index 5d9a362..80b6352 100644 --- a/vdb/lib/npm.py +++ b/vdb/lib/npm.py @@ -3,6 +3,7 @@ This module implements basic functionality to query npm registry for security advisories """ + import logging import httpx @@ -180,9 +181,7 @@ def to_vuln(self, v, ret_data): description = """# {} {} {} - """.format( - title, overview, recommendation - ) + """.format(title, overview, recommendation) references = ( [{"name": "npm advisory", "url": v.get("url")}] if v.get("url") else [] ) diff --git a/vdb/lib/nvd.py b/vdb/lib/nvd.py index 1f7da11..8762d06 100644 --- a/vdb/lib/nvd.py +++ b/vdb/lib/nvd.py @@ -373,7 +373,9 @@ def convert_api_vuln_detail(vuln: dict) -> list[VulnerabilityDetail] | None: single_version = all_parts.group("version") # Version numbers could have erroneous \ or commas if single_version: - single_version = single_version.removeprefix(",").removesuffix("\\").strip() + single_version = ( + single_version.removeprefix(",").removesuffix("\\").strip() + ) version_start_including = cpe.get( "versionStartIncluding", single_version ) @@ -437,7 +439,11 @@ def convert_api_vuln(vuln: dict) -> Vulnerability | None: description = NvdSource._get_value(vuln, "descriptions") # Ignore disputed CVEs. Eg: CVE-2023-35116 # CVE-2023-39017 uses the phrase "this is disputed" - for ds in ("** DISPUTED **", "this is not a valid vulnerability report", "this is disputed"): + for ds in ( + "** DISPUTED **", + "this is not a valid vulnerability report", + "this is disputed", + ): if ds in description: return None affected_symbols = extract_affected_symbols(description) @@ -483,5 +489,5 @@ def convert_api_vuln(vuln: dict) -> Vulnerability | None: cvss_v3, vuln["lastModified"], vuln["published"], - affected_symbols + affected_symbols, ) diff --git a/vdb/lib/osv.py b/vdb/lib/osv.py index 5cbdd2c..404d399 100644 --- a/vdb/lib/osv.py +++ b/vdb/lib/osv.py @@ -449,7 +449,18 @@ def to_vuln(cve_data): and version_start_including != rversions_list[-1] ): version_end_including = rversions_list[-1] - fixed_version_in_events = len([e for e in events if e.get("fixed") or e.get("last_affected") or e.get("limit")]) > 0 + fixed_version_in_events = ( + len( + [ + e + for e in events + if e.get("fixed") + or e.get("last_affected") + or e.get("limit") + ] + ) + > 0 + ) for ev in events: # Reset all versions for introduced event if ev.get("introduced") is not None: @@ -481,7 +492,7 @@ def to_vuln(cve_data): version_end_excluding = ev.get("limit").split(":")[-1] # Create an entry for each introduced + fixed/limit event if version_start_including and ( - (fix_version_start_including or fix_version_start_excluding) + (fix_version_start_including or fix_version_start_excluding) or (not fixed_version_in_events and version_end_including) or version_end_excluding or (not fixed_version_in_events and not versions_list) diff --git a/vdb/lib/search.py b/vdb/lib/search.py index 7ed3d8b..0a4911a 100644 --- a/vdb/lib/search.py +++ b/vdb/lib/search.py @@ -5,7 +5,8 @@ import orjson from vdb.lib import db6, utils -from vdb.lib.cve_model import CVE, CVE1 +from vdb.lib.config import PLACEHOLDER_EXCLUDE_VERSION, PLACEHOLDER_FIX_VERSION +from vdb.lib.cve_model import CVE, CVE1, Product, Versions, Status from vdb.lib.utils import load_json @@ -31,7 +32,32 @@ def filter_hits(raw_hits: List, compare_ver: str) -> List: return filtered_list -def get_cve_data(db_conn: apsw.Connection | None, index_hits: List, search_str: str) -> Generator: +def get_unaffected(source_data, vers): + if source_data and source_data.root.containers: + products: List[Product] = source_data.root.containers.cna.affected.root + for p in products: + versions: List[Versions] = p.versions + if versions: + for ver in versions: + if ver.status == Status.unaffected: + if ver.version: + return ver.version.root + if "|" in vers: + vers = vers.split("|")[-1] + if "!=" in vers or "<=" not in vers: + return vers.replace("<", "").replace("!=", "") + elif "/<" in vers and "/<=" not in vers: + return vers.split("/<")[-1] + elif vers.endswith(f"<{PLACEHOLDER_EXCLUDE_VERSION}"): + return PLACEHOLDER_EXCLUDE_VERSION + elif vers.endswith(f"<{PLACEHOLDER_FIX_VERSION}"): + return PLACEHOLDER_FIX_VERSION + return "" + + +def get_cve_data( + db_conn: apsw.Connection | None, index_hits: List, search_str: str +) -> Generator: """Get CVE data for the index results Args: @@ -47,10 +73,17 @@ def get_cve_data(db_conn: apsw.Connection | None, index_hits: List, search_str: for ahit in index_hits: results = exec_query( db_conn, - "SELECT DISTINCT cve_id, type, namespace, name, source_data_hash, json(source_data), json(override_data), purl_prefix FROM cve_data WHERE cve_id = ? AND purl_prefix = ? GROUP BY purl_prefix ORDER BY cve_id DESC;", - (ahit["cve_id"], ahit["purl_prefix"]), + "SELECT DISTINCT cve_id, type, namespace, name, source_data_hash, json(source_data), json(override_data), vers, purl_prefix FROM cve_data WHERE cve_id = ? AND vers = ? AND purl_prefix = ? GROUP BY purl_prefix ORDER BY cve_id DESC;", + (ahit["cve_id"], ahit["vers"], ahit["purl_prefix"]), ) for res in results: + source_data = ( + CVE(root=CVE1.model_validate(orjson.loads(res[5]), strict=False)) + if res[5] + else None + ) + vers = res[7] + fix_version = get_unaffected(source_data, vers) yield { "cve_id": res[0], "type": res[1], @@ -59,13 +92,11 @@ def get_cve_data(db_conn: apsw.Connection | None, index_hits: List, search_str: "matching_vers": ahit["vers"], "matched_by": search_str, "source_data_hash": res[4], - "source_data": ( - CVE(root=CVE1.model_validate(orjson.loads(res[5]), strict=False)) - if res[5] - else None - ), + "source_data": source_data, "override_data": (orjson.loads(res[6]) if res[6] else None), - "purl_prefix": res[7], + "vers": vers, + "purl_prefix": res[8], + "fix_version": fix_version, } @@ -130,7 +161,10 @@ def search_by_purl_like(purl: str, with_data: bool = False) -> List: return filtered_list return [] -def search_by_cve(cve_id: str, with_data: bool = False, with_limit: int | None = None) -> List: + +def search_by_cve( + cve_id: str, with_data: bool = False, with_limit: int | None = None +) -> List: """Search by CVE""" db_conn, index_conn = db6.get(read_only=True) filter_part = "cve_id LIKE ?" if "%" in cve_id else "cve_id = ?" diff --git a/vdb/lib/utils.py b/vdb/lib/utils.py index e2b56fb..135ce75 100644 --- a/vdb/lib/utils.py +++ b/vdb/lib/utils.py @@ -481,7 +481,13 @@ def vers_compare(compare_ver: str | int | float | None, vers: str) -> bool: elif apart.startswith("<"): max_excluding = apart.removeprefix("<") # There is exactly only one version - if len(vers_parts) == 1 and not min_version and not max_version and not min_excluding and not max_excluding: + if ( + len(vers_parts) == 1 + and not min_version + and not max_version + and not min_excluding + and not max_excluding + ): min_version = vers_parts[0].strip().replace(" ", "") max_version = min_version return version_compare( @@ -558,7 +564,6 @@ def version_compare( and max_version.startswith(compare_ver) and max_version != compare_ver ): - return True if mae and mae.startswith(compare_ver) and mae != compare_ver: return True @@ -1023,7 +1028,7 @@ def parse_purl(purl_str: str) -> Dict: and purl_obj.get("type") == "golang" and purl_obj.get("namespace") ): - purl_obj["name"] = f'{purl_obj["namespace"]}/{purl_obj["name"]}' + purl_obj["name"] = f"{purl_obj['namespace']}/{purl_obj['name']}" purl_obj["namespace"] = "" except ValueError: # Ignore errors @@ -1120,7 +1125,9 @@ def to_purl_vers(vendor: str, versions: List) -> str: continue elif version == less_than_or_equal: if "(" in version: - version = version.replace("(", "-").replace(")", "").replace("\\", "") + version = ( + version.replace("(", "-").replace(")", "").replace("\\", "") + ) vers_list.append(version) continue else: @@ -1162,7 +1169,7 @@ def url_to_purl(url: str) -> Dict: ] if paths: max_path = 2 if len(paths) >= 2 else 1 - git_repo_name = f"""{git_repo_name}/{'/'.join(paths[:max_path])}""" + git_repo_name = f"""{git_repo_name}/{"/".join(paths[:max_path])}""" for part in ("/commit/", "/tag/", "/releases/", "/blob/"): if part in url_obj.path: version = url_obj.path.split(part)[-1].split("/")[0].split(";")[0] @@ -1177,7 +1184,9 @@ def url_to_purl(url: str) -> Dict: for v in ("commit", "tag", "hash", "version", "id"): if query_obj.get(v): version = query_obj.get(v)[0].split(";")[0] - git_repo_name = git_repo_name.removesuffix("-").removesuffix("/commit").removesuffix(".git") + git_repo_name = ( + git_repo_name.removesuffix("-").removesuffix("/commit").removesuffix(".git") + ) url_obj = urlparse(f"https://{git_repo_name}") # Fix for #112 pkg_type = "generic" @@ -1205,11 +1214,11 @@ def url_to_purl(url: str) -> Dict: def clean_cpe_uri(cpe_uri: str) -> str: if not cpe_uri: return cpe_uri - cpe_uri = re.sub(r"[\\!&,()+\[\]]" , "", cpe_uri) - cpe_uri = cpe_uri.replace("_-_" , "-") - cpe_uri = cpe_uri.replace("_/_" , "/") - cpe_uri = cpe_uri.replace("__" , "_") - cpe_uri = cpe_uri.replace("@" , "_") + cpe_uri = re.sub(r"[\\!&,()+\[\]]", "", cpe_uri) + cpe_uri = cpe_uri.replace("_-_", "-") + cpe_uri = cpe_uri.replace("_/_", "/") + cpe_uri = cpe_uri.replace("__", "_") + cpe_uri = cpe_uri.replace("@", "_") return cpe_uri @@ -1223,14 +1232,33 @@ def extract_affected_symbols(description: str) -> Dict: if description: words = description.strip().split(" ") for word in words: - if not word or word.startswith("http") or "\n" in word or "```" in word or "#" in word or re.match( - r"^(v|V|v.|V.)?[0-9]", word) or word.startswith("(") or word.startswith( - "CVSS") or ".." in word or word.startswith("/") or "?" in word or word.startswith("AV:"): + if ( + not word + or word.startswith("http") + or "\n" in word + or "```" in word + or "#" in word + or re.match(r"^(v|V|v.|V.)?[0-9]", word) + or word.startswith("(") + or word.startswith("CVSS") + or ".." in word + or word.startswith("/") + or "?" in word + or word.startswith("AV:") + ): continue word = word.removesuffix(",").removesuffix(".").replace("`", "").strip() - if re.match(mod_file_extns, word) or word.count(".") > 2 or word.count("/") > 2: - affected_modules.add(re.sub(r"\.(c|cc|cpp|h|java|rb|py|S|o)$", "", word)) - elif not re.match(r"^([._$])", word) and ("_" in word or word.endswith("()")): + if ( + re.match(mod_file_extns, word) + or word.count(".") > 2 + or word.count("/") > 2 + ): + affected_modules.add( + re.sub(r"\.(c|cc|cpp|h|java|rb|py|S|o)$", "", word) + ) + elif not re.match(r"^([._$])", word) and ( + "_" in word or word.endswith("()") + ): affected_functions.add(word.replace("()", "")) return { "affected_functions": sorted(affected_functions),