diff --git a/ansible_base/lib/utils/coverage_test_utils.py b/ansible_base/lib/utils/coverage_test_utils.py new file mode 100644 index 000000000..b5b069ee2 --- /dev/null +++ b/ansible_base/lib/utils/coverage_test_utils.py @@ -0,0 +1,39 @@ +""" +Utility functions for testing code coverage thresholds. +This module is intentionally added to test SonarCloud coverage gates. +""" + + +def validate_quality_gate(coverage_percent, threshold=80.0): + """ + Validate whether code coverage meets the quality gate threshold. + + Args: + coverage_percent: Current code coverage percentage + threshold: Minimum required coverage (default: 80.0) + + Returns: + dict: Validation result with status and details + """ + if coverage_percent < 0 or coverage_percent > 100: + raise ValueError("Coverage percentage must be between 0 and 100") + + if threshold < 0 or threshold > 100: + raise ValueError("Threshold must be between 0 and 100") + + passes = coverage_percent >= threshold + gap = threshold - coverage_percent if not passes else 0.0 + + result = { + "passes_gate": passes, + "current_coverage": coverage_percent, + "required_coverage": threshold, + "coverage_gap": round(gap, 2), + } + + if passes: + result["message"] = f"Coverage of {coverage_percent}% meets the {threshold}% threshold" + else: + result["message"] = f"Coverage of {coverage_percent}% is below the {threshold}% threshold by {gap:.2f}%" + + return result diff --git a/test_app/tests/lib/cache/test_fallback_cache.py.new b/test_app/tests/lib/cache/test_fallback_cache.py.new new file mode 100644 index 000000000..f6352e4f1 --- /dev/null +++ b/test_app/tests/lib/cache/test_fallback_cache.py.new @@ -0,0 +1,245 @@ +import logging +import tempfile +import time +from copy import deepcopy +from pathlib import Path +from unittest import mock + +import pytest +import xdist # noqa: F401 This gives us access to testrun_uid +from django.core import cache as django_cache +from django.core.cache.backends.base import BaseCache +from django.test import override_settings + +from ansible_base.lib.cache.fallback_cache import FALLBACK_CACHE, PRIMARY_CACHE, DABCacheWithFallback + + +class BreakableCache(BaseCache): + _instance = None + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super(BreakableCache, cls).__new__(cls) + cls.__initialized = False + return cls._instance + + def __init__(self, location, params): + if self.__initialized: + return + self.cache = {} + options = params.get("OPTIONS", {}) + self.working = options.get("working", True) + self.__initialized = True + + def add(self, key, value, timeout=300, version=None): + self.cache[key] = value + + def get(self, key, default=None, version=None): + if self.working: + return self.cache.get(key, default) + else: + raise RuntimeError(f"Sorry, cache no worky {self}") + + def set(self, key, value, timeout=300, version=None, client=None): + self.cache[key] = value + + def delete(self, key, version=None): + self.cache.pop(key, None) + + def clear(self): + self.cache = {} + + def breakit(self): + self.working = False + + def fixit(self): + self.working = True + + +class UnBreakableCache(BreakableCache): + def get(self, key, default=None, version=None): + return self.cache.get(key, default) + + +breakable_cache_settings = { + 'default': { + 'BACKEND': 'ansible_base.lib.cache.fallback_cache.DABCacheWithFallback', + }, + 'primary': { + 'BACKEND': 'test_app.tests.lib.cache.test_fallback_cache.BreakableCache', + 'LOCATION': 'primary', + }, + 'fallback': { + 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', + 'LOCATION': 'fallback', + }, +} + +unbreakable_cache_settings = deepcopy(breakable_cache_settings) +unbreakable_cache_settings['primary']['BACKEND'] = 'test_app.tests.lib.cache.test_fallback_cache.UnBreakableCache' + + +@pytest.fixture +def mocked_fallback_cache_temp_file(testrun_uid): + tempfile.NamedTemporaryFile(prefix=testrun_uid) + _temp_file = Path(tempfile.NamedTemporaryFile().name) + # Make sure the file does not exist right before we yield it + _temp_file.unlink(missing_ok=True) + with mock.patch('ansible_base.lib.cache.fallback_cache._temp_file', _temp_file): + yield _temp_file + # Make sure the file is cleaned up + _temp_file.unlink(missing_ok=True) + + +@override_settings(CACHES=breakable_cache_settings) +def test_fallback_cache(): + cache = django_cache.caches.create_connection('default') + + primary = cache._primary_cache + # Make sure the primary is in a state we want + primary.fixit() + fallback = cache._fallback_cache + cache.set('key', 'val1') + assert primary.get('key') == 'val1' + assert fallback.get('key') is None + + primary.set('tobecleared', True) + primary.breakit() + + # Breaks primary + cache.get('key') + + # Sets in fallback + cache.set('key', 'val2') + + assert cache.get('key', 'val2') + + assert cache.get_active_cache() == FALLBACK_CACHE + + primary.fixit() + + # Check until primary is back + timeout = time.time() + 30 + while True: + if cache.get_active_cache() == PRIMARY_CACHE: + break + if time.time() > timeout: + assert False + time.sleep(1) + + # Ensure caches were cleared + assert cache.get('key') is None + assert fallback.get('key') is None + assert cache.get('tobecleared') is None + + cache.set('key2', 'val3') + + assert cache.get('key2') == 'val3' + + +@override_settings(CACHES=breakable_cache_settings) +def test_dead_primary(): + primary_cache = django_cache.caches.create_connection('primary') + # Make sure the primary is in a state we want + primary_cache.breakit() + + # Kill post-shutdown logging from unfinished recovery checker + logging.getLogger('ansible_base.cache.fallback_cache').setLevel(logging.CRITICAL) + + cache = django_cache.caches.create_connection('default') + + cache.set('key', 'val') + cache.get('key') + + # Check until fallback is set + timeout = time.time() + 30 + while True: + if cache.get_active_cache() == FALLBACK_CACHE: + break + if time.time() > timeout: + assert False + time.sleep(1) + + +@override_settings(CACHES=unbreakable_cache_settings) +def test_ensure_temp_file_is_removed_on_init(mocked_fallback_cache_temp_file): + # Get a handle on a DABCacheWithFallback + cache = DABCacheWithFallback(None, {}) + # Since this may already be initalized, lets tell it its not + cache.initialized = False + # now touch the file and call the init again which should remove it + mocked_fallback_cache_temp_file.touch() + cache.__init__(None, {}) + assert mocked_fallback_cache_temp_file.exists() is False + + +@override_settings(CACHES=unbreakable_cache_settings) +def test_ensure_initialization_wont_happen_twice(): + with mock.patch('ansible_base.lib.cache.fallback_cache.ThreadPoolExecutor') as tfe: + cache = DABCacheWithFallback(None, {}) + number_of_calls = tfe.call_count + cache.__init__(None, {}) + # when calling init again ThreadPoolExecute should not be called again so we should still have only one call + assert tfe.call_count == number_of_calls, "Didn't expect the number of calls to ThreadPoolExecutor to increase" + + +@pytest.mark.parametrize( + "method", + [ + ('clear'), + ('delete'), + ('set'), + ('get'), + ('add'), + ], +) +@override_settings(CACHES=unbreakable_cache_settings) +def test_all_methods_are_overwritten(method): + with mock.patch('ansible_base.lib.cache.fallback_cache.DABCacheWithFallback._op_with_fallback') as owf: + cache = DABCacheWithFallback(None, {}) + if method == 'clear': + getattr(cache, method)() + elif method in ['delete', 'get']: + getattr(cache, method)('test_value') + else: + getattr(cache, method)('test_value', 1) + owf.assert_called_once() + + +@pytest.mark.parametrize( + "file_exists", + [ + (True), + (False), + ], +) +@override_settings(CACHES=unbreakable_cache_settings) +def test_check_primary_cache(file_exists, mocked_fallback_cache_temp_file): + # Initialization of the cache will clear the temp file so do this first + cache = DABCacheWithFallback(None, {}) + + # Create the temp file if needed + if file_exists: + mocked_fallback_cache_temp_file.touch() + + mocked_function = mock.MagicMock(return_value=None) + cache._primary_cache.clear = mocked_function + cache.check_primary_cache() + if file_exists: + mocked_function.assert_called_once() + else: + mocked_function.assert_not_called() + assert mocked_fallback_cache_temp_file.exists() is False + + +@override_settings(CACHES=unbreakable_cache_settings) +def test_file_unlink_exception_does_not_cause_failure(mocked_fallback_cache_temp_file): + cache = DABCacheWithFallback(None, {}) + # We can't do: temp_file.unlink = mock.MagicMock(side_effect=Exception('failed to unlink exception')) + # Because unlink is marked as read only so we will just mock the cache.clear to raise in its place + mocked_function = mock.MagicMock(side_effect=Exception('failed to delete a file exception')) + cache._primary_cache.clear = mocked_function + + mocked_fallback_cache_temp_file.touch() + cache.check_primary_cache() + # No assertion needed because we just want to make sure check_primary_cache does not raise diff --git a/test_app/tests/lib/utils/test_coverage_test_utils.py b/test_app/tests/lib/utils/test_coverage_test_utils.py new file mode 100644 index 000000000..b4b4f03c3 --- /dev/null +++ b/test_app/tests/lib/utils/test_coverage_test_utils.py @@ -0,0 +1,114 @@ +""" +Tests for coverage_test_utils module. +""" + +import pytest + +from ansible_base.lib.utils.coverage_test_utils import validate_quality_gate + + +class TestValidateQualityGate: + """Test suite for validate_quality_gate function.""" + + def test_coverage_passes_at_threshold(self): + """Test coverage exactly at 80% threshold passes.""" + result = validate_quality_gate(80.0) + + assert result["passes_gate"] is True + assert result["current_coverage"] == 80.0 + assert result["required_coverage"] == 80.0 + assert result["coverage_gap"] == 0.0 + assert "meets the 80.0% threshold" in result["message"] + + def test_coverage_above_threshold(self): + """Test coverage above threshold passes.""" + result = validate_quality_gate(90.0) + + assert result["passes_gate"] is True + assert result["current_coverage"] == 90.0 + assert result["coverage_gap"] == 0.0 + + def test_coverage_below_threshold(self): + """Test coverage below threshold fails.""" + result = validate_quality_gate(70.0) + + assert result["passes_gate"] is False + assert result["current_coverage"] == 70.0 + assert result["coverage_gap"] == 10.0 + assert "below the 80.0% threshold by 10.00%" in result["message"] + + def test_zero_coverage(self): + """Test 0% coverage fails.""" + result = validate_quality_gate(0.0) + + assert result["passes_gate"] is False + assert result["coverage_gap"] == 80.0 + + def test_perfect_coverage(self): + """Test 100% coverage passes.""" + result = validate_quality_gate(100.0) + + assert result["passes_gate"] is True + assert result["coverage_gap"] == 0.0 + + def test_custom_threshold(self): + """Test with custom threshold.""" + result = validate_quality_gate(75.0, threshold=70.0) + + assert result["passes_gate"] is True + assert result["required_coverage"] == 70.0 + assert result["coverage_gap"] == 0.0 + + def test_custom_threshold_fails(self): + """Test failing with custom threshold.""" + result = validate_quality_gate(65.0, threshold=70.0) + + assert result["passes_gate"] is False + assert result["required_coverage"] == 70.0 + assert result["coverage_gap"] == 5.0 + + def test_invalid_coverage_negative(self): + """Test that negative coverage raises ValueError.""" + with pytest.raises(ValueError, match="Coverage percentage must be between 0 and 100"): + validate_quality_gate(-5.0) + + def test_invalid_coverage_over_100(self): + """Test that coverage over 100 raises ValueError.""" + with pytest.raises(ValueError, match="Coverage percentage must be between 0 and 100"): + validate_quality_gate(105.0) + + def test_invalid_threshold_negative(self): + """Test that negative threshold raises ValueError.""" + with pytest.raises(ValueError, match="Threshold must be between 0 and 100"): + validate_quality_gate(80.0, threshold=-5.0) + + def test_invalid_threshold_over_100(self): + """Test that threshold over 100 raises ValueError.""" + with pytest.raises(ValueError, match="Threshold must be between 0 and 100"): + validate_quality_gate(80.0, threshold=105.0) + + def test_edge_case_at_boundary(self): + """Test edge case at exact threshold boundary.""" + result = validate_quality_gate(80.0, threshold=80.0) + + assert result["passes_gate"] is True + assert result["coverage_gap"] == 0.0 + + def test_just_below_threshold(self): + """Test coverage just below threshold.""" + result = validate_quality_gate(79.99, threshold=80.0) + + assert result["passes_gate"] is False + assert result["coverage_gap"] == 0.01 + + def test_message_format_passing(self): + """Test message format when passing.""" + result = validate_quality_gate(85.5, threshold=80.0) + + assert result["message"] == "Coverage of 85.5% meets the 80.0% threshold" + + def test_message_format_failing(self): + """Test message format when failing.""" + result = validate_quality_gate(75.5, threshold=80.0) + + assert result["message"] == "Coverage of 75.5% is below the 80.0% threshold by 4.50%"