|
4 | 4 | """
|
5 | 5 | VEX Validation Module
|
6 | 6 |
|
7 |
| -Provides functionality to validate VEX files for schema compliance using standard |
8 |
| -JSON schemas for CycloneDX, CSAF, and OpenVEX formats. |
| 7 | +Provides functionality to validate VEX files for schema compliance using |
| 8 | +lib4sbom for CycloneDX validation and standard JSON schemas for CSAF and OpenVEX formats. |
9 | 9 | """
|
10 | 10 |
|
11 | 11 | import json
|
|
16 | 16 |
|
17 | 17 | import jsonschema
|
18 | 18 | from jsonschema import ValidationError as JsonSchemaValidationError
|
| 19 | +from lib4sbom.validator import SBOMValidator |
| 20 | +from lib4vex.parser import VEXParser |
19 | 21 |
|
20 | 22 | from cve_bin_tool.log import LOGGER
|
21 | 23 |
|
@@ -161,33 +163,67 @@ def validate_file(
|
161 | 163 | self.logger.info(f"Detected VEX format: {vex_type} (version: {version})")
|
162 | 164 |
|
163 | 165 | # Validate against appropriate schema
|
164 |
| - try: |
165 |
| - schema = self._get_schema(vex_type, version) |
166 |
| - if schema: # Only validate if schema was successfully loaded |
167 |
| - jsonschema.validate(instance=raw_data, schema=schema) |
168 |
| - self.logger.info(f"VEX file passed {vex_type} schema validation") |
169 |
| - except JsonSchemaValidationError as e: |
170 |
| - # Convert jsonschema validation error to our format |
171 |
| - path = ".".join(str(p) for p in e.path) if e.path else None |
| 166 | + if vex_type == "cyclonedx": |
| 167 | + # Use lib4sbom validator for CycloneDX VEX files |
| 168 | + try: |
| 169 | + sbom_validator = SBOMValidator() |
| 170 | + validation_result = sbom_validator.validate_file(vex_file_path) |
| 171 | + |
| 172 | + # Check validation result |
| 173 | + cyclonedx_valid = validation_result.get("CycloneDX", False) |
| 174 | + if not cyclonedx_valid: |
| 175 | + self.errors.append( |
| 176 | + ValidationError( |
| 177 | + "SCHEMA_ERROR", |
| 178 | + "CycloneDX schema validation failed. File does not comply with CycloneDX specification.", |
| 179 | + ) |
| 180 | + ) |
| 181 | + else: |
| 182 | + self.logger.info( |
| 183 | + "VEX file passed CycloneDX schema validation using lib4sbom" |
| 184 | + ) |
172 | 185 |
|
173 |
| - self.errors.append( |
174 |
| - ValidationError( |
175 |
| - "SCHEMA_ERROR", |
176 |
| - f"Schema validation error: {e.message}", |
177 |
| - field=path, |
178 |
| - location=str(e.schema_path) if e.schema_path else None, |
| 186 | + except Exception as e: |
| 187 | + self.errors.append( |
| 188 | + ValidationError( |
| 189 | + "VALIDATION_ERROR", |
| 190 | + f"Error during CycloneDX validation: {str(e)}", |
| 191 | + ) |
179 | 192 | )
|
180 |
| - ) |
181 |
| - except Exception as e: |
182 |
| - self.errors.append( |
183 |
| - ValidationError( |
184 |
| - "VALIDATION_ERROR", f"Error during schema validation: {str(e)}" |
| 193 | + else: |
| 194 | + # Use traditional jsonschema validation for CSAF and OpenVEX |
| 195 | + try: |
| 196 | + schema = self._get_schema(vex_type, version) |
| 197 | + if schema: # Only validate if schema was successfully loaded |
| 198 | + jsonschema.validate(instance=raw_data, schema=schema) |
| 199 | + self.logger.info(f"VEX file passed {vex_type} schema validation") |
| 200 | + except JsonSchemaValidationError as e: |
| 201 | + # Convert jsonschema validation error to our format |
| 202 | + path = ".".join(str(p) for p in e.path) if e.path else None |
| 203 | + |
| 204 | + self.errors.append( |
| 205 | + ValidationError( |
| 206 | + "SCHEMA_ERROR", |
| 207 | + f"Schema validation error: {e.message}", |
| 208 | + field=path, |
| 209 | + location=str(e.schema_path) if e.schema_path else None, |
| 210 | + ) |
| 211 | + ) |
| 212 | + except Exception as e: |
| 213 | + self.errors.append( |
| 214 | + ValidationError( |
| 215 | + "VALIDATION_ERROR", f"Error during schema validation: {str(e)}" |
| 216 | + ) |
185 | 217 | )
|
186 |
| - ) |
187 | 218 |
|
188 | 219 | # Perform format-specific validations
|
189 | 220 | self._perform_format_specific_validation(raw_data, vex_type)
|
190 | 221 |
|
| 222 | + # Validate status transitions and provide actionable fixes using lib4vex |
| 223 | + self._validate_status_transitions_and_suggest_fixes( |
| 224 | + raw_data, vex_type, vex_file_path |
| 225 | + ) |
| 226 | + |
191 | 227 | # Add warnings for empty sections
|
192 | 228 | self._check_for_empty_sections(raw_data, vex_type)
|
193 | 229 |
|
@@ -463,6 +499,164 @@ def _validate_csaf_vulnerability(self, vulnerability: Dict, index: int) -> None:
|
463 | 499 | )
|
464 | 500 | )
|
465 | 501 |
|
| 502 | + def _validate_status_transitions_and_suggest_fixes( |
| 503 | + self, raw_data: Dict, vex_type: str, vex_file_path: str |
| 504 | + ) -> None: |
| 505 | + """ |
| 506 | + Validate status transitions and provide actionable fixes using lib4vex. |
| 507 | +
|
| 508 | + Checks for invalid status transitions (e.g., marking a resolved CVE as not_affected |
| 509 | + without justification) and uses lib4vex to suggest improvements. |
| 510 | + """ |
| 511 | + # For CycloneDX, use basic validation since lib4sbom handles schema validation |
| 512 | + if vex_type == "cyclonedx": |
| 513 | + self._validate_status_transitions_basic(raw_data, vex_type) |
| 514 | + self._suggest_actionable_fixes(raw_data, vex_type) |
| 515 | + return |
| 516 | + |
| 517 | + # For other formats, try lib4vex first, then fall back to basic validation |
| 518 | + try: |
| 519 | + # Use lib4vex parser to analyze the VEX file |
| 520 | + vex_parser = VEXParser() |
| 521 | + vex_parser.parse(vex_file_path) |
| 522 | + |
| 523 | + # Get vulnerabilities from lib4vex parser |
| 524 | + vulnerabilities = vex_parser.get_vulnerabilities() |
| 525 | + |
| 526 | + for vuln in vulnerabilities: |
| 527 | + self._validate_individual_vulnerability_status(vuln, vex_type) |
| 528 | + |
| 529 | + # Check for missing required fields and suggest fixes |
| 530 | + self._suggest_actionable_fixes(raw_data, vex_type) |
| 531 | + |
| 532 | + except Exception as e: |
| 533 | + self.logger.debug(f"lib4vex validation failed: {str(e)}") |
| 534 | + # Fall back to basic validation if lib4vex fails |
| 535 | + self._validate_status_transitions_basic(raw_data, vex_type) |
| 536 | + self._suggest_actionable_fixes(raw_data, vex_type) |
| 537 | + |
| 538 | + def _validate_individual_vulnerability_status( |
| 539 | + self, vuln: Dict, vex_type: str |
| 540 | + ) -> None: |
| 541 | + """Validate individual vulnerability status transitions.""" |
| 542 | + status = vuln.get("status", "").lower() |
| 543 | + |
| 544 | + # Check for invalid status transitions |
| 545 | + if status == "not_affected": |
| 546 | + # If status is not_affected, should have justification |
| 547 | + justification = ( |
| 548 | + vuln.get("justification") |
| 549 | + or vuln.get("detail") |
| 550 | + or vuln.get("action_statement") |
| 551 | + ) |
| 552 | + if not justification: |
| 553 | + self.errors.append( |
| 554 | + ValidationError( |
| 555 | + "INVALID_STATUS_TRANSITION", |
| 556 | + "CVE marked as 'not_affected' without justification. Add justification field explaining why this vulnerability does not affect the product.", |
| 557 | + field="justification", |
| 558 | + location=f"vulnerability {vuln.get('id', 'unknown')}", |
| 559 | + ) |
| 560 | + ) |
| 561 | + |
| 562 | + elif status == "resolved" or status == "fixed": |
| 563 | + # If status is resolved/fixed, should have timestamp and details |
| 564 | + timestamp = vuln.get("updated") or vuln.get("timestamp") |
| 565 | + if not timestamp: |
| 566 | + self.warnings.append( |
| 567 | + ValidationError( |
| 568 | + "MISSING_TIMESTAMP", |
| 569 | + "Fixed/resolved vulnerability should have a timestamp indicating when it was resolved. Add timestamp field.", |
| 570 | + field="timestamp", |
| 571 | + location=f"vulnerability {vuln.get('id', 'unknown')}", |
| 572 | + severity="warning", |
| 573 | + ) |
| 574 | + ) |
| 575 | + |
| 576 | + def _validate_status_transitions_basic(self, raw_data: Dict, vex_type: str) -> None: |
| 577 | + """Basic status transition validation when lib4vex is not available.""" |
| 578 | + if vex_type == "cyclonedx": |
| 579 | + vulnerabilities = raw_data.get("vulnerabilities", []) |
| 580 | + for i, vuln in enumerate(vulnerabilities): |
| 581 | + analysis = vuln.get("analysis", {}) |
| 582 | + state = analysis.get("state", "").lower() |
| 583 | + |
| 584 | + if state == "not_affected" and not analysis.get("detail"): |
| 585 | + self.errors.append( |
| 586 | + ValidationError( |
| 587 | + "INVALID_STATUS_TRANSITION", |
| 588 | + "CVE marked as 'not_affected' without justification in analysis.detail field.", |
| 589 | + field="analysis.detail", |
| 590 | + location=f"vulnerabilities[{i}]", |
| 591 | + ) |
| 592 | + ) |
| 593 | + |
| 594 | + elif vex_type == "openvex": |
| 595 | + statements = raw_data.get("statements", []) |
| 596 | + for i, stmt in enumerate(statements): |
| 597 | + status = stmt.get("status", "").lower() |
| 598 | + |
| 599 | + if status == "not_affected" and not stmt.get("action_statement"): |
| 600 | + self.errors.append( |
| 601 | + ValidationError( |
| 602 | + "INVALID_STATUS_TRANSITION", |
| 603 | + "Statement marked as 'not_affected' without action_statement explaining why.", |
| 604 | + field="action_statement", |
| 605 | + location=f"statements[{i}]", |
| 606 | + ) |
| 607 | + ) |
| 608 | + |
| 609 | + def _suggest_actionable_fixes(self, raw_data: Dict, vex_type: str) -> None: |
| 610 | + """Suggest actionable fixes for common VEX file issues.""" |
| 611 | + |
| 612 | + # Check for missing timestamp in document metadata |
| 613 | + if vex_type == "cyclonedx": |
| 614 | + metadata = raw_data.get("metadata", {}) |
| 615 | + if not metadata.get("timestamp"): |
| 616 | + self.warnings.append( |
| 617 | + ValidationError( |
| 618 | + "MISSING_FIELD", |
| 619 | + "Add missing timestamp field to metadata for better traceability: 'timestamp': '2024-01-01T00:00:00Z'", |
| 620 | + field="metadata.timestamp", |
| 621 | + severity="warning", |
| 622 | + ) |
| 623 | + ) |
| 624 | + |
| 625 | + elif vex_type == "openvex": |
| 626 | + if not raw_data.get("timestamp"): |
| 627 | + self.warnings.append( |
| 628 | + ValidationError( |
| 629 | + "MISSING_FIELD", |
| 630 | + "Add missing timestamp field: 'timestamp': '2024-01-01T00:00:00'", |
| 631 | + field="timestamp", |
| 632 | + severity="warning", |
| 633 | + ) |
| 634 | + ) |
| 635 | + |
| 636 | + # Check for missing author information |
| 637 | + if vex_type == "cyclonedx": |
| 638 | + metadata = raw_data.get("metadata", {}) |
| 639 | + if not metadata.get("authors") and not metadata.get("supplier"): |
| 640 | + self.warnings.append( |
| 641 | + ValidationError( |
| 642 | + "MISSING_FIELD", |
| 643 | + "Add author information in metadata.authors or metadata.supplier for accountability", |
| 644 | + field="metadata.authors", |
| 645 | + severity="warning", |
| 646 | + ) |
| 647 | + ) |
| 648 | + |
| 649 | + elif vex_type == "openvex": |
| 650 | + if not raw_data.get("author"): |
| 651 | + self.warnings.append( |
| 652 | + ValidationError( |
| 653 | + "MISSING_FIELD", |
| 654 | + "Add missing author field: 'author': 'Your Organization'", |
| 655 | + field="author", |
| 656 | + severity="warning", |
| 657 | + ) |
| 658 | + ) |
| 659 | + |
466 | 660 | def _check_for_empty_sections(self, raw_data: Dict, vex_type: str) -> None:
|
467 | 661 | """Check for empty vulnerability/statement sections and add warnings."""
|
468 | 662 | if vex_type == "cyclonedx":
|
|
0 commit comments