Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cve_bin_tool/vex_manager/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,8 @@
# Copyright (C) 2024 Intel Corporation
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: GPL-3.0-or-later

from cve_bin_tool.vex_manager.generate import VEXGenerate
from cve_bin_tool.vex_manager.handler import VexHandler
from cve_bin_tool.vex_manager.parse import VEXParse

__all__ = ["VexHandler", "VEXParse", "VEXGenerate"]
297 changes: 297 additions & 0 deletions cve_bin_tool/vex_manager/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: GPL-3.0-or-later

import os
from collections import defaultdict
from typing import Any, DefaultDict, Dict, Set, Union

from lib4vex.generator import VEXGenerator
from lib4vex.parser import VEXParser

from cve_bin_tool.log import LOGGER
from cve_bin_tool.util import ProductInfo, Remarks, decode_bom_ref, decode_purl
from lib4vex import Validator as VEXValidator

TriageData = Dict[str, Union[Dict[str, Any], Set[str]]]


class VexHandler:
"""
A centralized handler class for all VEX format operations.
Supports CSAF, CycloneDX, and OpenVEX formats.

This class uses lib4vex for parsing, validation, and generation of VEX documents.

Attributes:
logger: Logger for logging information.
analysis_state: Mapping between VEX format states and internal Remarks.
"""

# Mapping between different VEX format states and internal Remarks
analysis_state = {
"cyclonedx": {
"in_triage": Remarks.NewFound,
"exploitable": Remarks.Confirmed,
"resolved": Remarks.Mitigated,
"false_positive": Remarks.FalsePositive,
"not_affected": Remarks.NotAffected,
},
"csaf": {
"first_affected": Remarks.NewFound,
"first_fixed": Remarks.Mitigated,
"fixed": Remarks.Mitigated,
"known_affected": Remarks.Confirmed,
"known_not_affected": Remarks.NotAffected,
"last_affected": Remarks.Confirmed,
"recommended": Remarks.Mitigated,
"under_investigation": Remarks.NewFound,
},
"openvex": {
"not_affected": Remarks.NotAffected,
"affected": Remarks.Confirmed,
"fixed": Remarks.Mitigated,
"under_investigation": Remarks.NewFound,
},
}

def __init__(self, logger=None):
"""
Initialize the VexHandler.

Args:
logger: Optional logger to use. Defaults to a new child logger.
"""
self.logger = logger or LOGGER.getChild(self.__class__.__name__)

def parse(
self, filename: str, vextype: str = "auto"
) -> DefaultDict[ProductInfo, TriageData]:
"""
Parse a VEX file and extract the necessary information.

Args:
filename: Path to the VEX file.
vextype: Type of VEX file. Can be 'cyclonedx', 'csaf', 'openvex', or 'auto' for automatic detection.

Returns:
Dictionary mapping ProductInfo to vulnerability data.
"""
if not os.path.isfile(filename):
self.logger.error(f"VEX file not found: {filename}")
return defaultdict(dict)

try:
vexparser = VEXParser(vex_type=vextype)
vexparser.parse(filename)

# Get the detected type if auto was specified
if vextype == "auto":
vextype = vexparser.get_type()

self.logger.info(f"Parsed VEX file: {filename} of type: {vextype}")

return self._process_parsed_data(vexparser, vextype)

except Exception as e:
self.logger.error(f"Error parsing VEX file {filename}: {str(e)}")
return defaultdict(dict)

def validate(self, filename: str, vextype: str = "auto") -> bool:
"""
Validate a VEX file against its schema.

Args:
filename: Path to the VEX file.
vextype: Type of VEX file. Can be 'cyclonedx', 'csaf', 'openvex', or 'auto' for automatic detection.

Returns:
True if the file is valid, False otherwise.
"""
if not os.path.isfile(filename):
self.logger.error(f"VEX file not found: {filename}")
return False

try:
validator = VEXValidator(vex_type=vextype)
is_valid = validator.validate(filename)

if is_valid:
self.logger.info(f"VEX file {filename} is valid")
else:
self.logger.error(f"VEX file {filename} is invalid")

return is_valid

except Exception as e:
self.logger.error(f"Error validating VEX file {filename}: {str(e)}")
return False

def generate(self, data: Dict, output_file: str, vextype: str) -> bool:
"""
Generate a VEX document from data.

Args:
data: Data to include in the VEX document.
output_file: Path where to save the generated VEX document.
vextype: Type of VEX document to generate ('cyclonedx', 'csaf', or 'openvex').

Returns:
True if the file was successfully generated, False otherwise.
"""
try:
generator = VEXGenerator(vex_type=vextype)
generator.generate(data, output_file)
self.logger.info(f"Generated {vextype} VEX file: {output_file}")
return True

except Exception as e:
self.logger.error(f"Error generating VEX file {output_file}: {str(e)}")
return False

def convert(
self,
input_file: str,
output_file: str,
from_type: str = "auto",
to_type: str = "cyclonedx",
) -> bool:
"""
Convert a VEX file from one format to another.

Args:
input_file: Path to the input VEX file.
output_file: Path where to save the converted VEX document.
from_type: Type of input VEX file. Can be 'cyclonedx', 'csaf', 'openvex', or 'auto'.
to_type: Type of output VEX file. Can be 'cyclonedx', 'csaf', or 'openvex'.

Returns:
True if conversion was successful, False otherwise.
"""
try:
# Parse the input file
parsed_data = self.parse(input_file, from_type)
if not parsed_data:
return False

# Convert to the target format and generate the output file
return self.generate(parsed_data, output_file, to_type)

except Exception as e:
self.logger.error(
f"Error converting VEX file {input_file} to {to_type}: {str(e)}"
)
return False

def _process_parsed_data(
self, vexparser, vextype: str
) -> DefaultDict[ProductInfo, TriageData]:
"""
Process the parsed VEX data and extract the necessary information.

This method performs the following steps:
1. Extracts metadata, product information, and vulnerabilities from the parser
2. Iterates through each vulnerability to extract key details like ID, status, justification
3. Maps VEX format-specific status values to internal Remarks
4. Decodes product identifiers (bom_ref or purl) to obtain consistent ProductInfo objects
5. Collects all vulnerability data per product

Args:
vexparser: The VEXParser object with parsed data.
vextype: The type of VEX document ('cyclonedx', 'csaf', or 'openvex').

Returns:
DefaultDict mapping ProductInfo objects to their vulnerability data.
"""
parsed_data = defaultdict(dict)
serialNumbers = set()
vulnerabilities = vexparser.get_vulnerabilities()
metadata = vexparser.get_metadata()
product = vexparser.get_product()

# Extract product info based on VEX type but not used directly in this method
# Just stored for future extensions or reference
_ = self._extract_product_info(vextype, metadata, product)

# Process vulnerabilities
for vuln in vulnerabilities:
# Extract necessary fields from the vulnerability
cve_id = vuln.get("id")
remarks = self.analysis_state[vextype][vuln.get("status")]
justification = vuln.get("justification")
response = vuln.get("remediation")
comments = vuln.get("comment")

# If the comment doesn't already have the justification prepended, add it
if comments and justification and not comments.startswith(justification):
comments = f"{justification}: {comments}"

severity = vuln.get("severity")

# Decode the bom reference or purl based on VEX type
product_info = None
serialNumber = ""
if vextype == "cyclonedx":
decoded_ref = decode_bom_ref(vuln.get("bom_link"))
if isinstance(decoded_ref, tuple) and not isinstance(
decoded_ref, ProductInfo
):
product_info, serialNumber = decoded_ref
serialNumbers.add(serialNumber)
else:
product_info = decoded_ref
elif vextype in ["openvex", "csaf"]:
product_info = decode_purl(vuln.get("purl"))

if product_info:
cve_data = {
"remarks": remarks,
"comments": comments if comments else "",
"response": response if response else [],
}
if justification:
cve_data["justification"] = justification.strip()

if severity:
cve_data["severity"] = severity.strip()

parsed_data[product_info][cve_id.strip()] = cve_data

if "paths" not in parsed_data[product_info]:
parsed_data[product_info]["paths"] = {}

self.logger.debug(f"Parsed VEX data: {parsed_data}")
return parsed_data

def _extract_product_info(
self, vextype: str, metadata: Dict, product: Dict
) -> Dict[str, str]:
"""
Extract product information from the parsed VEX file.

Args:
vextype: Type of VEX document.
metadata: Metadata from the VEX document.
product: Product information from the VEX document.

Returns:
Dictionary with product information.
"""
product_info = {}
if vextype == "cyclonedx":
# release and vendor is not available in cyclonedx
product_info["product"] = metadata.get("name")
product_info["release"] = ""
product_info["vendor"] = ""
elif vextype == "csaf":
csaf_product = product.get("CSAFPID_0001", {})
if csaf_product:
product_info["product"] = csaf_product.get("product")
product_info["release"] = csaf_product.get("version")
product_info["vendor"] = csaf_product.get("vendor")
elif vextype == "openvex":
# product and release is not available in openvex
product_info["product"] = ""
product_info["release"] = ""
product_info["vendor"] = metadata.get("author")

return product_info
Loading