@@ -250,209 +250,6 @@ class CoverageData:
250250 blank_re : Pattern [str ] = re .compile (r"\s*(#|$)" )
251251 else_re : Pattern [str ] = re .compile (r"\s*else\s*:\s*(#|$)" )
252252
253- @staticmethod
254- def load_from_sqlite_database (
255- database_path : Path , config_path : Path , function_name : str , code_context : CodeOptimizationContext , source_code_path : Path
256- ) -> CoverageData :
257- """Load coverage data from an SQLite database, mimicking the behavior of load_from_coverage_file."""
258- from coverage import Coverage
259- from coverage .jsonreport import JsonReporter
260-
261- cov = Coverage (data_file = database_path ,config_file = config_path , data_suffix = True , auto_data = True , branch = True )
262-
263- if not database_path .stat ().st_size or not database_path .exists ():
264- logger .debug (f"Coverage database { database_path } is empty or does not exist" )
265- sentry_sdk .capture_message (f"Coverage database { database_path } is empty or does not exist" )
266- return CoverageData .create_empty (source_code_path , function_name , code_context )
267- cov .load ()
268-
269- reporter = JsonReporter (cov )
270- temp_json_file = database_path .with_suffix (".report.json" )
271- with temp_json_file .open ("w" ) as f :
272- try :
273- reporter .report (morfs = [source_code_path .as_posix ()], outfile = f )
274- except NoDataError :
275- sentry_sdk .capture_message (f"No coverage data found for { function_name } in { source_code_path } " )
276- return CoverageData .create_empty (source_code_path , function_name , code_context )
277- with temp_json_file .open () as f :
278- original_coverage_data = json .load (f )
279-
280- coverage_data , status = CoverageData ._parse_coverage_file (temp_json_file , source_code_path )
281-
282- main_func_coverage , dependent_func_coverage = CoverageData ._fetch_function_coverages (
283- function_name , code_context , coverage_data , original_cov_data = original_coverage_data
284- )
285-
286- total_executed_lines , total_unexecuted_lines = CoverageData ._aggregate_coverage (
287- main_func_coverage , dependent_func_coverage
288- )
289-
290- total_lines = total_executed_lines | total_unexecuted_lines
291- coverage = len (total_executed_lines ) / len (total_lines ) * 100 if total_lines else 0.0
292- # coverage = (lines covered of the original function + its 1 level deep helpers) / (lines spanned by original function + its 1 level deep helpers), if no helpers then just the original function coverage
293-
294- functions_being_tested = [main_func_coverage .name ]
295- if dependent_func_coverage :
296- functions_being_tested .append (dependent_func_coverage .name )
297-
298- graph = CoverageData ._build_graph (main_func_coverage , dependent_func_coverage )
299- temp_json_file .unlink ()
300-
301- return CoverageData (
302- file_path = source_code_path ,
303- coverage = coverage ,
304- function_name = function_name ,
305- functions_being_tested = functions_being_tested ,
306- graph = graph ,
307- code_context = code_context ,
308- main_func_coverage = main_func_coverage ,
309- dependent_func_coverage = dependent_func_coverage ,
310- status = status ,
311- )
312-
313- @staticmethod
314- def _parse_coverage_file (
315- coverage_file_path : Path , source_code_path : Path
316- ) -> tuple [dict [str , dict [str , Any ]], CoverageStatus ]:
317- with coverage_file_path .open () as f :
318- coverage_data = json .load (f )
319-
320- candidates = generate_candidates (source_code_path )
321-
322- logger .debug (f"Looking for coverage data in { ' -> ' .join (candidates )} " )
323- for candidate in candidates :
324- try :
325- cov : dict [str , dict [str , Any ]] = coverage_data ["files" ][candidate ]["functions" ]
326- logger .debug (f"Coverage data found for { source_code_path } in { candidate } " )
327- status = CoverageStatus .PARSED_SUCCESSFULLY
328- break
329- except KeyError :
330- continue
331- else :
332- logger .debug (f"No coverage data found for { source_code_path } in { candidates } " )
333- cov = {}
334- status = CoverageStatus .NOT_FOUND
335- return cov , status
336-
337- @staticmethod
338- def _fetch_function_coverages (
339- function_name : str ,
340- code_context : CodeOptimizationContext ,
341- coverage_data : dict [str , dict [str , Any ]],
342- original_cov_data : dict [str , dict [str , Any ]],
343- ) -> tuple [FunctionCoverage , Union [FunctionCoverage , None ]]:
344- resolved_name = build_fully_qualified_name (function_name , code_context )
345- try :
346- main_function_coverage = FunctionCoverage (
347- name = resolved_name ,
348- coverage = coverage_data [resolved_name ]["summary" ]["percent_covered" ],
349- executed_lines = coverage_data [resolved_name ]["executed_lines" ],
350- unexecuted_lines = coverage_data [resolved_name ]["missing_lines" ],
351- executed_branches = coverage_data [resolved_name ]["executed_branches" ],
352- unexecuted_branches = coverage_data [resolved_name ]["missing_branches" ],
353- )
354- except KeyError :
355- main_function_coverage = FunctionCoverage (
356- name = resolved_name ,
357- coverage = 0 ,
358- executed_lines = [],
359- unexecuted_lines = [],
360- executed_branches = [],
361- unexecuted_branches = [],
362- )
363-
364- dependent_function = extract_dependent_function (function_name , code_context )
365- dependent_func_coverage = (
366- CoverageData .grab_dependent_function_from_coverage_data (
367- dependent_function , coverage_data , original_cov_data
368- )
369- if dependent_function
370- else None
371- )
372-
373- return main_function_coverage , dependent_func_coverage
374-
375- @staticmethod
376- def _aggregate_coverage (
377- main_func_coverage : FunctionCoverage , dependent_func_coverage : Union [FunctionCoverage , None ]
378- ) -> tuple [set [int ], set [int ]]:
379- total_executed_lines = set (main_func_coverage .executed_lines )
380- total_unexecuted_lines = set (main_func_coverage .unexecuted_lines )
381-
382- if dependent_func_coverage :
383- total_executed_lines .update (dependent_func_coverage .executed_lines )
384- total_unexecuted_lines .update (dependent_func_coverage .unexecuted_lines )
385-
386- return total_executed_lines , total_unexecuted_lines
387-
388- @staticmethod
389- def _build_graph (
390- main_func_coverage : FunctionCoverage , dependent_func_coverage : Union [FunctionCoverage , None ]
391- ) -> dict [str , dict [str , Collection [object ]]]:
392- graph = {
393- main_func_coverage .name : {
394- "executed_lines" : set (main_func_coverage .executed_lines ),
395- "unexecuted_lines" : set (main_func_coverage .unexecuted_lines ),
396- "executed_branches" : main_func_coverage .executed_branches ,
397- "unexecuted_branches" : main_func_coverage .unexecuted_branches ,
398- }
399- }
400-
401- if dependent_func_coverage :
402- graph [dependent_func_coverage .name ] = {
403- "executed_lines" : set (dependent_func_coverage .executed_lines ),
404- "unexecuted_lines" : set (dependent_func_coverage .unexecuted_lines ),
405- "executed_branches" : dependent_func_coverage .executed_branches ,
406- "unexecuted_branches" : dependent_func_coverage .unexecuted_branches ,
407- }
408-
409- return graph
410-
411- @staticmethod
412- def grab_dependent_function_from_coverage_data (
413- dependent_function_name : str ,
414- coverage_data : dict [str , dict [str , Any ]],
415- original_cov_data : dict [str , dict [str , Any ]],
416- ) -> FunctionCoverage :
417- """Grab the dependent function from the coverage data."""
418- try :
419- return FunctionCoverage (
420- name = dependent_function_name ,
421- coverage = coverage_data [dependent_function_name ]["summary" ]["percent_covered" ],
422- executed_lines = coverage_data [dependent_function_name ]["executed_lines" ],
423- unexecuted_lines = coverage_data [dependent_function_name ]["missing_lines" ],
424- executed_branches = coverage_data [dependent_function_name ]["executed_branches" ],
425- unexecuted_branches = coverage_data [dependent_function_name ]["missing_branches" ],
426- )
427- except KeyError :
428- msg = f"Coverage data not found for dependent function { dependent_function_name } in the coverage data"
429- try :
430- files = original_cov_data ["files" ]
431- for file in files :
432- functions = files [file ]["functions" ]
433- for function in functions :
434- if dependent_function_name in function :
435- return FunctionCoverage (
436- name = dependent_function_name ,
437- coverage = functions [function ]["summary" ]["percent_covered" ],
438- executed_lines = functions [function ]["executed_lines" ],
439- unexecuted_lines = functions [function ]["missing_lines" ],
440- executed_branches = functions [function ]["executed_branches" ],
441- unexecuted_branches = functions [function ]["missing_branches" ],
442- )
443- msg = f"Coverage data not found for dependent function { dependent_function_name } in the original coverage data"
444- except KeyError :
445- raise ValueError (msg ) from None
446-
447- return FunctionCoverage (
448- name = dependent_function_name ,
449- coverage = 0 ,
450- executed_lines = [],
451- unexecuted_lines = [],
452- executed_branches = [],
453- unexecuted_branches = [],
454- )
455-
456253 def build_message (self ) -> str :
457254 if self .status == CoverageStatus .NOT_FOUND :
458255 return f"No coverage data found for { self .function_name } "
@@ -504,7 +301,6 @@ def create_empty(cls, file_path: Path, function_name: str, code_context: CodeOpt
504301 status = CoverageStatus .NOT_FOUND ,
505302 )
506303
507-
508304@dataclass
509305class FunctionCoverage :
510306 """Represents the coverage data for a specific function in a source file."""
0 commit comments