1
1
import re
2
2
from abc import abstractmethod
3
- from typing import Optional
3
+ from typing import List , Optional , Tuple
4
4
5
5
import structlog
6
6
from litellm import ChatCompletionRequest , ChatCompletionSystemMessage , ModelResponse
@@ -44,12 +44,11 @@ def _hide_secret(self, match: Match) -> str:
44
44
pass
45
45
46
46
@abstractmethod
47
- def _notify_secret (self , secret ) :
47
+ def _notify_secret (self , match : Match , protected_text : List [ str ]) -> None :
48
48
"""
49
49
Notify about a found secret
50
- TODO: We should probably not notify about a secret value but rather
51
- an obfuscated string. It might be nice to report the context as well
52
- (e.g. the file or a couple of lines before and after)
50
+ TODO: If the secret came from a CodeSnippet we should notify about that. This would
51
+ involve using the CodeSnippetExtractor step that is further down the pipeline.
53
52
"""
54
53
pass
55
54
@@ -92,6 +91,21 @@ def _extend_match_boundaries(self, text: str, start: int, end: int) -> tuple[int
92
91
93
92
return start , end
94
93
94
+ def _get_surrounding_secret_lines (
95
+ self , protected_text : List [str ], secret_line : int , surrounding_lines : int = 3
96
+ ) -> str :
97
+ """
98
+ Get the lines before and after the secret line to provide context.
99
+
100
+ Args:
101
+ protected_text: The text with secrets replaced
102
+ secret_line: The line number of the secret
103
+ """
104
+ lines = "" .join (protected_text ).split ("\n " )
105
+ start_line = max (secret_line - surrounding_lines , 0 )
106
+ end_line = min (secret_line + surrounding_lines , len (lines ))
107
+ return "\n " .join (lines [start_line :end_line ])
108
+
95
109
def obfuscate (self , text : str ) -> tuple [str , int ]:
96
110
matches = CodegateSignatures .find_in_string (text )
97
111
if not matches :
@@ -100,7 +114,7 @@ def obfuscate(self, text: str) -> tuple[str, int]:
100
114
logger .debug (f"Found { len (matches )} secrets in the user message" )
101
115
102
116
# Convert line positions to absolute positions and extend boundaries
103
- absolute_matches = []
117
+ absolute_matches : List [ Tuple [ int , int , Match ]] = []
104
118
for match in matches :
105
119
start = self ._get_absolute_position (match .line_number , match .start_index , text )
106
120
end = self ._get_absolute_position (match .line_number , match .end_index , text )
@@ -119,37 +133,30 @@ def obfuscate(self, text: str) -> tuple[str, int]:
119
133
protected_text = list (text )
120
134
121
135
# Store matches for logging
122
- found_secrets = []
136
+ found_secrets = 0
123
137
124
138
# Replace each match with its encrypted value
139
+ logger .info ("\n Found secrets:" )
125
140
for start , end , match in absolute_matches :
126
141
hidden_secret = self ._hide_secret (match )
127
- self ._notify_secret (hidden_secret )
142
+ self ._notify_secret (match , protected_text )
128
143
129
144
# Replace the secret in the text
130
145
protected_text [start :end ] = hidden_secret
131
- # Store for logging
132
- found_secrets .append (
133
- {
134
- "service" : match .service ,
135
- "type" : match .type ,
136
- "original" : match .value ,
137
- "encrypted" : hidden_secret ,
138
- }
139
- )
140
146
141
- # Log the findings
142
- logger .info ("\n Found secrets:" )
143
- for secret in found_secrets :
144
- logger .info (f"\n Service: { secret ['service' ]} " )
145
- logger .info (f"Type: { secret ['type' ]} " )
146
- logger .info (f"Original: { secret ['original' ]} " )
147
- logger .info (f"Encrypted: { secret ['encrypted' ]} " )
147
+ found_secrets += 1
148
+ # Log the findings
149
+ logger .info (
150
+ f"\n Service: { match .service } "
151
+ f"\n Type: { match .type } "
152
+ f"\n Original: { match .value } "
153
+ f"\n Encrypted: { hidden_secret } "
154
+ )
148
155
149
156
# Convert back to string
150
157
protected_string = "" .join (protected_text )
151
158
print (f"\n Protected text:\n { protected_string } " )
152
- return protected_string , len ( found_secrets )
159
+ return protected_string , found_secrets
153
160
154
161
155
162
class SecretsEncryptor (SecretsModifier ):
@@ -175,7 +182,9 @@ def _hide_secret(self, match: Match) -> str:
175
182
)
176
183
return f"REDACTED<${ encrypted_value } >"
177
184
178
- def _notify_secret (self , notify_string ):
185
+ def _notify_secret (self , match : Match , protected_text : List [str ]) -> None :
186
+ secret_lines = self ._get_surrounding_secret_lines (protected_text , match .line_number )
187
+ notify_string = f"{ match .service } - { match .type } :\n { secret_lines } "
179
188
self ._context .add_alert (
180
189
self ._name , trigger_string = notify_string , severity_category = AlertSeverity .CRITICAL
181
190
)
@@ -194,7 +203,7 @@ def _hide_secret(self, match: Match) -> str:
194
203
"""
195
204
return "*" * 32
196
205
197
- def _notify_secret (self , secret ) :
206
+ def _notify_secret (self , match : Match , protected_text : List [ str ]) -> None :
198
207
pass
199
208
200
209
0 commit comments