diff --git a/scanpipe/pipes/output.py b/scanpipe/pipes/output.py index ccf5fc70e7..1b02ab82ee 100644 --- a/scanpipe/pipes/output.py +++ b/scanpipe/pipes/output.py @@ -746,9 +746,17 @@ def to_spdx(project, version=spdx.SPDX_SPEC_VERSION_2_3, include_files=False): # for detailed context. if len(project_inputs_as_spdx_packages) == 1: describe_spdx_id = project_inputs_as_spdx_packages[0].spdx_id + describe_spdx_ids = [describe_spdx_id] + + # When multiple inputs exist, all inputs should be in documentDescribes. + # This allows proper representation of multiple archives, PURLs, or Docker images + # being analyzed together. See https://github.com/aboutcode-org/scancode.io/issues/1878 + elif len(project_inputs_as_spdx_packages) > 1: + describe_spdx_ids = [pkg.spdx_id for pkg in project_inputs_as_spdx_packages] + describe_spdx_id = describe_spdx_ids[0] # Fallback to the Project as the SPDX root element for the "documentDescribes", - # if more than one input, or if no inputs, are available. + # if no inputs are available. else: project_as_root_package = spdx.Package( spdx_id=f"SPDXRef-scancodeio-project-{project.uuid}", @@ -757,6 +765,31 @@ def to_spdx(project, version=spdx.SPDX_SPEC_VERSION_2_3, include_files=False): ) packages_as_spdx.append(project_as_root_package) describe_spdx_id = project_as_root_package.spdx_id + describe_spdx_ids = [describe_spdx_id] + + # Build a mapping of packages to their input sources + # This is done by checking which input path the package resources belong to + package_to_input_map = {} + if len(project_inputs_as_spdx_packages) > 1: + input_filenames = { + inp.get("filename"): inp_pkg.spdx_id + for inp, inp_pkg in zip( + project.get_inputs_with_source(), project_inputs_as_spdx_packages + ) + } + + for package in discoveredpackage_qs: + # Try to determine which input this package came from + # by checking the package's codebase resources + resources = package.codebase_resources.all()[:1] + if resources: + resource_path = resources[0].path + for filename, input_spdx_id in input_filenames.items(): + if resource_path.startswith(filename) or resource_path.startswith( + filename.replace(".tar.gz", "").replace(".tgz", "").replace(".zip", "") + ): + package_to_input_map[package.uuid] = input_spdx_id + break for package in discoveredpackage_qs: spdx_package = package.as_spdx() @@ -765,12 +798,36 @@ def to_spdx(project, version=spdx.SPDX_SPEC_VERSION_2_3, include_files=False): if license_expression := package.declared_license_expression: license_expressions.append(license_expression) - spdx_relationship = spdx.Relationship( - spdx_id=describe_spdx_id, - related_spdx_id=spdx_package.spdx_id, - relationship="DEPENDS_ON", - ) - relationships.append(spdx_relationship) + # When multiple inputs exist, use CONTAINS relationship from the input + # to show that the package was found within that input + if len(project_inputs_as_spdx_packages) > 1: + # If we can determine which input this package came from, create a CONTAINS + # relationship from that input to this package + input_spdx_id = package_to_input_map.get(package.uuid) + if input_spdx_id: + spdx_relationship = spdx.Relationship( + spdx_id=input_spdx_id, + related_spdx_id=spdx_package.spdx_id, + relationship="CONTAINS", + ) + relationships.append(spdx_relationship) + else: + # If we can't determine the input, create a DEPENDS_ON relationship + # from the first input (or project) for backward compatibility + spdx_relationship = spdx.Relationship( + spdx_id=describe_spdx_id, + related_spdx_id=spdx_package.spdx_id, + relationship="DEPENDS_ON", + ) + relationships.append(spdx_relationship) + else: + # Single input case: use DEPENDS_ON as before + spdx_relationship = spdx.Relationship( + spdx_id=describe_spdx_id, + related_spdx_id=spdx_package.spdx_id, + relationship="DEPENDS_ON", + ) + relationships.append(spdx_relationship) for dependency in discovereddependency_qs: spdx_relationship = get_dependency_as_spdx_relationship( @@ -792,7 +849,7 @@ def to_spdx(project, version=spdx.SPDX_SPEC_VERSION_2_3, include_files=False): spdx_id=document_spdx_id, name=f"scancodeio_{project.name}", namespace=f"https://scancode.io/spdxdocs/{project.uuid}", - describes=[describe_spdx_id], + describes=describe_spdx_ids, creation_info=spdx.CreationInfo(tool=f"ScanCode.io-{scancodeio_version}"), packages=packages_as_spdx, files=files_as_spdx, @@ -881,6 +938,60 @@ def get_cyclonedx_bom(project): bom_ref=str(project.uuid), ) + # Get input sources to potentially create components for them + input_sources = project.get_inputs_with_source() + input_components = [] + + # When multiple inputs exist, create components for each input + # and add them as nested components under the project root component + if len(input_sources) > 1: + for input_source in input_sources: + input_uuid = input_source.get("uuid") or str(uuid.uuid4()) + input_component = cdx_component.Component( + name=input_source.get("filename", "unknown"), + bom_ref=f"input-{input_uuid}", + type=cdx_component.ComponentType.LIBRARY, + ) + input_components.append(input_component) + + # Build properties list + properties = [ + cdx_bom.Property( + name="notice", + value=SCAN_NOTICE, + ), + ] + + # Add properties for project name and input sources only when multiple inputs exist + if len(input_sources) > 1: + properties.append( + cdx_bom.Property( + name="scancode-io:project-name", + value=project.name, + ) + ) + + # Add properties for each input source + for input_source in input_sources: + filename = input_source.get("filename") + download_url = input_source.get("download_url") + + if filename: + properties.append( + cdx_bom.Property( + name="scancode-io:input-file", + value=filename, + ) + ) + + if download_url: + properties.append( + cdx_bom.Property( + name="scancode-io:input-source", + value=download_url, + ) + ) + bom = cdx_bom.Bom() bom.metadata = cdx_bom.BomMetaData( component=project_as_root_component, @@ -890,12 +1001,7 @@ def get_cyclonedx_bom(project): version=scancodeio_version, ) ], - properties=[ - cdx_bom.Property( - name="notice", - value=SCAN_NOTICE, - ) - ], + properties=properties, ) vulnerabilities = [] @@ -904,10 +1010,39 @@ def get_cyclonedx_bom(project): package_qs = get_queryset(project, "discoveredpackage") package_qs = package_qs.prefetch_related("children_packages") + # Build a mapping of packages to their input sources when multiple inputs exist + package_to_input_map = {} + if len(input_sources) > 1: + input_filenames = { + inp.get("filename"): idx + for idx, inp in enumerate(input_sources) + if inp.get("filename") + } + + for package in package_qs: + # Try to determine which input this package came from + resources = package.codebase_resources.all()[:1] + if resources: + resource_path = resources[0].path + for filename, input_idx in input_filenames.items(): + if resource_path.startswith(filename) or resource_path.startswith( + filename.replace(".tar.gz", "").replace(".tgz", "").replace(".zip", "") + ): + package_to_input_map[package.uuid] = input_idx + break + for package in package_qs: component = package.as_cyclonedx() bom.components.add(component) - bom.register_dependency(project_as_root_component, [component]) + + if len(input_sources) > 1: + input_idx = package_to_input_map.get(package.uuid) + if input_idx is not None and input_idx < len(input_components): + bom.register_dependency(input_components[input_idx], [component]) + else: + bom.register_dependency(project_as_root_component, [component]) + else: + bom.register_dependency(project_as_root_component, [component]) # Store the component dependencies to be added later since all components need # to be added on the BOM first. @@ -920,6 +1055,12 @@ def get_cyclonedx_bom(project): vulnerability_as_cyclonedx(vulnerability_data, component.bom_ref) ) + # Add input components to the BOM if we have multiple inputs + if input_components: + for input_component in input_components: + bom.components.add(input_component) + bom.register_dependency(project_as_root_component, [input_component]) + for component, depends_on_bom_refs in dependencies.items(): if not depends_on_bom_refs: continue diff --git a/scanpipe/tests/pipes/test_output.py b/scanpipe/tests/pipes/test_output.py index fd8b897e46..ef09c0065d 100644 --- a/scanpipe/tests/pipes/test_output.py +++ b/scanpipe/tests/pipes/test_output.py @@ -388,7 +388,7 @@ def test_scanpipe_pipes_outputs_get_cyclonedx_bom_dependency_tree(self): make_dependency(project, for_package=a, resolved_to_package=b) make_dependency(project, for_package=b, resolved_to_package=c) - with self.assertNumQueries(2): + with self.assertNumQueries(3): output_file = output.to_cyclonedx(project=project) results_json = json.loads(output_file.read_text()) @@ -633,6 +633,121 @@ def test_scanpipe_pipes_outputs_to_to_ort_package_list_yml(self): expected_file = self.data / "asgiref" / "asgiref-3.3.0.package-list.yml" self.assertResultsEqual(expected_file, output_file.read_text()) + def test_scanpipe_pipes_outputs_to_spdx_multiple_inputs(self): + """Test SPDX generation with multiple input sources.""" + project = make_project(name="MultiInputProject") + + # Add two input sources + input1 = project.add_input_source( + download_url="pkg:npm/package1@1.0.0", + filename="package1-1.0.0.tgz", + ) + input2 = project.add_input_source( + download_url="pkg:npm/package2@2.0.0", + filename="package2-2.0.0.tgz", + ) + + # Create resources for each input + resource1 = CodebaseResource.objects.create( + project=project, + path="package1-1.0.0/package.json", + ) + resource2 = CodebaseResource.objects.create( + project=project, + path="package2-2.0.0/package.json", + ) + + # Create packages associated with each input + package1 = make_package(project, "pkg:npm/dependency1@1.0.0") + package1.codebase_resources.add(resource1) + + package2 = make_package(project, "pkg:npm/dependency2@2.0.0") + package2.codebase_resources.add(resource2) + + # Generate SPDX output + output_file = output.to_spdx(project=project) + results_json = json.loads(output_file.read_text()) + + # Verify documentDescribes contains both inputs + self.assertEqual(2, len(results_json["documentDescribes"])) + self.assertIn(f"SPDXRef-scancodeio-input-{input1.uuid}", results_json["documentDescribes"]) + self.assertIn(f"SPDXRef-scancodeio-input-{input2.uuid}", results_json["documentDescribes"]) + + # Verify packages include both inputs and discovered packages + package_spdx_ids = [pkg["SPDXID"] for pkg in results_json["packages"]] + self.assertIn(f"SPDXRef-scancodeio-input-{input1.uuid}", package_spdx_ids) + self.assertIn(f"SPDXRef-scancodeio-input-{input2.uuid}", package_spdx_ids) + + # Verify CONTAINS relationships exist from inputs to their packages + contains_relationships = [ + rel for rel in results_json["relationships"] + if rel["relationshipType"] == "CONTAINS" + ] + self.assertGreater(len(contains_relationships), 0) + + def test_scanpipe_pipes_outputs_to_cyclonedx_multiple_inputs(self): + """Test CycloneDX generation with multiple input sources.""" + project = make_project(name="MultiInputProject") + + # Add two input sources + input1 = project.add_input_source( + download_url="pkg:npm/package1@1.0.0", + filename="package1-1.0.0.tgz", + ) + input2 = project.add_input_source( + download_url="pkg:npm/package2@2.0.0", + filename="package2-2.0.0.tgz", + ) + + # Create resources for each input + resource1 = CodebaseResource.objects.create( + project=project, + path="package1-1.0.0/package.json", + ) + resource2 = CodebaseResource.objects.create( + project=project, + path="package2-2.0.0/package.json", + ) + + # Create packages associated with each input + package1 = make_package(project, "pkg:npm/dependency1@1.0.0") + package1.codebase_resources.add(resource1) + + package2 = make_package(project, "pkg:npm/dependency2@2.0.0") + package2.codebase_resources.add(resource2) + + # Generate CycloneDX output + output_file = output.to_cyclonedx(project=project) + results_json = json.loads(output_file.read_text()) + + # Verify metadata properties include input source information + properties = results_json["metadata"]["properties"] + property_names = [prop["name"] for prop in properties] + + self.assertIn("scancode-io:project-name", property_names) + self.assertIn("scancode-io:input-file", property_names) + self.assertIn("scancode-io:input-source", property_names) + + # Count input file properties (should be 2, one for each input) + input_file_properties = [ + prop for prop in properties + if prop["name"] == "scancode-io:input-file" + ] + self.assertEqual(2, len(input_file_properties)) + + # Verify input filenames are in properties + input_file_values = [prop["value"] for prop in input_file_properties] + self.assertIn("package1-1.0.0.tgz", input_file_values) + self.assertIn("package2-2.0.0.tgz", input_file_values) + + # Verify components include input components + components = results_json.get("components", []) + component_refs = [comp["bom-ref"] for comp in components] + + # Check that input components are present + self.assertIn(f"input-{input1.uuid}", component_refs) + self.assertIn(f"input-{input2.uuid}", component_refs) + def test_scanpipe_pipes_outputs_make_unknown_license_object(self): licensing = get_licensing() parsed_expression = licensing.parse("some-unknown-license")