diff --git a/pyproject.toml b/pyproject.toml index e83eb0b1..0d4d36bd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,8 @@ dependencies = [ "tree-sitter-languages==1.10.2", "univers==30.12", "litellm<=1.75.8", + "csaf-tool==0.3.2", + "jsonschema>=4.0.0,<5.0.0", ] requires-python = ">=3.11,<3.13" description = "NVIDIA AI Blueprint: Vulnerability Analysis for Container Security" diff --git a/src/exploit_iq_commons/utils/dep_tree.py b/src/exploit_iq_commons/utils/dep_tree.py index 74e89f88..e4f84a41 100644 --- a/src/exploit_iq_commons/utils/dep_tree.py +++ b/src/exploit_iq_commons/utils/dep_tree.py @@ -12,7 +12,6 @@ from packaging.specifiers import SpecifierSet from tqdm import tqdm -import logging import ast import json import zipfile @@ -180,7 +179,7 @@ def find_all_files(self, root_dir): try: full_path.rename(new_path) except Exception as e: - logging.warning( + logger.warning( "Rename failed: %s → %s: %s", full_path, new_path, e ) @@ -815,7 +814,7 @@ def get_go_mod_graph_tree(manifest_path) -> str: f"manifest wasn't found at {manifest_path}, error details => " f"{repr(e)}" ) - logging.error(error_message_exception) + logger.error(error_message_exception) raise e return process_object.stdout diff --git a/src/exploit_iq_commons/utils/js_extended_parser.py b/src/exploit_iq_commons/utils/js_extended_parser.py index 56a03d7b..a8967ff7 100644 --- a/src/exploit_iq_commons/utils/js_extended_parser.py +++ b/src/exploit_iq_commons/utils/js_extended_parser.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging from typing import Any from typing import List from typing import Tuple diff --git a/src/exploit_iq_commons/utils/source_code_git_loader.py b/src/exploit_iq_commons/utils/source_code_git_loader.py index 592caa2b..a376d447 100644 --- a/src/exploit_iq_commons/utils/source_code_git_loader.py +++ b/src/exploit_iq_commons/utils/source_code_git_loader.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import os import typing from pathlib import Path diff --git a/src/vuln_analysis/configs/config-http-nim.yml b/src/vuln_analysis/configs/config-http-nim.yml index 8ef81811..e14e0f60 100644 --- a/src/vuln_analysis/configs/config-http-nim.yml +++ b/src/vuln_analysis/configs/config-http-nim.yml @@ -118,6 +118,10 @@ functions: cve_justify: _type: cve_justify llm_name: justify_llm + cve_generate_vex: + _type: cve_generate_vex + skip: false + vex_format: csaf cve_http_output: _type: cve_http_output url: http://localhost:8080 @@ -213,6 +217,7 @@ workflow: cve_checklist_name: cve_checklist cve_agent_executor_name: cve_agent_executor cve_generate_cvss_name: cve_generate_cvss + cve_generate_vex_name: cve_generate_vex cve_summarize_name: cve_summarize cve_justify_name: cve_justify cve_output_config_name: cve_http_output diff --git a/src/vuln_analysis/configs/config-http-openai.yml b/src/vuln_analysis/configs/config-http-openai.yml index 1ff998ba..0a8c4c51 100644 --- a/src/vuln_analysis/configs/config-http-openai.yml +++ b/src/vuln_analysis/configs/config-http-openai.yml @@ -125,6 +125,9 @@ functions: cve_justify: _type: cve_justify llm_name: justify_llm + cve_generate_vex: + _type: cve_generate_vex + skip: false cve_http_output: _type: cve_http_output url: http://localhost:8080 @@ -222,6 +225,7 @@ workflow: cve_checklist_name: cve_checklist cve_agent_executor_name: cve_agent_executor cve_generate_cvss_name: cve_generate_cvss + cve_generate_vex_name: cve_generate_vex cve_summarize_name: cve_summarize cve_justify_name: cve_justify cve_output_config_name: cve_http_output diff --git a/src/vuln_analysis/configs/config-tracing.yml b/src/vuln_analysis/configs/config-tracing.yml index b18d815c..6c064d8b 100644 --- a/src/vuln_analysis/configs/config-tracing.yml +++ b/src/vuln_analysis/configs/config-tracing.yml @@ -128,6 +128,9 @@ functions: cve_justify: _type: cve_justify llm_name: justify_llm + cve_generate_vex: + _type: cve_generate_vex + skip: false cve_file_output: _type: cve_file_output file_path: .tmp/output.json @@ -218,6 +221,7 @@ workflow: cve_checklist_name: cve_checklist cve_agent_executor_name: cve_agent_executor cve_generate_cvss_name: cve_generate_cvss + cve_generate_vex_name: cve_generate_vex cve_summarize_name: cve_summarize cve_justify_name: cve_justify cve_output_config_name: cve_file_output diff --git a/src/vuln_analysis/configs/config.yml b/src/vuln_analysis/configs/config.yml index c3f23092..701dcd84 100644 --- a/src/vuln_analysis/configs/config.yml +++ b/src/vuln_analysis/configs/config.yml @@ -98,6 +98,10 @@ functions: cve_justify: _type: cve_justify llm_name: justify_llm + cve_generate_vex: + _type: cve_generate_vex + skip: false + vex_format: csaf cve_file_output: _type: cve_file_output file_path: .tmp/output.json @@ -186,6 +190,7 @@ workflow: cve_checklist_name: cve_checklist cve_agent_executor_name: cve_agent_executor cve_generate_cvss_name: cve_generate_cvss + cve_generate_vex_name: cve_generate_vex cve_summarize_name: cve_summarize cve_justify_name: cve_justify cve_output_config_name: cve_file_output diff --git a/src/vuln_analysis/configs/openapi/openapi.json b/src/vuln_analysis/configs/openapi/openapi.json index 2f2a49c5..79feca4c 100644 --- a/src/vuln_analysis/configs/openapi/openapi.json +++ b/src/vuln_analysis/configs/openapi/openapi.json @@ -944,6 +944,20 @@ }, "justification": { "$ref": "#/components/schemas/JustificationOutput" + }, + "intel_score": { + "type": "integer", + "title": "Intel Score" + }, + "cvss": { + "anyOf": [ + { + "$ref": "#/components/schemas/CVSSOutput" + }, + { + "type": "null" + } + ] } }, "type": "object", @@ -951,10 +965,12 @@ "vuln_id", "checklist", "summary", - "justification" + "justification", + "intel_score", + "cvss" ], "title": "AgentMorpheusEngineOutput", - "description": "Contains all output generated by the main Agent Morpheus LLM Engine for a given vulnerability.\n\n- vuln_id: the ID of the vulnerability being processed by the LLM engine.\n- checklist: a list of ChecklistItemOutput objects, each containing an input and a response from the LLM agent.\n- summary: a short summary of the checklist inputs and responses, generated by an LLM.\n- justification: a JustificationOutput object containing details of the model's justification decision." + "description": "Contains all output generated by the main Agent Morpheus LLM Engine for a given vulnerability.\n\n- vuln_id: the ID of the vulnerability being processed by the LLM engine.\n- checklist: a list of ChecklistItemOutput objects, each containing an input and a response from the LLM agent.\n- summary: a short summary of the checklist inputs and responses, generated by an LLM.\n- justification: a JustificationOutput object containing details of the model's justification decision.\n- intel_score: the intelligence score for the vulnerability.\n- cvss: a CVSSOutput object containing the CVSS score and vector string for the vulnerability." }, "AgentMorpheusInfo": { "properties": { @@ -1054,11 +1070,7 @@ "$ref": "#/components/schemas/AgentMorpheusInfo" }, "output": { - "items": { - "$ref": "#/components/schemas/AgentMorpheusEngineOutput" - }, - "type": "array", - "title": "Output" + "$ref": "#/components/schemas/OutputPayload" } }, "type": "object", @@ -1218,6 +1230,25 @@ "type": "object", "title": "CVSS3" }, + "CVSSOutput": { + "properties": { + "vector_string": { + "type": "string", + "title": "Vector String" + }, + "score": { + "type": "string", + "title": "Score" + } + }, + "type": "object", + "required": [ + "vector_string", + "score" + ], + "title": "CVSSOutput", + "description": "CVSS (Common Vulnerability Scoring System) representing the severity of a vulnerability in reference to an image.\n- vector_string: The CVSS vector string that encodes the metric values used to calculate the score.\n- score: The calculated CVSS base score representing the severity of the vulnerability in the given image." + }, "CVSSV3": { "properties": { "attackComplexity": { @@ -2492,6 +2523,35 @@ "type": "object", "title": "Note" }, + "OutputPayload": { + "properties": { + "analysis": { + "items": { + "$ref": "#/components/schemas/AgentMorpheusEngineOutput" + }, + "type": "array", + "title": "Analysis" + }, + "vex": { + "anyOf": [ + { + "type": "object" + }, + { + "type": "null" + } + ], + "title": "Vex" + } + }, + "type": "object", + "required": [ + "analysis", + "vex" + ], + "title": "OutputPayload", + "description": "Wrapper for final pipeline results.\n- analysis: per-vulnerability analysis results\n- vex: the vulnerability exploitability exchange document JSON" + }, "PackageState": { "properties": { "product_name": { diff --git a/src/vuln_analysis/configs/vex/csaf/v2.0/csaf_json_schema.json b/src/vuln_analysis/configs/vex/csaf/v2.0/csaf_json_schema.json new file mode 100644 index 00000000..93ff152a --- /dev/null +++ b/src/vuln_analysis/configs/vex/csaf/v2.0/csaf_json_schema.json @@ -0,0 +1,1414 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://docs.oasis-open.org/csaf/csaf/v2.0/csaf_json_schema.json", + "title": "Common Security Advisory Framework", + "description": "Representation of security advisory information as a JSON document.", + "type": "object", + "$defs": { + "acknowledgments_t": { + "title": "List of acknowledgments", + "description": "Contains a list of acknowledgment elements.", + "type": "array", + "minItems": 1, + "items": { + "title": "Acknowledgment", + "description": "Acknowledges contributions by describing those that contributed.", + "type": "object", + "minProperties": 1, + "properties": { + "names": { + "title": "List of acknowledged names", + "description": "Contains the names of contributors being recognized.", + "type": "array", + "minItems": 1, + "items": { + "title": "Name of the contributor", + "description": "Contains the name of a single contributor being recognized.", + "type": "string", + "minLength": 1, + "examples": [ + "Albert Einstein", + "Johann Sebastian Bach" + ] + } + }, + "organization": { + "title": "Contributing organization", + "description": "Contains the name of a contributing organization being recognized.", + "type": "string", + "minLength": 1, + "examples": [ + "CISA", + "Google Project Zero", + "Talos" + ] + }, + "summary": { + "title": "Summary of the acknowledgment", + "description": "SHOULD represent any contextual details the document producers wish to make known about the acknowledgment or acknowledged parties.", + "type": "string", + "minLength": 1, + "examples": [ + "First analysis of Coordinated Multi-Stream Attack (CMSA)" + ] + }, + "urls": { + "title": "List of URLs", + "description": "Specifies a list of URLs or location of the reference to be acknowledged.", + "type": "array", + "minItems": 1, + "items": { + "title": "URL of acknowledgment", + "description": "Contains the URL or location of the reference to be acknowledged.", + "type": "string", + "format": "uri" + } + } + } + } + }, + "branches_t": { + "title": "List of branches", + "description": "Contains branch elements as children of the current element.", + "type": "array", + "minItems": 1, + "items": { + "title": "Branch", + "description": "Is a part of the hierarchical structure of the product tree.", + "type": "object", + "maxProperties": 3, + "minProperties": 3, + "required": [ + "category", + "name" + ], + "properties": { + "branches": { + "$ref": "#/$defs/branches_t" + }, + "category": { + "title": "Category of the branch", + "description": "Describes the characteristics of the labeled branch.", + "type": "string", + "enum": [ + "architecture", + "host_name", + "language", + "legacy", + "patch_level", + "product_family", + "product_name", + "product_version", + "product_version_range", + "service_pack", + "specification", + "vendor" + ] + }, + "name": { + "title": "Name of the branch", + "description": "Contains the canonical descriptor or 'friendly name' of the branch.", + "type": "string", + "minLength": 1, + "examples": [ + "10", + "365", + "Microsoft", + "Office", + "PCS 7", + "SIMATIC", + "Siemens", + "Windows" + ] + }, + "product": { + "$ref": "#/$defs/full_product_name_t" + } + } + } + }, + "full_product_name_t": { + "title": "Full product name", + "description": "Specifies information about the product and assigns the product_id.", + "type": "object", + "required": [ + "name", + "product_id" + ], + "properties": { + "name": { + "title": "Textual description of the product", + "description": "The value should be the product’s full canonical name, including version number and other attributes, as it would be used in a human-friendly document.", + "type": "string", + "minLength": 1, + "examples": [ + "Cisco AnyConnect Secure Mobility Client 2.3.185", + "Microsoft Host Integration Server 2006 Service Pack 1" + ] + }, + "product_id": { + "$ref": "#/$defs/product_id_t" + }, + "product_identification_helper": { + "title": "Helper to identify the product", + "description": "Provides at least one method which aids in identifying the product in an asset database.", + "type": "object", + "minProperties": 1, + "properties": { + "cpe": { + "title": "Common Platform Enumeration representation", + "description": "The Common Platform Enumeration (CPE) attribute refers to a method for naming platforms external to this specification.", + "type": "string", + "pattern": "^(cpe:2\\.3:[aho\\*\\-](:(((\\?*|\\*?)([a-zA-Z0-9\\-\\._]|(\\\\[\\\\\\*\\?!\"#\\$%&'\\(\\)\\+,/:;<=>@\\[\\]\\^`\\{\\|\\}~]))+(\\?*|\\*?))|[\\*\\-])){5}(:(([a-zA-Z]{2,3}(-([a-zA-Z]{2}|[0-9]{3}))?)|[\\*\\-]))(:(((\\?*|\\*?)([a-zA-Z0-9\\-\\._]|(\\\\[\\\\\\*\\?!\"#\\$%&'\\(\\)\\+,/:;<=>@\\[\\]\\^`\\{\\|\\}~]))+(\\?*|\\*?))|[\\*\\-])){4})|([c][pP][eE]:/[AHOaho]?(:[A-Za-z0-9\\._\\-~%]*){0,6})$", + "minLength": 5 + }, + "hashes": { + "title": "List of hashes", + "description": "Contains a list of cryptographic hashes usable to identify files.", + "type": "array", + "minItems": 1, + "items": { + "title": "Cryptographic hashes", + "description": "Contains all information to identify a file based on its cryptographic hash values.", + "type": "object", + "required": [ + "file_hashes", + "filename" + ], + "properties": { + "file_hashes": { + "title": "List of file hashes", + "description": "Contains a list of cryptographic hashes for this file.", + "type": "array", + "minItems": 1, + "items": { + "title": "File hash", + "description": "Contains one hash value and algorithm of the file to be identified.", + "type": "object", + "required": [ + "algorithm", + "value" + ], + "properties": { + "algorithm": { + "title": "Algorithm of the cryptographic hash", + "description": "Contains the name of the cryptographic hash algorithm used to calculate the value.", + "type": "string", + "default": "sha256", + "minLength": 1, + "examples": [ + "blake2b512", + "sha256", + "sha3-512", + "sha384", + "sha512" + ] + }, + "value": { + "title": "Value of the cryptographic hash", + "description": "Contains the cryptographic hash value in hexadecimal representation.", + "type": "string", + "pattern": "^[0-9a-fA-F]{32,}$", + "minLength": 32, + "examples": [ + "37df33cb7464da5c7f077f4d56a32bc84987ec1d85b234537c1c1a4d4fc8d09dc29e2e762cb5203677bf849a2855a0283710f1f5fe1d6ce8d5ac85c645d0fcb3", + "4775203615d9534a8bfca96a93dc8b461a489f69124a130d786b42204f3341cc", + "9ea4c8200113d49d26505da0e02e2f49055dc078d1ad7a419b32e291c7afebbb84badfbd46dec42883bea0b2a1fa697c" + ] + } + } + } + }, + "filename": { + "title": "Filename", + "description": "Contains the name of the file which is identified by the hash values.", + "type": "string", + "minLength": 1, + "examples": [ + "WINWORD.EXE", + "msotadddin.dll", + "sudoers.so" + ] + } + } + } + }, + "model_numbers": { + "title": "List of models", + "description": "Contains a list of full or abbreviated (partial) model numbers.", + "type": "array", + "minItems": 1, + "uniqueItems": true, + "items": { + "title": "Model number", + "description": "Contains a full or abbreviated (partial) model number of the component to identify.", + "type": "string", + "minLength": 1 + } + }, + "purl": { + "title": "package URL representation", + "description": "The package URL (purl) attribute refers to a method for reliably identifying and locating software packages external to this specification.", + "type": "string", + "format": "uri", + "pattern": "^pkg:[A-Za-z\\.\\-\\+][A-Za-z0-9\\.\\-\\+]*/.+", + "minLength": 7 + }, + "sbom_urls": { + "title": "List of SBOM URLs", + "description": "Contains a list of URLs where SBOMs for this product can be retrieved.", + "type": "array", + "minItems": 1, + "items": { + "title": "SBOM URL", + "description": "Contains a URL of one SBOM for this product.", + "type": "string", + "format": "uri" + } + }, + "serial_numbers": { + "title": "List of serial numbers", + "description": "Contains a list of full or abbreviated (partial) serial numbers.", + "type": "array", + "minItems": 1, + "uniqueItems": true, + "items": { + "title": "Serial number", + "description": "Contains a full or abbreviated (partial) serial number of the component to identify.", + "type": "string", + "minLength": 1 + } + }, + "skus": { + "title": "List of stock keeping units", + "description": "Contains a list of full or abbreviated (partial) stock keeping units.", + "type": "array", + "minItems": 1, + "items": { + "title": "Stock keeping unit", + "description": "Contains a full or abbreviated (partial) stock keeping unit (SKU) which is used in the ordering process to identify the component.", + "type": "string", + "minLength": 1 + } + }, + "x_generic_uris": { + "title": "List of generic URIs", + "description": "Contains a list of identifiers which are either vendor-specific or derived from a standard not yet supported.", + "type": "array", + "minItems": 1, + "items": { + "title": "Generic URI", + "description": "Provides a generic extension point for any identifier which is either vendor-specific or derived from a standard not yet supported.", + "type": "object", + "required": [ + "namespace", + "uri" + ], + "properties": { + "namespace": { + "title": "Namespace of the generic URI", + "description": "Refers to a URL which provides the name and knowledge about the specification used or is the namespace in which these values are valid.", + "type": "string", + "format": "uri" + }, + "uri": { + "title": "URI", + "description": "Contains the identifier itself.", + "type": "string", + "format": "uri" + } + } + } + } + } + } + } + }, + "lang_t": { + "title": "Language type", + "description": "Identifies a language, corresponding to IETF BCP 47 / RFC 5646. See IETF language registry: https://www.iana.org/assignments/language-subtag-registry/language-subtag-registry", + "type": "string", + "pattern": "^(([A-Za-z]{2,3}(-[A-Za-z]{3}(-[A-Za-z]{3}){0,2})?|[A-Za-z]{4,8})(-[A-Za-z]{4})?(-([A-Za-z]{2}|[0-9]{3}))?(-([A-Za-z0-9]{5,8}|[0-9][A-Za-z0-9]{3}))*(-[A-WY-Za-wy-z0-9](-[A-Za-z0-9]{2,8})+)*(-[Xx](-[A-Za-z0-9]{1,8})+)?|[Xx](-[A-Za-z0-9]{1,8})+|[Ii]-[Dd][Ee][Ff][Aa][Uu][Ll][Tt]|[Ii]-[Mm][Ii][Nn][Gg][Oo])$", + "examples": [ + "de", + "en", + "fr", + "frc", + "jp" + ] + }, + "notes_t": { + "title": "List of notes", + "description": "Contains notes which are specific to the current context.", + "type": "array", + "minItems": 1, + "items": { + "title": "Note", + "description": "Is a place to put all manner of text blobs related to the current context.", + "type": "object", + "required": [ + "category", + "text" + ], + "properties": { + "audience": { + "title": "Audience of note", + "description": "Indicates who is intended to read it.", + "type": "string", + "minLength": 1, + "examples": [ + "all", + "executives", + "operational management and system administrators", + "safety engineers" + ] + }, + "category": { + "title": "Note category", + "description": "Contains the information of what kind of note this is.", + "type": "string", + "enum": [ + "description", + "details", + "faq", + "general", + "legal_disclaimer", + "other", + "summary" + ] + }, + "text": { + "title": "Note content", + "description": "Holds the content of the note. Content varies depending on type.", + "type": "string", + "minLength": 1 + }, + "title": { + "title": "Title of note", + "description": "Provides a concise description of what is contained in the text of the note.", + "type": "string", + "minLength": 1, + "examples": [ + "Details", + "Executive summary", + "Technical summary", + "Impact on safety systems" + ] + } + } + } + }, + "product_group_id_t": { + "title": "Reference token for product group instance", + "description": "Token required to identify a group of products so that it can be referred to from other parts in the document. There is no predefined or required format for the product_group_id as long as it uniquely identifies a group in the context of the current document.", + "type": "string", + "minLength": 1, + "examples": [ + "CSAFGID-0001", + "CSAFGID-0002", + "CSAFGID-0020" + ] + }, + "product_groups_t": { + "title": "List of product_group_ids", + "description": "Specifies a list of product_group_ids to give context to the parent item.", + "type": "array", + "minItems": 1, + "uniqueItems": true, + "items": { + "$ref": "#/$defs/product_group_id_t" + } + }, + "product_id_t": { + "title": "Reference token for product instance", + "description": "Token required to identify a full_product_name so that it can be referred to from other parts in the document. There is no predefined or required format for the product_id as long as it uniquely identifies a product in the context of the current document.", + "type": "string", + "minLength": 1, + "examples": [ + "CSAFPID-0004", + "CSAFPID-0008" + ] + }, + "products_t": { + "title": "List of product_ids", + "description": "Specifies a list of product_ids to give context to the parent item.", + "type": "array", + "minItems": 1, + "uniqueItems": true, + "items": { + "$ref": "#/$defs/product_id_t" + } + }, + "references_t": { + "title": "List of references", + "description": "Holds a list of references.", + "type": "array", + "minItems": 1, + "items": { + "title": "Reference", + "description": "Holds any reference to conferences, papers, advisories, and other resources that are related and considered related to either a surrounding part of or the entire document and to be of value to the document consumer.", + "type": "object", + "required": [ + "summary", + "url" + ], + "properties": { + "category": { + "title": "Category of reference", + "description": "Indicates whether the reference points to the same document or vulnerability in focus (depending on scope) or to an external resource.", + "type": "string", + "default": "external", + "enum": [ + "external", + "self" + ] + }, + "summary": { + "title": "Summary of the reference", + "description": "Indicates what this reference refers to.", + "type": "string", + "minLength": 1 + }, + "url": { + "title": "URL of reference", + "description": "Provides the URL for the reference.", + "type": "string", + "format": "uri" + } + } + } + }, + "version_t": { + "title": "Version", + "description": "Specifies a version string to denote clearly the evolution of the content of the document. Format must be either integer or semantic versioning.", + "type": "string", + "pattern": "^(0|[1-9][0-9]*)$|^((0|[1-9]\\d*)\\.(0|[1-9]\\d*)\\.(0|[1-9]\\d*)(?:-((?:0|[1-9]\\d*|\\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\\.(?:0|[1-9]\\d*|\\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\\+([0-9a-zA-Z-]+(?:\\.[0-9a-zA-Z-]+)*))?)$", + "examples": [ + "1", + "4", + "0.9.0", + "1.4.3", + "2.40.0+21AF26D3" + ] + } + }, + "required": [ + "document" + ], + "properties": { + "document": { + "title": "Document level meta-data", + "description": "Captures the meta-data about this document describing a particular set of security advisories.", + "type": "object", + "required": [ + "category", + "csaf_version", + "publisher", + "title", + "tracking" + ], + "properties": { + "acknowledgments": { + "title": "Document acknowledgments", + "description": "Contains a list of acknowledgment elements associated with the whole document.", + "$ref": "#/$defs/acknowledgments_t" + }, + "aggregate_severity": { + "title": "Aggregate severity", + "description": "Is a vehicle that is provided by the document producer to convey the urgency and criticality with which the one or more vulnerabilities reported should be addressed. It is a document-level metric and applied to the document as a whole — not any specific vulnerability. The range of values in this field is defined according to the document producer's policies and procedures.", + "type": "object", + "required": [ + "text" + ], + "properties": { + "namespace": { + "title": "Namespace of aggregate severity", + "description": "Points to the namespace so referenced.", + "type": "string", + "format": "uri" + }, + "text": { + "title": "Text of aggregate severity", + "description": "Provides a severity which is independent of - and in addition to - any other standard metric for determining the impact or severity of a given vulnerability (such as CVSS).", + "type": "string", + "minLength": 1, + "examples": [ + "Critical", + "Important", + "Moderate" + ] + } + } + }, + "category": { + "title": "Document category", + "description": "Defines a short canonical name, chosen by the document producer, which will inform the end user as to the category of document.", + "type": "string", + "pattern": "^[^\\s\\-_\\.](.*[^\\s\\-_\\.])?$", + "minLength": 1, + "examples": [ + "csaf_base", + "csaf_security_advisory", + "csaf_vex", + "Example Company Security Notice" + ] + }, + "csaf_version": { + "title": "CSAF version", + "description": "Gives the version of the CSAF specification which the document was generated for.", + "type": "string", + "enum": [ + "2.0" + ] + }, + "distribution": { + "title": "Rules for sharing document", + "description": "Describe any constraints on how this document might be shared.", + "type": "object", + "minProperties": 1, + "properties": { + "text": { + "title": "Textual description", + "description": "Provides a textual description of additional constraints.", + "type": "string", + "minLength": 1, + "examples": [ + "Copyright 2021, Example Company, All Rights Reserved.", + "Distribute freely.", + "Share only on a need-to-know-basis only." + ] + }, + "tlp": { + "title": "Traffic Light Protocol (TLP)", + "description": "Provides details about the TLP classification of the document.", + "type": "object", + "required": [ + "label" + ], + "properties": { + "label": { + "title": "Label of TLP", + "description": "Provides the TLP label of the document.", + "type": "string", + "enum": [ + "AMBER", + "GREEN", + "RED", + "WHITE" + ] + }, + "url": { + "title": "URL of TLP version", + "description": "Provides a URL where to find the textual description of the TLP version which is used in this document. Default is the URL to the definition by FIRST.", + "type": "string", + "default": "https://www.first.org/tlp/", + "format": "uri", + "examples": [ + "https://www.us-cert.gov/tlp", + "https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Kritis/Merkblatt_TLP.pdf" + ] + } + } + } + } + }, + "lang": { + "title": "Document language", + "description": "Identifies the language used by this document, corresponding to IETF BCP 47 / RFC 5646.", + "$ref": "#/$defs/lang_t" + }, + "notes": { + "title": "Document notes", + "description": "Holds notes associated with the whole document.", + "$ref": "#/$defs/notes_t" + }, + "publisher": { + "title": "Publisher", + "description": "Provides information about the publisher of the document.", + "type": "object", + "required": [ + "category", + "name", + "namespace" + ], + "properties": { + "category": { + "title": "Category of publisher", + "description": "Provides information about the category of publisher releasing the document.", + "type": "string", + "enum": [ + "coordinator", + "discoverer", + "other", + "translator", + "user", + "vendor" + ] + }, + "contact_details": { + "title": "Contact details", + "description": "Information on how to contact the publisher, possibly including details such as web sites, email addresses, phone numbers, and postal mail addresses.", + "type": "string", + "minLength": 1, + "examples": [ + "Example Company can be reached at contact_us@example.com, or via our website at https://www.example.com/contact." + ] + }, + "issuing_authority": { + "title": "Issuing authority", + "description": "Provides information about the authority of the issuing party to release the document, in particular, the party's constituency and responsibilities or other obligations.", + "type": "string", + "minLength": 1 + }, + "name": { + "title": "Name of publisher", + "description": "Contains the name of the issuing party.", + "type": "string", + "minLength": 1, + "examples": [ + "BSI", + "Cisco PSIRT", + "Siemens ProductCERT" + ] + }, + "namespace": { + "title": "Namespace of publisher", + "description": "Contains a URL which is under control of the issuing party and can be used as a globally unique identifier for that issuing party.", + "type": "string", + "format": "uri", + "examples": [ + "https://csaf.io", + "https://www.example.com" + ] + } + } + }, + "references": { + "title": "Document references", + "description": "Holds a list of references associated with the whole document.", + "$ref": "#/$defs/references_t" + }, + "source_lang": { + "title": "Source language", + "description": "If this copy of the document is a translation then the value of this property describes from which language this document was translated.", + "$ref": "#/$defs/lang_t" + }, + "title": { + "title": "Title of this document", + "description": "This SHOULD be a canonical name for the document, and sufficiently unique to distinguish it from similar documents.", + "type": "string", + "minLength": 1, + "examples": [ + "Cisco IPv6 Crafted Packet Denial of Service Vulnerability", + "Example Company Cross-Site-Scripting Vulnerability in Example Generator" + ] + }, + "tracking": { + "title": "Tracking", + "description": "Is a container designated to hold all management attributes necessary to track a CSAF document as a whole.", + "type": "object", + "required": [ + "current_release_date", + "id", + "initial_release_date", + "revision_history", + "status", + "version" + ], + "properties": { + "aliases": { + "title": "Aliases", + "description": "Contains a list of alternate names for the same document.", + "type": "array", + "minItems": 1, + "uniqueItems": true, + "items": { + "title": "Alternate name", + "description": "Specifies a non-empty string that represents a distinct optional alternative ID used to refer to the document.", + "type": "string", + "minLength": 1, + "examples": [ + "CVE-2019-12345" + ] + } + }, + "current_release_date": { + "title": "Current release date", + "description": "The date when the current revision of this document was released", + "type": "string", + "format": "date-time" + }, + "generator": { + "title": "Document generator", + "description": "Is a container to hold all elements related to the generation of the document. These items will reference when the document was actually created, including the date it was generated and the entity that generated it.", + "type": "object", + "required": [ + "engine" + ], + "properties": { + "date": { + "title": "Date of document generation", + "description": "This SHOULD be the current date that the document was generated. Because documents are often generated internally by a document producer and exist for a nonzero amount of time before being released, this field MAY be different from the Initial Release Date and Current Release Date.", + "type": "string", + "format": "date-time" + }, + "engine": { + "title": "Engine of document generation", + "description": "Contains information about the engine that generated the CSAF document.", + "type": "object", + "required": [ + "name" + ], + "properties": { + "name": { + "title": "Engine name", + "description": "Represents the name of the engine that generated the CSAF document.", + "type": "string", + "minLength": 1, + "examples": [ + "Red Hat rhsa-to-cvrf", + "Secvisogram", + "TVCE" + ] + }, + "version": { + "title": "Engine version", + "description": "Contains the version of the engine that generated the CSAF document.", + "type": "string", + "minLength": 1, + "examples": [ + "0.6.0", + "1.0.0-beta+exp.sha.a1c44f85", + "2" + ] + } + } + } + } + }, + "id": { + "title": "Unique identifier for the document", + "description": "The ID is a simple label that provides for a wide range of numbering values, types, and schemes. Its value SHOULD be assigned and maintained by the original document issuing authority.", + "type": "string", + "pattern": "^[\\S](.*[\\S])?$", + "minLength": 1, + "examples": [ + "Example Company - 2019-YH3234", + "RHBA-2019:0024", + "cisco-sa-20190513-secureboot" + ] + }, + "initial_release_date": { + "title": "Initial release date", + "description": "The date when this document was first published.", + "type": "string", + "format": "date-time" + }, + "revision_history": { + "title": "Revision history", + "description": "Holds one revision item for each version of the CSAF document, including the initial one.", + "type": "array", + "minItems": 1, + "items": { + "title": "Revision", + "description": "Contains all the information elements required to track the evolution of a CSAF document.", + "type": "object", + "required": [ + "date", + "number", + "summary" + ], + "properties": { + "date": { + "title": "Date of the revision", + "description": "The date of the revision entry", + "type": "string", + "format": "date-time" + }, + "legacy_version": { + "title": "Legacy version of the revision", + "description": "Contains the version string used in an existing document with the same content.", + "type": "string", + "minLength": 1 + }, + "number": { + "$ref": "#/$defs/version_t" + }, + "summary": { + "title": "Summary of the revision", + "description": "Holds a single non-empty string representing a short description of the changes.", + "type": "string", + "minLength": 1, + "examples": [ + "Initial version." + ] + } + } + } + }, + "status": { + "title": "Document status", + "description": "Defines the draft status of the document.", + "type": "string", + "enum": [ + "draft", + "final", + "interim" + ] + }, + "version": { + "$ref": "#/$defs/version_t" + } + } + } + } + }, + "product_tree": { + "title": "Product tree", + "description": "Is a container for all fully qualified product names that can be referenced elsewhere in the document.", + "type": "object", + "minProperties": 1, + "properties": { + "branches": { + "$ref": "#/$defs/branches_t" + }, + "full_product_names": { + "title": "List of full product names", + "description": "Contains a list of full product names.", + "type": "array", + "minItems": 1, + "items": { + "$ref": "#/$defs/full_product_name_t" + } + }, + "product_groups": { + "title": "List of product groups", + "description": "Contains a list of product groups.", + "type": "array", + "minItems": 1, + "items": { + "title": "Product group", + "description": "Defines a new logical group of products that can then be referred to in other parts of the document to address a group of products with a single identifier.", + "type": "object", + "required": [ + "group_id", + "product_ids" + ], + "properties": { + "group_id": { + "$ref": "#/$defs/product_group_id_t" + }, + "product_ids": { + "title": "List of Product IDs", + "description": "Lists the product_ids of those products which known as one group in the document.", + "type": "array", + "minItems": 2, + "uniqueItems": true, + "items": { + "$ref": "#/$defs/product_id_t" + } + }, + "summary": { + "title": "Summary of the product group", + "description": "Gives a short, optional description of the group.", + "type": "string", + "minLength": 1, + "examples": [ + "Products supporting Modbus.", + "The x64 versions of the operating system." + ] + } + } + } + }, + "relationships": { + "title": "List of relationships", + "description": "Contains a list of relationships.", + "type": "array", + "minItems": 1, + "items": { + "title": "Relationship", + "description": "Establishes a link between two existing full_product_name_t elements, allowing the document producer to define a combination of two products that form a new full_product_name entry.", + "type": "object", + "required": [ + "category", + "full_product_name", + "product_reference", + "relates_to_product_reference" + ], + "properties": { + "category": { + "title": "Relationship category", + "description": "Defines the category of relationship for the referenced component.", + "type": "string", + "enum": [ + "default_component_of", + "external_component_of", + "installed_on", + "installed_with", + "optional_component_of" + ] + }, + "full_product_name": { + "$ref": "#/$defs/full_product_name_t" + }, + "product_reference": { + "title": "Product reference", + "description": "Holds a Product ID that refers to the Full Product Name element, which is referenced as the first element of the relationship.", + "$ref": "#/$defs/product_id_t" + }, + "relates_to_product_reference": { + "title": "Relates to product reference", + "description": "Holds a Product ID that refers to the Full Product Name element, which is referenced as the second element of the relationship.", + "$ref": "#/$defs/product_id_t" + } + } + } + } + } + }, + "vulnerabilities": { + "title": "Vulnerabilities", + "description": "Represents a list of all relevant vulnerability information items.", + "type": "array", + "minItems": 1, + "items": { + "title": "Vulnerability", + "description": "Is a container for the aggregation of all fields that are related to a single vulnerability in the document.", + "type": "object", + "minProperties": 1, + "properties": { + "acknowledgments": { + "title": "Vulnerability acknowledgments", + "description": "Contains a list of acknowledgment elements associated with this vulnerability item.", + "$ref": "#/$defs/acknowledgments_t" + }, + "cve": { + "title": "CVE", + "description": "Holds the MITRE standard Common Vulnerabilities and Exposures (CVE) tracking number for the vulnerability.", + "type": "string", + "pattern": "^CVE-[0-9]{4}-[0-9]{4,}$" + }, + "cwe": { + "title": "CWE", + "description": "Holds the MITRE standard Common Weakness Enumeration (CWE) for the weakness associated.", + "type": "object", + "required": [ + "id", + "name" + ], + "properties": { + "id": { + "title": "Weakness ID", + "description": "Holds the ID for the weakness associated.", + "type": "string", + "pattern": "^CWE-[1-9]\\d{0,5}$", + "examples": [ + "CWE-22", + "CWE-352", + "CWE-79" + ] + }, + "name": { + "title": "Weakness name", + "description": "Holds the full name of the weakness as given in the CWE specification.", + "type": "string", + "minLength": 1, + "examples": [ + "Cross-Site Request Forgery (CSRF)", + "Improper Limitation of a Pathname to a Restricted Directory ('Path Traversal')", + "Improper Neutralization of Input During Web Page Generation ('Cross-site Scripting')" + ] + } + } + }, + "discovery_date": { + "title": "Discovery date", + "description": "Holds the date and time the vulnerability was originally discovered.", + "type": "string", + "format": "date-time" + }, + "flags": { + "title": "List of flags", + "description": "Contains a list of machine readable flags.", + "type": "array", + "minItems": 1, + "uniqueItems": true, + "items": { + "title": "Flag", + "description": "Contains product specific information in regard to this vulnerability as a single machine readable flag.", + "type": "object", + "required": [ + "label" + ], + "properties": { + "date": { + "title": "Date of the flag", + "description": "Contains the date when assessment was done or the flag was assigned.", + "type": "string", + "format": "date-time" + }, + "group_ids": { + "$ref": "#/$defs/product_groups_t" + }, + "label": { + "title": "Label of the flag", + "description": "Specifies the machine readable label.", + "type": "string", + "enum": [ + "component_not_present", + "inline_mitigations_already_exist", + "vulnerable_code_cannot_be_controlled_by_adversary", + "vulnerable_code_not_in_execute_path", + "vulnerable_code_not_present" + ] + }, + "product_ids": { + "$ref": "#/$defs/products_t" + } + } + } + }, + "ids": { + "title": "List of IDs", + "description": "Represents a list of unique labels or tracking IDs for the vulnerability (if such information exists).", + "type": "array", + "minItems": 1, + "uniqueItems": true, + "items": { + "title": "ID", + "description": "Contains a single unique label or tracking ID for the vulnerability.", + "type": "object", + "required": [ + "system_name", + "text" + ], + "properties": { + "system_name": { + "title": "System name", + "description": "Indicates the name of the vulnerability tracking or numbering system.", + "type": "string", + "minLength": 1, + "examples": [ + "Cisco Bug ID", + "GitHub Issue" + ] + }, + "text": { + "title": "Text", + "description": "Is unique label or tracking ID for the vulnerability (if such information exists).", + "type": "string", + "minLength": 1, + "examples": [ + "CSCso66472", + "oasis-tcs/csaf#210" + ] + } + } + } + }, + "involvements": { + "title": "List of involvements", + "description": "Contains a list of involvements.", + "type": "array", + "minItems": 1, + "uniqueItems": true, + "items": { + "title": "Involvement", + "description": "Is a container, that allows the document producers to comment on the level of involvement (or engagement) of themselves or third parties in the vulnerability identification, scoping, and remediation process.", + "type": "object", + "required": [ + "party", + "status" + ], + "properties": { + "date": { + "title": "Date of involvement", + "description": "Holds the date and time of the involvement entry.", + "type": "string", + "format": "date-time" + }, + "party": { + "title": "Party category", + "description": "Defines the category of the involved party.", + "type": "string", + "enum": [ + "coordinator", + "discoverer", + "other", + "user", + "vendor" + ] + }, + "status": { + "title": "Party status", + "description": "Defines contact status of the involved party.", + "type": "string", + "enum": [ + "completed", + "contact_attempted", + "disputed", + "in_progress", + "not_contacted", + "open" + ] + }, + "summary": { + "title": "Summary of the involvement", + "description": "Contains additional context regarding what is going on.", + "type": "string", + "minLength": 1 + } + } + } + }, + "notes": { + "title": "Vulnerability notes", + "description": "Holds notes associated with this vulnerability item.", + "$ref": "#/$defs/notes_t" + }, + "product_status": { + "title": "Product status", + "description": "Contains different lists of product_ids which provide details on the status of the referenced product related to the current vulnerability. ", + "type": "object", + "minProperties": 1, + "properties": { + "first_affected": { + "title": "First affected", + "description": "These are the first versions of the releases known to be affected by the vulnerability.", + "$ref": "#/$defs/products_t" + }, + "first_fixed": { + "title": "First fixed", + "description": "These versions contain the first fix for the vulnerability but may not be the recommended fixed versions.", + "$ref": "#/$defs/products_t" + }, + "fixed": { + "title": "Fixed", + "description": "These versions contain a fix for the vulnerability but may not be the recommended fixed versions.", + "$ref": "#/$defs/products_t" + }, + "known_affected": { + "title": "Known affected", + "description": "These versions are known to be affected by the vulnerability.", + "$ref": "#/$defs/products_t" + }, + "known_not_affected": { + "title": "Known not affected", + "description": "These versions are known not to be affected by the vulnerability.", + "$ref": "#/$defs/products_t" + }, + "last_affected": { + "title": "Last affected", + "description": "These are the last versions in a release train known to be affected by the vulnerability. Subsequently released versions would contain a fix for the vulnerability.", + "$ref": "#/$defs/products_t" + }, + "recommended": { + "title": "Recommended", + "description": "These versions have a fix for the vulnerability and are the vendor-recommended versions for fixing the vulnerability.", + "$ref": "#/$defs/products_t" + }, + "under_investigation": { + "title": "Under investigation", + "description": "It is not known yet whether these versions are or are not affected by the vulnerability. However, it is still under investigation - the result will be provided in a later release of the document.", + "$ref": "#/$defs/products_t" + } + } + }, + "references": { + "title": "Vulnerability references", + "description": "Holds a list of references associated with this vulnerability item.", + "$ref": "#/$defs/references_t" + }, + "release_date": { + "title": "Release date", + "description": "Holds the date and time the vulnerability was originally released into the wild.", + "type": "string", + "format": "date-time" + }, + "remediations": { + "title": "List of remediations", + "description": "Contains a list of remediations.", + "type": "array", + "minItems": 1, + "items": { + "title": "Remediation", + "description": "Specifies details on how to handle (and presumably, fix) a vulnerability.", + "type": "object", + "required": [ + "category", + "details" + ], + "properties": { + "category": { + "title": "Category of the remediation", + "description": "Specifies the category which this remediation belongs to.", + "type": "string", + "enum": [ + "mitigation", + "no_fix_planned", + "none_available", + "vendor_fix", + "workaround" + ] + }, + "date": { + "title": "Date of the remediation", + "description": "Contains the date from which the remediation is available.", + "type": "string", + "format": "date-time" + }, + "details": { + "title": "Details of the remediation", + "description": "Contains a thorough human-readable discussion of the remediation.", + "type": "string", + "minLength": 1 + }, + "entitlements": { + "title": "List of entitlements", + "description": "Contains a list of entitlements.", + "type": "array", + "minItems": 1, + "items": { + "title": "Entitlement of the remediation", + "description": "Contains any possible vendor-defined constraints for obtaining fixed software or hardware that fully resolves the vulnerability.", + "type": "string", + "minLength": 1 + } + }, + "group_ids": { + "$ref": "#/$defs/product_groups_t" + }, + "product_ids": { + "$ref": "#/$defs/products_t" + }, + "restart_required": { + "title": "Restart required by remediation", + "description": "Provides information on category of restart is required by this remediation to become effective.", + "type": "object", + "required": [ + "category" + ], + "properties": { + "category": { + "title": "Category of restart", + "description": "Specifies what category of restart is required by this remediation to become effective.", + "type": "string", + "enum": [ + "connected", + "dependencies", + "machine", + "none", + "parent", + "service", + "system", + "vulnerable_component", + "zone" + ] + }, + "details": { + "title": "Additional restart information", + "description": "Provides additional information for the restart. This can include details on procedures, scope or impact.", + "type": "string", + "minLength": 1 + } + } + }, + "url": { + "title": "URL to the remediation", + "description": "Contains the URL where to obtain the remediation.", + "type": "string", + "format": "uri" + } + } + } + }, + "scores": { + "title": "List of scores", + "description": "Contains score objects for the current vulnerability.", + "type": "array", + "minItems": 1, + "items": { + "title": "Score", + "description": "Specifies information about (at least one) score of the vulnerability and for which products the given value applies.", + "type": "object", + "minProperties": 2, + "required": [ + "products" + ], + "properties": { + "cvss_v2": { + "$ref": "https://www.first.org/cvss/cvss-v2.0.json" + }, + "cvss_v3": { + "oneOf": [ + { + "$ref": "https://www.first.org/cvss/cvss-v3.0.json" + }, + { + "$ref": "https://www.first.org/cvss/cvss-v3.1.json" + } + ] + }, + "products": { + "$ref": "#/$defs/products_t" + } + } + } + }, + "threats": { + "title": "List of threats", + "description": "Contains information about a vulnerability that can change with time.", + "type": "array", + "minItems": 1, + "items": { + "title": "Threat", + "description": "Contains the vulnerability kinetic information. This information can change as the vulnerability ages and new information becomes available.", + "type": "object", + "required": [ + "category", + "details" + ], + "properties": { + "category": { + "title": "Category of the threat", + "description": "Categorizes the threat according to the rules of the specification.", + "type": "string", + "enum": [ + "exploit_status", + "impact", + "target_set" + ] + }, + "date": { + "title": "Date of the threat", + "description": "Contains the date when the assessment was done or the threat appeared.", + "type": "string", + "format": "date-time" + }, + "details": { + "title": "Details of the threat", + "description": "Represents a thorough human-readable discussion of the threat.", + "type": "string", + "minLength": 1 + }, + "group_ids": { + "$ref": "#/$defs/product_groups_t" + }, + "product_ids": { + "$ref": "#/$defs/products_t" + } + } + } + }, + "title": { + "title": "Title", + "description": "Gives the document producer the ability to apply a canonical name or title to the vulnerability.", + "type": "string", + "minLength": 1 + } + } + } + } + } +} diff --git a/src/vuln_analysis/data_models/output.py b/src/vuln_analysis/data_models/output.py index 232a98a2..e77469a3 100644 --- a/src/vuln_analysis/data_models/output.py +++ b/src/vuln_analysis/data_models/output.py @@ -81,12 +81,22 @@ class AgentMorpheusEngineOutput(BaseModel): cvss: CVSSOutput | None +class OutputPayload(BaseModel): + """ + Wrapper for final pipeline results. + - analysis: per-vulnerability analysis results + - vex: the vulnerability exploitability exchange document JSON + """ + analysis: list[AgentMorpheusEngineOutput] + vex: dict[str, typing.Any] | None + + class AgentMorpheusOutput(AgentMorpheusEngineInput): """" The final output of the Agent Morpheus pipeline. Contains all fields in the AgentMorpheusEngineInput, plus the AgentMorpheusEngineOuput for each input vulnerability. """ - output: list[AgentMorpheusEngineOutput] + output: OutputPayload @model_validator(mode="before") @classmethod diff --git a/src/vuln_analysis/data_models/plugins/intel_plugin.py b/src/vuln_analysis/data_models/plugins/intel_plugin.py index cdad0475..9b85c6d1 100644 --- a/src/vuln_analysis/data_models/plugins/intel_plugin.py +++ b/src/vuln_analysis/data_models/plugins/intel_plugin.py @@ -7,7 +7,6 @@ # disclosure or distribution of this material and related documentation # without an express license agreement from NVIDIA CORPORATION or # its affiliates is strictly prohibited. -import logging import requests from pydantic import BaseModel diff --git a/src/vuln_analysis/data_models/state.py b/src/vuln_analysis/data_models/state.py index 7b77e7f8..38108bce 100644 --- a/src/vuln_analysis/data_models/state.py +++ b/src/vuln_analysis/data_models/state.py @@ -34,5 +34,6 @@ class AgentMorpheusEngineState(BaseModel): justifications: dict[str, dict[str, str]] = {} poor_quality_intel_vul: dict[str, int] = {} cvss_results: dict[str, dict[str, str]] = {} + vex: dict[str, typing.Any] | None = None current_vuln_id: str | None = None diff --git a/src/vuln_analysis/functions/cve_agent.py b/src/vuln_analysis/functions/cve_agent.py index 60ca3a78..f4a3f62b 100644 --- a/src/vuln_analysis/functions/cve_agent.py +++ b/src/vuln_analysis/functions/cve_agent.py @@ -15,7 +15,6 @@ import asyncio from vuln_analysis.runtime_context import ctx_state -import logging import typing from aiq.builder.builder import Builder from aiq.builder.framework_enum import LLMFrameworkEnum diff --git a/src/vuln_analysis/functions/cve_calculate_intel_score.py b/src/vuln_analysis/functions/cve_calculate_intel_score.py index ef1a2588..5f562e4d 100644 --- a/src/vuln_analysis/functions/cve_calculate_intel_score.py +++ b/src/vuln_analysis/functions/cve_calculate_intel_score.py @@ -14,7 +14,6 @@ # limitations under the License. import asyncio -import logging import aiohttp from aiq.builder.builder import Builder @@ -24,8 +23,8 @@ from aiq.data_models.function import FunctionBaseConfig from pydantic import Field -logger = logging.getLogger(__name__) - +from exploit_iq_commons.logging.loggers_factory import LoggingFactory +logger = LoggingFactory.get_agent_logger(__name__) class CVECalculateIntelScoreConfig(FunctionBaseConfig, name="cve_calculate_intel_score"): """ diff --git a/src/vuln_analysis/functions/cve_check_vuln_deps.py b/src/vuln_analysis/functions/cve_check_vuln_deps.py index 4875e792..4c3153e7 100644 --- a/src/vuln_analysis/functions/cve_check_vuln_deps.py +++ b/src/vuln_analysis/functions/cve_check_vuln_deps.py @@ -14,7 +14,6 @@ # limitations under the License. import asyncio -import logging import aiohttp from aiq.builder.builder import Builder diff --git a/src/vuln_analysis/functions/cve_checklist.py b/src/vuln_analysis/functions/cve_checklist.py index f0cd6566..696c7fb9 100644 --- a/src/vuln_analysis/functions/cve_checklist.py +++ b/src/vuln_analysis/functions/cve_checklist.py @@ -14,7 +14,6 @@ # limitations under the License. import asyncio -import logging import pandas as pd from aiq.builder.builder import Builder diff --git a/src/vuln_analysis/functions/cve_fetch_intel.py b/src/vuln_analysis/functions/cve_fetch_intel.py index 1acc59e3..26bb52c3 100644 --- a/src/vuln_analysis/functions/cve_fetch_intel.py +++ b/src/vuln_analysis/functions/cve_fetch_intel.py @@ -14,7 +14,6 @@ # limitations under the License. import asyncio -import logging import typing import aiohttp diff --git a/src/vuln_analysis/functions/cve_file_output.py b/src/vuln_analysis/functions/cve_file_output.py index 0d856282..6e41d198 100644 --- a/src/vuln_analysis/functions/cve_file_output.py +++ b/src/vuln_analysis/functions/cve_file_output.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - from aiq.builder.builder import Builder from aiq.builder.function_info import FunctionInfo from aiq.cli.register_workflow import register_function diff --git a/src/vuln_analysis/functions/cve_generate_cvss.py b/src/vuln_analysis/functions/cve_generate_cvss.py index 424c24d2..8343d1c1 100644 --- a/src/vuln_analysis/functions/cve_generate_cvss.py +++ b/src/vuln_analysis/functions/cve_generate_cvss.py @@ -15,7 +15,6 @@ import asyncio from vuln_analysis.runtime_context import ctx_state -import logging import json import typing import re @@ -38,7 +37,8 @@ from vuln_analysis.tools.tool_names import ToolNames from vuln_analysis.utils.prompting import get_cvss_prompt -logger = logging.getLogger(__name__) +from exploit_iq_commons.logging.loggers_factory import LoggingFactory +logger = LoggingFactory.get_agent_logger(__name__) OUTPUT_CONTAIN_BOTH_ACTION_AND_FINAL_ANSWER = "Parsing LLM output produced both a final answer and a parse-able action" OUTPUT_CONTAIN_PARSING_ERROR = "Could not parse" diff --git a/src/vuln_analysis/functions/cve_generate_vex.py b/src/vuln_analysis/functions/cve_generate_vex.py new file mode 100644 index 00000000..62a9d059 --- /dev/null +++ b/src/vuln_analysis/functions/cve_generate_vex.py @@ -0,0 +1,62 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +from aiq.builder.builder import Builder +from aiq.builder.function_info import FunctionInfo +from aiq.cli.register_workflow import register_function +from aiq.data_models.function import FunctionBaseConfig +from pydantic import Field +from vuln_analysis.data_models.state import AgentMorpheusEngineState +from vuln_analysis.utils.vex.vex_generator_loader import load_vex_generator + +from exploit_iq_commons.logging.loggers_factory import LoggingFactory +logger = LoggingFactory.get_agent_logger(__name__) + +class CVEGenerateVexConfig(FunctionBaseConfig, name="cve_generate_vex"): + """ + Defines a function that generates a custom VEX (Vendor Exploitability eXchange) document for vulnerable components following the OpenVEX specification. + """ + + skip: bool = Field(default=False, description="Whether or not the VEX generator should be skipped.") + vex_format: str = Field(default="csaf", description="VEX document format to generate.") + +@register_function(config_type=CVEGenerateVexConfig) +async def cve_generate_vex(config: CVEGenerateVexConfig, builder: Builder): + + async def _arun(state: AgentMorpheusEngineState) -> AgentMorpheusEngineState: + if config.skip: + logger.info("`config.skip` is set to True. Skipping VEX generation.") + return state + + if not any(justification.get("justification_label") == "vulnerable" for justification in state.justifications.values()): + logger.info("No vulnerable CVE(s) found. Skipping VEX generation.") + return state + + try: + generator = load_vex_generator(config.vex_format) + vex_doc = generator.generate(state) + state.vex = vex_doc + except ValueError as e: + logger.error("VEX generator initialization failed: %s", e) + except Exception as e: + logger.error("VEX document generation failed: %s", e) + + return state + + yield FunctionInfo.from_fn(_arun, + input_schema=AgentMorpheusEngineState, + description="Generates a custom VEX document for vulnerable components.") diff --git a/src/vuln_analysis/functions/cve_http_output.py b/src/vuln_analysis/functions/cve_http_output.py index 7a78c7c6..fcc3b172 100644 --- a/src/vuln_analysis/functions/cve_http_output.py +++ b/src/vuln_analysis/functions/cve_http_output.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import base64 -import logging from http import HTTPStatus from aiq.builder.builder import Builder diff --git a/src/vuln_analysis/functions/cve_justify.py b/src/vuln_analysis/functions/cve_justify.py index fce509f0..8124b422 100644 --- a/src/vuln_analysis/functions/cve_justify.py +++ b/src/vuln_analysis/functions/cve_justify.py @@ -14,7 +14,6 @@ # limitations under the License. import asyncio -import logging from aiq.builder.builder import Builder from aiq.builder.framework_enum import LLMFrameworkEnum diff --git a/src/vuln_analysis/functions/cve_process_sbom.py b/src/vuln_analysis/functions/cve_process_sbom.py index 65865167..c2b78d25 100644 --- a/src/vuln_analysis/functions/cve_process_sbom.py +++ b/src/vuln_analysis/functions/cve_process_sbom.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - from aiq.builder.builder import Builder from aiq.builder.framework_enum import LLMFrameworkEnum from aiq.builder.function_info import FunctionInfo diff --git a/src/vuln_analysis/functions/cve_summarize.py b/src/vuln_analysis/functions/cve_summarize.py index 608fff83..db7fba0b 100644 --- a/src/vuln_analysis/functions/cve_summarize.py +++ b/src/vuln_analysis/functions/cve_summarize.py @@ -14,7 +14,6 @@ # limitations under the License. import asyncio -import logging from aiq.builder.builder import Builder from aiq.builder.framework_enum import LLMFrameworkEnum diff --git a/src/vuln_analysis/register.py b/src/vuln_analysis/register.py index 48fe71d9..19541df1 100644 --- a/src/vuln_analysis/register.py +++ b/src/vuln_analysis/register.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging from datetime import datetime from io import TextIOWrapper @@ -40,6 +39,7 @@ from vuln_analysis.functions import cve_process_sbom from vuln_analysis.functions import cve_summarize from vuln_analysis.functions import cve_generate_cvss +from vuln_analysis.functions import cve_generate_vex from vuln_analysis.functions import health_endpoint from vuln_analysis.tools import lexical_full_search # This is actually registers the tool in the type registry of NAT! @@ -69,6 +69,7 @@ class CVEAgentWorkflowConfig(FunctionBaseConfig, name="cve_agent"): cve_agent_executor_name: str = Field(description="Function name to run CVE agent on checklist") cve_summarize_name: str = Field(description="Function name to generate summary") cve_justify_name: str = Field(description="Function to generate justifications for each CVE") + cve_generate_vex_name: str = Field(description="Function name to generate VEX for vulnerable components") cve_generate_cvss_name: str = Field(description="Function name to generate CVSS") cve_output_config_name: str | None = Field(default=None, description="Function to output workflow results " @@ -95,6 +96,7 @@ async def cve_agent_workflow(config: CVEAgentWorkflowConfig, builder: Builder): cve_agent_executor_fn = builder.get_function(name=config.cve_agent_executor_name) cve_summary_fn = builder.get_function(name=config.cve_summarize_name) cve_justify_fn = builder.get_function(name=config.cve_justify_name) + cve_generate_vex_fn = builder.get_function(name=config.cve_generate_vex_name) cve_generate_cvss_fn = builder.get_function(name=config.cve_generate_cvss_name) cve_output_fn = builder.get_function(name=config.cve_output_config_name) if config.cve_output_config_name else None @@ -156,6 +158,11 @@ async def justify_node(state: AgentMorpheusEngineState) -> AgentMorpheusEngineSt return await cve_justify_fn.ainvoke(state.model_dump()) + async def generate_vex_node(state: AgentMorpheusEngineState) -> AgentMorpheusEngineState: + """Generates VEX for vulnerable components""" + + return await cve_generate_vex_fn.ainvoke(state.model_dump()) + async def generate_cvss_node(state: AgentMorpheusEngineState) -> AgentMorpheusEngineState: """Generates CVSS for the results of the execution""" @@ -181,13 +188,15 @@ async def output_results_node(state: AgentMorpheusOutput) -> AgentMorpheusOutput subgraph_builder.add_node("agent_executor", agent_executor_node) subgraph_builder.add_node("summarize", summarize_node) subgraph_builder.add_node("justify", justify_node) + subgraph_builder.add_node("generate_vex", generate_vex_node) subgraph_builder.add_node("generate_cvss", generate_cvss_node) subgraph_builder.add_edge(START, "checklist") subgraph_builder.add_edge("checklist", "agent_executor") subgraph_builder.add_edge("agent_executor", "summarize") subgraph_builder.add_edge("summarize", "justify") - subgraph_builder.add_edge("justify", "generate_cvss") + subgraph_builder.add_edge("justify", "generate_vex") + subgraph_builder.add_edge("generate_vex", "generate_cvss") subgraph = subgraph_builder.compile() @catch_pipeline_errors_async diff --git a/src/vuln_analysis/tools/container_image_analysis_data.py b/src/vuln_analysis/tools/container_image_analysis_data.py index ea0a6d2d..53495dac 100644 --- a/src/vuln_analysis/tools/container_image_analysis_data.py +++ b/src/vuln_analysis/tools/container_image_analysis_data.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging from typing import Any from aiq.builder.builder import Builder @@ -26,7 +25,8 @@ CONTAINER_IMAGE_ANALYSIS_DATA = "container_image_analysis_data" -logger = logging.getLogger(__name__) +from exploit_iq_commons.logging.loggers_factory import LoggingFactory +logger = LoggingFactory.get_agent_logger(__name__) class ContainerImageAnalysisDataToolConfig(FunctionBaseConfig, name=("%s" % CONTAINER_IMAGE_ANALYSIS_DATA)): diff --git a/src/vuln_analysis/tools/lexical_full_search.py b/src/vuln_analysis/tools/lexical_full_search.py index f2e92e66..7166b5e5 100644 --- a/src/vuln_analysis/tools/lexical_full_search.py +++ b/src/vuln_analysis/tools/lexical_full_search.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - from aiq.builder.builder import Builder from aiq.builder.framework_enum import LLMFrameworkEnum from aiq.builder.function_info import FunctionInfo diff --git a/src/vuln_analysis/tools/local_vdb.py b/src/vuln_analysis/tools/local_vdb.py index d6effc8d..7549032f 100644 --- a/src/vuln_analysis/tools/local_vdb.py +++ b/src/vuln_analysis/tools/local_vdb.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - from aiq.builder.builder import Builder from aiq.builder.framework_enum import LLMFrameworkEnum from aiq.builder.function_info import FunctionInfo diff --git a/src/vuln_analysis/tools/serp.py b/src/vuln_analysis/tools/serp.py index fc117654..7fe820b8 100644 --- a/src/vuln_analysis/tools/serp.py +++ b/src/vuln_analysis/tools/serp.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - from aiq.builder.builder import Builder from aiq.builder.function_info import FunctionInfo from aiq.cli.register_workflow import register_function diff --git a/src/vuln_analysis/utils/async_http_utils.py b/src/vuln_analysis/utils/async_http_utils.py index 030a2d80..5d2a3c1a 100644 --- a/src/vuln_analysis/utils/async_http_utils.py +++ b/src/vuln_analysis/utils/async_http_utils.py @@ -14,7 +14,6 @@ # limitations under the License. import asyncio -import logging import time import typing from contextlib import asynccontextmanager diff --git a/src/vuln_analysis/utils/checklist_prompt_generator.py b/src/vuln_analysis/utils/checklist_prompt_generator.py index 8e515c55..3ed21b15 100644 --- a/src/vuln_analysis/utils/checklist_prompt_generator.py +++ b/src/vuln_analysis/utils/checklist_prompt_generator.py @@ -14,7 +14,6 @@ # limitations under the License. import ast -import logging import re from jinja2 import Template @@ -170,7 +169,7 @@ async def generate_checklist(prompt: str | None, return parsed_checklist.content except Exception as e: - logging.error(f" Error in generating checklist : {e}") + logger.error(f" Error in generating checklist : {e}") raise return gen_checklist.content diff --git a/src/vuln_analysis/utils/clients/first_client.py b/src/vuln_analysis/utils/clients/first_client.py index 1fd85433..f961b7fd 100644 --- a/src/vuln_analysis/utils/clients/first_client.py +++ b/src/vuln_analysis/utils/clients/first_client.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import os import aiohttp diff --git a/src/vuln_analysis/utils/clients/ghsa_client.py b/src/vuln_analysis/utils/clients/ghsa_client.py index 466b4df7..ce819833 100644 --- a/src/vuln_analysis/utils/clients/ghsa_client.py +++ b/src/vuln_analysis/utils/clients/ghsa_client.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import os import aiohttp diff --git a/src/vuln_analysis/utils/clients/nvd_client.py b/src/vuln_analysis/utils/clients/nvd_client.py index c76bc5ca..1b13ff51 100644 --- a/src/vuln_analysis/utils/clients/nvd_client.py +++ b/src/vuln_analysis/utils/clients/nvd_client.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import os import re diff --git a/src/vuln_analysis/utils/clients/rhsa_client.py b/src/vuln_analysis/utils/clients/rhsa_client.py index 404a1845..30bea1f8 100644 --- a/src/vuln_analysis/utils/clients/rhsa_client.py +++ b/src/vuln_analysis/utils/clients/rhsa_client.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import os import aiohttp diff --git a/src/vuln_analysis/utils/full_text_search.py b/src/vuln_analysis/utils/full_text_search.py index eada7d0d..3f09349b 100644 --- a/src/vuln_analysis/utils/full_text_search.py +++ b/src/vuln_analysis/utils/full_text_search.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import os import re from pathlib import Path diff --git a/src/vuln_analysis/utils/http_utils.py b/src/vuln_analysis/utils/http_utils.py index 3b298253..f58f87ea 100644 --- a/src/vuln_analysis/utils/http_utils.py +++ b/src/vuln_analysis/utils/http_utils.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import time import typing from enum import Enum diff --git a/src/vuln_analysis/utils/intel_source_score.py b/src/vuln_analysis/utils/intel_source_score.py index 596cb798..479b3cc5 100644 --- a/src/vuln_analysis/utils/intel_source_score.py +++ b/src/vuln_analysis/utils/intel_source_score.py @@ -10,7 +10,6 @@ # limitations under the License. import json import os -import logging import re from aiq.builder.builder import Builder from langchain_core.language_models.base import BaseLanguageModel @@ -21,7 +20,8 @@ from ..utils.prompting import additional_intel_prompting from aiq.builder.framework_enum import LLMFrameworkEnum -logger = logging.getLogger(__name__) +from exploit_iq_commons.logging.loggers_factory import LoggingFactory +logger = LoggingFactory.get_agent_logger(__name__) class IntelScorer: def __init__(self, diff --git a/src/vuln_analysis/utils/justification_parser.py b/src/vuln_analysis/utils/justification_parser.py index c0964ce5..ac94ed19 100644 --- a/src/vuln_analysis/utils/justification_parser.py +++ b/src/vuln_analysis/utils/justification_parser.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging from textwrap import dedent from exploit_iq_commons.logging.loggers_factory import LoggingFactory diff --git a/src/vuln_analysis/utils/llm_engine_utils.py b/src/vuln_analysis/utils/llm_engine_utils.py index 351b4253..72783021 100644 --- a/src/vuln_analysis/utils/llm_engine_utils.py +++ b/src/vuln_analysis/utils/llm_engine_utils.py @@ -23,6 +23,7 @@ from exploit_iq_commons.data_models.input import AgentMorpheusInput from vuln_analysis.data_models.output import AgentMorpheusEngineOutput from vuln_analysis.data_models.output import AgentMorpheusOutput +from vuln_analysis.data_models.output import OutputPayload from vuln_analysis.data_models.output import ChecklistItemOutput from vuln_analysis.data_models.output import JustificationOutput from vuln_analysis.data_models.output import CVSSOutput @@ -265,7 +266,8 @@ def postprocess_engine_output(message: AgentMorpheusEngineInput, out.justification.label, out.cvss.score if out.cvss else "-") - return AgentMorpheusOutput(input=message.input, info=message.info, output=output) + payload = OutputPayload(analysis=output, vex=result.vex) + return AgentMorpheusOutput(input=message.input, info=message.info, output=payload) def finalize_preprocess_engine_input(message: AgentMorpheusEngineInput, engine_state: AgentMorpheusEngineState, builder: Builder) -> AgentMorpheusEngineState: config = builder.get_function_config("cve_calculate_intel_score") diff --git a/src/vuln_analysis/utils/vex/__init__.py b/src/vuln_analysis/utils/vex/__init__.py new file mode 100644 index 00000000..a1744724 --- /dev/null +++ b/src/vuln_analysis/utils/vex/__init__.py @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/vuln_analysis/utils/vex/implementations/__init__.py b/src/vuln_analysis/utils/vex/implementations/__init__.py new file mode 100644 index 00000000..a1744724 --- /dev/null +++ b/src/vuln_analysis/utils/vex/implementations/__init__.py @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/vuln_analysis/utils/vex/implementations/csaf_generator.py b/src/vuln_analysis/utils/vex/implementations/csaf_generator.py new file mode 100644 index 00000000..d0ebb7d7 --- /dev/null +++ b/src/vuln_analysis/utils/vex/implementations/csaf_generator.py @@ -0,0 +1,243 @@ +from __future__ import annotations + +import json +import os +from pathlib import Path +import re +import tempfile +from typing import Any, Dict + +from exploit_iq_commons.data_models.cve_intel import CveIntel +from vuln_analysis.data_models.state import AgentMorpheusEngineState + +from ..vex_generator_base import VexGenerator +from ..vex_utils import get_vex_validator, build_patch_recommendation +from csaf.generator import CSAFGenerator + +from exploit_iq_commons.logging.loggers_factory import LoggingFactory +logger = LoggingFactory.get_agent_logger(__name__) + +# Regex pattern for OCI digest format +OCI_DIGEST_PATTERN = r"^[a-z0-9]+:[a-f0-9]{32,}$" +OCI_DIGEST_RE = re.compile(OCI_DIGEST_PATTERN) + +# Note categories +NOTE_CATEGORY_DESCRIPTION = "description" +NOTE_CATEGORY_SUMMARY = "summary" +NOTE_CATEGORY_GENERAL = "general" +NOTE_CATEGORY_OTHER = "other" +NOTE_CATEGORY_LEGAL_DISCLAIMER = "legal_disclaimer" + +# Note titles +NOTE_TITLE_VULNERABILITY_DESCRIPTION = "Vulnerability description" +NOTE_TITLE_VULNERABILITY_SUMMARY = "Vulnerability summary" +NOTE_TITLE_RHSA_STATEMENT = "Red Hat Security Advisory Statement" +NOTE_TITLE_EXPLOITIQ_SUMMARY = "ExploitIQ Analysis Summary" +NOTE_TITLE_EXPLOITIQ_JUSTIFICATION_REASONING = "ExploitIQ Analysis Justification Reasoning" +NOTE_TITLE_EXPLOITIQ_JUSTIFICATION_LABEL = "ExploitIQ Analysis Justification Label" +NOTE_TITLE_UNOFFICIAL_CONTENT = "Unofficial Content Notice" + +# Disclaimer text +DISCLAIMER_TEXT = ( + "This CSAF document is generated as custom content for internal or experimental use. " + "It is not an official Red Hat security document, has not undergone formal validation or review, " + "and is not digitally signed. The information provided here should not be regarded as authoritative " + "or replace official Red Hat advisories or VEX statements." +) + +# Author information +AUTHOR_NAME = "Red Hat Product Security" +AUTHOR_URL = "https://access.redhat.com/security/" + +# Default values +DEFAULT_VENDOR = "unknown" +DEFAULT_IMPACT = "unknown" + +# Justification labels +JUSTIFICATION_LABEL_VULNERABLE = "vulnerable" + +# Vulnerability statuses +STATUS_KNOWN_AFFECTED = "known_affected" +STATUS_KNOWN_NOT_AFFECTED = "known_not_affected" + +# Remediation +REMEDIATION_TYPE_VENDOR_FIX = "vendor_fix" +REMEDIATION_MESSAGE_TEMPLATE = "Upgrade to the first patched version(s): {patch_recommendation}." +REMEDIATION_MESSAGE_NO_RECOMMENDATION = "No remediation recommendation available for this vulnerability." + +# File names +CSAF_OUTPUT_FILENAME = "csaf.json" + +# Error messages +ERROR_CSAF_VALIDATION_FAILED = "CSAF document does not conform to CSAF 2.0 schema." + +# CSAF VEX schema path +CSAF_SCHEMA_PATH = Path(__file__).resolve().parents[3] / "configs" / "vex" / "csaf" / "v2.0" / "csaf_json_schema.json" + + +def _enrich_vulnerabilities_with_notes( + csaf_json: Dict[str, Any], + intel_map: Dict[str, CveIntel], + final_summaries: Dict[str, str], + justifications: Dict[str, Dict[str, str]] +) -> None: + """ + Enrich each vulnerability in the CSAF document with informational notes + from intel sources and analysis results. + + Returns the enriched csaf_json document. + """ + for v in csaf_json.get("vulnerabilities", []): + vuln_id = v.get("cve") + notes = v.get("notes", []) + + ci = intel_map.get(vuln_id) + + # Update or remove GHSA description note + ghsa_description = ci.ghsa.description if (ci and ci.ghsa and ci.ghsa.description) else None + if ghsa_description: + for note in notes: + if note.get("category") == NOTE_CATEGORY_DESCRIPTION: + note["title"] = NOTE_TITLE_VULNERABILITY_DESCRIPTION + note["text"] = ghsa_description + break + else: + # Remove description note if no GHSA description available + notes[:] = [note for note in notes if note.get("category") != NOTE_CATEGORY_DESCRIPTION] + + # Add GHSA summary if available + ghsa_summary = ci.ghsa.summary if (ci and ci.ghsa and ci.ghsa.summary) else None + if ghsa_summary: + notes.append({ + "category": NOTE_CATEGORY_SUMMARY, + "text": ghsa_summary, + "title": NOTE_TITLE_VULNERABILITY_SUMMARY + }) + + # Add RHSA statement if available + rhsa_statement = ci.rhsa.statement if (ci and ci.rhsa and ci.rhsa.statement) else None + if rhsa_statement: + notes.append({ + "category": NOTE_CATEGORY_GENERAL, + "text": rhsa_statement, + "title": NOTE_TITLE_RHSA_STATEMENT + }) + + # Add ExploitIQ analysis summary + summary = final_summaries.get(vuln_id) + notes.append({ + "category": NOTE_CATEGORY_OTHER, + "title": NOTE_TITLE_EXPLOITIQ_SUMMARY, + "text": summary + }) + + # Add ExploitIQ justification details + justification = justifications.get(vuln_id) + notes.append({ + "category": NOTE_CATEGORY_OTHER, + "title": NOTE_TITLE_EXPLOITIQ_JUSTIFICATION_REASONING, + "text": justification.get("justification") + }) + notes.append({ + "category": NOTE_CATEGORY_OTHER, + "title": NOTE_TITLE_EXPLOITIQ_JUSTIFICATION_LABEL, + "text": justification.get("justification_label") + }) + + v["notes"] = notes + + +class CsafVexGenerator(VexGenerator): + """ + CSAF VEX generator. Builds a CSAF JSON document and validates it with the csaf-tool. + """ + + def generate(self, state: AgentMorpheusEngineState) -> Dict[str, Any]: + + message: AgentMorpheusEngineInput = state.original_input + + csaf_gen = CSAFGenerator() + + product_name = message.input.image.name + product_tag = message.input.image.tag + + csaf_gen.set_header_title(f"ExploitIQ VEX Document - {product_name}{"@" if OCI_DIGEST_RE.fullmatch(product_tag) else ":"}{product_tag}") + + csaf_gen.set_value("notes",[ + { + "category": NOTE_CATEGORY_LEGAL_DISCLAIMER, + "text": DISCLAIMER_TEXT, + "title": NOTE_TITLE_UNOFFICIAL_CONTENT + } + ]) + + csaf_gen.set_value("author", AUTHOR_NAME) + csaf_gen.set_value("author_url", AUTHOR_URL) + + vendor = (product_name.split("/")[-2] if "/" in product_name else "") or DEFAULT_VENDOR + csaf_gen.add_product(product_name=product_name, vendor=vendor, release=product_tag) + + intel_map: Dict[str, CveIntel] = {ci.vuln_id: ci for ci in message.info.intel} + sbom_names: set[str] | None = None + if message.input.image.sbom_info and message.input.image.sbom_info.packages: + sbom_names = {pkg.name for pkg in message.input.image.sbom_info.packages} + + for vuln_id, justification in state.justifications.items(): + + ci = intel_map.get(vuln_id) + impact = ci.rhsa.threat_severity if ci and ci.rhsa and ci.rhsa.threat_severity else DEFAULT_IMPACT + + is_vulnerable = justification.get("justification_label") == JUSTIFICATION_LABEL_VULNERABLE + + if is_vulnerable: + patch_recommendation = build_patch_recommendation(ci, sbom_names) + comment = ( + REMEDIATION_MESSAGE_TEMPLATE.format(patch_recommendation=patch_recommendation) + if patch_recommendation else REMEDIATION_MESSAGE_NO_RECOMMENDATION + ) + + csaf_gen.add_vulnerability( + product_name=product_name, + release=product_tag, + id=vuln_id, + status=STATUS_KNOWN_AFFECTED, + description="", + comment=impact, + remediation=REMEDIATION_TYPE_VENDOR_FIX, + action=comment + ) + + else: + csaf_gen.add_vulnerability( + product_name=product_name, + release=product_tag, + id=vuln_id, + status=STATUS_KNOWN_NOT_AFFECTED, + description="", + comment=impact, + ) + + csaf_gen.generate_csaf() + + with tempfile.TemporaryDirectory() as tmp_dir: + path = os.path.join(tmp_dir, CSAF_OUTPUT_FILENAME) + csaf_gen.publish_csaf(path) + + # Load CSAF JSON + with open(path, "r") as f: + csaf_json = json.load(f) + + # Enrich the CSAF in memory + _enrich_vulnerabilities_with_notes( + csaf_json, intel_map, state.final_summaries, state.justifications + ) + + # Validate the CSAF document against the JSON schema + errors = list(get_vex_validator(CSAF_SCHEMA_PATH).iter_errors(csaf_json)) + if errors: + logger.error("%s Found %d error(s):", ERROR_CSAF_VALIDATION_FAILED, len(errors)) + for e in errors: + logger.error(" %s: %s", e.json_path, e.message) + return {} + + return csaf_json \ No newline at end of file diff --git a/src/vuln_analysis/utils/vex/tests/__init__.py b/src/vuln_analysis/utils/vex/tests/__init__.py new file mode 100644 index 00000000..a1744724 --- /dev/null +++ b/src/vuln_analysis/utils/vex/tests/__init__.py @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/vuln_analysis/utils/vex/tests/test_csaf_generator_integration.py b/src/vuln_analysis/utils/vex/tests/test_csaf_generator_integration.py new file mode 100644 index 00000000..5bca102d --- /dev/null +++ b/src/vuln_analysis/utils/vex/tests/test_csaf_generator_integration.py @@ -0,0 +1,385 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Integration tests for CSAF VEX generator. +""" + +from unittest.mock import patch + +import pytest + +from exploit_iq_commons.data_models.common import AnalysisType +from exploit_iq_commons.data_models.cve_intel import CveIntel, CveIntelGhsa, CveIntelRhsa +from exploit_iq_commons.data_models.info import AgentMorpheusInfo, SBOMPackage +from exploit_iq_commons.data_models.input import ( + AgentMorpheusEngineInput, + AgentMorpheusInput, + ImageInfoInput, + ManualSBOMInfoInput, + ScanInfoInput, + SourceDocumentsInfo, + VulnInfo, +) +from vuln_analysis.data_models.state import AgentMorpheusEngineState +from vuln_analysis.utils.vex.implementations.csaf_generator import CsafVexGenerator +from vuln_analysis.utils.vex.vex_generator_loader import load_vex_generator + + +_DEFAULT_SOURCE_INFO = [ + SourceDocumentsInfo( + type="code", + git_repo="https://github.com/example/repo", + ref="main", + include=["*.py"], + exclude=[], + ) +] +_DEFAULT_PRODUCT_NAME = "registry.example.com/myapp" +_DEFAULT_PRODUCT_TAG = "v1.0.0" +_DEFAULT_JUSTIFICATION = {"justification": "reason", "justification_label": "vulnerable"} +_DEFAULT_VULNS = ["CVE-2024-1234"] +_DEFAULT_SUMMARY = "Analysis summary for {v}" +_DEFAULT_SBOM_PACKAGES = [SBOMPackage(name="test-package", version="1.0.0", system="npm")] + + +@pytest.fixture(scope="module") +def mock_state() -> AgentMorpheusEngineState: + """Fixture providing a default mock AgentMorpheusEngineState for testing. + This state has one vulnerable CVE with a known affected status, with no GHSA or RHSA intel data. + """ + return create_mock_state() + + +def create_mock_state( + vulns: list[str] = _DEFAULT_VULNS, + justification: dict[str, dict[str, str]] | None = None, + intel: list[CveIntel] | None = None, + product_name: str = _DEFAULT_PRODUCT_NAME, + product_tag: str = _DEFAULT_PRODUCT_TAG, + sbom_packages: list[SBOMPackage] | None = _DEFAULT_SBOM_PACKAGES, +) -> AgentMorpheusEngineState: + """Create a mock AgentMorpheusEngineState for testing.""" + + intel = intel or [CveIntel(vuln_id=v) for v in vulns] + + sbom_info = None if sbom_packages is None else ManualSBOMInfoInput(packages=sbom_packages) + + image_info = ImageInfoInput( + name=product_name, + tag=product_tag, + analysis_type=AnalysisType.IMAGE, + source_info=_DEFAULT_SOURCE_INFO, + sbom_info=sbom_info, + ) + + engine_input = AgentMorpheusEngineInput( + input=AgentMorpheusInput( + scan=ScanInfoInput(vulns=[VulnInfo(vuln_id=v) for v in vulns]), + image=image_info, + ), + info=AgentMorpheusInfo(intel=intel), + ) + + return AgentMorpheusEngineState( + cve_intel=intel, + original_input=engine_input, + final_summaries={v: _DEFAULT_SUMMARY.format(v=v) for v in vulns}, + justifications={v: justification or _DEFAULT_JUSTIFICATION for v in vulns}, + ) + + +class TestCsafVexGeneratorIntegration: + """Integration tests for CsafVexGenerator.generate() method.""" + + def test_loader_returns_working_generator(self, mock_state): + """Test that load_vex_generator returns a functional generator.""" + generator = load_vex_generator("csaf") + result = generator.generate(mock_state) + + assert "document" in result + assert "product_tree" in result + assert "vulnerabilities" in result + + def test_document_has_correct_title(self, mock_state): + """Test that document title contains product name and tag.""" + generator = CsafVexGenerator() + result = generator.generate(mock_state) + + title = result["document"].get("title") + assert "ExploitIQ VEX Document - " + _DEFAULT_PRODUCT_NAME + ":" + _DEFAULT_PRODUCT_TAG in title + + def test_oci_digest_tag_uses_at_separator(self): + """Test that OCI digest tags use @ separator instead of : in title.""" + oci_digest = "sha256:abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890" + state = create_mock_state( + product_tag=oci_digest, + ) + + generator = CsafVexGenerator() + result = generator.generate(state) + + title = result["document"].get("title") + assert "ExploitIQ VEX Document - " + _DEFAULT_PRODUCT_NAME + "@" + oci_digest in title + + def test_document_has_disclaimer_note(self, mock_state): + """Test that document includes the disclaimer note.""" + generator = CsafVexGenerator() + result = generator.generate(mock_state) + + notes = result["document"].get("notes", []) + disclaimer_notes = [n for n in notes if n.get("category") == "legal_disclaimer"] + assert len(disclaimer_notes) == 1 + assert "Unofficial Content Notice" in disclaimer_notes[0]["title"] + + def test_document_has_author_note(self, mock_state): + """Test that document includes the author note.""" + generator = CsafVexGenerator() + result = generator.generate(mock_state) + + publisher = result["document"].get("publisher") + assert "Red Hat Product Security" in publisher.get("name") + assert "https://access.redhat.com/security/" in publisher.get("namespace") + + def test_product_tree_contains_product(self, mock_state): + """Test that product tree contains the analyzed product.""" + generator = CsafVexGenerator() + result = generator.generate(mock_state) + + product_tree = result["product_tree"] + assert _DEFAULT_PRODUCT_NAME in product_tree.get("branches")[0].get("branches")[0].get("name") + assert _DEFAULT_PRODUCT_TAG in product_tree.get("branches")[0].get("branches")[0].get("branches")[0].get("name") + + def test_vulnerable_cve_has_known_affected_status(self, mock_state): + """Test that vulnerable CVEs get 'known_affected' status.""" + generator = CsafVexGenerator() + result = generator.generate(mock_state) + + vuln = result["vulnerabilities"][0] + product_status = vuln.get("product_status", {}) + assert "known_affected" in product_status + + def test_not_vulnerable_cve_has_known_not_affected_status(self): + """Test that non-vulnerable CVEs get 'known_not_affected' status.""" + state = create_mock_state( + justification={"justification": "Code path not reachable", "justification_label": "not_vulnerable"}, + ) + + generator = CsafVexGenerator() + result = generator.generate(state) + + vuln = result["vulnerabilities"][0] + product_status = vuln.get("product_status", {}) + assert "known_not_affected" in product_status + + def test_vulnerable_cve_includes_remediation(self): + """Test that vulnerable CVEs include remediation information when patch is available.""" + ghsa = CveIntelGhsa( + ghsa_id="GHSA-1234-5678-9012", + vulnerabilities=[ + {"package": {"name": "lodash"}, "first_patched_version": "4.17.21"}, + ] + ) + intel = [CveIntel(vuln_id="CVE-2024-1234", ghsa=ghsa)] + + state = create_mock_state( + intel=intel, + sbom_packages=[SBOMPackage(name="lodash", version="4.17.21", system="npm")], + ) + + generator = CsafVexGenerator() + result = generator.generate(state) + + vuln = result["vulnerabilities"][0] + remediations = vuln.get("remediations", []) + assert len(remediations) > 0 + assert "lodash:4.17.21" in remediations[0].get("details") + + def test_no_patch_available_shows_fallback_message(self, mock_state): + """Test 'No remediation recommendation available' message when no patch is available.""" + generator = CsafVexGenerator() + result = generator.generate(mock_state) + + vuln = result["vulnerabilities"][0] + remediations = vuln.get("remediations", []) + assert "No remediation recommendation available" in remediations[0].get("details") + + def test_multiple_vulnerabilities_all_included(self): + """Test that multiple vulnerabilities are all included in output.""" + state = create_mock_state( + vulns=["CVE-2024-1234", "CVE-2024-5678", "CVE-2024-9999"] + ) + + generator = CsafVexGenerator() + result = generator.generate(state) + + cve_ids = [v.get("cve") for v in result["vulnerabilities"]] + assert "CVE-2024-1234" in cve_ids + assert "CVE-2024-5678" in cve_ids + assert "CVE-2024-9999" in cve_ids + + def test_vulnerabilities_have_notes_enriched(self): + """Test that vulnerabilities have notes added from enrichment.""" + ghsa = CveIntelGhsa( + ghsa_id="GHSA-1234-5678-9012", + summary="Test vulnerability summary", + description="Detailed description of the vulnerability", + ) + rhsa = CveIntelRhsa(threat_severity="Important", statement="RHSA statement.") + intel = [CveIntel(vuln_id="CVE-2024-1234", ghsa=ghsa, rhsa=rhsa)] + + state = create_mock_state( + intel=intel, + ) + + generator = CsafVexGenerator() + result = generator.generate(state) + + vuln = result["vulnerabilities"][0] + notes = vuln.get("notes", []) + + other_notes = [n for n in notes if n.get("category") == "other"] + assert len(other_notes) == 3 # analysis summary + justification reasoning + justification label + + description_note = [n for n in notes if n.get("category") == "description"] + assert len(description_note) == 1 # ghsa description + + summary_note = [n for n in notes if n.get("category") == "summary"] + assert len(summary_note) == 1 # ghsa summary + + general_notes = [n for n in notes if n.get("category") == "general"] + assert len(general_notes) == 1 # rhsa statement + + def test_description_note_removed_when_no_ghsa(self, mock_state): + """Test that description note is removed when no GHSA data is available.""" + generator = CsafVexGenerator() + result = generator.generate(mock_state) + + vuln = result["vulnerabilities"][0] + notes = vuln.get("notes", []) + description_notes = [n for n in notes if n.get("category") == "description"] + assert len(description_notes) == 0 # Description note should be removed + + def test_rhsa_threat_severity_used_as_impact(self): + """Test that RHSA threat severity is included as impact/comment.""" + rhsa = CveIntelRhsa(threat_severity="Important") + intel = [CveIntel(vuln_id="CVE-2024-1234", rhsa=rhsa)] + + state = create_mock_state( + intel=intel, + ) + + generator = CsafVexGenerator() + result = generator.generate(state) + + vuln = result["vulnerabilities"][0] + impact = vuln.get("threats")[0].get("details") + assert "Important" in impact + + +class TestCsafVexGeneratorEdgeCases: + """Integration tests for edge cases and error handling.""" + + @pytest.mark.parametrize("sbom_packages", [None, []], ids=["sbom_info_none", "empty_packages"]) + def test_includes_all_packages_when_no_sbom_filtering(self, sbom_packages): + """Test that all packages are included when SBOM is None or has no packages.""" + ghsa = CveIntelGhsa( + ghsa_id="GHSA-1234-5678-9012", + vulnerabilities=[ + {"package": {"name": "lodash"}, "first_patched_version": "4.17.21"}, + {"package": {"name": "express"}, "first_patched_version": "4.18.0"}, + ] + ) + intel = [CveIntel(vuln_id="CVE-2024-1234", ghsa=ghsa)] + state = create_mock_state( + intel=intel, + sbom_packages=sbom_packages + ) + + generator = CsafVexGenerator() + result = generator.generate(state) + + assert "vulnerabilities" in result + vuln = result["vulnerabilities"][0] + remediations = vuln.get("remediations", []) + assert "lodash:4.17.21" in remediations[0].get("details") + assert "express:4.18.0" in remediations[0].get("details") + + def test_handles_sbom_packages_with_matching_and_non_matching_packages(self): + """Test handling when SBOM has a matching package and a non-matching package.""" + ghsa = CveIntelGhsa( + ghsa_id="GHSA-1234-5678-9012", + vulnerabilities=[ + {"package": {"name": "lodash"}, "first_patched_version": "4.17.21"}, + {"package": {"name": "express"}, "first_patched_version": "4.18.0"}, + ] + ) + intel = [CveIntel(vuln_id="CVE-2024-1234", ghsa=ghsa)] + state = create_mock_state( + intel=intel, + sbom_packages=[SBOMPackage(name="lodash", version="4.17.21", system="npm")], + ) + + generator = CsafVexGenerator() + result = generator.generate(state) + + assert "vulnerabilities" in result + vuln = result["vulnerabilities"][0] + remediations = vuln.get("remediations", []) + assert "lodash:4.17.21" in remediations[0].get("details") + assert "express:4.18.0" not in remediations[0].get("details") + + def test_handles_missing_rhsa_severity(self, mock_state): + """Test handling when RHSA threat severity is missing.""" + generator = CsafVexGenerator() + result = generator.generate(mock_state) + + assert "vulnerabilities" in result + + vuln = result["vulnerabilities"][0] + impact = vuln.get("threats")[0].get("details") + assert "unknown" in impact + + def test_handles_product_name_without_slash(self): + """Test vendor extraction when product name has no slash.""" + state = create_mock_state( + product_name="simpleapp", + ) + + generator = CsafVexGenerator() + result = generator.generate(state) + + product_tree = result["product_tree"] + assert "unknown" == product_tree.get("branches")[0].get("name") + assert "simpleapp" == product_tree.get("branches")[0].get("branches")[0].get("name") + + def test_validation_failure_returns_empty_dict(self, mock_state): + """Test that validation failure returns empty dict.""" + generator = CsafVexGenerator() + + # Mock the enrichment function to inject invalid data (modifies csaf_json in-place) + def mock_enrich(csaf_json, intel_map, final_summaries, justifications): + # Inject invalid note category (violates schema) + for v in csaf_json.get("vulnerabilities", []): + v["notes"] = [{"category": "not_allowed_category", "title": "Test", "text": "test text"}] + + with patch( + "vuln_analysis.utils.vex.implementations.csaf_generator._enrich_vulnerabilities_with_notes", + side_effect=mock_enrich + ): + result = generator.generate(mock_state) + + assert result == {} + diff --git a/src/vuln_analysis/utils/vex/vex_generator_base.py b/src/vuln_analysis/utils/vex/vex_generator_base.py new file mode 100644 index 00000000..98e8bfcf --- /dev/null +++ b/src/vuln_analysis/utils/vex/vex_generator_base.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Any, Dict + +from vuln_analysis.data_models.state import AgentMorpheusEngineState + + +class VexGenerator(ABC): + """ + Abstract base for VEX document generators. + """ + + @abstractmethod + def generate(self, state: AgentMorpheusEngineState) -> Dict[str, Any]: + """ + Generate a VEX document as a JSON-serializable dict from the engine state. + """ + pass + + diff --git a/src/vuln_analysis/utils/vex/vex_generator_loader.py b/src/vuln_analysis/utils/vex/vex_generator_loader.py new file mode 100644 index 00000000..aeb47e3b --- /dev/null +++ b/src/vuln_analysis/utils/vex/vex_generator_loader.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +from .vex_generator_base import VexGenerator +from .implementations.csaf_generator import CsafVexGenerator + +# Supported VEX formats +CSAF = "csaf" + +# Error messages +ERROR_UNSUPPORTED_VEX_FORMAT = "Unsupported VEX format: {fmt}" + + +def load_vex_generator(fmt: str) -> VexGenerator: + """ + Return a VEX generator implementation for the requested format. + """ + + fmt_lower = fmt.lower() + + if fmt_lower == CSAF: + return CsafVexGenerator() + raise ValueError(ERROR_UNSUPPORTED_VEX_FORMAT.format(fmt=fmt_lower)) + diff --git a/src/vuln_analysis/utils/vex/vex_utils.py b/src/vuln_analysis/utils/vex/vex_utils.py new file mode 100644 index 00000000..4a6668d0 --- /dev/null +++ b/src/vuln_analysis/utils/vex/vex_utils.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +from functools import lru_cache +import json +from pathlib import Path + +from jsonschema import Draft202012Validator + +from exploit_iq_commons.data_models.cve_intel import CveIntel +from exploit_iq_commons.logging.loggers_factory import LoggingFactory + +logger = LoggingFactory.get_agent_logger(__name__) + + +@lru_cache(maxsize=None) +def get_vex_validator(schema_path: Path) -> Draft202012Validator: + """ + Load schema and create a cached VEX document validator instance. + + Args: + schema_path: Path to the JSON schema file. + + Returns: + A Draft202012Validator instance for the schema. + """ + with open(schema_path) as f: + schema = json.load(f) + logger.debug("VEX document schema loaded from %s", schema_path) + return Draft202012Validator(schema) + + +def get_patched_package(vuln: dict) -> tuple[str | None, str | None]: + """ + Extract package name and first patched version from a GHSA vulnerability dict. + + Args: + vuln: A GHSA vulnerability dictionary. + + Returns: + Tuple of (package_name, first_patched_version), either may be None. + """ + pkg = vuln.get("package") or {} + return pkg.get("name"), vuln.get("first_patched_version") + + +def build_patch_recommendation(ci: CveIntel, sbom_package_names: set[str] | None) -> str: + """ + Build a patch recommendation string from GHSA data. + + Args: + ci: CveIntel object containing GHSA vulnerability data. + sbom_package_names: Optional set of package names from SBOM to filter by. + + Returns: + Comma-separated string of "name:version" pairs, or empty string if none found. + + Notes: + - If SBOM provided: returns only patches for packages in the SBOM. + - If no SBOM: returns all unique patches. + """ + if not ci or not ci.ghsa or not ci.ghsa.vulnerabilities: + return "" + + vulns = ci.ghsa.vulnerabilities + + name_to_version: dict[str, str] = {} + for v in vulns: + name, patch = get_patched_package(v) + if not name or not patch: + continue + if name in name_to_version: + continue + # If SBOM provided, only include packages that are in the SBOM + if sbom_package_names is not None and name not in sbom_package_names: + continue + name_to_version[name] = patch + + if not name_to_version: + return "" + return ", ".join(f"{name}:{patch}" for name, patch in name_to_version.items()) + diff --git a/src/vuln_analysis/utils/vulnerable_dependency_checker.py b/src/vuln_analysis/utils/vulnerable_dependency_checker.py index 62eb9bd4..3b459dd1 100644 --- a/src/vuln_analysis/utils/vulnerable_dependency_checker.py +++ b/src/vuln_analysis/utils/vulnerable_dependency_checker.py @@ -14,7 +14,6 @@ # limitations under the License. import asyncio -import logging import os import re import urllib.parse diff --git a/tests/test_vex_csaf_helpers.py b/tests/test_vex_csaf_helpers.py new file mode 100644 index 00000000..687e5247 --- /dev/null +++ b/tests/test_vex_csaf_helpers.py @@ -0,0 +1,216 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Unit tests for CSAF VEX generator helper functions. +""" + +import pytest + +from exploit_iq_commons.data_models.cve_intel import CveIntel, CveIntelGhsa, CveIntelRhsa +from vuln_analysis.utils.vex.implementations.csaf_generator import ( + _enrich_vulnerabilities_with_notes, +) + + +# --- Fixtures for TestEnrichVulnerabilitiesWithNotes --- + +@pytest.fixture +def base_csaf_json(): + """Returns a fresh base CSAF JSON structure with one vulnerability.""" + return { + "vulnerabilities": [ + {"cve": "CVE-2024-1234", "notes": []} + ] + } + + +@pytest.fixture +def base_intel_map(): + """Returns a basic intel map with one CVE.""" + return {"CVE-2024-1234": CveIntel(vuln_id="CVE-2024-1234")} + + +@pytest.fixture +def base_final_summaries(): + """Returns basic final summaries dict.""" + return {"CVE-2024-1234": "Analysis summary"} + + +@pytest.fixture +def base_justifications(): + """Returns basic justifications dict.""" + return {"CVE-2024-1234": {"justification": "reasoning", "justification_label": "vulnerable"}} + + +class TestEnrichVulnerabilitiesWithNotes: + """Unit tests for _enrich_vulnerabilities_with_notes() function.""" + + def test_adds_ghsa_summary_note(self, base_csaf_json, base_final_summaries, base_justifications): + """Test that GHSA summary is added as a note.""" + ghsa = CveIntelGhsa( + ghsa_id="GHSA-1234-5678-9012", + summary="Test vulnerability summary" + ) + intel_map = {"CVE-2024-1234": CveIntel(vuln_id="CVE-2024-1234", ghsa=ghsa)} + + _enrich_vulnerabilities_with_notes(base_csaf_json, intel_map, base_final_summaries, base_justifications) + + notes = base_csaf_json["vulnerabilities"][0]["notes"] + summary_notes = [n for n in notes if n.get("category") == "summary"] + assert len(summary_notes) == 1 + assert summary_notes[0]["text"] == "Test vulnerability summary" + assert summary_notes[0]["title"] == "Vulnerability summary" + + def test_adds_rhsa_statement_note(self, base_csaf_json, base_final_summaries, base_justifications): + """Test that RHSA statement is added as a note.""" + rhsa = CveIntelRhsa(statement="Red Hat security statement") + intel_map = {"CVE-2024-1234": CveIntel(vuln_id="CVE-2024-1234", rhsa=rhsa)} + + _enrich_vulnerabilities_with_notes(base_csaf_json, intel_map, base_final_summaries, base_justifications) + + notes = base_csaf_json["vulnerabilities"][0]["notes"] + general_notes = [n for n in notes if n.get("category") == "general"] + assert len(general_notes) == 1 + assert general_notes[0]["text"] == "Red Hat security statement" + assert general_notes[0]["title"] == "Red Hat Security Advisory Statement" + + def test_adds_analysis_summary_note(self, base_csaf_json, base_intel_map, base_justifications): + """Test that analysis summary is added as a note.""" + final_summaries = {"CVE-2024-1234": "This is the analysis summary"} + + _enrich_vulnerabilities_with_notes(base_csaf_json, base_intel_map, final_summaries, base_justifications) + + notes = base_csaf_json["vulnerabilities"][0]["notes"] + analysis_notes = [n for n in notes if n.get("title") == "ExploitIQ Analysis Summary"] + assert len(analysis_notes) == 1 + assert analysis_notes[0]["text"] == "This is the analysis summary" + assert analysis_notes[0]["category"] == "other" + + def test_adds_justification_notes(self, base_csaf_json, base_intel_map, base_final_summaries): + """Test that justification reasoning and label are added as notes.""" + justifications = { + "CVE-2024-1234": { + "justification": "The vulnerable code path is reachable", + "justification_label": "vulnerable" + } + } + + _enrich_vulnerabilities_with_notes(base_csaf_json, base_intel_map, base_final_summaries, justifications) + + notes = base_csaf_json["vulnerabilities"][0]["notes"] + + reasoning_notes = [n for n in notes if n.get("title") == "ExploitIQ Analysis Justification Reasoning"] + assert len(reasoning_notes) == 1 + assert reasoning_notes[0]["text"] == "The vulnerable code path is reachable" + + label_notes = [n for n in notes if n.get("title") == "ExploitIQ Analysis Justification Label"] + assert len(label_notes) == 1 + assert label_notes[0]["text"] == "vulnerable" + + def test_updates_existing_description_note(self, base_final_summaries, base_justifications): + """Test that existing description note is updated with GHSA description.""" + csaf_json = { + "vulnerabilities": [ + { + "cve": "CVE-2024-1234", + "notes": [ + {"category": "description", "text": "Original description", "title": ""} + ] + } + ] + } + ghsa = CveIntelGhsa( + ghsa_id="GHSA-1234-5678-9012", + description="GHSA detailed description" + ) + intel_map = {"CVE-2024-1234": CveIntel(vuln_id="CVE-2024-1234", ghsa=ghsa)} + + _enrich_vulnerabilities_with_notes(csaf_json, intel_map, base_final_summaries, base_justifications) + + notes = csaf_json["vulnerabilities"][0]["notes"] + desc_notes = [n for n in notes if n.get("category") == "description"] + assert len(desc_notes) == 1 + assert desc_notes[0]["text"] == "GHSA detailed description" + assert desc_notes[0]["title"] == "Vulnerability description" + + def test_removes_description_note_when_no_ghsa_description(self, base_intel_map, base_final_summaries, base_justifications): + """Test that description note is removed when no GHSA description is available.""" + csaf_json = { + "vulnerabilities": [ + { + "cve": "CVE-2024-1234", + "notes": [ + {"category": "description", "text": "Original description", "title": "Original"}, + {"category": "summary", "text": "Some summary", "title": "Summary"} + ] + } + ] + } + + _enrich_vulnerabilities_with_notes(csaf_json, base_intel_map, base_final_summaries, base_justifications) + + notes = csaf_json["vulnerabilities"][0]["notes"] + desc_notes = [n for n in notes if n.get("category") == "description"] + # Description note should be removed + assert len(desc_notes) == 0 + # Other notes should still be present + summary_notes = [n for n in notes if n.get("category") == "summary"] + assert len(summary_notes) >= 1 + + def test_handles_multiple_vulnerabilities(self): + """Test that multiple vulnerabilities are all enriched.""" + csaf_json = { + "vulnerabilities": [ + {"cve": "CVE-2024-1234", "notes": []}, + {"cve": "CVE-2024-5678", "notes": []}, + ] + } + intel_map = { + "CVE-2024-1234": CveIntel(vuln_id="CVE-2024-1234"), + "CVE-2024-5678": CveIntel(vuln_id="CVE-2024-5678"), + } + final_summaries = { + "CVE-2024-1234": "Summary 1", + "CVE-2024-5678": "Summary 2", + } + justifications = { + "CVE-2024-1234": {"justification": "reason1", "justification_label": "vulnerable"}, + "CVE-2024-5678": {"justification": "reason2", "justification_label": "not_vulnerable"}, + } + + _enrich_vulnerabilities_with_notes(csaf_json, intel_map, final_summaries, justifications) + + # Both vulnerabilities should have notes + for vuln in csaf_json["vulnerabilities"]: + assert len(vuln["notes"]) >= 3 # At least analysis summary + 2 justification notes + + def test_handles_missing_intel_for_vulnerability(self, base_csaf_json, base_final_summaries, base_justifications): + """Test that missing intel for a vulnerability is handled gracefully.""" + # Should not raise an exception + _enrich_vulnerabilities_with_notes(base_csaf_json, {}, base_final_summaries, base_justifications) + + notes = base_csaf_json["vulnerabilities"][0]["notes"] + # Should still have analysis notes + assert len(notes) >= 3 + + def test_handles_empty_vulnerabilities_list(self): + """Test that empty vulnerabilities list is handled.""" + csaf_json = {"vulnerabilities": []} + + # Should not raise an exception + _enrich_vulnerabilities_with_notes(csaf_json, {}, {}, {}) + + assert csaf_json["vulnerabilities"] == [] diff --git a/tests/test_vex_loader.py b/tests/test_vex_loader.py new file mode 100644 index 00000000..76987d78 --- /dev/null +++ b/tests/test_vex_loader.py @@ -0,0 +1,55 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Unit tests for VEX generator loader/factory. + +Tests the load_vex_generator() function in loader.py. +""" + +import pytest + +from vuln_analysis.utils.vex.vex_generator_loader import load_vex_generator +from vuln_analysis.utils.vex.vex_generator_base import VexGenerator +from vuln_analysis.utils.vex.implementations.csaf_generator import CsafVexGenerator + + +class TestLoadVexGenerator: + """Unit tests for load_vex_generator() factory function.""" + + def test_csaf_format_returns_csaf_generator(self): + """Test that 'csaf' format returns CsafVexGenerator instance.""" + generator = load_vex_generator("csaf") + assert isinstance(generator, CsafVexGenerator) + + def test_csaf_format_returns_vex_generator_subclass(self): + """Test that returned generator is a VexGenerator subclass.""" + generator = load_vex_generator("csaf") + assert isinstance(generator, VexGenerator) + + def test_csaf_uppercase_is_case_insensitive(self): + """Test that format matching is case insensitive (uppercase).""" + generator = load_vex_generator("CSAF") + assert isinstance(generator, CsafVexGenerator) + + def test_invalid_format_raises_value_error(self): + """Test that unsupported format raises ValueError.""" + with pytest.raises(ValueError, match="Unsupported VEX format"): + load_vex_generator("WrongFormat") + + def test_empty_format_raises_value_error(self): + """Test that empty format string raises ValueError.""" + with pytest.raises(ValueError, match="Unsupported VEX format"): + load_vex_generator("") \ No newline at end of file diff --git a/tests/test_vex_utils.py b/tests/test_vex_utils.py new file mode 100644 index 00000000..1e7a28ba --- /dev/null +++ b/tests/test_vex_utils.py @@ -0,0 +1,232 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Unit tests for VEX utility functions in vex_utils.py. +""" + +from pathlib import Path +import tempfile + +import pytest +from jsonschema import Draft202012Validator + +from exploit_iq_commons.data_models.cve_intel import CveIntel, CveIntelGhsa +from vuln_analysis.utils.vex.implementations.csaf_generator import ( + CSAF_SCHEMA_PATH as vex_schema_path_example, +) +from vuln_analysis.utils.vex.vex_utils import ( + get_vex_validator, + get_patched_package, + build_patch_recommendation, +) + + +class TestGetVexValidator: + """Unit tests for get_vex_validator() function.""" + + def test_returns_draft202012_validator(self): + """Test that get_vex_validator returns a Draft202012Validator instance.""" + validator = get_vex_validator(vex_schema_path_example) + assert isinstance(validator, Draft202012Validator) + + def test_caching_returns_same_instance(self): + """Test that calling with same path returns the same cached validator (same object in memory and not just equal in vlaue).""" + validator1 = get_vex_validator(vex_schema_path_example) + validator2 = get_vex_validator(vex_schema_path_example) + assert validator1 is validator2 + + def test_different_paths_return_different_validators(self): + """Test that different schema paths return different validator instances.""" + minimal_schema = '{"type": "object"}' + + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=True) as f: + f.write(minimal_schema) + f.flush() # Ensure all contents are written before reading + temp_path = Path(f.name) + validator1 = get_vex_validator(vex_schema_path_example) + validator2 = get_vex_validator(temp_path) + assert validator1 is not validator2 + + def test_validator_can_validate_documents(self): + """Test that returned validator can actually validate documents.""" + validator = get_vex_validator(vex_schema_path_example) + + errors = list(validator.iter_errors({})) + assert len(errors) > 0 + + def test_invalid_path_raises_file_not_found(self): + """Test that invalid schema path raises FileNotFoundError.""" + invalid_path = Path("/nonexistent/path/schema.json") + + with pytest.raises(FileNotFoundError): + get_vex_validator(invalid_path) + + +class TestGetPatchedPackage: + """Unit tests for get_patched_package() function.""" + + def test_valid_package_returns_name_and_version(self): + """Test extraction of name and version from valid vulnerability dict.""" + vuln = { + "package": {"name": "lodash"}, + "first_patched_version": "4.17.21" + } + result = get_patched_package(vuln) + assert result == ("lodash", "4.17.21") + + def test_empty_dict_returns_none_tuple(self): + """Test that empty dict returns (None, None).""" + result = get_patched_package({}) + assert result == (None, None) + + def test_missing_package_key_returns_none_name(self): + """Test that missing 'package' key returns None for name.""" + vuln = {"first_patched_version": "1.0.0"} + result = get_patched_package(vuln) + assert result == (None, "1.0.0") + + def test_missing_version_returns_none_version(self): + """Test that missing version returns None for version.""" + vuln = {"package": {"name": "express"}} + result = get_patched_package(vuln) + assert result == ("express", None) + + def test_null_package_returns_none_name(self): + """Test that null package value returns None for name.""" + vuln = {"package": None, "first_patched_version": "2.0.0"} + result = get_patched_package(vuln) + assert result == (None, "2.0.0") + + def test_empty_package_dict_returns_none_name(self): + """Test that empty package dict returns None for name.""" + vuln = {"package": {}, "first_patched_version": "3.0.0"} + result = get_patched_package(vuln) + assert result == (None, "3.0.0") + + +class TestBuildPatchRecommendation: + """Unit tests for build_patch_recommendation() function.""" + + def test_returns_empty_when_intel_is_none(self): + """Test that None intel returns empty string.""" + result = build_patch_recommendation(None, None) + assert result == "" + + def test_returns_empty_when_ghsa_is_none(self): + """Test that intel without GHSA returns empty string.""" + ci = CveIntel(vuln_id="CVE-2024-1234", ghsa=None) + result = build_patch_recommendation(ci, None) + assert result == "" + + def test_returns_empty_when_vulnerabilities_is_none(self): + """Test that GHSA without vulnerabilities returns empty string.""" + ghsa = CveIntelGhsa(ghsa_id="GHSA-1234-5678-9012", vulnerabilities=None) + ci = CveIntel(vuln_id="CVE-2024-1234", ghsa=ghsa) + result = build_patch_recommendation(ci, None) + assert result == "" + + def test_returns_empty_when_vulnerabilities_is_empty(self): + """Test that empty vulnerabilities list returns empty string.""" + ghsa = CveIntelGhsa(ghsa_id="GHSA-1234-5678-9012", vulnerabilities=[]) + ci = CveIntel(vuln_id="CVE-2024-1234", ghsa=ghsa) + result = build_patch_recommendation(ci, None) + assert result == "" + + def test_with_sbom_returns_matching_package(self): + """Test that with SBOM, only matching package is returned.""" + ghsa = CveIntelGhsa( + ghsa_id="GHSA-1234-5678-9012", + vulnerabilities=[ + {"package": {"name": "lodash"}, "first_patched_version": "4.17.21"}, + {"package": {"name": "express"}, "first_patched_version": "4.18.0"}, + ] + ) + ci = CveIntel(vuln_id="CVE-2024-1234", ghsa=ghsa) + + result = build_patch_recommendation(ci, {"lodash", "express", "react"}) + assert result == "lodash:4.17.21, express:4.18.0" + + def test_with_sbom_no_match_returns_empty(self): + """Test that with SBOM but no match, empty string is returned.""" + ghsa = CveIntelGhsa( + ghsa_id="GHSA-1234-5678-9012", + vulnerabilities=[ + {"package": {"name": "lodash"}, "first_patched_version": "4.17.21"}, + ] + ) + ci = CveIntel(vuln_id="CVE-2024-1234", ghsa=ghsa) + + result = build_patch_recommendation(ci, {"react", "vue"}) + assert result == "" + + def test_without_sbom_returns_all_packages(self): + """Test that without SBOM, all packages are returned.""" + ghsa = CveIntelGhsa( + ghsa_id="GHSA-1234-5678-9012", + vulnerabilities=[ + {"package": {"name": "lodash"}, "first_patched_version": "4.17.21"}, + {"package": {"name": "express"}, "first_patched_version": "4.18.0"}, + ] + ) + ci = CveIntel(vuln_id="CVE-2024-1234", ghsa=ghsa) + + result = build_patch_recommendation(ci, None) + assert "lodash:4.17.21" in result + assert "express:4.18.0" in result + + def test_without_sbom_deduplicates_packages(self): + """Test that duplicate package names are deduplicated.""" + ghsa = CveIntelGhsa( + ghsa_id="GHSA-1234-5678-9012", + vulnerabilities=[ + {"package": {"name": "lodash"}, "first_patched_version": "4.17.21"}, + {"package": {"name": "lodash"}, "first_patched_version": "4.17.22"}, + ] + ) + ci = CveIntel(vuln_id="CVE-2024-1234", ghsa=ghsa) + + result = build_patch_recommendation(ci, None) + # First version should win + assert result == "lodash:4.17.21" + + def test_skips_vulnerabilities_without_name(self): + """Test that vulnerabilities without package name are skipped.""" + ghsa = CveIntelGhsa( + ghsa_id="GHSA-1234-5678-9012", + vulnerabilities=[ + {"package": {}, "first_patched_version": "1.0.0"}, + {"package": {"name": "express"}, "first_patched_version": "4.18.0"}, + ] + ) + ci = CveIntel(vuln_id="CVE-2024-1234", ghsa=ghsa) + + result = build_patch_recommendation(ci, None) + assert result == "express:4.18.0" + + def test_skips_vulnerabilities_without_version(self): + """Test that vulnerabilities without patched version are skipped.""" + ghsa = CveIntelGhsa( + ghsa_id="GHSA-1234-5678-9012", + vulnerabilities=[ + {"package": {"name": "lodash"}}, + {"package": {"name": "express"}, "first_patched_version": "4.18.0"}, + ] + ) + ci = CveIntel(vuln_id="CVE-2024-1234", ghsa=ghsa) + + result = build_patch_recommendation(ci, None) + assert result == "express:4.18.0" + diff --git a/uv.lock b/uv.lock index 0b608419..152420d8 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.11, <3.13" resolution-markers = [ "python_full_version >= '3.12' and sys_platform == 'darwin'", @@ -799,6 +799,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/0a/76/cf8d69da8d0b5ecb0db406f24a63a3f69ba5e791a11b782aeeefef27ccbb/cryptography-45.0.6-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:629127cfdcdc6806dfe234734d7cb8ac54edaf572148274fa377a7d3405b0043", size = 3331874, upload-time = "2025-08-05T23:59:23.017Z" }, ] +[[package]] +name = "csaf-tool" +version = "0.3.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "packageurl-python" }, + { name = "rich" }, +] +wheels = [ + { url = "https://files.pythonhosted.org/packages/9f/24/b408082ef806581de3a2094caf0b9ed560a4fb8e8c14bb9c28462ecb023c/csaf_tool-0.3.2-py2.py3-none-any.whl", hash = "sha256:7e5559cb522eb76e3acad39a7bf9ba1b81e5a6224099d511a4c9c2dcf36caa16", size = 17629, upload-time = "2024-06-12T20:10:06.429Z" }, +] + [[package]] name = "cvss" version = "3.6" @@ -3377,6 +3389,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b0/0d/73143ecb94ac4a5dcba223402139240a75dee0cc6ba8a543788a5646407a/ormsgpack-1.10.0-cp312-cp312-win_amd64.whl", hash = "sha256:35fa9f81e5b9a0dab42e09a73f7339ecffdb978d6dbf9deb2ecf1e9fc7808722", size = 121401, upload-time = "2025-05-24T19:07:28.308Z" }, ] +[[package]] +name = "packageurl-python" +version = "0.17.6" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/f5/d6/3b5a4e3cfaef7a53869a26ceb034d1ff5e5c27c814ce77260a96d50ab7bb/packageurl_python-0.17.6.tar.gz", hash = "sha256:1252ce3a102372ca6f86eb968e16f9014c4ba511c5c37d95a7f023e2ca6e5c25", size = 50618, upload-time = "2025-11-24T15:20:17.998Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b1/2f/c7277b7615a93f51b5fbc1eacfc1b75e8103370e786fd8ce2abf6e5c04ab/packageurl_python-0.17.6-py3-none-any.whl", hash = "sha256:31a85c2717bc41dd818f3c62908685ff9eebcb68588213745b14a6ee9e7df7c9", size = 36776, upload-time = "2025-11-24T15:20:16.962Z" }, +] + [[package]] name = "packaging" version = "25.0" @@ -5314,6 +5335,7 @@ dependencies = [ { name = "aiohttp-client-cache" }, { name = "aioresponses" }, { name = "brotli" }, + { name = "csaf-tool" }, { name = "cvss" }, { name = "esprima" }, { name = "exploit-iq-commons" }, @@ -5359,6 +5381,7 @@ requires-dist = [ { name = "aiohttp-client-cache", specifier = "==0.11" }, { name = "aioresponses", specifier = "==0.7.6" }, { name = "brotli" }, + { name = "csaf-tool", specifier = "==0.3.2" }, { name = "cvss", specifier = "==3.6" }, { name = "esprima" }, { name = "exploit-iq-commons", editable = "src/exploit_iq_commons" },