-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathdebate_engine.py
More file actions
362 lines (300 loc) · 13 KB
/
debate_engine.py
File metadata and controls
362 lines (300 loc) · 13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
"""Debate Engine — structured multi-perspective review for critical tasks.
Before executing high-stakes tasks (architecture, security, database schema),
this module runs a structured debate between the assigned agent and a
challenger agent with a different specialization. A judge then picks the
stronger approach.
The debate uses the same ``isolated_query`` function that the DAG executor
uses, so it runs through the real Claude SDK with no extra dependencies.
Integration point:
orchestrator._run_dag_session — after PM creates the TaskGraph but
before DAG execution, call ``enrich_graph_with_debates(graph)`` to
annotate critical tasks with debate results.
"""
from __future__ import annotations
import logging
from collections.abc import Callable
from dataclasses import dataclass, field
from enum import StrEnum
from typing import Any
from contracts import AgentRole, TaskInput
logger = logging.getLogger(__name__)
# ── Configuration ────────────────────────────────────────────────────────────
class DebateVerdict(StrEnum):
ORIGINAL = "original"
CHALLENGER = "challenger"
MERGED = "merged"
# Roles that trigger a debate (high-impact decisions)
DEBATE_ELIGIBLE_ROLES: set[AgentRole] = {
AgentRole.DATABASE_EXPERT,
AgentRole.SECURITY_AUDITOR,
AgentRole.DEVOPS,
}
# Keywords in task goals that trigger debate regardless of role.
# Intentionally strict — only multi-word phrases that indicate
# high-stakes architectural decisions, not common single words.
DEBATE_KEYWORDS: list[str] = [
"database migration",
"schema migration",
"security architecture",
"infrastructure design",
"api design",
]
# Which role challenges which
CHALLENGER_MAP: dict[AgentRole, AgentRole] = {
AgentRole.DATABASE_EXPERT: AgentRole.BACKEND_DEVELOPER,
AgentRole.SECURITY_AUDITOR: AgentRole.BACKEND_DEVELOPER,
AgentRole.DEVOPS: AgentRole.BACKEND_DEVELOPER,
AgentRole.BACKEND_DEVELOPER: AgentRole.SECURITY_AUDITOR,
AgentRole.FRONTEND_DEVELOPER: AgentRole.REVIEWER,
}
# Default max debate rounds (1 round = proposer + challenger + judge = 3 SDK calls)
DEFAULT_MAX_ROUNDS = 1
# Environment variable to enable the debate engine (opt-in, off by default)
DEBATE_ENABLED_ENV = "HIVEMIND_DEBATE_ENABLED"
@dataclass
class DebateRound:
"""One round of debate."""
round_num: int
proposer_argument: str
challenger_argument: str
@dataclass
class DebateResult:
"""Complete result of a debate."""
task_id: str
proposer_role: AgentRole
challenger_role: AgentRole
rounds: list[DebateRound]
verdict: DebateVerdict
verdict_reasoning: str
merged_approach: str # empty if verdict != MERGED
cost_turns: int = 0 # total SDK turns used
@dataclass
class DebateEngine:
"""Manages structured debates between agents for critical tasks.
The engine is stateless per-debate — each debate is independent.
It tracks history for reporting purposes only.
"""
max_rounds: int = DEFAULT_MAX_ROUNDS
eligible_roles: set[AgentRole] = field(default_factory=lambda: set(DEBATE_ELIGIBLE_ROLES))
keywords: list[str] = field(default_factory=lambda: list(DEBATE_KEYWORDS))
history: list[DebateResult] = field(default_factory=list)
def should_debate(self, task: TaskInput) -> bool:
"""Decide whether a task warrants a debate.
Returns True if:
0. The debate engine is enabled via HIVEMIND_DEBATE_ENABLED=true, AND
1. The task role is in DEBATE_ELIGIBLE_ROLES, OR
2. The task goal contains debate keywords.
"""
import os
if os.getenv(DEBATE_ENABLED_ENV, "false").lower() not in ("1", "true", "yes"):
return False
if task.role in self.eligible_roles:
return True
goal_lower = task.goal.lower()
return any(kw in goal_lower for kw in self.keywords)
def get_challenger_role(self, task: TaskInput) -> AgentRole:
"""Return the challenger role for a given task."""
return CHALLENGER_MAP.get(task.role, AgentRole.REVIEWER)
async def run_debate(
self,
task: TaskInput,
project_dir: str,
sdk=None,
context: str = "",
on_stream: Callable | None = None,
) -> DebateResult:
"""Run a structured debate for a task.
Uses isolated_query to call Claude agents for each perspective.
The debate has three phases:
1. Proposer presents approach
2. Challenger critiques and presents alternative
3. Judge evaluates and picks winner (or merges)
Args:
task: The task to debate
project_dir: Project directory for SDK calls
sdk: ClaudeSDKManager instance (passed to isolated_query)
context: Additional context (e.g., architect review)
Returns:
DebateResult with verdict and merged approach
"""
# Lazy import to avoid circular dependency
import state
from config import get_agent_turns
from isolated_query import isolated_query
from prompts import PROMPT_REGISTRY as SPECIALIST_PROMPTS
def _resolve_system_prompt(role: str) -> str:
"""Resolve system prompt from built-in prompts or plugin registry."""
prompt = SPECIALIST_PROMPTS.get(role)
if prompt is not None:
return prompt
try:
from plugin_registry import registry as _pr
plugin = _pr.get(role)
if plugin is not None:
return plugin.build_prompt()
except ImportError:
pass
return "You are an expert software engineer."
_sdk = sdk or state.sdk_client
challenger_role = self.get_challenger_role(task)
rounds: list[DebateRound] = []
total_turns = 0
for round_num in range(1, self.max_rounds + 1):
# ── Proposer argues ──────────────────────────────────────
proposer_prompt = (
f"You are debating the best approach for this task.\n"
f"Task: {task.goal}\n"
f"Context: {context}\n"
)
if rounds:
last = rounds[-1]
proposer_prompt += (
f"\nThe challenger argued:\n{last.challenger_argument}\n"
f"\nRespond to their critique and strengthen your approach."
)
else:
proposer_prompt += (
"\nPresent your proposed approach. Be specific about "
"implementation details, trade-offs, and risks."
)
proposer_response = await isolated_query(
_sdk,
prompt=proposer_prompt,
system_prompt=_resolve_system_prompt(task.role),
cwd=project_dir,
max_turns=min(get_agent_turns(task.role) // 2, 5),
max_budget_usd=1.0,
on_stream=on_stream,
)
proposer_text = proposer_response.text if proposer_response else ""
total_turns += proposer_response.num_turns if proposer_response else 0
# ── Challenger argues ────────────────────────────────────
challenger_prompt = (
f"You are reviewing a proposed approach for this task.\n"
f"Task: {task.goal}\n"
f"Context: {context}\n"
f"\nProposed approach:\n{proposer_text}\n"
f"\nCritique this approach. Identify weaknesses, risks, and "
f"propose a better alternative if you have one."
)
challenger_response = await isolated_query(
_sdk,
prompt=challenger_prompt,
system_prompt=SPECIALIST_PROMPTS.get(
challenger_role.value, "You are an expert software engineer."
),
cwd=project_dir,
max_turns=min(get_agent_turns(challenger_role.value) // 2, 5),
max_budget_usd=1.0,
on_stream=on_stream,
)
challenger_text = challenger_response.text if challenger_response else ""
total_turns += challenger_response.num_turns if challenger_response else 0
rounds.append(
DebateRound(
round_num=round_num,
proposer_argument=proposer_text,
challenger_argument=challenger_text,
)
)
# ── Judge evaluates ──────────────────────────────────────────
debate_transcript = ""
for r in rounds:
debate_transcript += (
f"--- Round {r.round_num} ---\n"
f"Proposer ({task.role}):\n{r.proposer_argument}\n\n"
f"Challenger ({challenger_role.value}):\n{r.challenger_argument}\n\n"
)
judge_prompt = (
f"You are judging a technical debate about this task:\n"
f"Task: {task.goal}\n\n"
f"Debate transcript:\n{debate_transcript}\n\n"
f"Evaluate both approaches. Respond in this exact format:\n"
f"VERDICT: [original|challenger|merged]\n"
f"REASONING: [your reasoning]\n"
f"APPROACH: [the winning or merged approach — be specific]"
)
judge_response = await isolated_query(
_sdk,
prompt=judge_prompt,
system_prompt="You are a senior technical judge. Be objective and thorough.",
cwd=project_dir,
max_turns=3,
max_budget_usd=1.0,
on_stream=on_stream,
)
judge_text = judge_response.text if judge_response else ""
total_turns += judge_response.num_turns if judge_response else 0
# Parse verdict
verdict, reasoning, approach = self._parse_verdict(judge_text)
result = DebateResult(
task_id=task.id,
proposer_role=task.role,
challenger_role=challenger_role,
rounds=rounds,
verdict=verdict,
verdict_reasoning=reasoning,
merged_approach=approach,
cost_turns=total_turns,
)
self.history.append(result)
logger.info(
"[DebateEngine] task %s: verdict=%s (proposer=%s, challenger=%s, turns=%d)",
task.id,
verdict.value,
task.role,
challenger_role.value,
total_turns,
)
return result
def build_debate_context(self, result: DebateResult) -> str:
"""Convert a debate result into context to inject into the task prompt.
This is appended to the task prompt before execution so the agent
benefits from the debate insights.
"""
lines = [
"## Pre-execution Debate Summary",
f"A debate was held between {result.proposer_role.value} and "
f"{result.challenger_role.value}.",
f"Verdict: **{result.verdict.value}**",
f"Reasoning: {result.verdict_reasoning}",
]
if result.merged_approach:
lines.append(f"\nRecommended approach:\n{result.merged_approach}")
return "\n".join(lines)
def get_summary(self) -> dict[str, Any]:
"""Return a summary of all debates."""
total = len(self.history)
verdicts = {}
for r in self.history:
verdicts[r.verdict.value] = verdicts.get(r.verdict.value, 0) + 1
return {
"total_debates": total,
"verdicts": verdicts,
"total_turns_used": sum(r.cost_turns for r in self.history),
}
# ── Internal ─────────────────────────────────────────────────────────
@staticmethod
def _parse_verdict(text: str) -> tuple[DebateVerdict, str, str]:
"""Parse the judge response into verdict, reasoning, approach."""
import re
verdict = DebateVerdict.MERGED # default
reasoning = ""
approach = ""
# Extract VERDICT
m = re.search(r"VERDICT:\s*(original|challenger|merged)", text, re.IGNORECASE)
if m:
v = m.group(1).lower()
try:
verdict = DebateVerdict(v)
except ValueError:
verdict = DebateVerdict.MERGED
# Extract REASONING
m = re.search(r"REASONING:\s*(.+?)(?=APPROACH:|$)", text, re.DOTALL | re.IGNORECASE)
if m:
reasoning = m.group(1).strip()
# Extract APPROACH
m = re.search(r"APPROACH:\s*(.+)", text, re.DOTALL | re.IGNORECASE)
if m:
approach = m.group(1).strip()
return verdict, reasoning, approach