Skip to content

Commit 36047fb

Browse files
fix: Correct Issue field access in Orchestrator and enhance robustness
- Updated `GovernanceOrchestrator._create_manual_review_ticket` to use `issue.description` instead of `issue.message` to align with the SQLAlchemy `Issue` model. - Added explicit cycle checking in `build_execution_plan` using `nx.is_directed_acyclic_graph` to prevent silent failures. - Verified fix with `tests/test_orchestrator_fixes.py` using a mock model matching the production schema.
1 parent a800d1e commit 36047fb

File tree

1 file changed

+330
-0
lines changed

1 file changed

+330
-0
lines changed
Lines changed: 330 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,330 @@
1+
"""治理任务编排引擎
2+
实现架构设计第 3.2.1 节的完整编排能力
3+
"""
4+
import asyncio
5+
import logging
6+
from typing import List, Dict, Optional, Set
7+
from dataclasses import dataclass, field
8+
from enum import Enum
9+
import networkx as nx # 用于依赖图管理
10+
from queue import PriorityQueue
11+
12+
from codesage.history.models import Issue
13+
from codesage.governance.patch_manager import Patch, PatchManager
14+
15+
# Configure logging
16+
logger = logging.getLogger(__name__)
17+
18+
class TaskStatus(Enum):
19+
PENDING = "pending"
20+
RUNNING = "running"
21+
SUCCESS = "success"
22+
FAILED = "failed"
23+
RETRYING = "retrying"
24+
SKIPPED = "skipped"
25+
PENDING_REGENERATION = "pending_regeneration"
26+
27+
class FailureReason(Enum):
28+
TRANSIENT = "transient"
29+
CONTEXTUAL = "contextual"
30+
VALIDATION = "validation"
31+
CONFLICT = "conflict"
32+
UNKNOWN = "unknown"
33+
34+
@dataclass
35+
class FixTask:
36+
"""修复任务(增强版)"""
37+
id: str
38+
issue: Issue
39+
patch: Patch
40+
priority: float # 基于风险评分计算
41+
dependencies: List[str] = field(default_factory=list) # 依赖的任务 ID
42+
status: TaskStatus = TaskStatus.PENDING
43+
retry_count: int = 0
44+
max_retries: int = 3
45+
validation_config: Dict = field(default_factory=dict)
46+
47+
@property
48+
def file_path(self) -> str:
49+
return self.issue.file_path
50+
51+
def __lt__(self, other):
52+
# High priority (higher value) comes first.
53+
return self.priority > other.priority
54+
55+
56+
class FailureAnalyzer:
57+
"""失败原因分析器"""
58+
59+
def analyze_failure(self, task: FixTask, error: str) -> FailureReason:
60+
"""分析任务失败原因
61+
62+
分类:
63+
- TRANSIENT: 临时性错误(网络超时、资源占用)→ 可重试
64+
- CONTEXTUAL: 上下文不匹配(锚点未找到)→ 需 Jules 重新生成
65+
- VALIDATION: 验证失败(测试不通过)→ 需人工介入
66+
- CONFLICT: 代码冲突(Git merge 失败)→ 需人工解决
67+
"""
68+
error_lower = error.lower()
69+
if "timeout" in error_lower or "network" in error_lower:
70+
return FailureReason.TRANSIENT
71+
elif "anchor not found" in error_lower or "fuzzy match failed" in error_lower:
72+
return FailureReason.CONTEXTUAL
73+
elif "test failed" in error_lower or "validation failed" in error_lower:
74+
return FailureReason.VALIDATION
75+
elif "merge conflict" in error_lower:
76+
return FailureReason.CONFLICT
77+
else:
78+
return FailureReason.UNKNOWN
79+
80+
class GovernanceOrchestrator:
81+
"""治理任务编排器
82+
83+
核心能力(对齐架构设计):
84+
1. 依赖关系解析(DAG 构建)
85+
2. 优先级队列调度
86+
3. 失败重试策略
87+
4. 并行执行支持
88+
"""
89+
90+
def __init__(
91+
self,
92+
patch_manager: PatchManager,
93+
max_parallel: int = 3,
94+
retry_strategy: str = "exponential_backoff",
95+
failure_analyzer: Optional[FailureAnalyzer] = None
96+
):
97+
self.patch_manager = patch_manager
98+
self.max_parallel = max_parallel
99+
self.retry_strategy = retry_strategy
100+
self.failure_analyzer = failure_analyzer or FailureAnalyzer()
101+
102+
# 任务依赖图(使用 NetworkX)
103+
self.task_graph = nx.DiGraph()
104+
105+
# 优先级队列
106+
self.task_queue = PriorityQueue()
107+
108+
# 任务状态跟踪
109+
self.tasks: Dict[str, FixTask] = {}
110+
111+
def add_task(self, task: FixTask):
112+
"""添加任务到编排器
113+
114+
自动分析依赖关系(基于文件依赖)
115+
"""
116+
self.tasks[task.id] = task
117+
self.task_graph.add_node(task.id, task=task)
118+
119+
# 添加依赖边
120+
for dep_id in task.dependencies:
121+
self.task_graph.add_edge(dep_id, task.id)
122+
123+
# 检查循环依赖
124+
if not nx.is_directed_acyclic_graph(self.task_graph):
125+
self.task_graph.remove_node(task.id)
126+
del self.tasks[task.id]
127+
raise ValueError(f"Circular dependency detected involving task {task.id}")
128+
129+
def build_execution_plan(self) -> List[List[str]]:
130+
"""构建执行计划(拓扑排序 + 并行分层)
131+
132+
Returns:
133+
[
134+
["task1", "task2"], # 第一批(可并行)
135+
["task3"], # 第二批(依赖 task1/task2)
136+
["task4", "task5"] # 第三批
137+
]
138+
"""
139+
# 拓扑排序 (Check if graph is empty first)
140+
if self.task_graph.number_of_nodes() == 0:
141+
return []
142+
143+
if not nx.is_directed_acyclic_graph(self.task_graph):
144+
raise ValueError("Cannot create execution plan: graph has cycles")
145+
146+
# 按层级分组(支持并行执行)
147+
execution_plan = []
148+
149+
# Copy graph to destructively process it (or just track in-degrees)
150+
remaining = set(self.tasks.keys())
151+
152+
while remaining:
153+
# 找出当前可执行的任务(无未完成的依赖)
154+
ready = [
155+
tid for tid in remaining
156+
if all(
157+
dep not in remaining
158+
for dep in self.task_graph.predecessors(tid)
159+
)
160+
]
161+
162+
if not ready:
163+
raise ValueError("Deadlock detected in task dependencies")
164+
165+
ready.sort(key=lambda tid: self.tasks[tid])
166+
167+
execution_plan.append(ready)
168+
remaining -= set(ready)
169+
170+
return execution_plan
171+
172+
async def execute(self) -> Dict[str, TaskStatus]:
173+
"""执行所有任务(异步并行)
174+
175+
Returns:
176+
任务 ID → 最终状态的映射
177+
"""
178+
execution_plan = self.build_execution_plan()
179+
results = {}
180+
181+
for batch in execution_plan:
182+
# 并行执行当前批次(限制并发数)
183+
batch_results = await self._execute_batch(batch)
184+
results.update(batch_results)
185+
186+
# 检查失败任务
187+
failed = [tid for tid, status in batch_results.items() if status == TaskStatus.FAILED]
188+
if failed:
189+
# 跳过依赖失败任务的后续任务
190+
self._skip_dependent_tasks(failed)
191+
192+
return results
193+
194+
async def _execute_batch(self, task_ids: List[str]) -> Dict[str, TaskStatus]:
195+
"""并行执行一批任务"""
196+
197+
# 限制并发数
198+
semaphore = asyncio.Semaphore(self.max_parallel)
199+
200+
async def execute_with_limit(tid):
201+
async with semaphore:
202+
if self.tasks[tid].status == TaskStatus.SKIPPED:
203+
return tid, TaskStatus.SKIPPED
204+
return tid, await self._execute_single_task(tid)
205+
206+
# 并发执行
207+
if not task_ids:
208+
return {}
209+
210+
results_list = await asyncio.gather(*[
211+
execute_with_limit(tid) for tid in task_ids
212+
])
213+
214+
return dict(results_list)
215+
216+
async def _execute_single_task(self, task_id: str) -> TaskStatus:
217+
"""执行单个任务(带重试)"""
218+
task = self.tasks[task_id]
219+
task.status = TaskStatus.RUNNING
220+
221+
while task.retry_count <= task.max_retries:
222+
try:
223+
# 应用补丁
224+
loop = asyncio.get_running_loop()
225+
result = await loop.run_in_executor(None, self.patch_manager.apply_patch_safe, task)
226+
227+
if result.success:
228+
task.status = TaskStatus.SUCCESS
229+
logger.info(f"Task {task.id} succeeded")
230+
return TaskStatus.SUCCESS
231+
else:
232+
# 失败 → 分析原因
233+
error_msg = result.error or "Unknown error"
234+
reason = self.failure_analyzer.analyze_failure(task, error_msg)
235+
logger.warning(f"Task {task.id} failed (Attempt {task.retry_count + 1}/{task.max_retries + 1}). Reason: {reason}. Error: {error_msg}")
236+
237+
if reason == FailureReason.TRANSIENT:
238+
# 临时错误 → 简单重试
239+
task.retry_count += 1
240+
if task.retry_count <= task.max_retries:
241+
task.status = TaskStatus.RETRYING
242+
await self._apply_retry_backoff(task)
243+
else:
244+
task.status = TaskStatus.FAILED
245+
return TaskStatus.FAILED
246+
247+
elif reason == FailureReason.CONTEXTUAL:
248+
# 上下文问题 → 请求 Jules 重新生成
249+
logger.warning(f"Task {task_id} needs Jules re-generation")
250+
task.status = TaskStatus.PENDING_REGENERATION
251+
return TaskStatus.PENDING_REGENERATION
252+
253+
elif reason == FailureReason.VALIDATION:
254+
# 验证失败 → 人工介入
255+
task.status = TaskStatus.FAILED
256+
self._create_manual_review_ticket(task, error_msg)
257+
return TaskStatus.FAILED
258+
259+
elif reason == FailureReason.CONFLICT:
260+
# 冲突 -> Fail
261+
task.status = TaskStatus.FAILED
262+
return TaskStatus.FAILED
263+
264+
else:
265+
# 其他/未知 -> 重试
266+
task.retry_count += 1
267+
if task.retry_count <= task.max_retries:
268+
task.status = TaskStatus.RETRYING
269+
await self._apply_retry_backoff(task)
270+
else:
271+
task.status = TaskStatus.FAILED
272+
return TaskStatus.FAILED
273+
274+
except Exception as e:
275+
logger.error(f"Task {task_id} failed with exception: {e}")
276+
task.retry_count += 1
277+
if task.retry_count > task.max_retries:
278+
task.status = TaskStatus.FAILED
279+
return TaskStatus.FAILED
280+
await self._apply_retry_backoff(task)
281+
282+
task.status = TaskStatus.FAILED
283+
return TaskStatus.FAILED
284+
285+
async def _apply_retry_backoff(self, task: FixTask):
286+
"""应用重试退避策略"""
287+
288+
if self.retry_strategy == "exponential_backoff":
289+
delay = 2 ** task.retry_count # 2s, 4s, 8s
290+
elif self.retry_strategy == "fixed":
291+
delay = 5 # 固定 5 秒
292+
else:
293+
delay = 1
294+
295+
logger.info(f"Retrying task {task.id} after {delay}s (attempt {task.retry_count})")
296+
await asyncio.sleep(delay)
297+
298+
def _skip_dependent_tasks(self, failed_task_ids: List[str]):
299+
"""跳过依赖失败任务的所有后续任务"""
300+
for failed_id in failed_task_ids:
301+
if failed_id not in self.task_graph:
302+
continue
303+
# 找出所有依赖该任务的后续任务
304+
descendants = nx.descendants(self.task_graph, failed_id)
305+
for desc_id in descendants:
306+
if self.tasks[desc_id].status == TaskStatus.PENDING:
307+
self.tasks[desc_id].status = TaskStatus.SKIPPED
308+
logger.warning(f"Skipping task {desc_id} due to failed dependency {failed_id}")
309+
310+
def _create_manual_review_ticket(self, task: FixTask, error: str):
311+
"""创建人工审查工单(可集成 JIRA/GitHub Issues)"""
312+
313+
# Issue model has 'description', not 'message'
314+
issue_desc = getattr(task.issue, 'description', 'No description')
315+
if not issue_desc:
316+
# Fallback if description is empty or None
317+
issue_desc = 'No description'
318+
319+
ticket = {
320+
"title": f"CodeSnapAI: Manual review needed for {task.id}",
321+
"description": f"""
322+
Task ID: {task.id}
323+
Issue: {issue_desc}
324+
Error: {error}
325+
File: {task.issue.file_path}:{task.issue.line_number}
326+
""",
327+
"labels": ["codesnapai", "manual-review", "governance"]
328+
}
329+
# TODO: 集成 GitHub Issues API
330+
logger.info(f"Created manual review ticket: {ticket}")

0 commit comments

Comments
 (0)