From 88d8885690ceb739d4e4885a4d1ac3dcbe3f3b1d Mon Sep 17 00:00:00 2001 From: Jaekwon Bang Date: Thu, 11 Sep 2025 17:48:42 +0900 Subject: [PATCH] Fix .jar analysis via Syft & Grype --- install_tools.py | 143 +++++ requirements.txt | 1 - setup.py | 41 ++ src/fosslight_binary/_jar_analysis.py | 662 ++++++++++++++++-------- src/fosslight_binary/binary_analysis.py | 26 +- src/fosslight_binary/install_cli.py | 56 ++ 6 files changed, 703 insertions(+), 226 deletions(-) create mode 100644 install_tools.py create mode 100644 src/fosslight_binary/install_cli.py diff --git a/install_tools.py b/install_tools.py new file mode 100644 index 0000000..78a61cb --- /dev/null +++ b/install_tools.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# Copyright (c) 2021 LG Electronics Inc. +# SPDX-License-Identifier: Apache-2.0 +""" +Standalone tool installer for syft and grype. +This script can be run independently without any package dependencies. +""" + +import platform +import tarfile +import zipfile +from pathlib import Path +from urllib.request import urlopen, urlretrieve + + +def get_platform_info(): + """Get platform and architecture information.""" + system = platform.system().lower() + machine = platform.machine().lower() + + # Normalize architecture names + if machine in ['x86_64', 'amd64']: + arch = 'amd64' + elif machine in ['aarch64', 'arm64']: + arch = 'arm64' + elif machine in ['i386', 'i686']: + arch = '386' + else: + arch = machine + + return system, arch + + +def get_latest_release_url(tool_name, system, arch): + """Get the latest release download URL for a tool.""" + import json + + # GitHub API to get latest release + api_url = f"https://api.github.com/repos/anchore/{tool_name}/releases/latest" + + try: + with urlopen(api_url) as response: + release_data = json.loads(response.read().decode()) + + # Find the right asset for our platform + for asset in release_data['assets']: + name = asset['name'].lower() + if system in name and arch in name: + if system == 'windows' and name.endswith('.zip'): + return asset['browser_download_url'] + elif system != 'windows' and name.endswith('.tar.gz'): + return asset['browser_download_url'] + + except Exception as e: + print(f"Failed to get release info for {tool_name}: {e}") + + # Fallback to direct URLs + base_urls = { + 'syft': f'https://github.com/anchore/syft/releases/latest/download/syft_{system}_{arch}', + 'grype': f'https://github.com/anchore/grype/releases/latest/download/grype_{system}_{arch}' + } + + if system == 'windows': + return f"{base_urls[tool_name]}.zip" + else: + return f"{base_urls[tool_name]}.tar.gz" + + +def install_tool(tool_name, install_dir): + """Install a tool (syft or grype) to the specified directory.""" + system, arch = get_platform_info() + + print(f"Installing {tool_name} for {system}/{arch}...") + + # Create install directory + install_path = Path(install_dir) + install_path.mkdir(parents=True, exist_ok=True) + + # Get download URL + download_url = get_latest_release_url(tool_name, system, arch) + + # Download file + if system == 'windows': + archive_name = f"{tool_name}.zip" + else: + archive_name = f"{tool_name}.tar.gz" + + archive_path = install_path / archive_name + + try: + print(f"Downloading {download_url}...") + urlretrieve(download_url, archive_path) + + # Extract archive + if system == 'windows': + with zipfile.ZipFile(archive_path, 'r') as zip_ref: + zip_ref.extractall(install_path) + else: + with tarfile.open(archive_path, 'r:gz') as tar_ref: + tar_ref.extractall(install_path) + + # Make executable (Unix systems) + if system != 'windows': + tool_binary = install_path / tool_name + if tool_binary.exists(): + tool_binary.chmod(0o755) + + # Clean up archive + archive_path.unlink() + + print(f"āœ… {tool_name} installed successfully!") + return True + + except Exception as e: + print(f"āŒ Failed to install {tool_name}: {e}") + return False + + +def install_syft_grype(): + """Install both syft and grype tools.""" + # Determine install directory + home_dir = Path.home() + install_dir = home_dir / '.local' / 'bin' + + print("Installing Syft and Grype tools...") + print(f"Install directory: {install_dir}") + + # Install both tools + syft_ok = install_tool('syft', install_dir) + grype_ok = install_tool('grype', install_dir) + + if syft_ok and grype_ok: + print("šŸŽ‰ Both tools installed successfully!") + print(f"Make sure {install_dir} is in your PATH") + return True + else: + print("āš ļø Some tools failed to install") + return False + + +if __name__ == '__main__': + install_syft_grype() diff --git a/requirements.txt b/requirements.txt index fe5a598..9cff919 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,4 +9,3 @@ pytz XlsxWriter PyYAML fosslight_util>=2.1.13 -dependency-check diff --git a/setup.py b/setup.py index 5e206ea..3ec50cf 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,40 @@ from codecs import open import os import shutil +import subprocess +import sys from setuptools import setup, find_packages +from setuptools.command.install import install + + +class PostInstallCommand(install): + """Post-installation for installation mode.""" + def run(self): + install.run(self) + + # Skip auto-install if explicitly disabled + if os.environ.get('FOSSLIGHT_SKIP_AUTO_INSTALL', '').lower() in ('1', 'true', 'yes'): + print("Auto-install disabled by environment variable") + return + + # Install syft and grype using standalone installer + try: + print("Installing syft and grype...") + # Use standalone installer script - no package dependencies! + script_path = os.path.join(os.path.dirname(__file__), 'install_tools.py') + if os.path.exists(script_path): + result = subprocess.run([sys.executable, script_path], + capture_output=True, text=True) + if result.returncode == 0: + print("Syft and grype installation completed.") + else: + print(f"Warning: Tool installation failed: {result.stderr}") + else: + print("Warning: install_tools.py not found, skipping auto-install") + except Exception as e: + print(f"Warning: Failed to auto-install syft/grype: {e}") + print("You can install them manually or they will be installed on first use.") + with open('README.md', 'r', 'utf-8') as f: readme = f.read() @@ -63,11 +96,19 @@ }, package_data={_PACKAEG_NAME: [os.path.join(_LICENSE_DIR, '*')]}, include_package_data=True, + # Include install_tools.py in the package + data_files=[ + ('', ['install_tools.py']), + ], + cmdclass={ + 'install': PostInstallCommand, + }, entry_points={ "console_scripts": [ "binary_analysis = fosslight_binary.cli:main", "fosslight_bin = fosslight_binary.cli:main", "fosslight_binary = fosslight_binary.cli:main", + "fosslight_install_tools = fosslight_binary.install_cli:main", ] } ) diff --git a/src/fosslight_binary/_jar_analysis.py b/src/fosslight_binary/_jar_analysis.py index c4f2043..06a280e 100644 --- a/src/fosslight_binary/_jar_analysis.py +++ b/src/fosslight_binary/_jar_analysis.py @@ -6,55 +6,272 @@ import logging import json import os -import sys +import subprocess +import platform +import tempfile +import tarfile +import shutil import fosslight_util.constant as constant from ._binary import BinaryItem, VulnerabilityItem, is_package_dir from fosslight_util.oss_item import OssItem -from dependency_check import run as dependency_check_run - +import urllib.request logger = logging.getLogger(constant.LOGGER_NAME) -def run_analysis(params, func): +def install_syft(): + """Install syft binary""" try: - sys.argv = params - func() - except SystemExit: - pass + system = platform.system().lower() + arch = platform.machine().lower() + + # Map architecture names + if arch in ['x86_64', 'amd64']: + arch = 'amd64' + elif arch in ['aarch64', 'arm64']: + arch = 'arm64' + else: + logger.error(f"Unsupported architecture: {arch}") + return False + + # Get Syft version + version = "v1.29.0" # Updated to latest version + + # Download URL + filename = f"syft_{version[1:]}_{system}_{arch}.tar.gz" + url = f"https://github.com/anchore/syft/releases/download/{version}/{filename}" + + logger.info(f"Downloading syft from {url}") + + with tempfile.TemporaryDirectory() as temp_dir: + tar_path = os.path.join(temp_dir, filename) + urllib.request.urlretrieve(url, tar_path) + + # Extract tar.gz + with tarfile.open(tar_path, 'r:gz') as tar: + tar.extractall(temp_dir) + + # Find syft binary and copy to local bin + syft_bin = os.path.join(temp_dir, 'syft') + local_bin_dir = os.path.expanduser('~/.local/bin') + os.makedirs(local_bin_dir, exist_ok=True) + + shutil.copy2(syft_bin, os.path.join(local_bin_dir, 'syft')) + os.chmod(os.path.join(local_bin_dir, 'syft'), 0o755) + + logger.info("Syft installed successfully") + return True + except Exception as ex: - logger.error(f"Run Analysis : {ex}") - - -def get_oss_ver(version_info): - oss_version = "" - if version_info.get('source') == 'central': - if version_info.get('name') == 'version': - oss_version = version_info.get('value') - elif version_info.get('source') == 'pom': - if version_info.get('name') == 'version': - oss_version = version_info.get('value') - elif version_info.get('source', "").lower() == 'manifest': - if version_info.get('name') == 'Implementation-Version' or version_info.get('name') == 'Bundle-Version': - oss_version = version_info.get('value') - return oss_version - - -def get_oss_lic_in_jar(data): - license = "" - license_raw = str(data.get("license")) - split_lic = license_raw.split(':')[0] - - # Not NoneType but string 'None' - if license_raw == "None": - license = "" - else: - if not split_lic.startswith('http'): - license = split_lic.replace(',', '') + logger.error(f"Failed to install syft: {ex}") + return False + + +def install_grype(): + """Install grype binary""" + try: + system = platform.system().lower() + arch = platform.machine().lower() + + # Map architecture names + if arch in ['x86_64', 'amd64']: + arch = 'amd64' + elif arch in ['aarch64', 'arm64']: + arch = 'arm64' else: - license = license_raw + logger.error(f"Unsupported architecture: {arch}") + return False + + # Grype version + version = "v0.84.0" # Updated to latest version + + # Download URL + filename = f"grype_{version[1:]}_{system}_{arch}.tar.gz" + url = f"https://github.com/anchore/grype/releases/download/{version}/{filename}" + + logger.info(f"Downloading grype from {url}") + + with tempfile.TemporaryDirectory() as temp_dir: + tar_path = os.path.join(temp_dir, filename) + urllib.request.urlretrieve(url, tar_path) - return license + # Extract tar.gz + with tarfile.open(tar_path, 'r:gz') as tar: + tar.extractall(temp_dir) + + # Find grype binary and copy to local bin + grype_bin = os.path.join(temp_dir, 'grype') + local_bin_dir = os.path.expanduser('~/.local/bin') + os.makedirs(local_bin_dir, exist_ok=True) + + shutil.copy2(grype_bin, os.path.join(local_bin_dir, 'grype')) + os.chmod(os.path.join(local_bin_dir, 'grype'), 0o755) + + logger.info("Grype installed successfully") + return True + + except Exception as ex: + logger.error(f"Failed to install grype: {ex}") + return False + + +def ensure_grype(): + """Ensure grype is installed and available""" + try: + # Try grype in PATH first + result = subprocess.run(['grype', '--version'], capture_output=True, text=True) + if result.returncode == 0: + return True + except FileNotFoundError: + pass + + # Try local installation + local_grype = os.path.expanduser('~/.local/bin/grype') + if os.path.exists(local_grype): + try: + result = subprocess.run([local_grype, '--version'], capture_output=True, text=True) + if result.returncode == 0: + return True + except Exception: + pass + + # Install grype if not found + logger.info("Grype not found, installing...") + return install_grype() + + +def ensure_syft_grype(): + """Ensure syft and grype are installed and available""" + def check_command(cmd): + try: + # Check in PATH first + subprocess.run([cmd, '--version'], capture_output=True, check=True) + return True + except (subprocess.CalledProcessError, FileNotFoundError): + # Check in ~/.local/bin + local_bin = os.path.expanduser(f'~/.local/bin/{cmd}') + if os.path.exists(local_bin): + try: + subprocess.run([local_bin, '--version'], capture_output=True, check=True) + return True + except subprocess.CalledProcessError: + pass + return False + + # Check and install syft + if not check_command('syft'): + logger.info("Syft not found. Installing...") + if not install_syft(): + logger.error("Failed to install syft") + return False + + # Check and install grype + if not check_command('grype'): + logger.info("Grype not found. Installing...") + if not install_grype(): + logger.error("Failed to install grype") + return False + + return True + + +def run_syft_analysis(jar_files, output_dir): + """Run syft to generate SBOM for multiple jar files""" + output_file = os.path.join(output_dir, 'syft-report.json') + + # Try syft in PATH first, then local bin + syft_cmd = 'syft' + try: + subprocess.run([syft_cmd, '--version'], capture_output=True, check=True) + except (subprocess.CalledProcessError, FileNotFoundError): + syft_cmd = os.path.expanduser('~/.local/bin/syft') + + try: + # Create temp dir and copy jar files (symlinks don't work well with new syft version) + temp_dir = tempfile.mkdtemp() + try: + for jar_file in jar_files: + # Copy jar file + jar_basename = os.path.basename(jar_file) + dest_path = os.path.join(temp_dir, jar_basename) + shutil.copy2(jar_file, dest_path) + + # Use java-archive-cataloger specifically for jar files + cmd = [syft_cmd, f'dir:{temp_dir}', + '--override-default-catalogers', 'java-archive-cataloger', + '-o', f'json={output_file}'] + + logger.debug(f"Running syft command: {' '.join(cmd)}") + subprocess.run(cmd, capture_output=True, text=True, check=True) + + if os.path.exists(output_file): + logger.debug(f"Syft analysis completed: {output_file}") + return output_file + else: + logger.error("Syft analysis failed: output file not created") + return None + finally: + # Clean up temp directory + shutil.rmtree(temp_dir, ignore_errors=True) + + except subprocess.CalledProcessError as e: + logger.error(f"Syft analysis failed: {e.stderr}") + return None + except Exception as e: + logger.error(f"Error running syft analysis: {str(e)}") + return None + + +def run_grype_analysis(jar_files, output_dir, syft_report_file=None): + """Run grype to scan vulnerabilities for multiple jar files using syft SBOM if available""" + output_file = os.path.join(output_dir, 'grype-report.json') + + # Try grype in PATH first, then local bin + grype_cmd = 'grype' + try: + subprocess.run([grype_cmd, '--version'], capture_output=True, check=True) + except (subprocess.CalledProcessError, FileNotFoundError): + grype_cmd = os.path.expanduser('~/.local/bin/grype') + + try: + # Use syft SBOM if available, otherwise scan jar files directly + if syft_report_file and os.path.exists(syft_report_file): + input_source = f'sbom:{syft_report_file}' + logger.debug(f"Running grype with syft SBOM: {syft_report_file}") + cmd = [grype_cmd, input_source, '-o', f'json={output_file}'] + else: + logger.debug("Can't find syft report file. Scanning jar files directly.") + # Create temp dir and copy jar files for unified processing + temp_dir = tempfile.mkdtemp() + try: + for jar_file in jar_files: + # Copy jar file instead of symlinking + jar_basename = os.path.basename(jar_file) + dest_path = os.path.join(temp_dir, jar_basename) + shutil.copy2(jar_file, dest_path) + + # Scan jar files directly without deprecated flags + cmd = [grype_cmd, f'dir:{temp_dir}', + '-o', f'json={output_file}'] + + logger.debug(f"Running grype command: {' '.join(cmd)}") + subprocess.run(cmd, capture_output=True, text=True, check=True) + + logger.debug(f"Grype analysis completed for {len(jar_files)} jar files") + return output_file + finally: + # Clean up temp directory + shutil.rmtree(temp_dir, ignore_errors=True) + + # For SBOM case + logger.debug(f"Running grype command: {' '.join(cmd)}") + subprocess.run(cmd, capture_output=True, text=True, check=True) + + logger.debug(f"Grype analysis completed for {len(jar_files)} jar files") + return output_file + except subprocess.CalledProcessError as ex: + logger.error(f"Error running grype analysis: {ex.stderr}") + return None def merge_oss_and_vul_items(bin, key, oss_list, vulnerability_items): @@ -63,11 +280,11 @@ def merge_oss_and_vul_items(bin, key, oss_list, vulnerability_items): bin.vulnerability_items.extend(vulnerability_items.get(key, [])) -def merge_binary_list(owasp_items, vulnerability_items, bin_list): +def merge_binary_list(syft_grype_items, vulnerability_items, bin_list): not_found_bin = [] # key : file_path / value : {"oss_list": [oss], "sha1": sha1} for one binary - for key, value in owasp_items.items(): + for key, value in syft_grype_items.items(): found = False oss_list = value["oss_list"] sha1 = value.get("sha1", "") @@ -76,7 +293,7 @@ def merge_binary_list(owasp_items, vulnerability_items, bin_list): found = True for oss in oss_list: if oss.name and oss.license: - bin.found_in_owasp = True + bin.found_in_syft = True break merge_oss_and_vul_items(bin, key, oss_list, vulnerability_items) else: @@ -99,202 +316,209 @@ def merge_binary_list(owasp_items, vulnerability_items, bin_list): return bin_list -def get_vulnerability_info(file_with_path, vulnerability, vulnerability_items, remove_vulnerability_items): - if vulnerability: - try: - for vul_info in vulnerability: - vul_id = "" - nvd_url = "" - for key, val in vul_info.items(): - if key == 'id': - vul_id = val - elif key == 'url': - nvd_url = val - - vul_item = VulnerabilityItem(file_with_path, vul_id, nvd_url) - - remove_vulnerability_items = vulnerability_items.get(file_with_path) - if remove_vulnerability_items: - remove_vulnerability_items.append(vul_item) - else: - vulnerability_items[file_with_path] = [vul_item] - except Exception as ex: - logger.info(f"Error to get vul_id and nvd_url: {ex}") - - return vulnerability_items - +def parse_syft_report(syft_json_file): + """Parse syft SBOM report to extract OSS information""" + oss_items = {} -def get_oss_groupid(evidence_info): - oss_groupid = "" - # First, Get groupid from Central, else get it from pom - if evidence_info.get('source') == 'central': - if evidence_info.get('name') == 'groupid': - oss_groupid = evidence_info.get('value') - elif evidence_info.get('source') == 'pom': - if evidence_info.get('name') == 'groupid': - oss_groupid = evidence_info.get('value') - return oss_groupid - - -def get_oss_artifactid(evidence_info): - oss_artifactid = "" - # Get OSS Info from POM - if evidence_info.get('source') == 'pom': - if evidence_info.get('name') == 'artifactid': - oss_artifactid = evidence_info.get('value') - return oss_artifactid + try: + with open(syft_json_file, 'r') as f: + syft_data = json.load(f) + + artifacts = syft_data.get('artifacts', []) + + for artifact in artifacts: + name = artifact.get('name', '') + version = artifact.get('version', '') + purl = artifact.get('purl', '') + licenses = artifact.get('licenses', []) + + # Extract license information + license_str = '' + if licenses: + license_str = ', '.join([lic.get('value', '') for lic in licenses if lic.get('value')]) + + # Get download URL from PURL if available + download_url = '' + if purl: + download_url = purl + + # Use locations to map to file paths + locations = artifact.get('locations', []) + for location in locations: + file_path = location.get('path', '') + if file_path.endswith('.jar'): + file_path = os.path.basename(file_path) + + oss = OssItem(name, version, license_str, download_url) + oss.comment = "Syft result" + + if file_path in oss_items: + oss_items[file_path]["oss_list"].append(oss) + else: + oss_items[file_path] = { + "oss_list": [oss], + "sha1": "" + } + except Exception as ex: + logger.error(f"Error parsing syft report: {ex}") -def get_oss_dl_url(evidence_info): - oss_dl_url = "" - if evidence_info.get('name') == 'url': - oss_dl_url = evidence_info.get('value') - return oss_dl_url + return oss_items -def get_oss_info_from_pkg_info(pkg_info): - oss_name = "" - oss_version = "" +def parse_grype_report(grype_json_file): + """Parse grype vulnerability report""" + vulnerability_items = {} try: - if pkg_info.get('id') != "": - # Get OSS Name - if pkg_info.get('id').startswith('pkg:maven'): - # ex, pkg:maven/com.hankcs/aho-corasick-double-array-trie@1.2.3 - oss_name = pkg_info.get('id').split('@')[0] - oss_name = f"{oss_name.split('/')[-2]}:{oss_name.split('/')[-1]}" - elif pkg_info.get('id').startswith('pkg:npm'): - # ex, pkg:npm/cryptiles@0.2.2 - oss_name = pkg_info.get('id').split('@')[0] - oss_name = oss_name.replace('pkg:npm', 'npm') - oss_name = oss_name.replace('/', ':') - else: - oss_name = pkg_info.get('id').split('@')[0] - oss_name = oss_name.split('/')[-1] - # Get OSS Version - oss_version = pkg_info.get('id').split('@')[1] + with open(grype_json_file, 'r') as f: + grype_data = json.load(f) + + matches = grype_data.get('matches', []) + + for match in matches: + vulnerability = match.get('vulnerability', {}) + artifact = match.get('artifact', {}) + + vul_id = vulnerability.get('id', '') + + # Try to get NVD URL from related vulnerabilities first + nvd_url = '' + related_vulnerabilities = match.get('relatedVulnerabilities', []) + for related_vul in related_vulnerabilities: + related_id = related_vul.get('id', '') + related_datasource = related_vul.get('dataSource', '') + if 'cve' in related_id.lower() and 'nvd' in related_datasource.lower(): + nvd_url = related_datasource + vul_id = related_id # Use CVE ID instead of GHSA + break + + # Get artifact location + locations = artifact.get('locations', []) + for location in locations: + file_path = location.get('path', '') + if file_path.endswith('.jar'): + file_path = os.path.basename(file_path) + + vul_item = VulnerabilityItem(file_path, vul_id, nvd_url) + + if file_path in vulnerability_items: + vulnerability_items[file_path].append(vul_item) + else: + vulnerability_items[file_path] = [vul_item] + except Exception as ex: - logger.debug(f"Error to get value for oss name and version: {ex}") - return oss_name, oss_version + logger.error(f"Error parsing grype report: {ex}") + + return vulnerability_items def analyze_jar_file(path_to_find_bin, path_to_exclude): - owasp_items = {} - remove_vulnerability_items = [] + """Analyze jar files using syft and grype""" + syft_items = {} vulnerability_items = {} success = True - json_file = "" - command = ['dependency-check', '--scan', f'{path_to_find_bin}', '--out', f'{path_to_find_bin}', - '--disableArchive', '--disableAssembly', '--disableRetireJS', '--disableNodeJS', - '--disableNodeAudit', '--disableNugetconf', '--disableNuspec', '--disableOpenSSL', - '--disableOssIndex', '--disableBundleAudit', '--cveValidForHours', '24', '-f', 'JSON'] - try: - run_analysis(command, dependency_check_run) - except Exception as ex: - logger.info(f"Error to analyze .jar file - OSS information for .jar file isn't included in report.\n {ex}") - success = False - return owasp_items, vulnerability_items, success + # Check if syft and grype are installed + if not ensure_syft_grype(): + logger.error("Syft or Grype is not installed. Cannot proceed with jar analysis.") + return syft_items, vulnerability_items, False + + # Find all jar files in the directory + jar_files = [] + for root, dirs, files in os.walk(path_to_find_bin): + for file in files: + if file.endswith('.jar'): + jar_path = os.path.join(root, file) + + # Check if jar file should be excluded + should_exclude = False + for exclude_path in path_to_exclude: + # Convert both paths to absolute paths to avoid mixing absolute and relative paths + exclude_path_abs = os.path.abspath(exclude_path) + jar_path_abs = os.path.abspath(jar_path) + + try: + if os.path.commonpath([jar_path_abs, exclude_path_abs]) == exclude_path_abs: + should_exclude = True + break + except ValueError: + # If commonpath fails, try simple path comparison + if jar_path_abs == exclude_path_abs: + should_exclude = True + break + + if not should_exclude: + jar_files.append(jar_path) + + if not jar_files: + logger.info("No jar files found for analysis") + return syft_items, vulnerability_items, True + + # Create output directory for reports in current working directory + output_dir = os.path.join(os.getcwd(), 'syft_grype_reports') + os.makedirs(output_dir, exist_ok=True) try: - json_file = os.path.join(path_to_find_bin, 'dependency-check-report.json') - with open(json_file, 'r') as f: - jar_contents = json.load(f) - except Exception as ex: - logger.debug(f"Error to read dependency-check-report.json file : {ex}") - success = False - return owasp_items, vulnerability_items, success + # Try jar analysis first for better performance + logger.info(f"Starting jar analysis of {len(jar_files)} jar files") + + # Run syft analysis + syft_report = run_syft_analysis(jar_files, output_dir) + if syft_report: + jar_oss_items = parse_syft_report(syft_report) + # Merge OSS items for all jar files + for file_path, data in jar_oss_items.items(): + # Find matching jar file by basename + matching_jar = None + for jar_file in jar_files: + if os.path.basename(jar_file) == file_path: + matching_jar = jar_file + break - dependencies = jar_contents.get("dependencies", []) + if matching_jar: + relative_path = os.path.relpath(matching_jar, path_to_find_bin) + if relative_path in syft_items: + syft_items[relative_path]["oss_list"].extend(data["oss_list"]) + else: + syft_items[relative_path] = data + + # Run grype analysis using syft SBOM if available + grype_report = run_grype_analysis(jar_files, output_dir, syft_report) + if grype_report: + jar_vul_items = parse_grype_report(grype_report) + # Merge vulnerability items for all jar files + for file_path, vul_list in jar_vul_items.items(): + # Find matching jar file by basename + matching_jar = None + for jar_file in jar_files: + if os.path.basename(jar_file) == file_path: + matching_jar = jar_file + break - try: - for val in dependencies: - bin_with_path = "" - oss_name = "" - oss_ver = "" - oss_artifactid = "" - oss_groupid = "" - oss_dl_url = "" - oss_license = get_oss_lic_in_jar(val) - oss_name_found = False - - sha1 = val.get("sha1", "") - - all_evidence = val.get("evidenceCollected", {}) - vulnerability = val.get("vulnerabilityIds", []) - all_pkg_info = val.get("packages", []) - - vendor_evidences = all_evidence.get('vendorEvidence', []) - version_evidences = all_evidence.get('versionEvidence', []) - - # Check if the file is .jar file - # Even if the oss info is from pom.xml in jar file, the file name will be .jar file. - # But the oss info from pom.xml could be different from .jar file. - bin_with_path = val.get("filePath") - - if any(os.path.commonpath([bin_with_path, exclude_path]) == exclude_path - for exclude_path in path_to_exclude): - continue + if matching_jar: + relative_path = os.path.relpath(matching_jar, path_to_find_bin) + if relative_path in vulnerability_items: + vulnerability_items[relative_path].extend(vul_list) + else: + vulnerability_items[relative_path] = vul_list - if not bin_with_path.endswith('.jar'): - bin_with_path = bin_with_path.split('.jar')[0] + '.jar' - - file_with_path = os.path.relpath(bin_with_path, path_to_find_bin) - - # First, Get OSS Name and Version info from pkg_info - for pkg_info in all_pkg_info: - oss_name, oss_ver = get_oss_info_from_pkg_info(pkg_info) - - if oss_name == "" and oss_ver == "": - # If can't find name and version, Find thoes in vendorEvidence and versionEvidence . - # Get Version info from versionEvidence - for version_info in version_evidences: - oss_ver = get_oss_ver(version_info) - - # Get Artifact ID, Group ID, OSS Name from vendorEvidence - for vendor_info in vendor_evidences: - if oss_groupid == "": - oss_groupid = get_oss_groupid(vendor_info) - if oss_artifactid == "": - oss_artifactid = get_oss_artifactid(vendor_info) - if oss_dl_url == "": - oss_dl_url = get_oss_dl_url(vendor_info) - # Combine groupid and artifactid - if oss_artifactid != "" and oss_groupid != "": - oss_name = f"{oss_groupid}:{oss_artifactid}" - oss_name_found = True - # If oss_name is found, break - if oss_name_found: - break - else: - # Get only dl_url from vendorEvidence - for vendor_info in vendor_evidences: - if oss_dl_url == "": - oss_dl_url = get_oss_dl_url(vendor_info) - - # Get Vulnerability Info. - vulnerability_items = get_vulnerability_info(file_with_path, vulnerability, vulnerability_items, remove_vulnerability_items) - - if oss_name or oss_license or oss_dl_url: - oss = OssItem(oss_name, oss_ver, oss_license, oss_dl_url) - oss.comment = "OWASP result" - - if file_with_path in owasp_items: - owasp_items[file_with_path]["oss_list"].append(oss) - # Update sha1 if not already set or if current sha1 is empty - if not owasp_items[file_with_path]["sha1"] and sha1: - owasp_items[file_with_path]["sha1"] = sha1 - else: - owasp_items[file_with_path] = { - "oss_list": [oss], - "sha1": sha1 - } - except Exception as ex: - logger.debug(f"Error to get dependency Info in jar_contents: {ex}") + if not syft_report or not grype_report: + logger.info("Failed to analyze jar files") - try: - if os.path.isfile(json_file): - os.remove(json_file) except Exception as ex: - logger.debug(f"Error - There is no .json file : {ex}") - return owasp_items, vulnerability_items, success + logger.error(f"Error during jar analysis: {ex}") + success = False + + finally: + # Clean up syft_grype_reports directory in current working directory + try: + syft_grype_reports_dir = os.path.join(os.getcwd(), 'syft_grype_reports') + if os.path.exists(syft_grype_reports_dir): + shutil.rmtree(syft_grype_reports_dir) + logger.debug(f"Cleaned up report directory: {syft_grype_reports_dir}") + except Exception as ex: + logger.debug(f"Error cleaning up report files: {ex}") + + logger.info("Completed jar analysis.") + return syft_items, vulnerability_items, success diff --git a/src/fosslight_binary/binary_analysis.py b/src/fosslight_binary/binary_analysis.py index 9ef5f57..452bbb7 100755 --- a/src/fosslight_binary/binary_analysis.py +++ b/src/fosslight_binary/binary_analysis.py @@ -147,8 +147,23 @@ def get_file_list(path_to_find, abs_path_to_exclude): for file in files: file_path = os.path.join(root, file) file_abs_path = os.path.abspath(file_path) - if any(os.path.commonpath([file_abs_path, exclude_path]) == exclude_path - for exclude_path in abs_path_to_exclude): + + # Safe exclude check with better error handling + should_exclude = False + for exclude_path in abs_path_to_exclude: + try: + if os.path.commonpath([file_abs_path, exclude_path]) == exclude_path: + should_exclude = True + break + except ValueError as e: + # Handle "Can't mix absolute and relative paths" error + logger.debug(f"Path comparison error for {file_abs_path} vs {exclude_path}: {e}") + # Fallback to simple string comparison + if file_abs_path == exclude_path: + should_exclude = True + break + + if should_exclude: continue file_lower_case = file.lower() extension = os.path.splitext(file_lower_case)[1][1:].strip() @@ -247,12 +262,11 @@ def find_binaries(path_to_find_bin, output_dir, formats, dburl="", simple_mode=F scan_item = ScannerItem(PKG_NAME, start_time) scan_item.set_cover_pathinfo(path_to_find_bin, path_to_exclude) try: - # Run OWASP Dependency-check if found_jar: - logger.info("Run OWASP Dependency-check to analyze .jar file") - owasp_items, vulnerability_items, success = analyze_jar_file(path_to_find_bin, abs_path_to_exclude) + logger.info("Run Syft & Grype to analyze .jar file") + syft_grype_items, vulnerability_items, success = analyze_jar_file(path_to_find_bin, abs_path_to_exclude) if success: - return_list = merge_binary_list(owasp_items, vulnerability_items, return_list) + return_list = merge_binary_list(syft_grype_items, vulnerability_items, return_list) else: logger.warning("Could not find OSS information for some jar files.") diff --git a/src/fosslight_binary/install_cli.py b/src/fosslight_binary/install_cli.py new file mode 100644 index 0000000..6bde99e --- /dev/null +++ b/src/fosslight_binary/install_cli.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# Copyright (c) 2025 LG Electronics Inc. +# SPDX-License-Identifier: Apache-2.0 + +""" +CLI interface for installing syft and grype tools manually. +""" + +import os +import subprocess +import sys + + +def main(): + """Main entry point for fosslight_install_tools command.""" + print("=== FOSSLight Binary Scanner - Tool Installer ===") + print("Installing syft and grype tools...") + + try: + # Find install_tools.py script + # Get the directory where this module is located + module_dir = os.path.dirname(os.path.abspath(__file__)) + # Go up to the project root + project_root = os.path.dirname(os.path.dirname(module_dir)) + script_path = os.path.join(project_root, 'install_tools.py') + + if os.path.exists(script_path): + print(f"Found installer script: {script_path}") + print("Running installation...") + + # Execute the installer script + result = subprocess.run([sys.executable, script_path], + capture_output=False, text=True) + + if result.returncode == 0: + print("\nāœ… Installation completed successfully!") + print("You can now use 'syft' and 'grype' commands.") + else: + print(f"\nāŒ Installation failed with exit code: {result.returncode}") + return 1 + + else: + print(f"āŒ Installer script not found at: {script_path}") + print("Please ensure install_tools.py exists in the project root.") + return 1 + + except Exception as e: + print(f"āŒ Installation failed: {e}") + return 1 + + return 0 + + +if __name__ == "__main__": + sys.exit(main())