|
1 |
| -import xml.etree.ElementTree as ET |
2 | 1 | from importlib import resources
|
3 | 2 | from typing import Any, Dict, List
|
4 | 3 |
|
5 | 4 | from grep_ast import filename_to_lang
|
6 | 5 | from tree_sitter_languages import get_language, get_parser
|
7 | 6 |
|
8 |
| -from mutahunter.core.entities.config import MutahunterConfig |
9 | 7 | from mutahunter.core.logger import logger
|
10 | 8 |
|
11 | 9 |
|
12 | 10 | class Analyzer:
|
13 |
| - def __init__(self, config: MutahunterConfig) -> None: |
14 |
| - """ |
15 |
| - Initializes the Analyzer with the given configuration. |
16 |
| -
|
17 |
| - Args: |
18 |
| - config (Dict[str, Any]): The configuration dictionary. |
19 |
| - """ |
20 |
| - self.config = config |
21 |
| - self.line_rate = None |
22 |
| - self.file_lines_executed = None |
23 |
| - |
24 |
| - def run_coverage_analysis(self) -> Dict[str, List[int]]: |
25 |
| - """ |
26 |
| - Parses the appropriate coverage report based on the coverage type. |
27 |
| -
|
28 |
| - Returns: |
29 |
| - Dict[str, List[int]]: A dictionary where keys are filenames and values are lists of covered line numbers. |
30 |
| - """ |
31 |
| - coverage_type_parsers = { |
32 |
| - "cobertura": self.parse_coverage_report_cobertura, |
33 |
| - "jacoco": self.parse_coverage_report_jacoco, |
34 |
| - "lcov": self.parse_coverage_report_lcov, |
35 |
| - } |
36 |
| - |
37 |
| - if self.config.coverage_type in coverage_type_parsers: |
38 |
| - return coverage_type_parsers[self.config.coverage_type]() |
39 |
| - else: |
40 |
| - raise ValueError( |
41 |
| - "Invalid coverage tool. Please specify either 'cobertura', 'jacoco', or 'lcov'." |
42 |
| - ) |
43 |
| - |
44 |
| - def parse_coverage_report_lcov(self) -> Dict[str, List[int]]: |
45 |
| - """ |
46 |
| - Parses an LCOV code coverage report to extract covered line numbers for each file and calculate overall line coverage. |
47 |
| -
|
48 |
| - Returns: |
49 |
| - Dict[str, Any]: A dictionary where keys are filenames and values are lists of covered line numbers. |
50 |
| - Additionally, it includes the overall line coverage percentage. |
51 |
| - """ |
52 |
| - self.file_lines_executed = {} |
53 |
| - current_file = None |
54 |
| - total_lines_found = 0 |
55 |
| - total_lines_hit = 0 |
56 |
| - |
57 |
| - with open(self.config.code_coverage_report_path, "r") as file: |
58 |
| - lines = file.readlines() |
59 |
| - for line in lines: |
60 |
| - if line.startswith("SF:"): |
61 |
| - current_file = line.strip().split(":", 1)[1] |
62 |
| - self.file_lines_executed[current_file] = [] |
63 |
| - elif line.startswith("DA:") and current_file: |
64 |
| - parts = line.strip().split(":")[1].split(",") |
65 |
| - hits = int(parts[1]) |
66 |
| - if hits > 0: |
67 |
| - line_number = int(parts[0]) |
68 |
| - self.file_lines_executed[current_file].append(line_number) |
69 |
| - elif line.startswith("LF:") and current_file: |
70 |
| - total_lines_found += int(line.strip().split(":")[1]) |
71 |
| - elif line.startswith("LH:") and current_file: |
72 |
| - total_lines_hit += int(line.strip().split(":")[1]) |
73 |
| - elif line.startswith("end_of_record"): |
74 |
| - current_file = None |
75 |
| - self.line_rate = ( |
76 |
| - (total_lines_hit / total_lines_found) if total_lines_found else 0.0 |
77 |
| - ) |
| 11 | + def __init__(self) -> None: |
| 12 | + pass |
78 | 13 |
|
79 |
| - def parse_coverage_report_cobertura(self) -> Dict[str, List[int]]: |
| 14 | + def get_language_by_filename(self, filename: str) -> str: |
80 | 15 | """
|
81 |
| - Parses a Cobertura XML code coverage report to extract covered line numbers for each file. |
| 16 | + Gets the language identifier based on the filename. |
82 | 17 |
|
83 |
| - Returns: |
84 |
| - Dict[str, List[int]]: A dictionary where keys are filenames and values are lists of covered line numbers. |
85 |
| - """ |
86 |
| - tree = ET.parse(self.config.code_coverage_report_path) |
87 |
| - root = tree.getroot() |
88 |
| - self.file_lines_executed = {} |
89 |
| - self.line_rate = float(root.get("line-rate", 0)) |
90 |
| - for cls in root.findall(".//class"): |
91 |
| - name_attr = cls.get("filename") |
92 |
| - executed_lines = [] |
93 |
| - for line in cls.findall(".//line"): |
94 |
| - line_number = int(line.get("number")) |
95 |
| - hits = int(line.get("hits")) |
96 |
| - if hits > 0: |
97 |
| - executed_lines.append(line_number) |
98 |
| - if executed_lines: |
99 |
| - self.file_lines_executed[name_attr] = executed_lines |
100 |
| - |
101 |
| - def parse_coverage_report_jacoco(self) -> Dict[str, Any]: |
102 |
| - """ |
103 |
| - Parses a JaCoCo XML code coverage report to extract covered line numbers for each file and calculate overall line coverage. |
| 18 | + Args: |
| 19 | + filename (str): The name of the file. |
104 | 20 |
|
105 | 21 | Returns:
|
106 |
| - Dict[str, Any]: A dictionary where keys are file paths and values are lists of covered line numbers. |
107 |
| - Additionally, it includes the overall line coverage percentage. |
| 22 | + str: The language identifier. |
108 | 23 | """
|
109 |
| - tree = ET.parse(self.config.code_coverage_report_path) |
110 |
| - root = tree.getroot() |
111 |
| - self.file_lines_executed = {} |
112 |
| - |
113 |
| - total_lines_missed = 0 |
114 |
| - total_lines_covered = 0 |
115 |
| - |
116 |
| - for package in root.findall(".//package"): |
117 |
| - package_name = package.get("name").replace("/", ".") |
118 |
| - for sourcefile in package.findall(".//sourcefile"): |
119 |
| - filename = sourcefile.get("name") |
120 |
| - # Construct the full file path with the src/main/java directory |
121 |
| - full_filename = ( |
122 |
| - f"src/main/java/{package_name.replace('.', '/')}/{filename}" |
123 |
| - ) |
124 |
| - executed_lines = [] |
125 |
| - for line in sourcefile.findall(".//line"): |
126 |
| - line_number = int(line.get("nr")) |
127 |
| - missed = int(line.get("mi")) |
128 |
| - covered = int(line.get("ci")) |
129 |
| - if covered > 0: |
130 |
| - executed_lines.append(line_number) |
131 |
| - total_lines_missed += missed |
132 |
| - total_lines_covered += covered |
133 |
| - if executed_lines: |
134 |
| - self.file_lines_executed[full_filename] = executed_lines |
135 |
| - |
136 |
| - self.line_rate = ( |
137 |
| - (total_lines_covered / (total_lines_covered + total_lines_missed)) |
138 |
| - if (total_lines_covered + total_lines_missed) > 0 |
139 |
| - else 0.0 |
140 |
| - ) |
| 24 | + return filename_to_lang(filename) |
141 | 25 |
|
142 | 26 | def get_covered_function_blocks(
|
143 | 27 | self, executed_lines: List[int], source_file_path: str
|
@@ -341,3 +225,57 @@ def _load_query_scm(self, lang: str) -> str:
|
341 | 225 | if not scm_fname.exists():
|
342 | 226 | return ""
|
343 | 227 | return scm_fname.read_text()
|
| 228 | + |
| 229 | + def find_function_block_by_name( |
| 230 | + self, source_file_path: str, method_name: str |
| 231 | + ) -> List[Any]: |
| 232 | + """ |
| 233 | + Finds a function block by its name and returns the start and end lines of the function. |
| 234 | +
|
| 235 | + Args: |
| 236 | + source_file_path (str): The path to the source file. |
| 237 | + method_name (str): The name of the method to find. |
| 238 | +
|
| 239 | + Returns: |
| 240 | + Dict[str, int]: A dictionary with 'start_line' and 'end_line' as keys and their corresponding line numbers as values. |
| 241 | + """ |
| 242 | + source_code = self._read_source_file(source_file_path) |
| 243 | + lang = filename_to_lang(source_file_path) |
| 244 | + if lang is None: |
| 245 | + raise ValueError(f"Language not supported for file: {source_file_path}") |
| 246 | + |
| 247 | + parser = get_parser(lang) |
| 248 | + language = get_language(lang) |
| 249 | + tree = parser.parse(source_code) |
| 250 | + |
| 251 | + query_scm = self._load_query_scm(lang) |
| 252 | + if not query_scm: |
| 253 | + raise ValueError( |
| 254 | + "Failed to load query SCM file for the specified language." |
| 255 | + ) |
| 256 | + |
| 257 | + query = language.query(query_scm) |
| 258 | + captures = query.captures(tree.root_node) |
| 259 | + |
| 260 | + result = [] |
| 261 | + |
| 262 | + for node, tag in captures: |
| 263 | + if tag == "definition.function" or tag == "definition.method": |
| 264 | + if self._is_function_name(node, method_name, source_code): |
| 265 | + return node |
| 266 | + raise ValueError(f"Function {method_name} not found in file {source_file_path}") |
| 267 | + |
| 268 | + def _is_function_name(self, node, method_name: str, source_code: bytes) -> bool: |
| 269 | + """ |
| 270 | + Checks if the given node corresponds to the method_name. |
| 271 | +
|
| 272 | + Args: |
| 273 | + node (Node): The AST node to check. |
| 274 | + method_name (str): The method name to find. |
| 275 | + source_code (bytes): The source code. |
| 276 | +
|
| 277 | + Returns: |
| 278 | + bool: True if the node corresponds to the method_name, False otherwise. |
| 279 | + """ |
| 280 | + node_text = source_code[node.start_byte : node.end_byte].decode("utf8") |
| 281 | + return method_name in node_text |
0 commit comments