From 9a1ffa08225d1d0f6bfce1595593b51a76f5e669 Mon Sep 17 00:00:00 2001 From: NucleonGodX Date: Fri, 1 Aug 2025 14:19:33 +0530 Subject: [PATCH 01/10] Add compliance_thresholds.py and related support files Signed-off-by: NucleonGodX --- ...se_clarity.py => compliance_thresholds.py} | 137 ++++---- scanpipe/pipes/scancode.py | 2 +- .../clarity_sample_thresholds.yml} | 0 .../scorecard_sample_thresholds.yml | 4 + .../tests/pipes/test_compliance_thresholds.py | 299 ++++++++++++++++++ scanpipe/tests/pipes/test_license_clarity.py | 162 ---------- 6 files changed, 375 insertions(+), 229 deletions(-) rename scanpipe/pipes/{license_clarity.py => compliance_thresholds.py} (56%) rename scanpipe/tests/data/{license_clarity/sample_thresholds.yml => compliance-thresholds/clarity_sample_thresholds.yml} (100%) create mode 100644 scanpipe/tests/data/compliance-thresholds/scorecard_sample_thresholds.yml create mode 100644 scanpipe/tests/pipes/test_compliance_thresholds.py delete mode 100644 scanpipe/tests/pipes/test_license_clarity.py diff --git a/scanpipe/pipes/license_clarity.py b/scanpipe/pipes/compliance_thresholds.py similarity index 56% rename from scanpipe/pipes/license_clarity.py rename to scanpipe/pipes/compliance_thresholds.py index fa835fc0eb..2f0bc4d90a 100644 --- a/scanpipe/pipes/license_clarity.py +++ b/scanpipe/pipes/compliance_thresholds.py @@ -21,22 +21,40 @@ # Visit https://github.com/nexB/scancode.io for support and download. """ -License Clarity Thresholds Management +Thresholds Management for License Clarity and Scorecard Compliance This module provides an independent mechanism to read, validate, and evaluate -license clarity score thresholds from policy files. Unlike license policies -which are applied during scan processing, clarity thresholds are evaluated -post-scan during summary generation. +both license clarity and OpenSSF Scorecard score thresholds from policy files. +Unlike license and security policies which are applied during scan processing, +these thresholds are evaluated post-scan during summary generation and compliance +assessment. -The clarity thresholds system uses a simple key-value mapping where: -- Keys are integer threshold values (minimum scores) +The thresholds system uses simple key-value mappings where: +- Keys are numeric threshold values (minimum scores) - Values are compliance alert levels ('ok', 'warning', 'error') +License Clarity Thresholds: +- Keys: integer threshold values (minimum clarity scores, 0-100 scale) +- Represents license information completeness percentage + +Scorecard Compliance Thresholds: +- Keys: numeric threshold values (minimum scorecard scores, 0-10.0 scale) +- Represents OpenSSF security assessment (higher score = better security) + Example policies.yml structure: license_clarity_thresholds: - 80: ok # Scores >= 80 get 'ok' alert + 80: ok # Scores >= 80 get 'ok' alert 50: warning # Scores 50-79 get 'warning' alert + 0: error # Scores below 50 get 'error' alert + +scorecard_score_thresholds: + 9.0: ok # Scores >= 9.0 get 'ok' alert + 7.0: warning # Scores 7.0-8.9 get 'warning' alert + 0: error # Scores below 7.0 get 'error' alert + +Both threshold types follow the same evaluation logic but are tailored to their +specific scoring systems and use cases. """ from pathlib import Path @@ -54,58 +72,50 @@ def load_yaml_content(yaml_content): raise ValidationError(f"Policies file format error: {e}") -class ClarityThresholdsPolicy: - """ - Manages clarity score thresholds and compliance evaluation. +class BaseThresholdsPolicy: + """Base class for managing score thresholds and compliance evaluation.""" - This class reads clarity thresholds from a dictionary, validates them - against threshold configurations and determines compliance alerts based on - clarity scores. - """ + YAML_KEY = None + THRESHOLD_TYPE = float + POLICY_NAME = "thresholds" def __init__(self, threshold_dict): - """Initialize with validated threshold dictionary.""" self.thresholds = self.validate_thresholds(threshold_dict) - @staticmethod - def validate_thresholds(threshold_dict): + def validate_thresholds(self, threshold_dict): if not isinstance(threshold_dict, dict): - raise ValidationError( - "The `license_clarity_thresholds` must be a dictionary" - ) + raise ValidationError(f"The `{self.YAML_KEY}` must be a dictionary") + validated = {} seen = set() for key, value in threshold_dict.items(): try: - threshold = int(key) + threshold = self.THRESHOLD_TYPE(key) except (ValueError, TypeError): - raise ValidationError(f"Threshold keys must be integers, got: {key}") + type_name = "integers" if self.THRESHOLD_TYPE == int else "numbers" + raise ValidationError(f"Threshold keys must be {type_name}, got: {key}") + if threshold in seen: raise ValidationError(f"Duplicate threshold key: {threshold}") seen.add(threshold) + if value not in ["ok", "warning", "error"]: raise ValidationError( - f"Compliance alert must be one of 'ok', 'warning', 'error', " - f"got: {value}" + f"Compliance alert must be one of 'ok', 'warning', 'error', got: {value}" ) validated[threshold] = value + sorted_keys = sorted(validated.keys(), reverse=True) if list(validated.keys()) != sorted_keys: raise ValidationError("Thresholds must be strictly descending") + return validated def get_alert_for_score(self, score): - """ - Determine compliance alert level for a given clarity score - - Returns: - str: Compliance alert level ('ok', 'warning', 'error') - - """ + """Determine compliance alert level for a given score.""" if score is None: return "error" - # Find the highest threshold that the score meets or exceeds applicable_thresholds = [t for t in self.thresholds if score >= t] if not applicable_thresholds: return "error" @@ -113,66 +123,49 @@ def get_alert_for_score(self, score): max_threshold = max(applicable_thresholds) return self.thresholds[max_threshold] - def get_thresholds_summary(self): - """ - Get a summary of configured thresholds for reporting - - Returns: - dict: Summary of thresholds and their alert levels - """ - return dict(sorted(self.thresholds.items(), reverse=True)) +# Specific implementations +class ClarityThresholdsPolicy(BaseThresholdsPolicy): + YAML_KEY = "license_clarity_thresholds" + THRESHOLD_TYPE = int + POLICY_NAME = "license clarity thresholds" -def load_clarity_thresholds_from_yaml(yaml_content): - """ - Load clarity thresholds from YAML content. +class ScorecardThresholdsPolicy(BaseThresholdsPolicy): + YAML_KEY = "scorecard_score_thresholds" + THRESHOLD_TYPE = float + POLICY_NAME = "scorecard score thresholds" - Returns: - ClarityThresholdsPolicy: Configured policy object - """ +def load_thresholds_from_yaml(yaml_content, policy_class): + """Generic function to load thresholds from YAML.""" data = load_yaml_content(yaml_content) if not isinstance(data, dict): raise ValidationError("YAML content must be a dictionary.") - if "license_clarity_thresholds" not in data: + if policy_class.YAML_KEY not in data: raise ValidationError( - "Missing 'license_clarity_thresholds' key in policies file." + f"Missing '{policy_class.YAML_KEY}' key in policies file." ) - return ClarityThresholdsPolicy(data["license_clarity_thresholds"]) - + return policy_class(data[policy_class.YAML_KEY]) -def load_clarity_thresholds_from_file(file_path): - """ - Load clarity thresholds from a YAML file. - Returns: - ClarityThresholdsPolicy: Configured policy object or None if file not found - - """ +def load_thresholds_from_file(file_path, policy_class): + """Generic function to load thresholds from file.""" file_path = Path(file_path) - if not file_path.exists(): return try: yaml_content = file_path.read_text(encoding="utf-8") - return load_clarity_thresholds_from_yaml(yaml_content) + return load_thresholds_from_yaml(yaml_content, policy_class) except (OSError, UnicodeDecodeError) as e: raise ValidationError(f"Error reading file {file_path}: {e}") def get_project_clarity_thresholds(project): - """ - Get clarity thresholds for a project using the unified policy loading logic. - - Returns: - ClarityThresholdsPolicy or None: Policy object if thresholds are configured - - """ policies_dict = project.get_policies_dict() if not policies_dict: return @@ -182,3 +175,15 @@ def get_project_clarity_thresholds(project): return return ClarityThresholdsPolicy(clarity_thresholds) + + +def get_project_scorecard_thresholds(project): + policies_dict = project.get_policies_dict() + if not policies_dict: + return + + scorecard_thresholds = policies_dict.get("scorecard_score_thresholds") + if not scorecard_thresholds: + return + + return ScorecardThresholdsPolicy(scorecard_thresholds) diff --git a/scanpipe/pipes/scancode.py b/scanpipe/pipes/scancode.py index 93b20e7710..81ce29c13a 100644 --- a/scanpipe/pipes/scancode.py +++ b/scanpipe/pipes/scancode.py @@ -60,7 +60,7 @@ from scanpipe.models import DiscoveredDependency from scanpipe.models import DiscoveredPackage from scanpipe.pipes import flag -from scanpipe.pipes.license_clarity import get_project_clarity_thresholds +from scanpipe.pipes.compliance_thresholds import get_project_clarity_thresholds logger = logging.getLogger("scanpipe.pipes") diff --git a/scanpipe/tests/data/license_clarity/sample_thresholds.yml b/scanpipe/tests/data/compliance-thresholds/clarity_sample_thresholds.yml similarity index 100% rename from scanpipe/tests/data/license_clarity/sample_thresholds.yml rename to scanpipe/tests/data/compliance-thresholds/clarity_sample_thresholds.yml diff --git a/scanpipe/tests/data/compliance-thresholds/scorecard_sample_thresholds.yml b/scanpipe/tests/data/compliance-thresholds/scorecard_sample_thresholds.yml new file mode 100644 index 0000000000..2afb5153e6 --- /dev/null +++ b/scanpipe/tests/data/compliance-thresholds/scorecard_sample_thresholds.yml @@ -0,0 +1,4 @@ +scorecard_score_thresholds: + 9: ok + 7: warning + 0: error \ No newline at end of file diff --git a/scanpipe/tests/pipes/test_compliance_thresholds.py b/scanpipe/tests/pipes/test_compliance_thresholds.py new file mode 100644 index 0000000000..5ab851dd26 --- /dev/null +++ b/scanpipe/tests/pipes/test_compliance_thresholds.py @@ -0,0 +1,299 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# http://nexb.com and https://github.com/nexB/scancode.io +# The ScanCode.io software is licensed under the Apache License version 2.0. +# Data generated with ScanCode.io is provided as-is without warranties. +# ScanCode is a trademark of nexB Inc. +# +# You may not use this software except in compliance with the License. +# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# +# Data Generated with ScanCode.io is provided on an "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, either express or implied. No content created from +# ScanCode.io should be considered or used as legal advice. Consult an Attorney +# for any legal advice. +# +# ScanCode.io is a free software code scanning tool from nexB Inc. and others. +# Visit https://github.com/nexB/scancode.io for support and download. + +from pathlib import Path + +from django.core.exceptions import ValidationError +from django.test import TestCase + +from scanpipe.pipes.compliance_thresholds import ClarityThresholdsPolicy +from scanpipe.pipes.compliance_thresholds import ScorecardThresholdsPolicy +from scanpipe.pipes.compliance_thresholds import load_thresholds_from_file +from scanpipe.pipes.compliance_thresholds import load_thresholds_from_yaml + + +class ClarityThresholdsPolicyTest(TestCase): + """Test ClarityThresholdsPolicy class functionality.""" + + data = Path(__file__).parent.parent / "data" + + def test_valid_thresholds_initialization(self): + thresholds = {80: "ok", 50: "warning", 20: "error"} + policy = ClarityThresholdsPolicy(thresholds) + self.assertEqual(policy.thresholds, thresholds) + + def test_string_keys_converted_to_integers(self): + thresholds = {"80": "ok", "50": "warning"} + policy = ClarityThresholdsPolicy(thresholds) + expected = {80: "ok", 50: "warning"} + self.assertEqual(policy.thresholds, expected) + + def test_invalid_threshold_key_raises_error(self): + with self.assertRaises(ValidationError) as cm: + ClarityThresholdsPolicy({"invalid": "ok"}) + self.assertIn("must be integers", str(cm.exception)) + + def test_invalid_alert_value_raises_error(self): + with self.assertRaises(ValidationError) as cm: + ClarityThresholdsPolicy({80: "invalid"}) + self.assertIn("must be one of 'ok', 'warning', 'error'", str(cm.exception)) + + def test_non_dict_input_raises_error(self): + with self.assertRaises(ValidationError) as cm: + ClarityThresholdsPolicy([80, 50]) + self.assertIn("must be a dictionary", str(cm.exception)) + + def test_duplicate_threshold_keys_raise_error(self): + with self.assertRaises(ValidationError) as cm: + ClarityThresholdsPolicy({80: "ok", "80": "warning"}) + self.assertIn("Duplicate threshold key", str(cm.exception)) + + def test_overlapping_thresholds_wrong_order(self): + with self.assertRaises(ValidationError) as cm: + ClarityThresholdsPolicy({70: "ok", 80: "warning"}) + self.assertIn("Thresholds must be strictly descending", str(cm.exception)) + + def test_float_threshold_keys(self): + thresholds = {80.5: "ok", 50.9: "warning"} + policy = ClarityThresholdsPolicy(thresholds) + expected = {80: "ok", 50: "warning"} + self.assertEqual(policy.thresholds, expected) + + def test_negative_threshold_values(self): + thresholds = {50: "ok", 0: "warning", -10: "error"} + policy = ClarityThresholdsPolicy(thresholds) + self.assertEqual(policy.get_alert_for_score(60), "ok") + self.assertEqual(policy.get_alert_for_score(25), "warning") + self.assertEqual(policy.get_alert_for_score(-5), "error") + self.assertEqual(policy.get_alert_for_score(-20), "error") + + def test_empty_thresholds_dict(self): + policy = ClarityThresholdsPolicy({}) + self.assertEqual(policy.get_alert_for_score(100), "error") + self.assertEqual(policy.get_alert_for_score(50), "error") + self.assertEqual(policy.get_alert_for_score(0), "error") + self.assertEqual(policy.get_alert_for_score(None), "error") + + def test_very_high_threshold_values(self): + thresholds = {150: "ok", 100: "warning"} + policy = ClarityThresholdsPolicy(thresholds) + self.assertEqual(policy.get_alert_for_score(100), "warning") + self.assertEqual(policy.get_alert_for_score(90), "error") + self.assertEqual(policy.get_alert_for_score(50), "error") + self.assertEqual(policy.get_alert_for_score(99), "error") + + # Policy logic via YAML string (mock policies.yml content) + def test_yaml_string_ok_and_warning(self): + yaml_content = """ +license_clarity_thresholds: + 90: ok + 30: warning +""" + policy = load_thresholds_from_yaml(yaml_content, ClarityThresholdsPolicy) + self.assertEqual(policy.get_alert_for_score(95), "ok") + self.assertEqual(policy.get_alert_for_score(60), "warning") + self.assertEqual(policy.get_alert_for_score(20), "error") + + def test_yaml_string_single_threshold(self): + yaml_content = """ +license_clarity_thresholds: + 80: ok +""" + policy = load_thresholds_from_yaml(yaml_content, ClarityThresholdsPolicy) + self.assertEqual(policy.get_alert_for_score(90), "ok") + self.assertEqual(policy.get_alert_for_score(79), "error") + + def test_yaml_string_invalid_alert(self): + yaml_content = """ +license_clarity_thresholds: + 80: great +""" + with self.assertRaises(ValidationError): + load_thresholds_from_yaml(yaml_content, ClarityThresholdsPolicy) + + def test_yaml_string_invalid_key(self): + yaml_content = """ +license_clarity_thresholds: + eighty: ok +""" + with self.assertRaises(ValidationError): + load_thresholds_from_yaml(yaml_content, ClarityThresholdsPolicy) + + def test_yaml_string_missing_key(self): + yaml_content = """ +license_policies: + - license_key: mit +""" + with self.assertRaises(ValidationError): + load_thresholds_from_yaml(yaml_content, ClarityThresholdsPolicy) + + def test_yaml_string_invalid_yaml(self): + yaml_content = "license_clarity_thresholds: [80, 50" + with self.assertRaises(ValidationError): + load_thresholds_from_yaml(yaml_content, ClarityThresholdsPolicy) + + def test_load_from_existing_file(self): + test_file = self.data / "compliance-thresholds" / "clarity_sample_thresholds.yml" + policy = load_thresholds_from_file(test_file, ClarityThresholdsPolicy) + self.assertIsNotNone(policy) + self.assertEqual(policy.get_alert_for_score(95), "ok") + self.assertEqual(policy.get_alert_for_score(75), "warning") + self.assertEqual(policy.get_alert_for_score(50), "error") + + def test_load_from_nonexistent_file(self): + policy = load_thresholds_from_file("/nonexistent/file.yml", ClarityThresholdsPolicy) + self.assertIsNone(policy) + + +class ScorecardThresholdsPolicyTest(TestCase): + """Test ScorecardThresholdsPolicy class functionality.""" + + data = Path(__file__).parent.parent / "data" + + def test_valid_thresholds_initialization(self): + thresholds = {9.0: "ok", 7.0: "warning", 0: "error"} + policy = ScorecardThresholdsPolicy(thresholds) + self.assertEqual(policy.thresholds, thresholds) + + def test_string_keys_converted_to_floats(self): + thresholds = {"9.0": "ok", "7.0": "warning"} + policy = ScorecardThresholdsPolicy(thresholds) + expected = {9.0: "ok", 7.0: "warning"} + self.assertEqual(policy.thresholds, expected) + + def test_invalid_threshold_key_raises_error(self): + with self.assertRaises(ValidationError) as cm: + ScorecardThresholdsPolicy({"invalid": "ok"}) + self.assertIn("must be numbers", str(cm.exception)) + + def test_invalid_alert_value_raises_error(self): + with self.assertRaises(ValidationError) as cm: + ScorecardThresholdsPolicy({9.0: "invalid"}) + self.assertIn("must be one of 'ok', 'warning', 'error'", str(cm.exception)) + + def test_non_dict_input_raises_error(self): + with self.assertRaises(ValidationError) as cm: + ScorecardThresholdsPolicy([9.0, 7.0]) + self.assertIn("must be a dictionary", str(cm.exception)) + + def test_duplicate_threshold_keys_raise_error(self): + with self.assertRaises(ValidationError) as cm: + ScorecardThresholdsPolicy({9.0: "ok", "9.0": "warning"}) + self.assertIn("Duplicate threshold key", str(cm.exception)) + + def test_overlapping_thresholds_wrong_order(self): + with self.assertRaises(ValidationError) as cm: + ScorecardThresholdsPolicy({7.0: "ok", 9.0: "warning"}) + self.assertIn("Thresholds must be strictly descending", str(cm.exception)) + + def test_float_threshold_keys(self): + thresholds = {9.5: "ok", 7.9: "warning"} + policy = ScorecardThresholdsPolicy(thresholds) + expected = {9.5: "ok", 7.9: "warning"} + self.assertEqual(policy.thresholds, expected) + + def test_negative_threshold_values(self): + thresholds = {5.0: "ok", 0: "warning", -1.0: "error"} + policy = ScorecardThresholdsPolicy(thresholds) + self.assertEqual(policy.get_alert_for_score(6.0), "ok") + self.assertEqual(policy.get_alert_for_score(2.5), "warning") + self.assertEqual(policy.get_alert_for_score(-0.5), "error") + self.assertEqual(policy.get_alert_for_score(-2.0), "error") + + def test_empty_thresholds_dict(self): + policy = ScorecardThresholdsPolicy({}) + self.assertEqual(policy.get_alert_for_score(10.0), "error") + self.assertEqual(policy.get_alert_for_score(5.0), "error") + self.assertEqual(policy.get_alert_for_score(0), "error") + self.assertEqual(policy.get_alert_for_score(None), "error") + + def test_very_high_threshold_values(self): + thresholds = {15.0: "ok", 10.0: "warning"} + policy = ScorecardThresholdsPolicy(thresholds) + self.assertEqual(policy.get_alert_for_score(10.0), "warning") + self.assertEqual(policy.get_alert_for_score(9.0), "error") + self.assertEqual(policy.get_alert_for_score(5.0), "error") + self.assertEqual(policy.get_alert_for_score(9.9), "error") + + # Policy logic via YAML string (mock policies.yml content) + def test_yaml_string_ok_and_warning(self): + yaml_content = """ +scorecard_score_thresholds: + 9.0: ok + 7.0: warning + 0: error +""" + policy = load_thresholds_from_yaml(yaml_content, ScorecardThresholdsPolicy) + self.assertEqual(policy.get_alert_for_score(9.5), "ok") + self.assertEqual(policy.get_alert_for_score(8.0), "warning") + self.assertEqual(policy.get_alert_for_score(2.0), "error") + + def test_yaml_string_single_threshold(self): + yaml_content = """ +scorecard_score_thresholds: + 8.0: ok +""" + policy = load_thresholds_from_yaml(yaml_content, ScorecardThresholdsPolicy) + self.assertEqual(policy.get_alert_for_score(9.0), "ok") + self.assertEqual(policy.get_alert_for_score(7.9), "error") + + def test_yaml_string_invalid_alert(self): + yaml_content = """ +scorecard_score_thresholds: + 8.0: great +""" + with self.assertRaises(ValidationError): + load_thresholds_from_yaml(yaml_content, ScorecardThresholdsPolicy) + + def test_yaml_string_invalid_key(self): + yaml_content = """ +scorecard_score_thresholds: + nine: ok +""" + with self.assertRaises(ValidationError): + load_thresholds_from_yaml(yaml_content, ScorecardThresholdsPolicy) + + def test_yaml_string_missing_key(self): + yaml_content = """ +license_policies: + - license_key: mit +""" + with self.assertRaises(ValidationError): + load_thresholds_from_yaml(yaml_content, ScorecardThresholdsPolicy) + + def test_yaml_string_invalid_yaml(self): + yaml_content = "scorecard_score_thresholds: [9.0, 7.0" + with self.assertRaises(ValidationError): + load_thresholds_from_yaml(yaml_content, ScorecardThresholdsPolicy) + + def test_load_from_existing_file(self): + test_file = self.data / "compliance-thresholds" / "scorecard_sample_thresholds.yml" + policy = load_thresholds_from_file(test_file, ScorecardThresholdsPolicy) + self.assertIsNotNone(policy) + self.assertEqual(policy.get_alert_for_score(9.5), "ok") + self.assertEqual(policy.get_alert_for_score(8.0), "warning") + self.assertEqual(policy.get_alert_for_score(5.0), "error") + + def test_load_from_nonexistent_file(self): + policy = load_thresholds_from_file("/nonexistent/file.yml", ScorecardThresholdsPolicy) + self.assertIsNone(policy) + diff --git a/scanpipe/tests/pipes/test_license_clarity.py b/scanpipe/tests/pipes/test_license_clarity.py deleted file mode 100644 index f00723007b..0000000000 --- a/scanpipe/tests/pipes/test_license_clarity.py +++ /dev/null @@ -1,162 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# http://nexb.com and https://github.com/nexB/scancode.io -# The ScanCode.io software is licensed under the Apache License version 2.0. -# Data generated with ScanCode.io is provided as-is without warranties. -# ScanCode is a trademark of nexB Inc. -# -# You may not use this software except in compliance with the License. -# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software distributed -# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. -# -# Data Generated with ScanCode.io is provided on an "AS IS" BASIS, WITHOUT WARRANTIES -# OR CONDITIONS OF ANY KIND, either express or implied. No content created from -# ScanCode.io should be considered or used as legal advice. Consult an Attorney -# for any legal advice. -# -# ScanCode.io is a free software code scanning tool from nexB Inc. and others. -# Visit https://github.com/nexB/scancode.io for support and download. - -from pathlib import Path - -from django.core.exceptions import ValidationError -from django.test import TestCase - -from scanpipe.pipes.license_clarity import ClarityThresholdsPolicy -from scanpipe.pipes.license_clarity import load_clarity_thresholds_from_file -from scanpipe.pipes.license_clarity import load_clarity_thresholds_from_yaml - - -class ClarityThresholdsPolicyTest(TestCase): - data = Path(__file__).parent.parent / "data" - """Test ClarityThresholdsPolicy class functionality.""" - - def test_valid_thresholds_initialization(self): - thresholds = {80: "ok", 50: "warning", 20: "error"} - policy = ClarityThresholdsPolicy(thresholds) - self.assertEqual(policy.thresholds, thresholds) - - def test_string_keys_converted_to_integers(self): - thresholds = {"80": "ok", "50": "warning"} - policy = ClarityThresholdsPolicy(thresholds) - expected = {80: "ok", 50: "warning"} - self.assertEqual(policy.thresholds, expected) - - def test_invalid_threshold_key_raises_error(self): - with self.assertRaises(ValidationError) as cm: - ClarityThresholdsPolicy({"invalid": "ok"}) - self.assertIn("must be integers", str(cm.exception)) - - def test_invalid_alert_value_raises_error(self): - with self.assertRaises(ValidationError) as cm: - ClarityThresholdsPolicy({80: "invalid"}) - self.assertIn("must be one of 'ok', 'warning', 'error'", str(cm.exception)) - - def test_non_dict_input_raises_error(self): - with self.assertRaises(ValidationError) as cm: - ClarityThresholdsPolicy([80, 50]) - self.assertIn("must be a dictionary", str(cm.exception)) - - def test_duplicate_threshold_keys_raise_error(self): - with self.assertRaises(ValidationError) as cm: - ClarityThresholdsPolicy({80: "ok", "80": "warning"}) - self.assertIn("Duplicate threshold key", str(cm.exception)) - - def test_overlapping_thresholds_wrong_order(self): - with self.assertRaises(ValidationError) as cm: - ClarityThresholdsPolicy({70: "ok", 80: "warning"}) - self.assertIn("Thresholds must be strictly descending", str(cm.exception)) - - def test_float_threshold_keys(self): - thresholds = {80.5: "ok", 50.9: "warning"} - policy = ClarityThresholdsPolicy(thresholds) - expected = {80: "ok", 50: "warning"} - self.assertEqual(policy.thresholds, expected) - - def test_negative_threshold_values(self): - thresholds = {50: "ok", 0: "warning", -10: "error"} - policy = ClarityThresholdsPolicy(thresholds) - self.assertEqual(policy.get_alert_for_score(60), "ok") - self.assertEqual(policy.get_alert_for_score(25), "warning") - self.assertEqual(policy.get_alert_for_score(-5), "error") - self.assertEqual(policy.get_alert_for_score(-20), "error") - - def test_empty_thresholds_dict(self): - policy = ClarityThresholdsPolicy({}) - self.assertEqual(policy.get_alert_for_score(100), "error") - self.assertEqual(policy.get_alert_for_score(50), "error") - self.assertEqual(policy.get_alert_for_score(0), "error") - self.assertEqual(policy.get_alert_for_score(None), "error") - - def test_very_high_threshold_values(self): - thresholds = {150: "ok", 100: "warning"} - policy = ClarityThresholdsPolicy(thresholds) - self.assertEqual(policy.get_alert_for_score(100), "warning") - self.assertEqual(policy.get_alert_for_score(90), "error") - self.assertEqual(policy.get_alert_for_score(50), "error") - self.assertEqual(policy.get_alert_for_score(99), "error") - - # Policy logic via YAML string (mock policies.yml content) - def test_yaml_string_ok_and_warning(self): - yaml_content = """ -license_clarity_thresholds: - 90: ok - 30: warning -""" - policy = load_clarity_thresholds_from_yaml(yaml_content) - self.assertEqual(policy.get_alert_for_score(95), "ok") - self.assertEqual(policy.get_alert_for_score(60), "warning") - self.assertEqual(policy.get_alert_for_score(20), "error") - - def test_yaml_string_single_threshold(self): - yaml_content = """ -license_clarity_thresholds: - 80: ok -""" - policy = load_clarity_thresholds_from_yaml(yaml_content) - self.assertEqual(policy.get_alert_for_score(90), "ok") - self.assertEqual(policy.get_alert_for_score(79), "error") - - def test_yaml_string_invalid_alert(self): - yaml_content = """ -license_clarity_thresholds: - 80: great -""" - with self.assertRaises(ValidationError): - load_clarity_thresholds_from_yaml(yaml_content) - - def test_yaml_string_invalid_key(self): - yaml_content = """ -license_clarity_thresholds: - eighty: ok -""" - with self.assertRaises(ValidationError): - load_clarity_thresholds_from_yaml(yaml_content) - - def test_yaml_string_missing_key(self): - yaml_content = """ -license_policies: - - license_key: mit -""" - with self.assertRaises(ValidationError): - load_clarity_thresholds_from_yaml(yaml_content) - - def test_yaml_string_invalid_yaml(self): - yaml_content = "license_clarity_thresholds: [80, 50" - with self.assertRaises(ValidationError): - load_clarity_thresholds_from_yaml(yaml_content) - - def test_load_from_existing_file(self): - test_file = self.data / "license_clarity" / "sample_thresholds.yml" - policy = load_clarity_thresholds_from_file(test_file) - self.assertIsNotNone(policy) - self.assertEqual(policy.get_alert_for_score(95), "ok") - self.assertEqual(policy.get_alert_for_score(75), "warning") - self.assertEqual(policy.get_alert_for_score(50), "error") - - def test_load_from_nonexistent_file(self): - policy = load_clarity_thresholds_from_file("/nonexistent/file.yml") - self.assertIsNone(policy) From f88a57d4b5e3eb7825e34184e76825f999d7b320 Mon Sep 17 00:00:00 2001 From: NucleonGodX Date: Fri, 1 Aug 2025 14:30:48 +0530 Subject: [PATCH 02/10] fix tests Signed-off-by: NucleonGodX --- scanpipe/pipes/compliance_thresholds.py | 11 ++++++---- .../tests/pipes/test_compliance_thresholds.py | 21 ++++++++++++------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/scanpipe/pipes/compliance_thresholds.py b/scanpipe/pipes/compliance_thresholds.py index 2f0bc4d90a..7c33066a7a 100644 --- a/scanpipe/pipes/compliance_thresholds.py +++ b/scanpipe/pipes/compliance_thresholds.py @@ -92,7 +92,9 @@ def validate_thresholds(self, threshold_dict): try: threshold = self.THRESHOLD_TYPE(key) except (ValueError, TypeError): - type_name = "integers" if self.THRESHOLD_TYPE == int else "numbers" + type_name = ( + "integers" if issubclass(self.THRESHOLD_TYPE, int) else "numbers" + ) raise ValidationError(f"Threshold keys must be {type_name}, got: {key}") if threshold in seen: @@ -101,7 +103,8 @@ def validate_thresholds(self, threshold_dict): if value not in ["ok", "warning", "error"]: raise ValidationError( - f"Compliance alert must be one of 'ok', 'warning', 'error', got: {value}" + f"Compliance alert must be one of 'ok', 'warning', 'error', " + f"got: {value}" ) validated[threshold] = value @@ -138,7 +141,7 @@ class ScorecardThresholdsPolicy(BaseThresholdsPolicy): def load_thresholds_from_yaml(yaml_content, policy_class): - """Generic function to load thresholds from YAML.""" + """Load thresholds from YAML.""" data = load_yaml_content(yaml_content) if not isinstance(data, dict): @@ -153,7 +156,7 @@ def load_thresholds_from_yaml(yaml_content, policy_class): def load_thresholds_from_file(file_path, policy_class): - """Generic function to load thresholds from file.""" + """Load thresholds from file.""" file_path = Path(file_path) if not file_path.exists(): return diff --git a/scanpipe/tests/pipes/test_compliance_thresholds.py b/scanpipe/tests/pipes/test_compliance_thresholds.py index 5ab851dd26..38d942019b 100644 --- a/scanpipe/tests/pipes/test_compliance_thresholds.py +++ b/scanpipe/tests/pipes/test_compliance_thresholds.py @@ -33,7 +33,7 @@ class ClarityThresholdsPolicyTest(TestCase): """Test ClarityThresholdsPolicy class functionality.""" - + data = Path(__file__).parent.parent / "data" def test_valid_thresholds_initialization(self): @@ -152,7 +152,9 @@ def test_yaml_string_invalid_yaml(self): load_thresholds_from_yaml(yaml_content, ClarityThresholdsPolicy) def test_load_from_existing_file(self): - test_file = self.data / "compliance-thresholds" / "clarity_sample_thresholds.yml" + test_file = ( + self.data / "compliance-thresholds" / "clarity_sample_thresholds.yml" + ) policy = load_thresholds_from_file(test_file, ClarityThresholdsPolicy) self.assertIsNotNone(policy) self.assertEqual(policy.get_alert_for_score(95), "ok") @@ -160,13 +162,15 @@ def test_load_from_existing_file(self): self.assertEqual(policy.get_alert_for_score(50), "error") def test_load_from_nonexistent_file(self): - policy = load_thresholds_from_file("/nonexistent/file.yml", ClarityThresholdsPolicy) + policy = load_thresholds_from_file( + "/nonexistent/file.yml", ClarityThresholdsPolicy + ) self.assertIsNone(policy) class ScorecardThresholdsPolicyTest(TestCase): """Test ScorecardThresholdsPolicy class functionality.""" - + data = Path(__file__).parent.parent / "data" def test_valid_thresholds_initialization(self): @@ -286,7 +290,9 @@ def test_yaml_string_invalid_yaml(self): load_thresholds_from_yaml(yaml_content, ScorecardThresholdsPolicy) def test_load_from_existing_file(self): - test_file = self.data / "compliance-thresholds" / "scorecard_sample_thresholds.yml" + test_file = ( + self.data / "compliance-thresholds" / "scorecard_sample_thresholds.yml" + ) policy = load_thresholds_from_file(test_file, ScorecardThresholdsPolicy) self.assertIsNotNone(policy) self.assertEqual(policy.get_alert_for_score(9.5), "ok") @@ -294,6 +300,7 @@ def test_load_from_existing_file(self): self.assertEqual(policy.get_alert_for_score(5.0), "error") def test_load_from_nonexistent_file(self): - policy = load_thresholds_from_file("/nonexistent/file.yml", ScorecardThresholdsPolicy) + policy = load_thresholds_from_file( + "/nonexistent/file.yml", ScorecardThresholdsPolicy + ) self.assertIsNone(policy) - From e63a9ab18dd9a9ee1a449bc2d7c552085216330f Mon Sep 17 00:00:00 2001 From: NucleonGodX Date: Fri, 1 Aug 2025 14:44:13 +0530 Subject: [PATCH 03/10] integrate scorecard compliance Signed-off-by: NucleonGodX --- scanpipe/api/views.py | 15 +++++++ .../management/commands/check-compliance.py | 13 +++++- scanpipe/models.py | 9 +++- scanpipe/pipelines/fetch_scores.py | 20 ++++++++- .../scanpipe/panels/project_compliance.html | 16 ++++++- scanpipe/tests/test_api.py | 18 ++++++++ scanpipe/tests/test_commands.py | 43 +++++++++++++++++++ scanpipe/views.py | 2 + 8 files changed, 130 insertions(+), 6 deletions(-) diff --git a/scanpipe/api/views.py b/scanpipe/api/views.py index 0c2baa6f8d..215ec8258b 100644 --- a/scanpipe/api/views.py +++ b/scanpipe/api/views.py @@ -497,6 +497,21 @@ def license_clarity_compliance(self, request, *args, **kwargs): clarity_alert = project.get_license_clarity_compliance_alert() return Response({"license_clarity_compliance_alert": clarity_alert}) + @action(detail=True, methods=["get"]) + def scorecard_compliance(self, request, *args, **kwargs): + """ + Retrieve the scorecard compliance alert for a project. + + This endpoint returns the scorecard compliance alert stored in the + project's extra_data. + + Example: + GET /api/projects/{project_id}/scorecard_compliance/ + + """ + project = self.get_object() + scorecard_alert = project.get_scorecard_compliance_alert() + return Response({"scorecard_compliance_alert": scorecard_alert}) class RunViewSet(mixins.RetrieveModelMixin, viewsets.GenericViewSet): """Add actions to the Run viewset.""" diff --git a/scanpipe/management/commands/check-compliance.py b/scanpipe/management/commands/check-compliance.py index f651c04f8a..3dd82b785c 100644 --- a/scanpipe/management/commands/check-compliance.py +++ b/scanpipe/management/commands/check-compliance.py @@ -77,7 +77,12 @@ def check_compliance(self, fail_level): clarity_alert = self.project.get_license_clarity_compliance_alert() has_clarity_issue = clarity_alert not in (None, "ok") - total_issues = count + (1 if has_clarity_issue else 0) + scorecard_alert = self.project.get_scorecard_compliance_alert() + has_scorecard_issue = scorecard_alert not in (None, "ok") + + total_issues = ( + count + (1 if has_clarity_issue else 0) + (1 if has_scorecard_issue else 0) + ) if total_issues and self.verbosity > 0: self.stderr.write(f"{total_issues} compliance issues detected.") @@ -92,6 +97,10 @@ def check_compliance(self, fail_level): self.stderr.write("[license clarity]") self.stderr.write(f" > {clarity_alert.upper()}") + if has_scorecard_issue: + self.stderr.write("[scorecard compliance]") + self.stderr.write(f" > {scorecard_alert.upper()}") + return total_issues > 0 def check_vulnerabilities(self): @@ -114,4 +123,4 @@ def check_vulnerabilities(self): else: self.stdout.write("No vulnerabilities found") - return count > 0 + return count > 0 \ No newline at end of file diff --git a/scanpipe/models.py b/scanpipe/models.py index 0ecd8e6392..f4b2a66dfb 100644 --- a/scanpipe/models.py +++ b/scanpipe/models.py @@ -1552,7 +1552,14 @@ def get_license_clarity_compliance_alert(self): or None if not set. """ return self.extra_data.get("license_clarity_compliance_alert") - + + def get_scorecard_compliance_alert(self): + """ + Return the scorecard compliance alert value for the project, + or None if not set. + """ + return self.extra_data.get("scorecard_compliance_alert") + def get_license_policy_index(self): """Return the policy license index for this project instance.""" if policies_dict := self.get_policies_dict(): diff --git a/scanpipe/pipelines/fetch_scores.py b/scanpipe/pipelines/fetch_scores.py index 24b1eaf997..b3a360a06f 100644 --- a/scanpipe/pipelines/fetch_scores.py +++ b/scanpipe/pipelines/fetch_scores.py @@ -25,7 +25,7 @@ from scanpipe.models import DiscoveredPackageScore from scanpipe.pipelines import Pipeline - +from scanpipe.pipes.compliance_thresholds import get_project_scorecard_thresholds class FetchScores(Pipeline): """ @@ -57,10 +57,26 @@ def check_scorecode_service_availability(self): raise Exception("ScoreCode service is not available.") def fetch_packages_scorecode_info(self): - """Fetch ScoreCode information for each of the project's discovered packages.""" + scorecard_policy = get_project_scorecard_thresholds(self.project) + worst_alert = None + for package in self.project.discoveredpackages.all(): if scorecard_data := ossf_scorecard.fetch_scorecard_info(package=package): DiscoveredPackageScore.create_from_package_and_scorecard( scorecard_data=scorecard_data, package=package, ) + + if scorecard_policy and scorecard_data.score is not None: + try: + score = float(scorecard_data.score) + alert = scorecard_policy.get_alert_for_score(score) + except Exception: + alert = "error" + + order = {"ok": 0, "warning": 1, "error": 2} + if worst_alert is None or order[alert] > order.get(worst_alert, -1): + worst_alert = alert + + if worst_alert is not None: + self.project.update_extra_data({"scorecard_compliance_alert": worst_alert}) diff --git a/scanpipe/templates/scanpipe/panels/project_compliance.html b/scanpipe/templates/scanpipe/panels/project_compliance.html index c1f0967b2a..194e3b0f5a 100644 --- a/scanpipe/templates/scanpipe/panels/project_compliance.html +++ b/scanpipe/templates/scanpipe/panels/project_compliance.html @@ -1,5 +1,5 @@ {% load humanize %} -{% if compliance_alerts or license_clarity_compliance_alert %} +{% if compliance_alerts or license_clarity_compliance_alert or scorecard_compliance_alert %}
{% endif %} + {% if scorecard_compliance_alert %} +
+ + Scorecard compliance + + + {{ scorecard_compliance_alert|title }} + +
+ {% endif %} {% endif %} diff --git a/scanpipe/tests/test_api.py b/scanpipe/tests/test_api.py index 61c1d5891b..a021882485 100644 --- a/scanpipe/tests/test_api.py +++ b/scanpipe/tests/test_api.py @@ -1270,6 +1270,24 @@ def test_scanpipe_api_project_action_license_clarity_compliance(self): expected = {"license_clarity_compliance_alert": "error"} self.assertEqual(expected, response.data) + def test_scanpipe_api_project_action_scorecard_compliance(self): + project = make_project() + url = reverse("project-scorecard-compliance", args=[project.uuid]) + + response = self.csrf_client.get(url) + expected = {"scorecard_compliance_alert": None} + self.assertEqual(expected, response.data) + + project.update_extra_data({"scorecard_compliance_alert": "ok"}) + response = self.csrf_client.get(url) + expected = {"scorecard_compliance_alert": "ok"} + self.assertEqual(expected, response.data) + + project.update_extra_data({"scorecard_compliance_alert": "error"}) + response = self.csrf_client.get(url) + expected = {"scorecard_compliance_alert": "error"} + self.assertEqual(expected, response.data) + def test_scanpipe_api_serializer_get_model_serializer(self): self.assertEqual( DiscoveredPackageSerializer, get_model_serializer(DiscoveredPackage) diff --git a/scanpipe/tests/test_commands.py b/scanpipe/tests/test_commands.py index 6d6ec8a000..00ac848898 100644 --- a/scanpipe/tests/test_commands.py +++ b/scanpipe/tests/test_commands.py @@ -1252,6 +1252,49 @@ def test_scanpipe_management_command_check_both_compliance_and_clarity(self): ) self.assertEqual(expected, out_value) + def test_scanpipe_management_command_check_scorecard_compliance_only(self): + project = make_project(name="my_project_scorecard") + + project.extra_data = {"scorecard_compliance_alert": "error"} + project.save(update_fields=["extra_data"]) + + out = StringIO() + options = ["--project", project.name] + with self.assertRaises(SystemExit) as cm: + call_command("check-compliance", options, stderr=out) + self.assertEqual(cm.exception.code, 1) + out_value = out.getvalue().strip() + expected = "1 compliance issues detected.\n[scorecard compliance]\n > ERROR" + self.assertEqual(expected, out_value) + + def test_scanpipe_management_command_check_all_compliance_types(self): + project = make_project(name="my_project_all") + + make_package( + project, + package_url="pkg:generic/name@1.0", + compliance_alert=CodebaseResource.Compliance.ERROR, + ) + project.extra_data = { + "license_clarity_compliance_alert": "warning", + "scorecard_compliance_alert": "error", + } + project.save(update_fields=["extra_data"]) + + out = StringIO() + options = ["--project", project.name, "--fail-level", "WARNING"] + with self.assertRaises(SystemExit) as cm: + call_command("check-compliance",options, stderr=out) + self.assertEqual(cm.exception.code, 1) + out_value = out.getvalue().strip() + expected = ( + "3 compliance issues detected." + "\n[packages]\n > ERROR: 1" + "\n[license clarity]\n > WARNING" + "\n[scorecard compliance]\n > ERROR" + ) + self.assertEqual(expected, out_value) + def test_scanpipe_management_command_check_compliance_vulnerabilities(self): project = make_project(name="my_project") package1 = make_package(project, package_url="pkg:generic/name@1.0") diff --git a/scanpipe/views.py b/scanpipe/views.py index d52eafcaa2..a0a9f29aeb 100644 --- a/scanpipe/views.py +++ b/scanpipe/views.py @@ -1286,6 +1286,8 @@ def get_context_data(self, **kwargs): project.get_license_clarity_compliance_alert() ) + context["scorecard_compliance_alert"] = project.get_scorecard_compliance_alert() + return context From c6b591bc63d879ee51972941bd194c93c25286ae Mon Sep 17 00:00:00 2001 From: NucleonGodX Date: Fri, 1 Aug 2025 14:55:29 +0530 Subject: [PATCH 04/10] fix tests Signed-off-by: NucleonGodX --- scanpipe/api/views.py | 1 + scanpipe/management/commands/check-compliance.py | 2 +- scanpipe/models.py | 4 ++-- scanpipe/pipelines/fetch_scores.py | 1 + scanpipe/tests/test_api.py | 2 +- scanpipe/tests/test_commands.py | 4 ++-- 6 files changed, 8 insertions(+), 6 deletions(-) diff --git a/scanpipe/api/views.py b/scanpipe/api/views.py index 215ec8258b..d5d91189cb 100644 --- a/scanpipe/api/views.py +++ b/scanpipe/api/views.py @@ -513,6 +513,7 @@ def scorecard_compliance(self, request, *args, **kwargs): scorecard_alert = project.get_scorecard_compliance_alert() return Response({"scorecard_compliance_alert": scorecard_alert}) + class RunViewSet(mixins.RetrieveModelMixin, viewsets.GenericViewSet): """Add actions to the Run viewset.""" diff --git a/scanpipe/management/commands/check-compliance.py b/scanpipe/management/commands/check-compliance.py index 3dd82b785c..216a849e67 100644 --- a/scanpipe/management/commands/check-compliance.py +++ b/scanpipe/management/commands/check-compliance.py @@ -123,4 +123,4 @@ def check_vulnerabilities(self): else: self.stdout.write("No vulnerabilities found") - return count > 0 \ No newline at end of file + return count > 0 diff --git a/scanpipe/models.py b/scanpipe/models.py index f4b2a66dfb..c9aa0637ac 100644 --- a/scanpipe/models.py +++ b/scanpipe/models.py @@ -1552,14 +1552,14 @@ def get_license_clarity_compliance_alert(self): or None if not set. """ return self.extra_data.get("license_clarity_compliance_alert") - + def get_scorecard_compliance_alert(self): """ Return the scorecard compliance alert value for the project, or None if not set. """ return self.extra_data.get("scorecard_compliance_alert") - + def get_license_policy_index(self): """Return the policy license index for this project instance.""" if policies_dict := self.get_policies_dict(): diff --git a/scanpipe/pipelines/fetch_scores.py b/scanpipe/pipelines/fetch_scores.py index b3a360a06f..ea536c9839 100644 --- a/scanpipe/pipelines/fetch_scores.py +++ b/scanpipe/pipelines/fetch_scores.py @@ -27,6 +27,7 @@ from scanpipe.pipelines import Pipeline from scanpipe.pipes.compliance_thresholds import get_project_scorecard_thresholds + class FetchScores(Pipeline): """ Fetch ScoreCode information for packages. diff --git a/scanpipe/tests/test_api.py b/scanpipe/tests/test_api.py index a021882485..41ab650aaf 100644 --- a/scanpipe/tests/test_api.py +++ b/scanpipe/tests/test_api.py @@ -1287,7 +1287,7 @@ def test_scanpipe_api_project_action_scorecard_compliance(self): response = self.csrf_client.get(url) expected = {"scorecard_compliance_alert": "error"} self.assertEqual(expected, response.data) - + def test_scanpipe_api_serializer_get_model_serializer(self): self.assertEqual( DiscoveredPackageSerializer, get_model_serializer(DiscoveredPackage) diff --git a/scanpipe/tests/test_commands.py b/scanpipe/tests/test_commands.py index 00ac848898..1b2d6f688e 100644 --- a/scanpipe/tests/test_commands.py +++ b/scanpipe/tests/test_commands.py @@ -1284,7 +1284,7 @@ def test_scanpipe_management_command_check_all_compliance_types(self): out = StringIO() options = ["--project", project.name, "--fail-level", "WARNING"] with self.assertRaises(SystemExit) as cm: - call_command("check-compliance",options, stderr=out) + call_command("check-compliance", options, stderr=out) self.assertEqual(cm.exception.code, 1) out_value = out.getvalue().strip() expected = ( @@ -1294,7 +1294,7 @@ def test_scanpipe_management_command_check_all_compliance_types(self): "\n[scorecard compliance]\n > ERROR" ) self.assertEqual(expected, out_value) - + def test_scanpipe_management_command_check_compliance_vulnerabilities(self): project = make_project(name="my_project") package1 = make_package(project, package_url="pkg:generic/name@1.0") From efa0f4b9a2d48ab096a3913bbeed02ed97508560 Mon Sep 17 00:00:00 2001 From: NucleonGodX Date: Fri, 1 Aug 2025 14:57:45 +0530 Subject: [PATCH 05/10] minor fix Signed-off-by: NucleonGodX --- scanpipe/pipelines/fetch_scores.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scanpipe/pipelines/fetch_scores.py b/scanpipe/pipelines/fetch_scores.py index ea536c9839..a2288838d3 100644 --- a/scanpipe/pipelines/fetch_scores.py +++ b/scanpipe/pipelines/fetch_scores.py @@ -58,6 +58,7 @@ def check_scorecode_service_availability(self): raise Exception("ScoreCode service is not available.") def fetch_packages_scorecode_info(self): + """Fetch ScoreCode information for each of the project's discovered packages.""" scorecard_policy = get_project_scorecard_thresholds(self.project) worst_alert = None From 8c9dc8f73794225c03460844a8c945d633b761a3 Mon Sep 17 00:00:00 2001 From: NucleonGodX Date: Thu, 7 Aug 2025 19:46:57 +0530 Subject: [PATCH 06/10] create a new pipe for compliancing Signed-off-by: NucleonGodX --- scanpipe/pipelines/fetch_scores.py | 22 +++------ scanpipe/pipes/scorecard_compliance.py | 62 ++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 17 deletions(-) create mode 100644 scanpipe/pipes/scorecard_compliance.py diff --git a/scanpipe/pipelines/fetch_scores.py b/scanpipe/pipelines/fetch_scores.py index a2288838d3..63186282a7 100644 --- a/scanpipe/pipelines/fetch_scores.py +++ b/scanpipe/pipelines/fetch_scores.py @@ -25,7 +25,7 @@ from scanpipe.models import DiscoveredPackageScore from scanpipe.pipelines import Pipeline -from scanpipe.pipes.compliance_thresholds import get_project_scorecard_thresholds +from scanpipe.pipes import scorecard_compliance class FetchScores(Pipeline): @@ -50,6 +50,7 @@ def steps(cls): return ( cls.check_scorecode_service_availability, cls.fetch_packages_scorecode_info, + cls.evaluate_compliance_alerts, ) def check_scorecode_service_availability(self): @@ -59,9 +60,6 @@ def check_scorecode_service_availability(self): def fetch_packages_scorecode_info(self): """Fetch ScoreCode information for each of the project's discovered packages.""" - scorecard_policy = get_project_scorecard_thresholds(self.project) - worst_alert = None - for package in self.project.discoveredpackages.all(): if scorecard_data := ossf_scorecard.fetch_scorecard_info(package=package): DiscoveredPackageScore.create_from_package_and_scorecard( @@ -69,16 +67,6 @@ def fetch_packages_scorecode_info(self): package=package, ) - if scorecard_policy and scorecard_data.score is not None: - try: - score = float(scorecard_data.score) - alert = scorecard_policy.get_alert_for_score(score) - except Exception: - alert = "error" - - order = {"ok": 0, "warning": 1, "error": 2} - if worst_alert is None or order[alert] > order.get(worst_alert, -1): - worst_alert = alert - - if worst_alert is not None: - self.project.update_extra_data({"scorecard_compliance_alert": worst_alert}) + def evaluate_compliance_alerts(self): + """Evaluate scorecard compliance alerts for the project.""" + scorecard_compliance.evaluate_scorecard_compliance(self.project) diff --git a/scanpipe/pipes/scorecard_compliance.py b/scanpipe/pipes/scorecard_compliance.py new file mode 100644 index 0000000000..3b045816a6 --- /dev/null +++ b/scanpipe/pipes/scorecard_compliance.py @@ -0,0 +1,62 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# http://nexb.com and https://github.com/aboutcode-org/scancode.io +# The ScanCode.io software is licensed under the Apache License version 2.0. +# Data generated with ScanCode.io is provided as-is without warranties. +# ScanCode is a trademark of nexB Inc. +# +# You may not use this software except in compliance with the License. +# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# +# Data Generated with ScanCode.io is provided on an "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, either express or implied. No content created from +# ScanCode.io should be considered or used as legal advice. Consult an Attorney +# for any legal advice. +# +# ScanCode.io is a free software code scanning tool from nexB Inc. and others. +# Visit https://github.com/aboutcode-org/scancode.io for support and download. + +from scanpipe.pipes.compliance_thresholds import get_project_scorecard_thresholds + + +def evaluate_scorecard_compliance(project): + """ + Evaluate scorecard compliance for all discovered packages in the project. + + This function checks OpenSSF Scorecard scores against project-defined + thresholds and determines the worst compliance alert level across all packages. + Updates the project's extra_data with the overall compliance status. + """ + scorecard_policy = get_project_scorecard_thresholds(project) + if not scorecard_policy: + return + + worst_alert = None + packages_with_scores = project.discoveredpackages.filter( + scores__scoring_tool="ossf-scorecard" + ).distinct() + + for package in packages_with_scores: + latest_score = package.scores.filter( + scoring_tool="ossf-scorecard" + ).order_by("-score_date").first() + + if not latest_score or latest_score.score is None: + continue + + try: + score = float(latest_score.score) + alert = scorecard_policy.get_alert_for_score(score) + except Exception: + alert = "error" + + order = {"ok": 0, "warning": 1, "error": 2} + if worst_alert is None or order[alert] > order.get(worst_alert, -1): + worst_alert = alert + + if worst_alert is not None: + project.update_extra_data({"scorecard_compliance_alert": worst_alert}) From dd1d7d54f3dddd9bb639ff9a473106d6f76afb2b Mon Sep 17 00:00:00 2001 From: NucleonGodX Date: Thu, 7 Aug 2025 19:50:21 +0530 Subject: [PATCH 07/10] fix code format Signed-off-by: NucleonGodX --- scanpipe/pipes/scorecard_compliance.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/scanpipe/pipes/scorecard_compliance.py b/scanpipe/pipes/scorecard_compliance.py index 3b045816a6..d108000391 100644 --- a/scanpipe/pipes/scorecard_compliance.py +++ b/scanpipe/pipes/scorecard_compliance.py @@ -26,14 +26,14 @@ def evaluate_scorecard_compliance(project): """ Evaluate scorecard compliance for all discovered packages in the project. - + This function checks OpenSSF Scorecard scores against project-defined thresholds and determines the worst compliance alert level across all packages. Updates the project's extra_data with the overall compliance status. """ scorecard_policy = get_project_scorecard_thresholds(project) if not scorecard_policy: - return + return worst_alert = None packages_with_scores = project.discoveredpackages.filter( @@ -41,9 +41,11 @@ def evaluate_scorecard_compliance(project): ).distinct() for package in packages_with_scores: - latest_score = package.scores.filter( - scoring_tool="ossf-scorecard" - ).order_by("-score_date").first() + latest_score = ( + package.scores.filter(scoring_tool="ossf-scorecard") + .order_by("-score_date") + .first() + ) if not latest_score or latest_score.score is None: continue From 8b83b306d65ae0e204473cc701c502c3f85527fc Mon Sep 17 00:00:00 2001 From: NucleonGodX Date: Thu, 7 Aug 2025 21:01:10 +0530 Subject: [PATCH 08/10] add documentation and changelog Signed-off-by: NucleonGodX --- docs/policies.rst | 37 ++++++++++++++++++++++++ docs/rest-api.rst | 25 +++++++++++++++++ docs/tutorial_license_policies.rst | 45 ++++++++++++++++++++++++++++++ 3 files changed, 107 insertions(+) diff --git a/docs/policies.rst b/docs/policies.rst index 31d36a553c..c53a369153 100644 --- a/docs/policies.rst +++ b/docs/policies.rst @@ -91,6 +91,43 @@ Accepted values for the alert level: - ``warning`` - ``error`` +Creating Scorecard Thresholds Files +----------------------------------- + +A valid scorecard thresholds file is required to **enable OpenSSF Scorecard compliance features**. + +The scorecard thresholds file, by default named ``policies.yml``, is a **YAML file** with a +structure similar to the following: + +.. code-block:: yaml + + scorecard_score_thresholds: + 9.0: ok + 7.0: warning + 0: error + +- In the example above, the keys ``9.0``, ``7.0``, and ``0`` are numeric threshold values + representing **minimum scorecard scores**. +- The values ``error``, ``warning``, and ``ok`` are the **compliance alert levels** that + will be triggered if the project's scorecard score meets or exceeds the + corresponding threshold. +- The thresholds must be listed in **strictly descending order**. + +How it works: + +- If the scorecard score is **9.0 or above**, the alert is **``ok``**. +- If the scorecard score is **7.0 to 8.9**, the alert is **``warning``**. +- If the scorecard score is **below 7.0**, the alert is **``error``**. + +You can adjust the threshold values and alert levels to match your organization's +security compliance requirements. + +Accepted values for the alert level: + +- ``ok`` +- ``warning`` +- ``error`` + App Policies ------------ diff --git a/docs/rest-api.rst b/docs/rest-api.rst index f83c6e6451..84e9068053 100644 --- a/docs/rest-api.rst +++ b/docs/rest-api.rst @@ -518,6 +518,31 @@ Data: "license_clarity_compliance_alert": "warning" } +.. _rest_api_scorecard_compliance: + +Scorecard Compliance +^^^^^^^^^^^^^^^^^^^^ + +This action returns the **scorecard compliance alert** for a project. + +The scorecard compliance alert is a single value (``ok``, ``warning``, or ``error``) +that summarizes the project's **OpenSSF Scorecard security compliance status**, +based on the thresholds defined in the ``policies.yml`` file. + +``GET /api/projects/6461408c-726c-4b70-aa7a-c9cc9d1c9685/scorecard_compliance/`` + +Data: + - ``scorecard_compliance_alert``: The overall scorecard compliance alert + for the project. + + Possible values: ``ok``, ``warning``, ``error``. + +.. code-block:: json + + { + "scorecard_compliance_alert": "warning" + } + Reset ^^^^^ diff --git a/docs/tutorial_license_policies.rst b/docs/tutorial_license_policies.rst index f53da66232..ceec7e3fef 100644 --- a/docs/tutorial_license_policies.rst +++ b/docs/tutorial_license_policies.rst @@ -128,6 +128,51 @@ The ``license_clarity_compliance_alert`` value (e.g., ``"error"``, ``"warning"`` is computed automatically based on the thresholds you configured and reflects the overall license clarity status of the scanned codebase. +Scorecard Compliance Thresholds and Alerts +------------------------------------------ + +ScanCode.io also supports **OpenSSF Scorecard compliance thresholds**, allowing you to enforce +minimum security standards for open source packages in your codebase. This is managed +through the ``scorecard_score_thresholds`` section in your ``policies.yml`` file. + +Defining Scorecard Thresholds +----------------------------- + +Add a ``scorecard_score_thresholds`` section to your ``policies.yml`` file, for example: + +.. code-block:: yaml + + scorecard_score_thresholds: + 9.0: ok + 7.0: warning + 0: error + +Scorecard Compliance in Results +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +When you run a the addon pipeline fetch_scores with scorecard thresholds defined in your +``policies.yml``, the computed scorecard compliance alert is included in the project's +``extra_data`` field. + +For example: + +.. code-block:: json + + "extra_data": { + "md5": "d23df4a4", + "sha1": "3e9b61cc98c", + "size": 3095, + "sha256": "abacfc8bcee59067", + "sha512": "208f6a83c83a4c770b3c0", + "filename": "cuckoo_filter-1.0.6.tar.gz", + "sha1_git": "3fdb0f82ad59", + "scorecard_compliance_alert": "warning" + } + +The ``scorecard_compliance_alert`` value (e.g., ``"error"``, ``"warning"``, or ``"ok"``) +is computed automatically based on the thresholds you configured and reflects the +overall security compliance status of the OpenSSF Scorecard scores for packages in the scanned codebase. + Run the ``check-compliance`` command ------------------------------------ From 7d7323f4e97eda53dbe2d48d89f5cc0bb0d82991 Mon Sep 17 00:00:00 2001 From: NucleonGodX Date: Thu, 7 Aug 2025 21:02:09 +0530 Subject: [PATCH 09/10] add changelog Signed-off-by: NucleonGodX --- CHANGELOG.rst | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 574d596bfa..cfc5d3d2d6 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -3,6 +3,13 @@ Changelog v35.2.0 (2025-08-01) -------------------- +- Enhanced scorecard compliance support with: + * New ``scorecard_compliance_alert`` in project ``extra_data``. + * ``/api/projects/{id}/scorecard_compliance/`` API endpoint. + * Scorecard compliance integration in ``check-compliance`` management command. + * UI template support for scorecard compliance alert. + * ``evaluate_scorecard_compliance()`` pipe function for compliance evaluation. + https://github.com/aboutcode-org/scancode.io/pull/1800 - Refactor policies implementation to support more than licenses. The entire ``policies`` data is now stored on the ``ScanPipeConfig`` in place of the From 7d657fecacef63a46676c6e370fffb5b83fb054d Mon Sep 17 00:00:00 2001 From: NucleonGodX Date: Fri, 8 Aug 2025 17:43:55 +0530 Subject: [PATCH 10/10] add test for integration of scorecard-compliance Signed-off-by: NucleonGodX --- .../tests/pipes/test_scorecard_compliance.py | 102 ++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 scanpipe/tests/pipes/test_scorecard_compliance.py diff --git a/scanpipe/tests/pipes/test_scorecard_compliance.py b/scanpipe/tests/pipes/test_scorecard_compliance.py new file mode 100644 index 0000000000..48cd72843c --- /dev/null +++ b/scanpipe/tests/pipes/test_scorecard_compliance.py @@ -0,0 +1,102 @@ +from unittest.mock import patch + +from django.test import TestCase +from django.utils import timezone + +from scanpipe.models import DiscoveredPackage +from scanpipe.models import DiscoveredPackageScore +from scanpipe.models import Project +from scanpipe.pipes.compliance_thresholds import ScorecardThresholdsPolicy +from scanpipe.pipes.scorecard_compliance import evaluate_scorecard_compliance + + +class EvaluateScorecardComplianceTest(TestCase): + """Test evaluate_scorecard_compliance pipe function.""" + + def setUp(self): + self.project = Project.objects.create(name="test_project") + + def create_package_with_score(self, package_name, score_value): + """Create a package with a scorecard score.""" + package = DiscoveredPackage.objects.create( + project=self.project, + name=package_name, + type="pypi", + vcs_url="https://github.com/numpy/numpy", + ) + + DiscoveredPackageScore.objects.create( + discovered_package=package, + scoring_tool="ossf-scorecard", + score=str(score_value), + scoring_tool_version="v4.10.2", + score_date=timezone.now(), + ) + + return package + + def test_no_scorecard_policy_configured(self): + """Test that function returns early when no scorecard policy is configured.""" + with patch( + "scanpipe.pipes.scorecard_compliance.get_project_scorecard_thresholds" + ) as mock_get_policy: + mock_get_policy.return_value = None + + evaluate_scorecard_compliance(self.project) + + self.project.refresh_from_db() + self.assertNotIn("scorecard_compliance_alert", self.project.extra_data) + + def test_sets_compliance_alert_correctly(self): + """Test that compliance alert is set correctly based on scores.""" + thresholds = {9.0: "ok", 7.0: "warning", 0: "error"} + policy = ScorecardThresholdsPolicy(thresholds) + + self.create_package_with_score("good_package", 9.5) + + with patch( + "scanpipe.pipes.scorecard_compliance.get_project_scorecard_thresholds" + ) as mock_get_policy: + mock_get_policy.return_value = policy + + evaluate_scorecard_compliance(self.project) + + self.project.refresh_from_db() + self.assertEqual( + self.project.extra_data["scorecard_compliance_alert"], "ok" + ) + + def test_worst_alert_selected_for_multiple_packages(self): + """Test that worst alert level is selected across multiple packages.""" + thresholds = {9.0: "ok", 7.0: "warning", 0: "error"} + policy = ScorecardThresholdsPolicy(thresholds) + + self.create_package_with_score("good_package", 9.5) + self.create_package_with_score("bad_package", 5.0) + + with patch( + "scanpipe.pipes.scorecard_compliance.get_project_scorecard_thresholds" + ) as mock_get_policy: + mock_get_policy.return_value = policy + + evaluate_scorecard_compliance(self.project) + + self.project.refresh_from_db() + self.assertEqual( + self.project.extra_data["scorecard_compliance_alert"], "error" + ) + + def test_no_packages_with_scores(self): + """Test behavior when no packages have scorecard scores.""" + thresholds = {9.0: "ok", 7.0: "warning", 0: "error"} + policy = ScorecardThresholdsPolicy(thresholds) + + with patch( + "scanpipe.pipes.scorecard_compliance.get_project_scorecard_thresholds" + ) as mock_get_policy: + mock_get_policy.return_value = policy + + evaluate_scorecard_compliance(self.project) + + self.project.refresh_from_db() + self.assertNotIn("scorecard_compliance_alert", self.project.extra_data)