Skip to content

Commit 1d9570b

Browse files
feat: add governance verification loop with validator, sandbox, and auto-rollback
This change introduces the Phase 4 Governance Verification Loop, ensuring that code patches generated by the LLM are automatically validated before being finalized. Key changes: - Added `CodeValidator` in `codesage/governance/validator.py` to run syntax checks and tests. - Added `Sandbox` in `codesage/governance/sandbox.py` for safe execution of validation commands (subprocess with timeout, shell=False). - Enhanced `PatchManager` in `codesage/governance/patch_manager.py` with `revert` and `cleanup_backup` methods (backup logic was already present). - Updated `TaskOrchestrator` in `codesage/governance/task_orchestrator.py` to implement the retry loop: Apply -> Validate -> Commit or Revert/Retry. - Updated `GovernanceConfig` in `codesage/config/governance.py` to include validation command templates. - Added integration tests in `tests/test_governance_loop.py` and unit tests in `tests/test_patch_rollback.py`.
1 parent 2a12d86 commit 1d9570b

File tree

7 files changed

+354
-40
lines changed

7 files changed

+354
-40
lines changed

codesage/config/governance.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,26 @@
11
from pydantic import BaseModel, Field
2-
from typing import Literal
2+
from typing import Literal, Dict
3+
4+
5+
class ValidationConfig(BaseModel):
6+
# Commands for syntax checking (linting)
7+
# Use {file} as placeholder
8+
syntax_commands: Dict[str, str] = Field(
9+
default_factory=lambda: {
10+
"python": "python -m py_compile {file}",
11+
"go": "go vet {file}",
12+
},
13+
description="Commands to check syntax for different languages."
14+
)
15+
# Commands for running tests
16+
# Use {scope} as placeholder, which might be a file or a package
17+
test_commands: Dict[str, str] = Field(
18+
default_factory=lambda: {
19+
"python": "pytest {scope}",
20+
"go": "go test {scope}",
21+
},
22+
description="Commands to run tests for different languages."
23+
)
324

425

526
class GovernanceConfig(BaseModel):
@@ -8,6 +29,8 @@ class GovernanceConfig(BaseModel):
829
group_by: Literal["rule", "file", "risk_level"] = Field("rule", description="How to group governance tasks.")
930
prioritization_strategy: Literal["risk_first", "issue_count_first"] = Field("risk_first", description="Strategy to prioritize governance tasks.")
1031

32+
validation: ValidationConfig = Field(default_factory=ValidationConfig, description="Validation settings.")
33+
1134
@classmethod
1235
def default(cls) -> "GovernanceConfig":
1336
return cls()

codesage/governance/patch_manager.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,3 +103,26 @@ def restore_backup(self, file_path: str | Path) -> bool:
103103
except Exception as e:
104104
logger.error("Failed to restore backup", file_path=str(path), error=str(e))
105105
return False
106+
107+
def revert(self, file_path: str | Path) -> bool:
108+
"""
109+
Alias for restore_backup, used for semantic clarity during rollback.
110+
"""
111+
return self.restore_backup(file_path)
112+
113+
def cleanup_backup(self, file_path: str | Path) -> bool:
114+
"""
115+
Removes the backup file if it exists.
116+
"""
117+
path = Path(file_path)
118+
backup_path = path.with_suffix(path.suffix + ".bak")
119+
120+
if backup_path.exists():
121+
try:
122+
backup_path.unlink()
123+
logger.info("Backup cleaned up", backup_path=str(backup_path))
124+
return True
125+
except Exception as e:
126+
logger.error("Failed to cleanup backup", backup_path=str(backup_path), error=str(e))
127+
return False
128+
return True

codesage/governance/sandbox.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import subprocess
2+
import os
3+
import structlog
4+
from typing import Dict, Optional, Tuple
5+
6+
logger = structlog.get_logger()
7+
8+
class Sandbox:
9+
def __init__(self, timeout: int = 30):
10+
self.timeout = timeout
11+
12+
def run(self, command: str | list[str], env: Optional[Dict[str, str]] = None, cwd: Optional[str] = None) -> Tuple[bool, str]:
13+
"""
14+
Runs a command in a subprocess.
15+
Returns (success, output).
16+
"""
17+
try:
18+
# Simple environment isolation: inherit mainly PATH, but could restrict others.
19+
run_env = os.environ.copy()
20+
if env:
21+
run_env.update(env)
22+
23+
# If command is a string, we split it for safety if not using shell=True
24+
# But the user config provides a string template.
25+
# Ideally, we should parse it into arguments.
26+
# For this phase, we will switch to shell=False if list is provided,
27+
# but if string is provided, we might still need shell=True or shlex.split.
28+
# To address security, we use shlex.split if it's a string.
29+
import shlex
30+
if isinstance(command, str):
31+
args = shlex.split(command)
32+
else:
33+
args = command
34+
35+
result = subprocess.run(
36+
args,
37+
shell=False, # Changed to False for security
38+
capture_output=True,
39+
text=True,
40+
timeout=self.timeout,
41+
env=run_env,
42+
cwd=cwd
43+
)
44+
45+
output = result.stdout + result.stderr
46+
if result.returncode != 0:
47+
return False, output
48+
return True, output
49+
50+
except subprocess.TimeoutExpired:
51+
logger.error("Sandbox execution timed out", command=command)
52+
return False, "Execution timed out"
53+
except Exception as e:
54+
logger.error("Sandbox execution failed", command=command, error=str(e))
55+
return False, str(e)

codesage/governance/task_orchestrator.py

Lines changed: 76 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,26 @@
66
from codesage.governance.task_models import GovernancePlan, GovernanceTask
77
from codesage.llm.client import BaseLLMClient, LLMRequest
88
from codesage.governance.patch_manager import PatchManager
9+
from codesage.governance.validator import CodeValidator
10+
from codesage.config.governance import GovernanceConfig
911

1012
logger = structlog.get_logger()
1113

1214
RISK_LEVEL_MAP = {"low": 1, "medium": 2, "high": 3, "unknown": 0}
1315

1416
class TaskOrchestrator:
15-
def __init__(self, plan: GovernancePlan, llm_client: Optional[BaseLLMClient] = None) -> None:
17+
def __init__(
18+
self,
19+
plan: GovernancePlan,
20+
llm_client: Optional[BaseLLMClient] = None,
21+
config: Optional[GovernanceConfig] = None
22+
) -> None:
1623
self._plan = plan
1724
self._all_tasks: List[GovernanceTask] = self._flatten_tasks()
1825
self.llm_client = llm_client
1926
self.patch_manager = PatchManager()
27+
self.config = config or GovernanceConfig.default()
28+
self.validator = CodeValidator(self.config)
2029

2130
def _flatten_tasks(self) -> List[GovernanceTask]:
2231
"""Extracts and flattens all tasks from the plan's groups."""
@@ -63,66 +72,94 @@ def select_tasks(
6372

6473
return filtered_tasks
6574

66-
def execute_task(self, task: GovernanceTask, apply_fix: bool = False) -> bool:
75+
def execute_task(self, task: GovernanceTask, apply_fix: bool = False, max_retries: int = 3) -> bool:
6776
"""
6877
Executes a governance task using the LLM client and optionally applies the fix.
78+
Includes a validation loop with rollback and retry.
6979
"""
7080
if not self.llm_client:
7181
logger.warning("LLM client not configured, skipping execution", task_id=task.id)
7282
return False
7383

7484
logger.info("Executing task", task_id=task.id, file=task.file_path)
7585

76-
# 1. Prepare context and prompt
77-
# Assuming task.context contains necessary info or we read file
7886
file_path = Path(task.file_path)
7987
if not file_path.exists():
8088
logger.error("File not found", file_path=str(file_path))
8189
return False
8290

83-
file_content = file_path.read_text(encoding="utf-8")
91+
original_content = file_path.read_text(encoding="utf-8")
8492

85-
# Construct a prompt (This logic might be moved to a PromptBuilder later)
86-
prompt = (
93+
# Initial Prompt
94+
base_prompt = (
8795
f"Fix the following issue in {task.file_path}:\n"
88-
f"Issue: {task.issue_type} - {task.message}\n"
89-
f"Severity: {task.severity}\n\n"
96+
f"Issue: {task.rule_id} - {task.description}\n"
97+
f"Severity: {task.risk_level}\n\n"
9098
f"Here is the file content:\n"
91-
f"```\n{file_content}\n```\n\n"
99+
f"```\n{original_content}\n```\n\n"
92100
f"Please provide the FULL corrected file content in a markdown code block."
93101
)
94102

95-
# 2. Call LLM
96-
request = LLMRequest(
97-
prompt=prompt,
98-
metadata={"task_id": task.id, "file_path": task.file_path}
99-
)
103+
current_prompt = base_prompt
104+
attempts = 0
100105

101-
try:
102-
response = self.llm_client.generate(request)
103-
except Exception as e:
104-
logger.error("LLM generation failed", error=str(e))
105-
return False
106+
while attempts <= max_retries:
107+
# 1. Call LLM
108+
request = LLMRequest(
109+
prompt=current_prompt,
110+
metadata={"task_id": task.id, "file_path": task.file_path, "attempt": attempts}
111+
)
106112

107-
# 3. Extract Code
108-
new_content = self.patch_manager.extract_code_block(response.content)
109-
if not new_content:
110-
logger.error("Failed to extract code from LLM response")
111-
return False
113+
try:
114+
response = self.llm_client.generate(request)
115+
except Exception as e:
116+
logger.error("LLM generation failed", error=str(e))
117+
return False
112118

113-
# 4. Apply Fix if requested
114-
if apply_fix:
115-
success = self.patch_manager.apply_patch(file_path, new_content)
116-
if success:
117-
task.status = "done"
118-
logger.info("Task completed and patch applied", task_id=task.id)
119+
# 2. Extract Code
120+
new_content = self.patch_manager.extract_code_block(response.content, language=task.language)
121+
if not new_content:
122+
logger.error("Failed to extract code from LLM response", attempt=attempts)
123+
attempts += 1
124+
continue
125+
126+
# 3. Apply Fix (or Dry Run)
127+
if not apply_fix:
128+
diff = self.patch_manager.create_diff(original_content, new_content, filename=task.file_path)
129+
print(f"--- Patch for {task.file_path} (Dry Run) ---\n{diff}\n-----------------------------")
130+
logger.info("Dry run completed", task_id=task.id)
119131
return True
132+
133+
# Apply with backup
134+
if self.patch_manager.apply_patch(file_path, new_content, create_backup=True):
135+
# 4. Validate
136+
# We use file_path as scope for now. Ideally, we should detect the test scope.
137+
validation_result = self.validator.validate(
138+
file_path,
139+
language=task.language,
140+
related_test_scope=str(file_path)
141+
)
142+
143+
if validation_result.success:
144+
logger.info("Validation passed", task_id=task.id)
145+
self.patch_manager.cleanup_backup(file_path)
146+
task.status = "done"
147+
return True
148+
else:
149+
logger.warning("Validation failed, rolling back", task_id=task.id, error=validation_result.error)
150+
self.patch_manager.revert(file_path)
151+
152+
# Prepare retry prompt
153+
current_prompt = (
154+
f"{base_prompt}\n\n"
155+
f"Previous attempt failed validation ({validation_result.stage}):\n"
156+
f"Error:\n{validation_result.error}\n\n"
157+
f"Please try again and fix the error."
158+
)
120159
else:
121-
logger.error("Failed to apply patch", task_id=task.id)
122-
return False
123-
else:
124-
# Just generate diff for dry-run
125-
diff = self.patch_manager.create_diff(file_content, new_content, filename=task.file_path)
126-
print(f"--- Patch for {task.file_path} ---\n{diff}\n-----------------------------")
127-
logger.info("Dry run completed", task_id=task.id)
128-
return True
160+
logger.error("Failed to apply patch", task_id=task.id)
161+
162+
attempts += 1
163+
164+
logger.error("Task failed after retries", task_id=task.id)
165+
return False

codesage/governance/validator.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from pathlib import Path
2+
from codesage.config.governance import GovernanceConfig
3+
from codesage.governance.sandbox import Sandbox
4+
import structlog
5+
from dataclasses import dataclass
6+
from typing import Optional
7+
8+
logger = structlog.get_logger()
9+
10+
@dataclass
11+
class ValidationResult:
12+
success: bool
13+
error: str = ""
14+
stage: str = ""
15+
16+
class CodeValidator:
17+
def __init__(self, config: GovernanceConfig, sandbox: Optional[Sandbox] = None):
18+
self.config = config
19+
self.sandbox = sandbox or Sandbox()
20+
21+
def validate(self, file_path: Path, language: str, related_test_scope: Optional[str] = None) -> ValidationResult:
22+
# 1. Syntax Check
23+
syntax_cmd_template = self.config.validation.syntax_commands.get(language)
24+
if syntax_cmd_template:
25+
cmd = syntax_cmd_template.format(file=str(file_path))
26+
logger.info("Running syntax check", command=cmd)
27+
success, output = self.sandbox.run(cmd)
28+
if not success:
29+
logger.warning("Syntax validation failed", file=str(file_path), error=output)
30+
return ValidationResult(success=False, error=output, stage="syntax")
31+
32+
# 2. Test Execution (Optional)
33+
# Only run if a scope is provided. In real world, we might infer it.
34+
if related_test_scope:
35+
test_cmd_template = self.config.validation.test_commands.get(language)
36+
if test_cmd_template:
37+
cmd = test_cmd_template.format(scope=related_test_scope)
38+
logger.info("Running test check", command=cmd)
39+
success, output = self.sandbox.run(cmd)
40+
if not success:
41+
logger.warning("Test validation failed", file=str(file_path), scope=related_test_scope, error=output)
42+
return ValidationResult(success=False, error=output, stage="test")
43+
44+
return ValidationResult(success=True)

0 commit comments

Comments
 (0)