Skip to content

Commit 91dbd1f

Browse files
feat: enhance governance loop with fuzzy patching and smart test execution
- Implemented `PatchManager.apply_fuzzy_patch` with symbol replacement (Python) and context matching fallback. - Added `Sandbox.run_tests` to auto-detect and run language-specific tests (Python, Go). - Enhanced `CodeValidator.resolve_related_tests` to discover tests via naming conventions and dependency graph. - Updated `GovernanceConfig` with `execution_timeout` and `max_retries`. - Added unit tests for fuzzy patching strategies.
1 parent 1d9570b commit 91dbd1f

File tree

5 files changed

+405
-39
lines changed

5 files changed

+405
-39
lines changed

codesage/config/governance.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ class ValidationConfig(BaseModel):
2121
},
2222
description="Commands to run tests for different languages."
2323
)
24+
execution_timeout: int = Field(30, description="Timeout in seconds for sandbox execution.")
25+
max_retries: int = Field(3, description="Maximum number of retries for failed validation.")
2426

2527

2628
class GovernanceConfig(BaseModel):

codesage/governance/patch_manager.py

Lines changed: 191 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@
33
import difflib
44
import re
55
import shutil
6+
import ast
67
from pathlib import Path
78
from typing import Optional, Tuple
89

910
import structlog
1011

12+
from codesage.analyzers.parser_factory import create_parser
13+
1114
logger = structlog.get_logger()
1215

1316

@@ -21,22 +24,17 @@ def extract_code_block(self, llm_response: str, language: str = "") -> Optional[
2124
Extracts the content of a markdown code block.
2225
Prioritizes blocks marked with the specific language.
2326
"""
24-
# Pattern for ```language ... ```
25-
# We try to match specifically the requested language first
2627
if language:
2728
pattern = re.compile(rf"```{language}\s*\n(.*?)\n```", re.DOTALL)
2829
match = pattern.search(llm_response)
2930
if match:
3031
return match.group(1)
3132

32-
# Fallback: match any code block
3333
pattern = re.compile(r"```(?:\w+)?\s*\n(.*?)\n```", re.DOTALL)
3434
match = pattern.search(llm_response)
3535
if match:
3636
return match.group(1)
3737

38-
# If no code block is found, we return None to be safe.
39-
# Returning the whole response might risk injecting chat text into source code.
4038
return None
4139

4240
def apply_patch(self, file_path: str | Path, new_content: str, create_backup: bool = True) -> bool:
@@ -55,16 +53,6 @@ def apply_patch(self, file_path: str | Path, new_content: str, create_backup: bo
5553
shutil.copy2(path, backup_path)
5654
logger.info("Backup created", backup_path=str(backup_path))
5755

58-
# For Phase 1, we assume new_content is the FULL file content
59-
# or we need to compute diff?
60-
# The task says "implement extract_code_block and apply_diff".
61-
# If the LLM returns a full file, we just overwrite.
62-
# If the LLM returns a diff or snippet, we need to handle it.
63-
# For now, let's assume the prompt asks for the FULL file content or we do full replacement.
64-
# If we want to support git-style diffs, we need more complex logic.
65-
# Based on "AC-2: PatchManager can correct parse LLM returned Markdown code block... and replace it to source file",
66-
# I will implement full replacement for now as it's safer for "Apply" than trying to merge snippets without line numbers.
67-
6856
path.write_text(new_content, encoding="utf-8")
6957
logger.info("Patch applied successfully", file_path=str(path))
7058
return True
@@ -73,10 +61,196 @@ def apply_patch(self, file_path: str | Path, new_content: str, create_backup: bo
7361
logger.error("Failed to apply patch", file_path=str(path), error=str(e))
7462
return False
7563

76-
def create_diff(self, original: str, new: str, filename: str = "file") -> str:
64+
def apply_fuzzy_patch(self, file_path: str | Path, new_code_block: str, target_symbol: str = None) -> bool:
65+
"""
66+
Applies a patch using fuzzy matching logic when exact replacement isn't feasible.
67+
"""
68+
path = Path(file_path)
69+
if not path.exists():
70+
logger.error("File not found for fuzzy patching", file_path=str(path))
71+
return False
72+
73+
try:
74+
original_content = path.read_text(encoding="utf-8")
75+
patched_content = None
76+
77+
if target_symbol:
78+
patched_content = self._replace_symbol(file_path, original_content, target_symbol, new_code_block)
79+
if patched_content:
80+
logger.info("Symbol replaced successfully", symbol=target_symbol)
81+
82+
if not patched_content:
83+
patched_content = self._apply_context_patch(original_content, new_code_block)
84+
if patched_content:
85+
logger.info("Context patch applied successfully")
86+
87+
if not patched_content:
88+
logger.warning("Could not apply fuzzy patch")
89+
return False
90+
91+
language = self._get_language_from_extension(path.suffix)
92+
if language and not self._verify_syntax(patched_content, language):
93+
logger.error("Patched content failed syntax check", language=language)
94+
return False
95+
96+
backup_path = path.with_suffix(path.suffix + ".bak")
97+
if not backup_path.exists():
98+
shutil.copy2(path, backup_path)
99+
100+
path.write_text(patched_content, encoding="utf-8")
101+
return True
102+
103+
except Exception as e:
104+
logger.error("Failed to apply fuzzy patch", file_path=str(path), error=str(e))
105+
return False
106+
107+
def _replace_symbol(self, file_path: str | Path, content: str, symbol_name: str, new_block: str) -> Optional[str]:
108+
"""
109+
Uses simple indentation-based parsing to find and replace a Python function.
77110
"""
78-
Creates a unified diff between original and new content.
111+
path = Path(file_path)
112+
if path.suffix != '.py':
113+
return None # Only Python implemented for P1 regex
114+
115+
lines = content.splitlines(keepends=True)
116+
start_idx = -1
117+
end_idx = -1
118+
current_indent = 0
119+
120+
# Regex to find definition
121+
def_pattern = re.compile(rf"^(\s*)def\s+{re.escape(symbol_name)}\s*\(")
122+
123+
for i, line in enumerate(lines):
124+
match = def_pattern.match(line)
125+
if match:
126+
start_idx = i
127+
current_indent = len(match.group(1))
128+
break
129+
130+
if start_idx == -1:
131+
return None
132+
133+
# Find end: Look for next line with same or less indentation that is NOT empty/comment
134+
# This is naive but works for standard formatting
135+
for i in range(start_idx + 1, len(lines)):
136+
line = lines[i]
137+
if not line.strip() or line.strip().startswith('#'):
138+
continue
139+
140+
# Check indentation
141+
indent = len(line) - len(line.lstrip())
142+
if indent <= current_indent:
143+
end_idx = i
144+
break
145+
else:
146+
end_idx = len(lines) # End of file
147+
148+
# Replace lines[start_idx:end_idx] with new_block
149+
# Ensure new_block ends with newline if needed
150+
if not new_block.endswith('\n'):
151+
new_block += '\n'
152+
153+
new_lines = lines[:start_idx] + [new_block] + lines[end_idx:]
154+
return "".join(new_lines)
155+
156+
def _apply_context_patch(self, original: str, new_block: str) -> Optional[str]:
79157
"""
158+
Uses difflib to find a close match for replacement.
159+
Finds the most similar block in the original content and replaces it.
160+
"""
161+
# Split into lines
162+
original_lines = original.splitlines(keepends=True)
163+
new_lines = new_block.splitlines(keepends=True)
164+
165+
if not new_lines:
166+
return None
167+
168+
# Assumption: The new_block is a modified version of some block in the original.
169+
# We search for the block in original that has the highest similarity to new_block.
170+
171+
best_ratio = 0.0
172+
best_match_start = -1
173+
best_match_end = -1
174+
175+
# Try to find header match
176+
header = new_lines[0].strip()
177+
# If header is empty or just braces, it's hard.
178+
if not header:
179+
return None
180+
181+
candidates = []
182+
for i, line in enumerate(original_lines):
183+
if header in line: # Loose match
184+
candidates.append(i)
185+
186+
# For each candidate start, try to find the end of the block (indentation based)
187+
# and compare similarity.
188+
189+
for start_idx in candidates:
190+
# Determine end_idx based on indentation of start_idx
191+
current_indent = len(original_lines[start_idx]) - len(original_lines[start_idx].lstrip())
192+
end_idx = len(original_lines)
193+
194+
for i in range(start_idx + 1, len(original_lines)):
195+
line = original_lines[i]
196+
if not line.strip() or line.strip().startswith('#'):
197+
continue
198+
indent = len(line) - len(line.lstrip())
199+
if indent <= current_indent:
200+
end_idx = i
201+
break
202+
203+
# Check similarity of this block with new_block
204+
old_block = "".join(original_lines[start_idx:end_idx])
205+
ratio = difflib.SequenceMatcher(None, old_block, new_block).ratio()
206+
207+
if ratio > best_ratio:
208+
best_ratio = ratio
209+
best_match_start = start_idx
210+
best_match_end = end_idx
211+
212+
# Threshold
213+
if best_ratio > 0.6: # Allow some significant changes but ensure it's roughly the same place
214+
# Replace
215+
new_content_lines = original_lines[:best_match_start] + new_lines + original_lines[best_match_end:]
216+
217+
return "".join(new_content_lines)
218+
219+
return None
220+
221+
def _verify_syntax(self, content: str, language: str) -> bool:
222+
if language == "python":
223+
try:
224+
ast.parse(content)
225+
return True
226+
except SyntaxError:
227+
return False
228+
elif language == "go":
229+
try:
230+
parser = create_parser("go")
231+
parser.parse(content)
232+
root = parser.tree.root_node
233+
return not self._has_error_node(root)
234+
except Exception:
235+
return False
236+
return True
237+
238+
def _has_error_node(self, node) -> bool:
239+
if node.type == 'ERROR' or node.is_missing:
240+
return True
241+
for child in node.children:
242+
if self._has_error_node(child):
243+
return True
244+
return False
245+
246+
def _get_language_from_extension(self, ext: str) -> Optional[str]:
247+
if ext in ['.py', '.pyi']:
248+
return 'python'
249+
if ext in ['.go']:
250+
return 'go'
251+
return None
252+
253+
def create_diff(self, original: str, new: str, filename: str = "file") -> str:
80254
diff = difflib.unified_diff(
81255
original.splitlines(keepends=True),
82256
new.splitlines(keepends=True),
@@ -86,9 +260,6 @@ def create_diff(self, original: str, new: str, filename: str = "file") -> str:
86260
return "".join(diff)
87261

88262
def restore_backup(self, file_path: str | Path) -> bool:
89-
"""
90-
Restores the file from its backup (.bak).
91-
"""
92263
path = Path(file_path)
93264
backup_path = path.with_suffix(path.suffix + ".bak")
94265

@@ -105,15 +276,9 @@ def restore_backup(self, file_path: str | Path) -> bool:
105276
return False
106277

107278
def revert(self, file_path: str | Path) -> bool:
108-
"""
109-
Alias for restore_backup, used for semantic clarity during rollback.
110-
"""
111279
return self.restore_backup(file_path)
112280

113281
def cleanup_backup(self, file_path: str | Path) -> bool:
114-
"""
115-
Removes the backup file if it exists.
116-
"""
117282
path = Path(file_path)
118283
backup_path = path.with_suffix(path.suffix + ".bak")
119284

codesage/governance/sandbox.py

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import subprocess
22
import os
33
import structlog
4-
from typing import Dict, Optional, Tuple
4+
import shlex
5+
from typing import Dict, Optional, Tuple, List
56

67
logger = structlog.get_logger()
78

@@ -20,21 +21,14 @@ def run(self, command: str | list[str], env: Optional[Dict[str, str]] = None, cw
2021
if env:
2122
run_env.update(env)
2223

23-
# If command is a string, we split it for safety if not using shell=True
24-
# But the user config provides a string template.
25-
# Ideally, we should parse it into arguments.
26-
# For this phase, we will switch to shell=False if list is provided,
27-
# but if string is provided, we might still need shell=True or shlex.split.
28-
# To address security, we use shlex.split if it's a string.
29-
import shlex
3024
if isinstance(command, str):
3125
args = shlex.split(command)
3226
else:
3327
args = command
3428

3529
result = subprocess.run(
3630
args,
37-
shell=False, # Changed to False for security
31+
shell=False,
3832
capture_output=True,
3933
text=True,
4034
timeout=self.timeout,
@@ -53,3 +47,48 @@ def run(self, command: str | list[str], env: Optional[Dict[str, str]] = None, cw
5347
except Exception as e:
5448
logger.error("Sandbox execution failed", command=command, error=str(e))
5549
return False, str(e)
50+
51+
def run_tests(self, test_files: List[str], language: str) -> Tuple[bool, str]:
52+
"""
53+
Executes tests for the given language and test files.
54+
Automatically constructs the test command.
55+
"""
56+
if not test_files:
57+
return True, "No test files to run."
58+
59+
command = []
60+
if language == "python":
61+
# Assuming pytest is available in the environment
62+
command = ["pytest"] + test_files
63+
elif language == "go":
64+
# Go tests run per package usually, but can target files if in same package.
65+
# Best practice is `go test ./pkg/...` or `go test path/to/file_test.go`
66+
# However, `go test file.go` requires passing all files in the package.
67+
# Safer to find the directory of the test file and run `go test -v ./path/to/dir`
68+
# But if multiple directories, we need multiple commands or one `go test ./...` with patterns.
69+
70+
# For simplicity, let's group by directory
71+
dirs = set(os.path.dirname(f) for f in test_files)
72+
if len(dirs) == 1:
73+
# Single directory
74+
d = list(dirs)[0]
75+
# If d is empty (current dir), use "."
76+
target = d if d else "."
77+
if not target.startswith(".") and not os.path.isabs(target):
78+
target = "./" + target
79+
command = ["go", "test", "-v", target]
80+
else:
81+
# Multiple directories - run for each? Or just list them?
82+
# Go test accepts multiple packages.
83+
targets = []
84+
for d in dirs:
85+
target = d if d else "."
86+
if not target.startswith(".") and not os.path.isabs(target):
87+
target = "./" + target
88+
targets.append(target)
89+
command = ["go", "test", "-v"] + targets
90+
else:
91+
return False, f"Unsupported language for test execution: {language}"
92+
93+
logger.info("Running tests", language=language, command=command)
94+
return self.run(command)

0 commit comments

Comments
 (0)