Skip to content
This repository was archived by the owner on Jun 5, 2025. It is now read-only.

Commit 5aa8693

Browse files
committed
Split out secret obfuscation into reusable classes
Instead of coding up the secret encryption directly in the Pipeline step, let's split it out into a class of its own based on its own. The actual method that changes the secret is pluggable, for encryption where we need to get the secret value back we use the method we had used in the pipeline step. For things like extracting packages from a code snippet where we don't need to retrieve the original value we just replace the secret with a fixed number of asterisks.
1 parent 8c20b48 commit 5aa8693

File tree

1 file changed

+113
-45
lines changed

1 file changed

+113
-45
lines changed

src/codegate/pipeline/secrets/secrets.py

Lines changed: 113 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import re
2+
from abc import abstractmethod
23
from typing import Optional
34

45
import structlog
@@ -14,30 +15,43 @@
1415
)
1516
from codegate.pipeline.output import OutputPipelineContext, OutputPipelineStep
1617
from codegate.pipeline.secrets.manager import SecretsManager
17-
from codegate.pipeline.secrets.signatures import CodegateSignatures
18+
from codegate.pipeline.secrets.signatures import CodegateSignatures, Match
1819
from codegate.pipeline.systemmsg import add_or_update_system_message
1920

2021
logger = structlog.get_logger("codegate")
2122

2223

23-
class CodegateSecrets(PipelineStep):
24-
"""Pipeline step that handles secret information requests."""
24+
class SecretsModifier:
25+
"""
26+
A class that helps obfuscate text by piping it through the secrets manager
27+
that finds the secrets and then calling hide_secret to modify them.
28+
29+
What modifications are done is up to the user who subclasses SecretsModifier
30+
"""
2531

2632
def __init__(self):
2733
"""Initialize the CodegateSecrets pipeline step."""
2834
super().__init__()
2935
# Initialize and load signatures immediately
3036
CodegateSignatures.initialize("signatures.yaml")
3137

32-
@property
33-
def name(self) -> str:
38+
@abstractmethod
39+
def _hide_secret(self, match: Match) -> str:
3440
"""
35-
Returns the name of this pipeline step.
41+
User-defined callable to hide a secret match to either obfuscate
42+
it or reversibly encrypt
43+
"""
44+
pass
3645

37-
Returns:
38-
str: The identifier 'codegate-secrets'.
46+
@abstractmethod
47+
def _notify_secret(self, secret):
3948
"""
40-
return "codegate-secrets"
49+
Notify about a found secret
50+
TODO: We should probably not notify about a secret value but rather
51+
an obfuscated string. It might be nice to report the context as well
52+
(e.g. the file or a couple of lines before and after)
53+
"""
54+
pass
4155

4256
def _get_absolute_position(self, line_number: int, line_offset: int, text: str) -> int:
4357
"""
@@ -78,21 +92,7 @@ def _extend_match_boundaries(self, text: str, start: int, end: int) -> tuple[int
7892

7993
return start, end
8094

81-
def _redact_text(
82-
self, text: str, secrets_manager: SecretsManager, session_id: str, context: PipelineContext
83-
) -> tuple[str, int]:
84-
"""
85-
Find and encrypt secrets in the given text.
86-
87-
Args:
88-
text: The text to protect
89-
secrets_manager: ..
90-
session_id: ..
91-
context: The pipeline context to be able to log alerts
92-
Returns:
93-
Tuple containing protected text with encrypted values and the count of redacted secrets
94-
"""
95-
# Find secrets in the text
95+
def obfuscate(self, text: str) -> tuple[str, int]:
9696
matches = CodegateSignatures.find_in_string(text)
9797
if not matches:
9898
return text, 0
@@ -123,48 +123,116 @@ def _redact_text(
123123

124124
# Replace each match with its encrypted value
125125
for start, end, match in absolute_matches:
126-
# Encrypt and store the value
127-
encrypted_value = secrets_manager.store_secret(
128-
match.value,
129-
match.service,
130-
match.type,
131-
session_id,
132-
)
133-
134-
# Create the replacement string
135-
replacement = f"REDACTED<${encrypted_value}>"
136-
# Store the protected text in DB.
137-
context.add_alert(
138-
self.name, trigger_string=replacement, severity_category=AlertSeverity.CRITICAL
139-
)
126+
hidden_secret = self._hide_secret(match)
127+
self._notify_secret(hidden_secret)
140128

141129
# Replace the secret in the text
142-
protected_text[start:end] = replacement
130+
protected_text[start:end] = hidden_secret
143131
# Store for logging
144132
found_secrets.append(
145133
{
146134
"service": match.service,
147135
"type": match.type,
148136
"original": match.value,
149-
"encrypted": encrypted_value,
137+
"encrypted": hidden_secret,
150138
}
151139
)
152140

153-
# Convert back to string
154-
protected_string = "".join(protected_text)
155-
156141
# Log the findings
157142
logger.info("\nFound secrets:")
158-
159143
for secret in found_secrets:
160144
logger.info(f"\nService: {secret['service']}")
161145
logger.info(f"Type: {secret['type']}")
162146
logger.info(f"Original: {secret['original']}")
163-
logger.info(f"Encrypted: REDACTED<${secret['encrypted']}>")
147+
logger.info(f"Encrypted: {secret['encrypted']}")
164148

149+
# Convert back to string
150+
protected_string = "".join(protected_text)
165151
print(f"\nProtected text:\n{protected_string}")
166152
return protected_string, len(found_secrets)
167153

154+
155+
class SecretsEncryptor(SecretsModifier):
156+
def __init__(
157+
self,
158+
secrets_manager: SecretsManager,
159+
context: PipelineContext,
160+
session_id: str,
161+
):
162+
self._secrets_manager = secrets_manager
163+
self._session_id = session_id
164+
self._context = context
165+
self._name = "codegate-secrets"
166+
super().__init__()
167+
168+
def _hide_secret(self, match: Match) -> str:
169+
# Encrypt and store the value
170+
encrypted_value = self._secrets_manager.store_secret(
171+
match.value,
172+
match.service,
173+
match.type,
174+
self._session_id,
175+
)
176+
return f"REDACTED<${encrypted_value}>"
177+
178+
def _notify_secret(self, notify_string):
179+
self._context.add_alert(
180+
self._name, trigger_string=notify_string, severity_category=AlertSeverity.CRITICAL
181+
)
182+
183+
184+
class SecretsObfuscator(SecretsModifier):
185+
def __init__(
186+
self,
187+
):
188+
super().__init__()
189+
190+
def _hide_secret(self, match: Match) -> str:
191+
"""
192+
Obfuscate the secret value. We use a hardcoded number of asterisks
193+
to not leak the length of the secret.
194+
"""
195+
return "*" * 32
196+
197+
def _notify_secret(self, secret):
198+
pass
199+
200+
201+
class CodegateSecrets(PipelineStep):
202+
"""Pipeline step that handles secret information requests."""
203+
204+
def __init__(self):
205+
"""Initialize the CodegateSecrets pipeline step."""
206+
super().__init__()
207+
208+
@property
209+
def name(self) -> str:
210+
"""
211+
Returns the name of this pipeline step.
212+
213+
Returns:
214+
str: The identifier 'codegate-secrets'.
215+
"""
216+
return "codegate-secrets"
217+
218+
def _redact_text(
219+
self, text: str, secrets_manager: SecretsManager, session_id: str, context: PipelineContext
220+
) -> tuple[str, int]:
221+
"""
222+
Find and encrypt secrets in the given text.
223+
224+
Args:
225+
text: The text to protect
226+
secrets_manager: ..
227+
session_id: ..
228+
context: The pipeline context to be able to log alerts
229+
Returns:
230+
Tuple containing protected text with encrypted values and the count of redacted secrets
231+
"""
232+
# Find secrets in the text
233+
text_encryptor = SecretsEncryptor(secrets_manager, context, session_id)
234+
return text_encryptor.obfuscate(text)
235+
168236
async def process(
169237
self, request: ChatCompletionRequest, context: PipelineContext
170238
) -> PipelineResult:

0 commit comments

Comments
 (0)