@@ -36,41 +36,35 @@ def parse_cyclonedx_data(data: dict) -> None:
3636
3737
3838def _create_cyclonedx_document(data: dict) -> VEX_Document:
39- # Use serial number as document ID, fallback to a generated one
4039 document_id = data.get("serialNumber")
4140 if not document_id:
42- bom_format = data.get("bomFormat", "CycloneDX")
43- spec_version = data.get("specVersion", "1.0")
44- document_id = f"{bom_format}-{spec_version}-{hash(str(data))}"
41+ raise ValidationError("serialNumber is missing")
4542
46- version = str(data.get("version", 1))
43+ version_value = data.get("version")
44+ if version_value is None:
45+ raise ValidationError("version is missing")
46+ version = str(version_value)
4747
48- # Extract metadata for author and timestamps
4948 metadata = data.get("metadata", {})
5049
51- # Get timestamp from metadata or use current time
5250 timestamp = metadata.get("timestamp")
5351 if not timestamp:
54- # If no timestamp, use a default ISO format string
55- from datetime import datetime
56- timestamp = datetime.now().isoformat() + "Z"
57-
58- # Extract author from tools or component
59- author = "Unknown"
60- tools = metadata.get("tools", [])
61- if tools:
62- if isinstance(tools, list) and tools:
63- author = tools[0].get("name", "Unknown")
64- elif isinstance(tools, dict):
65- components = tools.get("components", [])
66- if components:
67- author = components[0].get("name", "Unknown")
68-
69- # If no author from tools, try to get from component
70- if author == "Unknown":
71- component = metadata.get("component", {})
72- if component:
73- author = component.get("name", "Unknown")
52+ raise ValidationError("metadata/timestamp is missing")
53+
54+ author = None
55+ # Prefer authors list if available
56+ authors = metadata.get("authors")
57+ if authors and isinstance(authors, list) and len(authors) > 0:
58+ # Find the first author with a name set
59+ author = next((item.get("name") for item in authors
60+ if isinstance(item, dict) and item.get("name")), None)
61+
62+ # Fall back to manufacturer or supplier if no authors
63+ if not author:
64+ author = metadata.get("manufacturer", {}).get("name") or metadata.get("supplier", {}).get("name")
65+
66+ if not author:
67+ raise ValidationError("author is missing")
7468
7569 try:
7670 cyclonedx_document = VEX_Document.objects.get(document_id=document_id, author=author)
@@ -98,9 +92,13 @@ def _process_vex_statements(data: dict, cyclonedx_document: VEX_Document) -> tup
9892 if not isinstance(vulnerabilities, list):
9993 raise ValidationError("vulnerabilities is not a list")
10094
101- # Build components mapping
10295 components_map = _build_components_map(data)
10396
97+ product_purl = data.get("metadata", {}).get("component", {}).get("purl", "")
98+ if not product_purl:
99+ raise ValidationError("metadata/component/purl is missing")
100+ validate_purl(product_purl)
101+
104102 product_purls: set[str] = set()
105103 vex_statements: set[VEX_Statement] = set()
106104
@@ -115,13 +113,12 @@ def _process_vex_statements(data: dict, cyclonedx_document: VEX_Document) -> tup
115113
116114 analysis = vulnerability.get("analysis", {})
117115 if not analysis:
118- # Skip vulnerabilities without analysis (not VEX statements)
116+ # Skip vulnerabilities without analysis
119117 vulnerability_counter += 1
120118 continue
121119
122120 cyclonedx_analysis = _parse_analysis(analysis, vulnerability_counter)
123121
124- # Map CycloneDX state to VEX status
125122 vex_status = _map_cyclonedx_state_to_vex_status(cyclonedx_analysis.state)
126123 if not vex_status:
127124 raise ValidationError(f"vulnerability[{vulnerability_counter}]/analysis/state is not valid: {cyclonedx_analysis.state}")
@@ -131,26 +128,26 @@ def _process_vex_statements(data: dict, cyclonedx_document: VEX_Document) -> tup
131128 if detail:
132129 description += f"\n\n{detail}"
133130
134- # Build justification from CycloneDX justification
135- justification = _map_cyclonedx_justification_to_vex_justification(cyclonedx_analysis.justification)
136-
137- # Build remediation from response and recommendation
138131 remediation = _build_remediation_text(cyclonedx_analysis.response, vulnerability.get("recommendation", ""))
139132
140- # Process affected components
141133 affects = vulnerability.get("affects", [])
142134 if not affects:
143- # If no affects, this is a general statement - we'll need a product PURL
144- # Try to extract from metadata component
145- component = data.get("metadata", {}).get("component", {})
146- product_purl = component.get("purl", "")
147- if product_purl:
148- validate_purl(product_purl)
149- _create_vex_statement(
150- cyclonedx_document, vulnerability_id, description, vex_status,
151- justification, cyclonedx_analysis.detail, remediation,
152- product_purl, "", product_purls, vex_statements
153- )
135+ # General statement for the product
136+ _create_vex_statement(
137+ cyclonedx_document,
138+ vulnerability_id,
139+ description,
140+ vex_status,
141+ cyclonedx_analysis.justification,
142+ cyclonedx_analysis.detail,
143+ remediation,
144+ product_purl,
145+ "",
146+ product_purls,
147+ vex_statements,
148+ )
149+ elif not isinstance(affects, list):
150+ raise ValidationError(f"affects[{vulnerability_counter}] is not a list")
154151 else:
155152 _process_affected_components(
156153 cyclonedx_document=cyclonedx_document,
@@ -160,11 +157,12 @@ def _process_vex_statements(data: dict, cyclonedx_document: VEX_Document) -> tup
160157 vulnerability_id=vulnerability_id,
161158 description=description,
162159 vex_status=vex_status,
163- justification=justification,
160+ justification=cyclonedx_analysis. justification,
164161 impact=cyclonedx_analysis.detail,
165162 remediation=remediation,
166163 affects=affects,
167164 components_map=components_map,
165+ product_purl=product_purl,
168166 )
169167
170168 vulnerability_counter += 1
@@ -173,7 +171,6 @@ def _process_vex_statements(data: dict, cyclonedx_document: VEX_Document) -> tup
173171
174172
175173def _build_components_map(data: dict) -> dict[str, dict]:
176- """Build a mapping of bom-ref to component data for quick lookup."""
177174 components_map = {}
178175
179176 # Add root component from metadata
@@ -214,7 +211,6 @@ def _parse_analysis(analysis: dict, vulnerability_counter: int) -> CycloneDX_Ana
214211
215212
216213def _map_cyclonedx_state_to_vex_status(state: str) -> Optional[str]:
217- """Map CycloneDX analysis state to VEX status."""
218214 mapping = {
219215 CycloneDX_Analysis_State.CYCLONEDX_STATE_RESOLVED: VEX_Status.VEX_STATUS_FIXED,
220216 CycloneDX_Analysis_State.CYCLONEDX_STATE_RESOLVED_WITH_PEDIGREE: VEX_Status.VEX_STATUS_FIXED,
@@ -226,15 +222,7 @@ def _map_cyclonedx_state_to_vex_status(state: str) -> Optional[str]:
226222 return mapping.get(state)
227223
228224
229- def _map_cyclonedx_justification_to_vex_justification(justification: str) -> str:
230- """Map CycloneDX justification to VEX justification if possible."""
231- # CycloneDX doesn't have standardized justification values like OpenVEX
232- # We'll pass through the justification as is, or return empty string
233- return justification if justification else ""
234-
235-
236225def _build_remediation_text(response: list[str], recommendation: str) -> str:
237- """Build remediation text from response actions and recommendation."""
238226 remediation_parts = []
239227
240228 if response:
@@ -261,6 +249,7 @@ def _process_affected_components(
261249 remediation: str,
262250 affects: list,
263251 components_map: dict,
252+ product_purl: str,
264253) -> None:
265254 affected_counter = 0
266255 for affected in affects:
@@ -271,24 +260,32 @@ def _process_affected_components(
271260 if not ref:
272261 raise ValidationError(f"affects[{vulnerability_counter}][{affected_counter}]/ref is missing")
273262
274- # Look up component by bom-ref
275263 component = components_map.get(ref)
276264 if not component:
277- # Skip if we can't find the component
278- affected_counter += 1
279- continue
265+ raise ValidationError(
266+ f"affects[{vulnerability_counter}][{ affected_counter}]/ref '{ref}' not found in components"
267+ )
280268
281- # Extract PURL from component
282269 component_purl = component.get("purl", "")
283- if component_purl:
284- validate_purl(component_purl)
285-
286- # For affected components, we'll use the component PURL as both product and component
287- _create_vex_statement(
288- cyclonedx_document, vulnerability_id, description, vex_status,
289- justification, impact, remediation,
290- component_purl, component_purl, product_purls, vex_statements
270+ if not component_purl:
271+ raise ValidationError(
272+ f"affects[{vulnerability_counter}][{affected_counter}]/ref '{ref}' component is missing purl"
291273 )
274+ validate_purl(component_purl)
275+
276+ _create_vex_statement(
277+ cyclonedx_document,
278+ vulnerability_id,
279+ description,
280+ vex_status,
281+ justification,
282+ impact,
283+ remediation,
284+ product_purl,
285+ component_purl,
286+ product_purls,
287+ vex_statements,
288+ )
292289
293290 affected_counter += 1
294291
@@ -306,7 +303,6 @@ def _create_vex_statement(
306303 product_purls: set,
307304 vex_statements: set,
308305) -> None:
309- """Create and save a VEX statement."""
310306 vex_statement = VEX_Statement(
311307 document=cyclonedx_document,
312308 vulnerability_id=vulnerability_id,
0 commit comments