Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ resolve_dependencies = "scanpipe.pipelines.resolve_dependencies:ResolveDependenc
scan_codebase = "scanpipe.pipelines.scan_codebase:ScanCodebase"
scan_for_virus = "scanpipe.pipelines.scan_for_virus:ScanForVirus"
scan_single_package = "scanpipe.pipelines.scan_single_package:ScanSinglePackage"
scan_maven_package = "scanpipe.pipelines.scan_maven_package:ScanMavenPackage"

[tool.setuptools.packages.find]
where = ["."]
Expand Down
73 changes: 73 additions & 0 deletions scanpipe/pipelines/scan_maven_package.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# SPDX-License-Identifier: Apache-2.0
#
# http://nexb.com and https://github.com/aboutcode-org/scancode.io
# The ScanCode.io software is licensed under the Apache License version 2.0.
# Data generated with ScanCode.io is provided as-is without warranties.
# ScanCode is a trademark of nexB Inc.
#
# You may not use this software except in compliance with the License.
# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
# Data Generated with ScanCode.io is provided on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. No content created from
# ScanCode.io should be considered or used as legal advice. Consult an Attorney
# for any legal advice.
#
# ScanCode.io is a free software code scanning tool from nexB Inc. and others.
# Visit https://github.com/aboutcode-org/scancode.io for support and download.

import json

from scanpipe.pipelines.scan_single_package import ScanSinglePackage
from scanpipe.pipes.resolve import download_pom_files
from scanpipe.pipes.resolve import get_pom_url_list
from scanpipe.pipes.resolve import scan_pom_files


class ScanMavenPackage(ScanSinglePackage):
"""
Scan a single package archive (or package manifest file).

This pipeline scans a single package for package metadata,
declared dependencies, licenses, license clarity score and copyrights.

The output is a summary of the scan results in JSON format.
"""

@classmethod
def steps(cls):
return (
cls.get_package_input,
cls.collect_input_information,
cls.extract_input_to_codebase_directory,
cls.extract_archives,
cls.run_scan,
cls.fetch_and_scan_remote_pom,
cls.load_inventory_from_toolkit_scan,
cls.make_summary_from_scan_results,
)

def fetch_and_scan_remote_pom(self):
"""Fetch the .pom file from from maven.org if not present in codebase."""
with open(self.scan_output_location) as file:
data = json.load(file)
# Return and do nothing if data has pom.xml
for file in data["files"]:
if "pom.xml" in file["path"]:
return
packages = data.get("packages", [])

pom_url_list = get_pom_url_list(self.project.input_sources[0], packages)
pom_file_list = download_pom_files(pom_url_list)
scanned_pom_packages, scanned_dependencies = scan_pom_files(pom_file_list)

updated_pacakges = packages + scanned_pom_packages
# Replace/Update the package and dependencies section
data["packages"] = updated_pacakges
data["dependencies"] = scanned_dependencies
with open(self.scan_output_location, "w") as file:
json.dump(data, file, indent=2)
202 changes: 202 additions & 0 deletions scanpipe/pipes/resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import json
import logging
import re
import sys
import uuid
from pathlib import Path
Expand All @@ -30,6 +31,7 @@
from django.core.exceptions import ObjectDoesNotExist

import python_inspector.api as python_inspector
import requests
import saneyaml
from attributecode.model import About
from packagedcode import APPLICATION_PACKAGE_DATAFILE_HANDLERS
Expand All @@ -39,7 +41,9 @@
from scanpipe.models import DiscoveredDependency
from scanpipe.models import DiscoveredPackage
from scanpipe.pipes import cyclonedx
from scanpipe.pipes import fetch
from scanpipe.pipes import flag
from scanpipe.pipes import scancode
from scanpipe.pipes import spdx
from scanpipe.pipes import update_or_create_dependency
from scanpipe.pipes import update_or_create_package
Expand Down Expand Up @@ -521,3 +525,201 @@
return extracted_headers

return {}


def parse_maven_filename(filename):
"""Parse a Maven's jar filename to extract artifactId and version."""
# Remove the .jar extension
base = filename.rsplit(".", 1)[0]

# Common classifiers pattern
common_classifiers = {
"sources",
"javadoc",
"tests",
"test",
"test-sources",
"src",
"bin",
"docs",
"javadocs",
"client",
"server",
"linux",
"windows",
"macos",
"linux-x86_64",
"windows-x86_64",
}

# Remove known classifier if present
for classifier in common_classifiers:
if base.endswith(f"-{classifier}"):
base = base[: -(len(classifier) + 1)]
break

# Match artifactId and version
match = re.match(r"^(.*?)-((\d[\w.\-]*))$", base)

if match:
artifact_id = match.group(1)
version = match.group(2)
print("artifact_id", artifact_id)
print("version", version)
return artifact_id, version
else:
return None, None


def get_pom_url_list(input_source, packages):
"""Generate Maven POM URLs from package metadata or input source."""
pom_url_list = []
if packages:
for package in packages:
package_ns = package.get("namespace", "")
package_name = package.get("name", "")
package_version = package.get("version", "")
pom_url = (
f"https://repo1.maven.org/maven2/{package_ns.replace('.', '/')}/"
f"{package_name}/{package_version}/"
f"{package_name}-{package_version}.pom".lower()
)
pom_url_list.append(pom_url)
else:
from urllib.parse import urlparse

# Check what's the input source
input_source_url = input_source.get("download_url", "")

parsed_url = urlparse(input_source_url)
if input_source_url and parsed_url.netloc.endswith("maven.org"):

Check failure

Code scanning / CodeQL

Incomplete URL substring sanitization High

The string
maven.org
may be at an arbitrary position in the sanitized URL.

Copilot Autofix

AI 3 days ago

To fix the problem, the code should validate the URL host more strictly by parsing the host and ensuring it matches a whitelist of allowed hosts for Maven repositories. The safest approach is to define a set of permitted Maven repository hostnames (such as repo1.maven.org or search.maven.org) and check the parsed hostname against this whitelist. This change should be made in the relevant block in the get_pom_url_list function (lines 594–601). Additional context: ensure case-insensitive comparison and that the check only acts on the hostname portion (already parsed as parsed_url.netloc or parsed_url.hostname). You may need to replace .endswith("maven.org") with an exact match or a match on known subdomains (e.g., using in with a whitelist).

In terms of implementation, you may need to:

  • Define an allowed hostnames list (e.g., {"repo1.maven.org", "search.maven.org"}).
  • Use parsed_url.hostname for comparison.
  • Do a strict compare (in allowed_hosts), rather than a substring or suffix check.
  • No additional imports are needed.

Edit only the code shown in file scanpipe/pipes/resolve.py, lines 575–613.

Suggested changeset 1
scanpipe/pipes/resolve.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/scanpipe/pipes/resolve.py b/scanpipe/pipes/resolve.py
--- a/scanpipe/pipes/resolve.py
+++ b/scanpipe/pipes/resolve.py
@@ -592,7 +592,8 @@
         input_source_url = input_source.get("download_url", "")
 
         parsed_url = urlparse(input_source_url)
-        if input_source_url and parsed_url.netloc.endswith("maven.org"):
+        allowed_hosts = {"repo1.maven.org", "search.maven.org"}
+        if input_source_url and parsed_url.hostname and parsed_url.hostname.lower() in allowed_hosts:
             base_url = input_source_url.rsplit("/", 1)[0]
             pom_url = (
                 base_url + "/" + "-".join(base_url.rstrip("/").split("/")[-2:]) + ".pom"
EOF
@@ -592,7 +592,8 @@
input_source_url = input_source.get("download_url", "")

parsed_url = urlparse(input_source_url)
if input_source_url and parsed_url.netloc.endswith("maven.org"):
allowed_hosts = {"repo1.maven.org", "search.maven.org"}
if input_source_url and parsed_url.hostname and parsed_url.hostname.lower() in allowed_hosts:
base_url = input_source_url.rsplit("/", 1)[0]
pom_url = (
base_url + "/" + "-".join(base_url.rstrip("/").split("/")[-2:]) + ".pom"
Copilot is powered by AI and may make mistakes. Always verify output.
base_url = input_source_url.rsplit("/", 1)[0]
pom_url = (
base_url + "/" + "-".join(base_url.rstrip("/").split("/")[-2:]) + ".pom"
)
pom_url_list.append(pom_url)
else:
# Construct a pom_url from filename
input_filename = input_source.get("filename", "")
if input_filename.endswith(".jar"):
artifact_id, version = parse_maven_filename(input_filename)
if not artifact_id or not version:
return []
pom_url_list = construct_pom_url_from_filename(artifact_id, version)
else:
# Only work with input that's a .jar file
return []

return pom_url_list


def construct_pom_url_from_filename(artifact_id, version):
"""Construct a pom.xml URL from the given Maven filename."""
# Search Maven Central for the artifact to get its groupId
url = f"https://search.maven.org/solrsearch/select?q=a:{artifact_id}&wt=json"
pom_url_list = []
group_ids = []
try:
response = requests.get(url, timeout=5)
response.raise_for_status()
data = response.json()
# Extract all 'g' fields from the docs array that represent
# groupIds
group_ids = [doc["g"] for doc in data["response"]["docs"]]
except requests.RequestException as e:
print(f"Error fetching data: {e}")
return []
except KeyError as e:
print(f"Error parsing JSON: {e}")
return []

for group_id in group_ids:
pom_url = (
f"https://repo1.maven.org/maven2/{group_id.replace('.', '/')}/"
f"{artifact_id}/{version}/{artifact_id}-{version}.pom".lower()
)
if is_maven_pom_url(pom_url):
pom_url_list.append(pom_url)
if len(pom_url_list) > 1:
# If multiple valid POM URLs are found, it means the same
# artifactId and version exist under different groupIds. Since we
# can't confidently determine the correct groupId, we return an
# empty list to avoid fetching the wrong POM.
return []

return pom_url_list


def is_maven_pom_url(url):
"""Return True if the url is a accessible, False otherwise"""
# Maven Central has a fallback mechanism that serves a generic/error
# page instead of returning a proper 404.
try:
response = requests.get(url, timeout=5)
if response.status_code != 200:
return False
# Check content-type
content_type = response.headers.get("content-type", "").lower()
is_xml = "xml" in content_type or "text/xml" in content_type

# Check content
content = response.text.strip()
is_pom = content.startswith("<?xml") and "<project" in content

if is_xml and is_pom:
return True
else:
# It's probably the Maven Central error page
return False
except requests.RequestException:
return False


def download_pom_files(pom_url_list):
"""Fetch the pom file from the input pom_url_list"""
pom_file_list = []
for pom_url in pom_url_list:
pom_file_dict = {}
downloaded_pom = fetch.fetch_http(pom_url)
print("download_pom.path", str(downloaded_pom.path))
pom_file_dict["pom_file_path"] = str(downloaded_pom.path)
pom_file_dict["output_path"] = str(downloaded_pom.path) + "-output.json"
pom_file_dict["pom_url"] = pom_url
pom_file_list.append(pom_file_dict)
return pom_file_list


def scan_pom_files(pom_file_list):
"""Fetch and scan the pom file from the input pom_url_list"""
scanned_pom_packages = []
scanned_pom_deps = []
for pom_file_dict in pom_file_list:
pom_file_path = pom_file_dict.get("pom_file_path", "")
scanned_pom_output_path = pom_file_dict.get("output_path", "")
pom_url = pom_file_dict.get("pom_url", "")

# Run a package scan on the fetched pom.xml
_scanning_errors = scancode.run_scan(
location=pom_file_path,
output_file=scanned_pom_output_path,
run_scan_args={
"package": True,
},
)

with open(scanned_pom_output_path) as scanned_pom_file:
scanned_pom_data = json.load(scanned_pom_file)
scanned_packages = scanned_pom_data.get("packages", [])
scanned_dependencies = scanned_pom_data.get("dependencies", [])
if scanned_packages:
for scanned_package in scanned_packages:
# Replace the 'datafile_path' with the pom_url
scanned_package["datafile_paths"] = [pom_url]
scanned_pom_packages.append(scanned_package)
if scanned_dependencies:
for scanned_dep in scanned_dependencies:
# Replace the 'datafile_path' with empty string
# See https://github.com/aboutcode-org/scancode.io/issues/1763#issuecomment-3525165830
scanned_dep["datafile_path"] = ""
scanned_pom_deps.append(scanned_dep)
return scanned_pom_packages, scanned_pom_deps
Loading
Loading