@@ -23,26 +23,17 @@ def __init__(
2323 self .replacement = replacement
2424 self .change_description = change_description
2525
26- def apply (
27- self ,
28- context : CodemodExecutionContext ,
29- file_context : FileContext ,
30- results : list [Result ] | None ,
31- ) -> ChangeSet | None :
26+ def _apply_regex (self , line ):
27+ return re .sub (self .pattern , self .replacement , line )
28+
29+ def _apply (self , original_lines , file_context , results ):
3230 del results
3331
3432 changes = []
3533 updated_lines = []
3634
37- original_lines = (
38- file_context .file_path .read_bytes ()
39- .decode ("utf-8" )
40- .splitlines (keepends = True )
41- )
42-
4335 for lineno , line in enumerate (original_lines ):
44- # TODO: use results to filter out which lines to change
45- changed_line = re .sub (self .pattern , self .replacement , line )
36+ changed_line = self ._apply_regex (line )
4637 updated_lines .append (changed_line )
4738 if line != changed_line :
4839 changes .append (
@@ -52,6 +43,22 @@ def apply(
5243 findings = file_context .get_findings_for_location (lineno ),
5344 )
5445 )
46+ return changes , updated_lines
47+
48+ def apply (
49+ self ,
50+ context : CodemodExecutionContext ,
51+ file_context : FileContext ,
52+ results : list [Result ] | None ,
53+ ) -> ChangeSet | None :
54+
55+ original_lines = (
56+ file_context .file_path .read_bytes ()
57+ .decode ("utf-8" )
58+ .splitlines (keepends = True )
59+ )
60+
61+ changes , updated_lines = self ._apply (original_lines , file_context , results )
5562
5663 if not changes :
5764 logger .debug ("No changes produced for %s" , file_context .file_path )
@@ -67,3 +74,46 @@ def apply(
6774 diff = diff ,
6875 changes = changes ,
6976 )
77+
78+
79+ class SastRegexTransformerPipeline (RegexTransformerPipeline ):
80+ def line_matches_result (self , lineno : int , result_linenums : list [int ]) -> bool :
81+ return lineno in result_linenums
82+
83+ def report_unfixed (self , file_context : FileContext , line_number : int , reason : str ):
84+ findings = file_context .get_findings_for_location (line_number )
85+ file_context .add_unfixed_findings (findings , reason , line_number )
86+
87+ def _apply (self , original_lines , file_context , results ):
88+ changes = []
89+ updated_lines = []
90+ if results is not None and not results :
91+ return changes , updated_lines
92+
93+ result_linenums = [
94+ location .start .line for result in results for location in result .locations
95+ ]
96+ for lineno , line in enumerate (original_lines ):
97+ if self .line_matches_result (one_idx_lineno := lineno + 1 , result_linenums ):
98+ changed_line = self ._apply_regex (line )
99+ updated_lines .append (changed_line )
100+ if line == changed_line :
101+ logger .warn ("Unable to update html line: %s" , line )
102+ self .report_unfixed (
103+ file_context ,
104+ one_idx_lineno ,
105+ reason = "Unable to update html line" ,
106+ )
107+ continue
108+
109+ changes .append (
110+ Change (
111+ lineNumber = lineno + 1 ,
112+ description = self .change_description ,
113+ findings = file_context .get_findings_for_location (lineno ),
114+ )
115+ )
116+
117+ else :
118+ updated_lines .append (line )
119+ return changes , updated_lines
0 commit comments