1+ from typing import Iterable
2+ from mutmut .type_checking import TypeCheckingError
3+ from mutmut .type_checking import run_type_checker
4+ from typing import Any
15import os
26import sys
37import platform
4751 process_time ,
4852 sleep ,
4953)
50- from typing import (
51- Dict ,
52- List ,
53- Union ,
54- Optional ,
55- )
5654import warnings
5755
5856import click
8482 34 : 'skipped' ,
8583 35 : 'suspicious' ,
8684 36 : 'timeout' ,
85+ 6 : 'caught by type check' ,
8786 - 24 : 'timeout' , # SIGXCPU
8887 24 : 'timeout' , # SIGXCPU
8988 152 : 'timeout' , # SIGXCPU
9897 'timeout' : '⏰' ,
9998 'suspicious' : '🤔' ,
10099 'skipped' : '🔇' ,
100+ 'caught by type check' : 'ö' ,
101101 'check was interrupted by user' : '🛑' ,
102102 'not checked' : '?' ,
103103 'killed' : '🎉' ,
@@ -187,7 +187,7 @@ def __init__(self, pytest_args: list[str]) -> None:
187187
188188
189189class InvalidGeneratedSyntaxException (Exception ):
190- def __init__ (self , file : Union [ Path , str ] ) -> None :
190+ def __init__ (self , file : Path | str ) -> None :
191191 super ().__init__ (f'Mutmut generated invalid python syntax for { file } . '
192192 'If the original file has valid python syntax, please file an issue '
193193 'with a minimal reproducible example file.' )
@@ -206,7 +206,7 @@ def copy_src_dir():
206206class FileMutationResult :
207207 """Dataclass to transfer warnings and errors from child processes to the parent"""
208208 warnings : list [Warning ]
209- error : Optional [ Exception ] = None
209+ error : Exception | None = None
210210
211211def create_mutants (max_children : int ):
212212 with Pool (processes = max_children ) as p :
@@ -364,6 +364,108 @@ def save(self):
364364 estimated_durations_by_key = self .estimated_time_of_tests_by_mutant ,
365365 ), f , indent = 4 )
366366
367+ def filter_mutants_with_type_checker ():
368+ with change_cwd (Path ('mutants' )):
369+ errors = run_type_checker (mutmut .config .type_check_command )
370+ grouped_errors = group_by_path (errors )
371+
372+ mutants_to_skip : list [FailedTypeCheckMutant ] = []
373+
374+ for path , errors_of_file in grouped_errors .items ():
375+ with open (path , 'r' , encoding = 'utf-8' ) as file :
376+ source = file .read ()
377+ wrapper = cst .MetadataWrapper (cst .parse_module (source ))
378+ visitor = MutatedMethodsCollector (path )
379+ wrapper .visit (visitor )
380+ mutated_methods = visitor .found_mutants
381+
382+ for error in errors_of_file :
383+ assert error .file_path == visitor .file
384+ mutant = next ((m for m in mutated_methods if m .line_number_start <= error .line_number <= m .line_number_end ), None )
385+ if mutant is None :
386+ # TODO: test_utils.py
387+ if 'test_utils.py' in str (error .file_path ):
388+ # def as_address(self, *args, **kwargs): is missing a return type
389+ # this trips up a call
390+ continue
391+ if 'storage_byte_group.py' in str (error .file_path ):
392+ # mutated __init__ method
393+ # now pyright does not know that self._hexstring is a Hexstring
394+ continue
395+ print (mutated_methods )
396+ print ('Already found' , len (mutants_to_skip ))
397+ raise Exception (f'Could not find mutant for error { error .file_path } :{ error .line_number } ' )
398+
399+ module_name = strip_prefix (str (path .relative_to (Path ('.' ).absolute ()))[:- len (path .suffix )].replace (os .sep , '.' ), prefix = 'src.' )
400+
401+ mutant_name = '.' .join ([module_name , mutant .function_name ]).replace ('.__init__.' , '.' )
402+ mutants_to_skip .append (FailedTypeCheckMutant (
403+ method_location = mutant ,
404+ name = mutant_name ,
405+ ))
406+
407+ return mutants_to_skip
408+
409+
410+ def group_by_path (errors : list [TypeCheckingError ]) -> dict [Path , list [TypeCheckingError ]]:
411+ grouped : dict [Path , list [TypeCheckingError ]] = defaultdict (list )
412+
413+ for error in errors :
414+ grouped [error .file_path ].append (error )
415+
416+ return grouped
417+
418+ @dataclass
419+ class MutatedMethodLocation :
420+ file : Path
421+ function_name : str
422+ line_number_start : int
423+ line_number_end : int
424+
425+
426+ @dataclass
427+ class FailedTypeCheckMutant :
428+ method_location : MutatedMethodLocation
429+ name : str
430+
431+
432+ class MutatedMethodsCollector (cst .CSTVisitor ):
433+ METADATA_DEPENDENCIES = (cst .metadata .PositionProvider ,)
434+
435+ def __init__ (self , file : Path ):
436+ self .file = file
437+ self .found_mutants : list [MutatedMethodLocation ] = []
438+
439+ def visit_FunctionDef (self , node : cst .FunctionDef ) -> bool :
440+ name = node .name .value
441+ if is_mutated_method_name (name ):
442+ range = self .get_metadata (cst .metadata .PositionProvider , node )
443+ self .found_mutants .append (MutatedMethodLocation (
444+ file = self .file ,
445+ function_name = name ,
446+ line_number_start = range .start .line ,
447+ line_number_end = range .end .line ,
448+ ))
449+
450+ # do not continue visting children of this function
451+ # mutated methods are not nested within other methods
452+ return False
453+
454+ def is_mutated_method_name (name : str ):
455+ return name .startswith (('x_' , 'xǁ' )) and '__mutmut' in name
456+
457+ def parse_mutant_methods (file_paths : Iterable [Path ]) -> dict [Path , list [MutatedMethodLocation ]]:
458+ methods : dict [Path , list [MutatedMethodLocation ]] = {}
459+
460+ for path in file_paths :
461+ with open (path , 'r' , encoding = 'utf-8' ) as file :
462+ source = file .read ()
463+ module = cst .parse_module (source )
464+
465+ return methods
466+
467+
468+
367469
368470def unused (* _ ):
369471 pass
@@ -429,8 +531,8 @@ def new_tests(self):
429531
430532class PytestRunner (TestRunner ):
431533 def __init__ (self ):
432- self ._pytest_add_cli_args : List [str ] = mutmut .config .pytest_add_cli_args
433- self ._pytest_add_cli_args_test_selection : List [str ] = mutmut .config .pytest_add_cli_args_test_selection
534+ self ._pytest_add_cli_args : list [str ] = mutmut .config .pytest_add_cli_args
535+ self ._pytest_add_cli_args_test_selection : list [str ] = mutmut .config .pytest_add_cli_args_test_selection
434536
435537 # tests_dir is a special case of a test selection option,
436538 # so also use pytest_add_cli_args_test_selection for the implementation
@@ -612,6 +714,7 @@ class Stat:
612714 timeout : int
613715 check_was_interrupted_by_user : int
614716 segfault : int
717+ caught_by_type_check : int
615718
616719
617720def collect_stat (m : SourceFileMutationData ):
@@ -641,12 +744,13 @@ def calculate_summary_stats(source_file_mutation_data_by_path):
641744 timeout = sum (x .timeout for x in stats ),
642745 check_was_interrupted_by_user = sum (x .check_was_interrupted_by_user for x in stats ),
643746 segfault = sum (x .segfault for x in stats ),
747+ caught_by_type_check = sum (x .caught_by_type_check for x in stats ),
644748 )
645749
646750
647751def print_stats (source_file_mutation_data_by_path , force_output = False ):
648752 s = calculate_summary_stats (source_file_mutation_data_by_path )
649- print_status (f'{ (s .total - s .not_checked )} /{ s .total } 🎉 { s .killed } 🫥 { s .no_tests } ⏰ { s .timeout } 🤔 { s .suspicious } 🙁 { s .survived } 🔇 { s .skipped } ' , force_output = force_output )
753+ print_status (f'{ (s .total - s .not_checked )} /{ s .total } 🎉 { s .killed } 🫥 { s .no_tests } ⏰ { s .timeout } 🤔 { s .suspicious } 🙁 { s .survived } 🔇 { s .skipped } X { s . caught_by_type_check } ' , force_output = force_output )
650754
651755
652756def run_forced_fail_test (runner ):
@@ -713,15 +817,16 @@ def __exit__(self, exc_type, exc_val, exc_tb):
713817
714818@dataclass
715819class Config :
716- also_copy : List [Path ]
717- do_not_mutate : List [str ]
820+ also_copy : list [Path ]
821+ do_not_mutate : list [str ]
718822 max_stack_depth : int
719823 debug : bool
720- paths_to_mutate : List [Path ]
721- pytest_add_cli_args : List [str ]
722- pytest_add_cli_args_test_selection : List [str ]
723- tests_dir : List [str ]
824+ paths_to_mutate : list [Path ]
825+ pytest_add_cli_args : list [str ]
826+ pytest_add_cli_args_test_selection : list [str ]
827+ tests_dir : list [str ]
724828 mutate_only_covered_lines : bool
829+ type_check_command : list [str ]
725830
726831 def should_ignore_for_mutation (self , path ):
727832 if not str (path ).endswith ('.py' ):
@@ -758,7 +863,7 @@ def s(key, default):
758863 config_parser = ConfigParser ()
759864 config_parser .read ('setup.cfg' )
760865
761- def s (key : str , default ):
866+ def s (key : str , default ) -> Any :
762867 try :
763868 result = config_parser .get ('mutmut' , key )
764869 except (NoOptionError , NoSectionError ):
@@ -805,6 +910,7 @@ def load_config():
805910 tests_dir = s ('tests_dir' , []),
806911 pytest_add_cli_args = s ('pytest_add_cli_args' , []),
807912 pytest_add_cli_args_test_selection = s ('pytest_add_cli_args_test_selection' , []),
913+ type_check_command = s ('type_check_command' , []),
808914 )
809915
810916
@@ -923,7 +1029,7 @@ def save_cicd_stats(source_file_mutation_data_by_path):
9231029def export_cicd_stats ():
9241030 ensure_config_loaded ()
9251031
926- source_file_mutation_data_by_path : Dict [str , SourceFileMutationData ] = {}
1032+ source_file_mutation_data_by_path : dict [str , SourceFileMutationData ] = {}
9271033
9281034 for path in walk_source_files ():
9291035 if mutmut .config .should_ignore_for_mutation (path ):
@@ -949,7 +1055,7 @@ def export_cicd_stats():
9491055
9501056
9511057def collect_source_file_mutation_data (* , mutant_names ):
952- source_file_mutation_data_by_path : Dict [str , SourceFileMutationData ] = {}
1058+ source_file_mutation_data_by_path : dict [str , SourceFileMutationData ] = {}
9531059
9541060 for path in walk_source_files ():
9551061 if mutmut .config .should_ignore_for_mutation (path ):
@@ -1054,7 +1160,7 @@ def run(mutant_names, *, max_children):
10541160 _run (mutant_names , max_children )
10551161
10561162# separate function, so we can call it directly from the tests
1057- def _run (mutant_names : Union [ tuple , list ] , max_children : Union [ None , int ] ):
1163+ def _run (mutant_names : tuple | list , max_children : None | int ):
10581164 # TODO: run no-ops once in a while to detect if we get false negatives
10591165 # TODO: we should be able to get information on which tests killed mutants, which means we can get a list of tests and how many mutants each test kills. Those that kill zero mutants are redundant!
10601166 os .environ ['MUTANT_UNDER_TEST' ] = 'mutant_generation'
@@ -1075,6 +1181,12 @@ def _run(mutant_names: Union[tuple, list], max_children: Union[None, int]):
10751181 time = datetime .now () - start
10761182 print (f' done in { round (time .total_seconds ()* 1000 )} ms' , )
10771183
1184+ if mutmut .config .type_check_command :
1185+ with CatchOutput (spinner_title = 'Filtering mutations with type checker' ):
1186+ failed_type_check_mutants = filter_mutants_with_type_checker ()
1187+ else :
1188+ failed_type_check_mutants = []
1189+
10781190 # TODO: config/option for runner
10791191 # runner = HammettRunner()
10801192 runner = PytestRunner ()
@@ -1109,7 +1221,7 @@ def read_one_child_exit_status():
11091221 print (' worker exit code' , exit_code )
11101222 source_file_mutation_data_by_pid [pid ].register_result (pid = pid , exit_code = exit_code )
11111223
1112- source_file_mutation_data_by_pid : Dict [int , SourceFileMutationData ] = {} # many pids map to one MutationData
1224+ source_file_mutation_data_by_pid : dict [int , SourceFileMutationData ] = {} # many pids map to one MutationData
11131225 running_children = 0
11141226 count_tried = 0
11151227
@@ -1149,6 +1261,12 @@ def read_one_child_exit_status():
11491261 m .save ()
11501262 continue
11511263
1264+ failed_type_check_mutant = next ((m for m in failed_type_check_mutants if m .name == mutant_name ), None )
1265+ if failed_type_check_mutant :
1266+ m .exit_code_by_key [mutant_name ] = 6
1267+ m .save ()
1268+ continue
1269+
11521270 pid = os .fork ()
11531271 if not pid :
11541272 # In the child
@@ -1253,7 +1371,7 @@ def read_orig_module(path) -> cst.Module:
12531371 return cst .parse_module (f .read ())
12541372
12551373
1256- def find_top_level_function_or_method (module : cst .Module , name : str ) -> Union [ cst .FunctionDef , None ] :
1374+ def find_top_level_function_or_method (module : cst .Module , name : str ) -> cst .FunctionDef | None :
12571375 name = name .split ('.' )[- 1 ]
12581376 for child in module .body :
12591377 if isinstance (child , cst .FunctionDef ) and child .name .value == name :
0 commit comments