| 
 | 1 | +from __future__ import annotations  | 
 | 2 | + | 
 | 3 | +import json  | 
 | 4 | +from typing import TYPE_CHECKING, Any, Union  | 
 | 5 | + | 
 | 6 | +import sentry_sdk  | 
 | 7 | +from coverage.exceptions import NoDataError  | 
 | 8 | + | 
 | 9 | +from codeflash.cli_cmds.console import logger  | 
 | 10 | +from codeflash.code_utils.coverage_utils import (  | 
 | 11 | +    build_fully_qualified_name,  | 
 | 12 | +    extract_dependent_function,  | 
 | 13 | +    generate_candidates,  | 
 | 14 | +)  | 
 | 15 | +from codeflash.models.models import CoverageData, CoverageStatus, FunctionCoverage  | 
 | 16 | + | 
 | 17 | +if TYPE_CHECKING:  | 
 | 18 | +    from collections.abc import Collection  | 
 | 19 | +    from pathlib import Path  | 
 | 20 | + | 
 | 21 | +    from codeflash.models.models import CodeOptimizationContext  | 
 | 22 | + | 
 | 23 | + | 
 | 24 | +class CoverageUtils:  | 
 | 25 | +    """Coverage utils class for interfacing with Coverage."""  | 
 | 26 | + | 
 | 27 | +    @staticmethod  | 
 | 28 | +    def load_from_sqlite_database(  | 
 | 29 | +        database_path: Path, config_path: Path, function_name: str, code_context: CodeOptimizationContext, source_code_path: Path  | 
 | 30 | +    ) -> CoverageData:  | 
 | 31 | +        """Load coverage data from an SQLite database, mimicking the behavior of load_from_coverage_file."""  | 
 | 32 | +        from coverage import Coverage  | 
 | 33 | +        from coverage.jsonreport import JsonReporter  | 
 | 34 | + | 
 | 35 | +        cov = Coverage(data_file=database_path,config_file=config_path, data_suffix=True, auto_data=True, branch=True)  | 
 | 36 | + | 
 | 37 | +        if not database_path.stat().st_size or not database_path.exists():  | 
 | 38 | +            logger.debug(f"Coverage database {database_path} is empty or does not exist")  | 
 | 39 | +            sentry_sdk.capture_message(f"Coverage database {database_path} is empty or does not exist")  | 
 | 40 | +            return CoverageUtils.create_empty(source_code_path, function_name, code_context)  | 
 | 41 | +        cov.load()  | 
 | 42 | + | 
 | 43 | +        reporter = JsonReporter(cov)  | 
 | 44 | +        temp_json_file = database_path.with_suffix(".report.json")  | 
 | 45 | +        with temp_json_file.open("w") as f:  | 
 | 46 | +            try:  | 
 | 47 | +                reporter.report(morfs=[source_code_path.as_posix()], outfile=f)  | 
 | 48 | +            except NoDataError:  | 
 | 49 | +                sentry_sdk.capture_message(f"No coverage data found for {function_name} in {source_code_path}")  | 
 | 50 | +                return CoverageUtils.create_empty(source_code_path, function_name, code_context)  | 
 | 51 | +        with temp_json_file.open() as f:  | 
 | 52 | +            original_coverage_data = json.load(f)  | 
 | 53 | + | 
 | 54 | +        coverage_data, status = CoverageUtils._parse_coverage_file(temp_json_file, source_code_path)  | 
 | 55 | + | 
 | 56 | +        main_func_coverage, dependent_func_coverage = CoverageUtils._fetch_function_coverages(  | 
 | 57 | +            function_name, code_context, coverage_data, original_cov_data=original_coverage_data  | 
 | 58 | +        )  | 
 | 59 | + | 
 | 60 | +        total_executed_lines, total_unexecuted_lines = CoverageUtils._aggregate_coverage(  | 
 | 61 | +            main_func_coverage, dependent_func_coverage  | 
 | 62 | +        )  | 
 | 63 | + | 
 | 64 | +        total_lines = total_executed_lines | total_unexecuted_lines  | 
 | 65 | +        coverage = len(total_executed_lines) / len(total_lines) * 100 if total_lines else 0.0  | 
 | 66 | +        # coverage = (lines covered of the original function + its 1 level deep helpers) / (lines spanned by original function + its 1 level deep helpers), if no helpers then just the original function coverage  | 
 | 67 | + | 
 | 68 | +        functions_being_tested = [main_func_coverage.name]  | 
 | 69 | +        if dependent_func_coverage:  | 
 | 70 | +            functions_being_tested.append(dependent_func_coverage.name)  | 
 | 71 | + | 
 | 72 | +        graph = CoverageUtils._build_graph(main_func_coverage, dependent_func_coverage)  | 
 | 73 | +        temp_json_file.unlink()  | 
 | 74 | + | 
 | 75 | +        return CoverageData(  | 
 | 76 | +            file_path=source_code_path,  | 
 | 77 | +            coverage=coverage,  | 
 | 78 | +            function_name=function_name,  | 
 | 79 | +            functions_being_tested=functions_being_tested,  | 
 | 80 | +            graph=graph,  | 
 | 81 | +            code_context=code_context,  | 
 | 82 | +            main_func_coverage=main_func_coverage,  | 
 | 83 | +            dependent_func_coverage=dependent_func_coverage,  | 
 | 84 | +            status=status,  | 
 | 85 | +        )  | 
 | 86 | + | 
 | 87 | +    @staticmethod  | 
 | 88 | +    def _parse_coverage_file(  | 
 | 89 | +        coverage_file_path: Path, source_code_path: Path  | 
 | 90 | +    ) -> tuple[dict[str, dict[str, Any]], CoverageStatus]:  | 
 | 91 | +        with coverage_file_path.open() as f:  | 
 | 92 | +            coverage_data = json.load(f)  | 
 | 93 | + | 
 | 94 | +        candidates = generate_candidates(source_code_path)  | 
 | 95 | + | 
 | 96 | +        logger.debug(f"Looking for coverage data in {' -> '.join(candidates)}")  | 
 | 97 | +        for candidate in candidates:  | 
 | 98 | +            try:  | 
 | 99 | +                cov: dict[str, dict[str, Any]] = coverage_data["files"][candidate]["functions"]  | 
 | 100 | +                logger.debug(f"Coverage data found for {source_code_path} in {candidate}")  | 
 | 101 | +                status = CoverageStatus.PARSED_SUCCESSFULLY  | 
 | 102 | +                break  | 
 | 103 | +            except KeyError:  | 
 | 104 | +                continue  | 
 | 105 | +        else:  | 
 | 106 | +            logger.debug(f"No coverage data found for {source_code_path} in {candidates}")  | 
 | 107 | +            cov = {}  | 
 | 108 | +            status = CoverageStatus.NOT_FOUND  | 
 | 109 | +        return cov, status  | 
 | 110 | + | 
 | 111 | +    @staticmethod  | 
 | 112 | +    def _fetch_function_coverages(  | 
 | 113 | +        function_name: str,  | 
 | 114 | +        code_context: CodeOptimizationContext,  | 
 | 115 | +        coverage_data: dict[str, dict[str, Any]],  | 
 | 116 | +        original_cov_data: dict[str, dict[str, Any]],  | 
 | 117 | +    ) -> tuple[FunctionCoverage, Union[FunctionCoverage, None]]:  | 
 | 118 | +        resolved_name = build_fully_qualified_name(function_name, code_context)  | 
 | 119 | +        try:  | 
 | 120 | +            main_function_coverage = FunctionCoverage(  | 
 | 121 | +                name=resolved_name,  | 
 | 122 | +                coverage=coverage_data[resolved_name]["summary"]["percent_covered"],  | 
 | 123 | +                executed_lines=coverage_data[resolved_name]["executed_lines"],  | 
 | 124 | +                unexecuted_lines=coverage_data[resolved_name]["missing_lines"],  | 
 | 125 | +                executed_branches=coverage_data[resolved_name]["executed_branches"],  | 
 | 126 | +                unexecuted_branches=coverage_data[resolved_name]["missing_branches"],  | 
 | 127 | +            )  | 
 | 128 | +        except KeyError:  | 
 | 129 | +            main_function_coverage = FunctionCoverage(  | 
 | 130 | +                name=resolved_name,  | 
 | 131 | +                coverage=0,  | 
 | 132 | +                executed_lines=[],  | 
 | 133 | +                unexecuted_lines=[],  | 
 | 134 | +                executed_branches=[],  | 
 | 135 | +                unexecuted_branches=[],  | 
 | 136 | +            )  | 
 | 137 | + | 
 | 138 | +        dependent_function = extract_dependent_function(function_name, code_context)  | 
 | 139 | +        dependent_func_coverage = (  | 
 | 140 | +            CoverageUtils.grab_dependent_function_from_coverage_data(  | 
 | 141 | +                dependent_function, coverage_data, original_cov_data  | 
 | 142 | +            )  | 
 | 143 | +            if dependent_function  | 
 | 144 | +            else None  | 
 | 145 | +        )  | 
 | 146 | + | 
 | 147 | +        return main_function_coverage, dependent_func_coverage  | 
 | 148 | + | 
 | 149 | +    @staticmethod  | 
 | 150 | +    def _aggregate_coverage(  | 
 | 151 | +        main_func_coverage: FunctionCoverage, dependent_func_coverage: Union[FunctionCoverage, None]  | 
 | 152 | +    ) -> tuple[set[int], set[int]]:  | 
 | 153 | +        total_executed_lines = set(main_func_coverage.executed_lines)  | 
 | 154 | +        total_unexecuted_lines = set(main_func_coverage.unexecuted_lines)  | 
 | 155 | + | 
 | 156 | +        if dependent_func_coverage:  | 
 | 157 | +            total_executed_lines.update(dependent_func_coverage.executed_lines)  | 
 | 158 | +            total_unexecuted_lines.update(dependent_func_coverage.unexecuted_lines)  | 
 | 159 | + | 
 | 160 | +        return total_executed_lines, total_unexecuted_lines  | 
 | 161 | + | 
 | 162 | +    @staticmethod  | 
 | 163 | +    def _build_graph(  | 
 | 164 | +        main_func_coverage: FunctionCoverage, dependent_func_coverage: Union[FunctionCoverage, None]  | 
 | 165 | +    ) -> dict[str, dict[str, Collection[object]]]:  | 
 | 166 | +        graph = {  | 
 | 167 | +            main_func_coverage.name: {  | 
 | 168 | +                "executed_lines": set(main_func_coverage.executed_lines),  | 
 | 169 | +                "unexecuted_lines": set(main_func_coverage.unexecuted_lines),  | 
 | 170 | +                "executed_branches": main_func_coverage.executed_branches,  | 
 | 171 | +                "unexecuted_branches": main_func_coverage.unexecuted_branches,  | 
 | 172 | +            }  | 
 | 173 | +        }  | 
 | 174 | + | 
 | 175 | +        if dependent_func_coverage:  | 
 | 176 | +            graph[dependent_func_coverage.name] = {  | 
 | 177 | +                "executed_lines": set(dependent_func_coverage.executed_lines),  | 
 | 178 | +                "unexecuted_lines": set(dependent_func_coverage.unexecuted_lines),  | 
 | 179 | +                "executed_branches": dependent_func_coverage.executed_branches,  | 
 | 180 | +                "unexecuted_branches": dependent_func_coverage.unexecuted_branches,  | 
 | 181 | +            }  | 
 | 182 | + | 
 | 183 | +        return graph  | 
 | 184 | + | 
 | 185 | +    @staticmethod  | 
 | 186 | +    def grab_dependent_function_from_coverage_data(  | 
 | 187 | +        dependent_function_name: str,  | 
 | 188 | +        coverage_data: dict[str, dict[str, Any]],  | 
 | 189 | +        original_cov_data: dict[str, dict[str, Any]],  | 
 | 190 | +    ) -> FunctionCoverage:  | 
 | 191 | +        """Grab the dependent function from the coverage data."""  | 
 | 192 | +        try:  | 
 | 193 | +            return FunctionCoverage(  | 
 | 194 | +                name=dependent_function_name,  | 
 | 195 | +                coverage=coverage_data[dependent_function_name]["summary"]["percent_covered"],  | 
 | 196 | +                executed_lines=coverage_data[dependent_function_name]["executed_lines"],  | 
 | 197 | +                unexecuted_lines=coverage_data[dependent_function_name]["missing_lines"],  | 
 | 198 | +                executed_branches=coverage_data[dependent_function_name]["executed_branches"],  | 
 | 199 | +                unexecuted_branches=coverage_data[dependent_function_name]["missing_branches"],  | 
 | 200 | +            )  | 
 | 201 | +        except KeyError:  | 
 | 202 | +            msg = f"Coverage data not found for dependent function {dependent_function_name} in the coverage data"  | 
 | 203 | +            try:  | 
 | 204 | +                files = original_cov_data["files"]  | 
 | 205 | +                for file in files:  | 
 | 206 | +                    functions = files[file]["functions"]  | 
 | 207 | +                    for function in functions:  | 
 | 208 | +                        if dependent_function_name in function:  | 
 | 209 | +                            return FunctionCoverage(  | 
 | 210 | +                                name=dependent_function_name,  | 
 | 211 | +                                coverage=functions[function]["summary"]["percent_covered"],  | 
 | 212 | +                                executed_lines=functions[function]["executed_lines"],  | 
 | 213 | +                                unexecuted_lines=functions[function]["missing_lines"],  | 
 | 214 | +                                executed_branches=functions[function]["executed_branches"],  | 
 | 215 | +                                unexecuted_branches=functions[function]["missing_branches"],  | 
 | 216 | +                            )  | 
 | 217 | +                msg = f"Coverage data not found for dependent function {dependent_function_name} in the original coverage data"  | 
 | 218 | +            except KeyError:  | 
 | 219 | +                raise ValueError(msg) from None  | 
 | 220 | + | 
 | 221 | +        return FunctionCoverage(  | 
 | 222 | +            name=dependent_function_name,  | 
 | 223 | +            coverage=0,  | 
 | 224 | +            executed_lines=[],  | 
 | 225 | +            unexecuted_lines=[],  | 
 | 226 | +            executed_branches=[],  | 
 | 227 | +            unexecuted_branches=[],  | 
 | 228 | +        )  | 
 | 229 | + | 
0 commit comments