diff --git a/src/codegen/cli/cli.py b/src/codegen/cli/cli.py index 1bfd7833d..54353355d 100644 --- a/src/codegen/cli/cli.py +++ b/src/codegen/cli/cli.py @@ -8,6 +8,7 @@ from codegen.cli.commands.agents.main import agents_app from codegen.cli.commands.claude.main import claude from codegen.cli.commands.config.main import config_command +from codegen.cli.commands.council.main import council_app from codegen.cli.commands.init.main import init from codegen.cli.commands.integrations.main import integrations_app from codegen.cli.commands.login.main import login @@ -83,6 +84,7 @@ def version_callback(value: bool): # Add Typer apps as sub-applications (these will handle their own sub-command logging) main.add_typer(agents_app, name="agents") main.add_typer(config_command, name="config") +main.add_typer(council_app, name="council") main.add_typer(integrations_app, name="integrations") main.add_typer(profile_app, name="profile") diff --git a/src/codegen/cli/commands/council/__init__.py b/src/codegen/cli/commands/council/__init__.py new file mode 100644 index 000000000..900748ed0 --- /dev/null +++ b/src/codegen/cli/commands/council/__init__.py @@ -0,0 +1,2 @@ +"""Council command for multi-agent collaboration.""" + diff --git a/src/codegen/cli/commands/council/main.py b/src/codegen/cli/commands/council/main.py new file mode 100644 index 000000000..c8174f5a7 --- /dev/null +++ b/src/codegen/cli/commands/council/main.py @@ -0,0 +1,167 @@ +"""CLI command for running multi-agent councils.""" + +import typer +from rich import box +from rich.console import Console +from rich.panel import Panel +from rich.table import Table + +from codegen.agents.agent import Agent +from codegen.cli.auth.token_manager import get_current_token +from codegen.cli.rich.spinners import create_spinner +from codegen.cli.utils.org import resolve_org_id +from codegen.council.models import AgentConfig, CouncilConfig +from codegen.council.orchestrator import CouncilOrchestrator + +console = Console() + +council_app = typer.Typer(help="Run multi-agent councils for collaborative problem-solving") + + +@council_app.command("run") +def run_council( + prompt: str = typer.Option(..., "--prompt", "-p", help="The prompt/question for the council"), + models: str = typer.Option( + "gpt-4o,claude-3-5-sonnet-20241022,gemini-2.0-flash-exp", + "--models", + "-m", + help="Comma-separated list of models to use", + ), + candidates: int = typer.Option(3, "--candidates", "-c", help="Number of candidates per model"), + disable_ranking: bool = typer.Option(False, "--no-ranking", help="Skip Stage 2 peer ranking"), + synthesis_model: str = typer.Option( + "claude-3-5-sonnet-20241022", + "--synthesis-model", + help="Model to use for final synthesis", + ), + org_id: int | None = typer.Option(None, help="Organization ID (defaults to saved org)"), + poll_interval: float = typer.Option(5.0, "--poll", help="Seconds between status checks"), +): + """Run a multi-agent council to collaboratively solve a problem. + + Example: + codegen council run --prompt "How can I optimize my Python code?" --models gpt-4o,claude-3-5-sonnet + """ + # Get token + token = get_current_token() + if not token: + console.print("[red]Error:[/red] Not authenticated. Please run 'codegen login' first.") + raise typer.Exit(1) + + # Resolve org ID + resolved_org_id = resolve_org_id(org_id) + if resolved_org_id is None: + console.print( + "[red]Error:[/red] Organization ID not provided. " + "Pass --org-id, set CODEGEN_ORG_ID, or run 'codegen login'." + ) + raise typer.Exit(1) + + # Parse models + model_list = [m.strip() for m in models.split(",")] + + # Build config + agent_configs = [AgentConfig(model=model) for model in model_list] + + config = CouncilConfig( + agents=agent_configs, + num_candidates=candidates, + enable_ranking=not disable_ranking, + synthesis_model=synthesis_model, + ) + + console.print( + Panel( + f"[cyan]Models:[/cyan] {', '.join(model_list)}\n" + f"[cyan]Candidates per model:[/cyan] {candidates}\n" + f"[cyan]Total agent runs:[/cyan] {len(model_list) * candidates}\n" + f"[cyan]Ranking enabled:[/cyan] {'Yes' if not disable_ranking else 'No'}\n" + f"[cyan]Synthesis model:[/cyan] {synthesis_model}", + title="🏛️ [bold]Council Configuration[/bold]", + border_style="blue", + box=box.ROUNDED, + ) + ) + + # Run council + orchestrator = CouncilOrchestrator( + token=token, + org_id=resolved_org_id, + config=config, + ) + + spinner = create_spinner("Running council...") + spinner.start() + + try: + result = orchestrator.run(prompt, poll_interval=poll_interval) + except Exception as e: + spinner.stop() + console.print(f"[red]Error running council:[/red] {e}") + raise typer.Exit(1) + finally: + spinner.stop() + + # Display results + console.print("\n") + console.print( + Panel( + result.stage3_synthesis.content if result.stage3_synthesis else "No synthesis generated", + title="✨ [bold]Final Synthesized Answer[/bold]", + border_style="green", + box=box.ROUNDED, + padding=(1, 2), + ) + ) + + # Show candidate responses + if result.stage1_candidates: + console.print("\n[bold]Stage 1: Candidate Responses[/bold]") + table = Table(box=box.ROUNDED) + table.add_column("Model", style="cyan") + table.add_column("Agent Run", style="magenta") + table.add_column("Preview", style="dim") + + for cand in result.stage1_candidates: + preview = cand.content[:100] + "..." if len(cand.content) > 100 else cand.content + table.add_row( + cand.model, + f"#{cand.agent_run_id}", + preview, + ) + + console.print(table) + + # Show aggregate rankings + if result.aggregate_rankings: + console.print("\n[bold]Stage 2: Aggregate Rankings[/bold]") + rank_table = Table(box=box.ROUNDED) + rank_table.add_column("Rank", style="yellow", justify="center") + rank_table.add_column("Model", style="cyan") + rank_table.add_column("Avg Score", style="green", justify="right") + rank_table.add_column("Judgments", style="dim", justify="right") + + for idx, ranking in enumerate(result.aggregate_rankings, start=1): + rank_table.add_row( + f"#{idx}", + ranking["model"], + f"{ranking['average_rank']:.2f}", + str(ranking["rankings_count"]), + ) + + console.print(rank_table) + + # Show synthesis info + if result.stage3_synthesis: + console.print("\n[dim]💡 Synthesis Details:[/dim]") + console.print(f" Method: {result.stage3_synthesis.method}") + console.print(f" Agent Run: #{result.stage3_synthesis.agent_run_id}") + if result.stage3_synthesis.web_url: + console.print(f" View: {result.stage3_synthesis.web_url}") + + console.print("\n[green]✓[/green] Council completed successfully!") + + +# Make council_app the default export for CLI integration +council = council_app + diff --git a/src/codegen/council/__init__.py b/src/codegen/council/__init__.py new file mode 100644 index 000000000..89d2eee5e --- /dev/null +++ b/src/codegen/council/__init__.py @@ -0,0 +1,19 @@ +"""Multi-agent council orchestration for Codegen. + +This module provides a council-based approach where multiple agents with different +models collaborate to solve complex problems through: +1. Parallel generation of candidate responses +2. Peer ranking and evaluation +3. Synthesis of final answer +""" + +from .models import AgentConfig, CouncilConfig, CouncilResult +from .orchestrator import CouncilOrchestrator + +__all__ = [ + "AgentConfig", + "CouncilConfig", + "CouncilResult", + "CouncilOrchestrator", +] + diff --git a/src/codegen/council/models.py b/src/codegen/council/models.py new file mode 100644 index 000000000..9c577239a --- /dev/null +++ b/src/codegen/council/models.py @@ -0,0 +1,154 @@ +"""Data models for council orchestration.""" + +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + + +@dataclass +class AgentConfig: + """Configuration for a single agent in the council. + + Attributes: + model: Model identifier to use for this agent + role: Optional role description for the agent + temperature: Sampling temperature (0-1, higher = more creative) + prompt_variation: Optional prompt modification strategy + """ + + model: str + role: Optional[str] = None + temperature: float = 0.9 + prompt_variation: Optional[str] = None + + +@dataclass +class CouncilConfig: + """Configuration for council execution. + + Attributes: + agents: List of agent configurations to use + num_candidates: Number of parallel candidates to generate per agent + enable_ranking: Whether to run Stage 2 (peer ranking) + synthesis_model: Model to use for final synthesis + synthesis_temperature: Temperature for synthesis + tournament_threshold: Use tournament synthesis if candidates exceed this + group_size: Size of groups for tournament synthesis + """ + + agents: List[AgentConfig] + num_candidates: int = 3 + enable_ranking: bool = True + synthesis_model: str = "claude-3-5-sonnet-20241022" + synthesis_temperature: float = 0.2 + tournament_threshold: int = 20 + group_size: int = 10 + + +@dataclass +class CandidateResponse: + """A single candidate response from an agent. + + Attributes: + agent_run_id: ID of the codegen agent run + model: Model that generated this response + content: The response content + web_url: URL to view the agent run + metadata: Additional metadata from the run + """ + + agent_run_id: int + model: str + content: str + web_url: Optional[str] = None + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class RankingResult: + """Ranking of candidates by a judging agent. + + Attributes: + judge_model: Model that performed the ranking + agent_run_id: ID of the ranking agent run + ranking_text: Full text of the ranking explanation + parsed_ranking: Ordered list of response labels (best to worst) + web_url: URL to view the ranking agent run + """ + + judge_model: str + agent_run_id: int + ranking_text: str + parsed_ranking: List[str] + web_url: Optional[str] = None + + +@dataclass +class SynthesisResult: + """Final synthesized response. + + Attributes: + agent_run_id: ID of the synthesis agent run + model: Model that performed synthesis + content: The final synthesized response + web_url: URL to view the synthesis agent run + method: Synthesis method used ('simple' or 'tournament') + """ + + agent_run_id: int + model: str + content: str + web_url: Optional[str] = None + method: str = "simple" + + +@dataclass +class CouncilResult: + """Complete result from a council execution. + + Attributes: + stage1_candidates: All candidate responses generated + stage2_rankings: Rankings from peer evaluation (if enabled) + stage3_synthesis: Final synthesized response + aggregate_rankings: Aggregated ranking scores across all judges + label_to_model: Mapping from anonymous labels to model names + """ + + stage1_candidates: List[CandidateResponse] + stage2_rankings: Optional[List[RankingResult]] = None + stage3_synthesis: Optional[SynthesisResult] = None + aggregate_rankings: Optional[List[Dict[str, Any]]] = None + label_to_model: Optional[Dict[str, str]] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + "stage1_candidates": [ + { + "agent_run_id": c.agent_run_id, + "model": c.model, + "content": c.content, + "web_url": c.web_url, + } + for c in self.stage1_candidates + ], + "stage2_rankings": [ + { + "judge_model": r.judge_model, + "agent_run_id": r.agent_run_id, + "ranking_text": r.ranking_text, + "parsed_ranking": r.parsed_ranking, + "web_url": r.web_url, + } + for r in (self.stage2_rankings or []) + ], + "stage3_synthesis": { + "agent_run_id": self.stage3_synthesis.agent_run_id, + "model": self.stage3_synthesis.model, + "content": self.stage3_synthesis.content, + "web_url": self.stage3_synthesis.web_url, + "method": self.stage3_synthesis.method, + } if self.stage3_synthesis else None, + "aggregate_rankings": self.aggregate_rankings, + "label_to_model": self.label_to_model, + } + diff --git a/src/codegen/council/orchestrator.py b/src/codegen/council/orchestrator.py new file mode 100644 index 000000000..053ea36aa --- /dev/null +++ b/src/codegen/council/orchestrator.py @@ -0,0 +1,528 @@ +"""Council orchestrator for multi-agent collaboration.""" + +import re +import time +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Any, Dict, List, Optional, Tuple + +from codegen.agents.agent import Agent, AgentTask +from codegen.council.models import ( + AgentConfig, + CandidateResponse, + CouncilConfig, + CouncilResult, + RankingResult, + SynthesisResult, +) +from codegen.shared.logging.get_logger import get_logger + +logger = get_logger(__name__) + + +class CouncilOrchestrator: + """Orchestrates multi-agent council execution with Codegen agents. + + Implements a 3-stage process: + 1. Stage 1: Generate N candidate responses from each agent/model + 2. Stage 2 (optional): Each agent ranks all candidates anonymously + 3. Stage 3: Synthesize final response from all candidates and rankings + """ + + def __init__( + self, + token: str, + org_id: int, + config: CouncilConfig, + max_workers: int = 50, + ): + """Initialize council orchestrator. + + Args: + token: Codegen API token + org_id: Organization ID + config: Council configuration + max_workers: Max parallel workers for agent execution + """ + self.token = token + self.org_id = org_id + self.config = config + self.max_workers = max_workers + + def run(self, prompt: str, poll_interval: float = 5.0) -> CouncilResult: + """Execute the full council process. + + Args: + prompt: User's question/task + poll_interval: Seconds between status polls for agent runs + + Returns: + CouncilResult with all stages completed + """ + logger.info( + f"Starting council with {len(self.config.agents)} agents, " + f"{self.config.num_candidates} candidates each" + ) + + # Stage 1: Generate candidates + stage1_candidates = self._stage1_generate_candidates(prompt, poll_interval) + + if not stage1_candidates: + raise RuntimeError("All candidate generations failed") + + logger.info(f"Stage 1 complete: {len(stage1_candidates)} candidates generated") + + # Stage 2: Rankings (optional) + stage2_rankings = None + aggregate_rankings = None + label_to_model = None + + if self.config.enable_ranking: + stage2_rankings, label_to_model = self._stage2_collect_rankings( + prompt, stage1_candidates, poll_interval + ) + aggregate_rankings = self._calculate_aggregate_rankings( + stage2_rankings, label_to_model + ) + logger.info(f"Stage 2 complete: {len(stage2_rankings)} rankings collected") + + # Stage 3: Synthesis + method = "tournament" if len(stage1_candidates) > self.config.tournament_threshold else "simple" + + stage3_synthesis = self._stage3_synthesize( + prompt, + stage1_candidates, + stage2_rankings or [], + method, + poll_interval, + ) + logger.info(f"Stage 3 complete: Final synthesis using {method} method") + + return CouncilResult( + stage1_candidates=stage1_candidates, + stage2_rankings=stage2_rankings, + stage3_synthesis=stage3_synthesis, + aggregate_rankings=aggregate_rankings, + label_to_model=label_to_model, + ) + + def _stage1_generate_candidates( + self, + prompt: str, + poll_interval: float, + ) -> List[CandidateResponse]: + """Stage 1: Generate candidate responses from all agents.""" + # Calculate total runs: agents × candidates_per_agent + total_runs = len(self.config.agents) * self.config.num_candidates + + logger.info(f"Stage 1: Launching {total_runs} agent runs") + + # Build all agent run configs + run_configs = [] + for agent_config in self.config.agents: + for _ in range(self.config.num_candidates): + run_configs.append((agent_config.model, prompt)) + + # Launch all runs in parallel + tasks = self._launch_parallel_runs(run_configs) + + # Wait for completion + results = self._wait_for_completion(tasks, poll_interval) + + # Convert to CandidateResponse objects + candidates = [] + for task, (model, _) in zip(tasks, run_configs): + if task.status == "COMPLETE" and task.result: + # task.result can be either a string or a dict with 'content' + if isinstance(task.result, str): + result_content = task.result + elif isinstance(task.result, dict): + result_content = task.result.get("content", "") + else: + result_content = str(task.result) + + if result_content: + candidates.append( + CandidateResponse( + agent_run_id=task.id, + model=model, + content=result_content, + web_url=task.web_url, + ) + ) + + return candidates + + def _stage2_collect_rankings( + self, + original_prompt: str, + candidates: List[CandidateResponse], + poll_interval: float, + ) -> Tuple[List[RankingResult], Dict[str, str]]: + """Stage 2: Each agent ranks the anonymized candidates.""" + # Create anonymous labels (Response A, Response B, etc.) + labels = [chr(65 + i) for i in range(len(candidates))] # A, B, C, ... + label_to_model = { + f"Response {label}": cand.model + for label, cand in zip(labels, candidates) + } + + # Build ranking prompt + ranking_prompt = self._build_ranking_prompt(original_prompt, candidates, labels) + + logger.info(f"Stage 2: Launching {len(self.config.agents)} ranking runs") + + # Launch ranking runs for each agent + run_configs = [(agent.model, ranking_prompt) for agent in self.config.agents] + tasks = self._launch_parallel_runs(run_configs) + + # Wait for completion + self._wait_for_completion(tasks, poll_interval) + + # Parse rankings + rankings = [] + for task, (model, _) in zip(tasks, run_configs): + if task.status == "COMPLETE" and task.result: + # task.result can be either a string or a dict with 'content' + if isinstance(task.result, str): + ranking_text = task.result + elif isinstance(task.result, dict): + ranking_text = task.result.get("content", "") + else: + ranking_text = str(task.result) + + if ranking_text: + parsed = self._parse_ranking_from_text(ranking_text) + rankings.append( + RankingResult( + judge_model=model, + agent_run_id=task.id, + ranking_text=ranking_text, + parsed_ranking=parsed, + web_url=task.web_url, + ) + ) + + return rankings, label_to_model + + def _stage3_synthesize( + self, + original_prompt: str, + candidates: List[CandidateResponse], + rankings: List[RankingResult], + method: str, + poll_interval: float, + ) -> SynthesisResult: + """Stage 3: Synthesize final response.""" + if method == "tournament": + return self._tournament_synthesis( + original_prompt, candidates, rankings, poll_interval + ) + else: + return self._simple_synthesis( + original_prompt, candidates, rankings, poll_interval + ) + + def _simple_synthesis( + self, + original_prompt: str, + candidates: List[CandidateResponse], + rankings: List[RankingResult], + poll_interval: float, + ) -> SynthesisResult: + """Simple synthesis: combine all candidates in one shot.""" + synthesis_prompt = self._build_synthesis_prompt( + original_prompt, candidates, rankings + ) + + logger.info("Stage 3: Running simple synthesis") + + # Launch synthesis run + agent = Agent(token=self.token, org_id=self.org_id) + task = agent.run(synthesis_prompt) + + # Wait for completion + self._wait_for_single_task(task, poll_interval) + + # Get result + content = "" + if task.status == "COMPLETE" and task.result: + if isinstance(task.result, str): + content = task.result + elif isinstance(task.result, dict): + content = task.result.get("content", "") + else: + content = str(task.result) + + return SynthesisResult( + agent_run_id=task.id, + model=self.config.synthesis_model, + content=content, + web_url=task.web_url, + method="simple", + ) + + def _tournament_synthesis( + self, + original_prompt: str, + candidates: List[CandidateResponse], + rankings: List[RankingResult], + poll_interval: float, + ) -> SynthesisResult: + """Tournament synthesis: group → synth groups → synth winners.""" + logger.info( + f"Stage 3: Running tournament synthesis with {len(candidates)} candidates, " + f"group_size={self.config.group_size}" + ) + + # Split into groups + groups = [ + candidates[i : i + self.config.group_size] + for i in range(0, len(candidates), self.config.group_size) + ] + + logger.info(f"Created {len(groups)} groups for tournament") + + # Synthesize each group + group_winners = [] + for group_idx, group in enumerate(groups): + logger.info(f"Synthesizing group {group_idx + 1}/{len(groups)}") + group_prompt = self._build_synthesis_prompt(original_prompt, group, []) + + agent = Agent(token=self.token, org_id=self.org_id) + task = agent.run(group_prompt) + self._wait_for_single_task(task, poll_interval) + + if task.status == "COMPLETE" and task.result: + if isinstance(task.result, str): + content = task.result + elif isinstance(task.result, dict): + content = task.result.get("content", "") + else: + content = str(task.result) + + if content: + group_winners.append( + CandidateResponse( + agent_run_id=task.id, + model=self.config.synthesis_model, + content=content, + web_url=task.web_url, + ) + ) + + # Final synthesis across group winners + logger.info(f"Final synthesis across {len(group_winners)} group winners") + final_prompt = self._build_synthesis_prompt(original_prompt, group_winners, rankings) + + agent = Agent(token=self.token, org_id=self.org_id) + task = agent.run(final_prompt) + self._wait_for_single_task(task, poll_interval) + + content = "" + if task.status == "COMPLETE" and task.result: + if isinstance(task.result, str): + content = task.result + elif isinstance(task.result, dict): + content = task.result.get("content", "") + else: + content = str(task.result) + + return SynthesisResult( + agent_run_id=task.id, + model=self.config.synthesis_model, + content=content, + web_url=task.web_url, + method="tournament", + ) + + def _launch_parallel_runs( + self, + run_configs: List[Tuple[str, str]], + ) -> List[AgentTask]: + """Launch multiple agent runs in parallel. + + Args: + run_configs: List of (model, prompt) tuples + + Returns: + List of AgentTask objects + """ + tasks = [] + + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + future_to_config = {} + + for model, prompt in run_configs: + agent = Agent(token=self.token, org_id=self.org_id) + future = executor.submit(agent.run, prompt) + future_to_config[future] = (model, prompt) + + for future in as_completed(future_to_config): + try: + task = future.result() + tasks.append(task) + except Exception as e: + logger.error(f"Failed to launch agent run: {e}") + + return tasks + + def _wait_for_completion( + self, + tasks: List[AgentTask], + poll_interval: float, + ) -> List[AgentTask]: + """Wait for all tasks to complete.""" + pending = set(tasks) + + while pending: + completed_in_round = set() + + for task in pending: + task.refresh() + if task.status in ("COMPLETE", "FAILED", "STOPPED"): + completed_in_round.add(task) + + pending -= completed_in_round + + if pending: + time.sleep(poll_interval) + + return tasks + + def _wait_for_single_task(self, task: AgentTask, poll_interval: float): + """Wait for a single task to complete.""" + while task.status not in ("COMPLETE", "FAILED", "STOPPED"): + time.sleep(poll_interval) + task.refresh() + + def _get_task_status(self, task: AgentTask) -> Optional[Dict[str, Any]]: + """Get status dict for a task.""" + return { + "id": task.id, + "status": task.status, + "result": task.result, + "web_url": task.web_url, + } + + def _build_ranking_prompt( + self, + original_prompt: str, + candidates: List[CandidateResponse], + labels: List[str], + ) -> str: + """Build prompt for ranking candidates.""" + responses_text = "\n\n".join( + f"\n{cand.content}\n" + for label, cand in zip(labels, candidates) + ) + + return f"""You are evaluating different responses to the following question: + +Question: {original_prompt} + +Here are the responses from different models (anonymized): + +{responses_text} + +Your task: +1. First, evaluate each response individually. For each response, explain what it does well and what it does poorly. +2. Then, at the very end of your response, provide a final ranking. + +IMPORTANT: Your final ranking MUST be formatted EXACTLY as follows: +- Start with the line "FINAL RANKING:" (all caps, with colon) +- Then list the responses from best to worst as a numbered list +- Each line should be: number, period, space, then ONLY the response label (e.g., "1. Response A") +- Do not add any other text or explanations in the ranking section + +Example format: + +[Your evaluation of each response...] + +FINAL RANKING: +1. Response C +2. Response A +3. Response B + +Now provide your evaluation and ranking:""" + + def _build_synthesis_prompt( + self, + original_prompt: str, + candidates: List[CandidateResponse], + rankings: List[RankingResult], + ) -> str: + """Build prompt for synthesizing final answer.""" + candidates_text = "\n\n".join( + f"\n{cand.content}\n" + for i, cand in enumerate(candidates) + ) + + rankings_text = "" + if rankings: + rankings_text = "\n\nPeer Rankings:\n" + "\n\n".join( + f"Judge {i + 1}:\n{rank.ranking_text}" + for i, rank in enumerate(rankings) + ) + + return f"""You are an expert editor synthesizing multiple candidate responses. + +Original Question: {original_prompt} + +Candidate Responses: +{candidates_text}{rankings_text} + +Your task is to synthesize ONE best answer by: +- Merging the strengths of multiple candidates +- Correcting any errors or inconsistencies +- Removing repetition and redundancy +- Being decisive and clear + +Do not mention the candidates, synthesis process, or ranking. Just provide the best final answer.""" + + def _parse_ranking_from_text(self, ranking_text: str) -> List[str]: + """Parse FINAL RANKING section from response.""" + if "FINAL RANKING:" in ranking_text: + parts = ranking_text.split("FINAL RANKING:") + if len(parts) >= 2: + ranking_section = parts[1] + numbered_matches = re.findall(r"\d+\.\s*Response [A-Z]", ranking_section) + if numbered_matches: + return [ + re.search(r"Response [A-Z]", m).group() + for m in numbered_matches + ] + matches = re.findall(r"Response [A-Z]", ranking_section) + return matches + + matches = re.findall(r"Response [A-Z]", ranking_text) + return matches + + def _calculate_aggregate_rankings( + self, + rankings: List[RankingResult], + label_to_model: Dict[str, str], + ) -> List[Dict[str, Any]]: + """Calculate aggregate rankings across all judges.""" + model_positions: Dict[str, List[int]] = defaultdict(list) + + for ranking in rankings: + for position, label in enumerate(ranking.parsed_ranking, start=1): + if label in label_to_model: + model_name = label_to_model[label] + model_positions[model_name].append(position) + + aggregate = [] + for model, positions in model_positions.items(): + if positions: + avg_rank = sum(positions) / len(positions) + aggregate.append( + { + "model": model, + "average_rank": round(avg_rank, 2), + "rankings_count": len(positions), + } + ) + + aggregate.sort(key=lambda x: x["average_rank"]) + return aggregate diff --git a/tests/council/test_orchestrator.py b/tests/council/test_orchestrator.py new file mode 100644 index 000000000..e817120a5 --- /dev/null +++ b/tests/council/test_orchestrator.py @@ -0,0 +1,198 @@ +"""Tests for council orchestrator.""" + +from unittest.mock import MagicMock, Mock, patch + +import pytest + +from codegen.council.models import AgentConfig, CouncilConfig +from codegen.council.orchestrator import CouncilOrchestrator + + +@pytest.fixture +def mock_agent_task(): + """Create a mock AgentTask.""" + task = Mock() + task.id = 123 + task.status = "COMPLETE" + task.result = {"content": "Mock response content"} + task.web_url = "https://codegen.com/agent/run/123" + return task + + +@pytest.fixture +def mock_agent(mock_agent_task): + """Create a mock Agent class.""" + with patch("codegen.council.orchestrator.Agent") as MockAgent: + mock_instance = Mock() + mock_instance.run.return_value = mock_agent_task + MockAgent.return_value = mock_instance + yield MockAgent + + +def test_council_config_defaults(): + """Test CouncilConfig defaults.""" + agents = [AgentConfig(model="gpt-4o")] + config = CouncilConfig(agents=agents) + + assert config.num_candidates == 3 + assert config.enable_ranking is True + assert config.synthesis_model == "claude-3-5-sonnet-20241022" + assert config.tournament_threshold == 20 + + +def test_orchestrator_initialization(): + """Test CouncilOrchestrator initialization.""" + agents = [AgentConfig(model="gpt-4o")] + config = CouncilConfig(agents=agents) + + orchestrator = CouncilOrchestrator( + token="test-token", + org_id=123, + config=config, + ) + + assert orchestrator.token == "test-token" + assert orchestrator.org_id == 123 + assert orchestrator.config == config + + +def test_parse_ranking_from_text(): + """Test parsing of ranking text.""" + orchestrator = CouncilOrchestrator( + token="test-token", + org_id=123, + config=CouncilConfig(agents=[AgentConfig(model="gpt-4o")]), + ) + + ranking_text = """ + Response A is good but has issues. + Response B is better. + Response C is the best. + + FINAL RANKING: + 1. Response C + 2. Response B + 3. Response A + """ + + parsed = orchestrator._parse_ranking_from_text(ranking_text) + assert parsed == ["Response C", "Response B", "Response A"] + + +def test_parse_ranking_fallback(): + """Test parsing falls back gracefully when format is off.""" + orchestrator = CouncilOrchestrator( + token="test-token", + org_id=123, + config=CouncilConfig(agents=[AgentConfig(model="gpt-4o")]), + ) + + # Missing FINAL RANKING header + ranking_text = """ + Response A is mentioned here. + Response B is also mentioned. + """ + + parsed = orchestrator._parse_ranking_from_text(ranking_text) + assert "Response A" in parsed + assert "Response B" in parsed + + +def test_build_synthesis_prompt(): + """Test synthesis prompt building.""" + from codegen.council.models import CandidateResponse + + orchestrator = CouncilOrchestrator( + token="test-token", + org_id=123, + config=CouncilConfig(agents=[AgentConfig(model="gpt-4o")]), + ) + + candidates = [ + CandidateResponse( + agent_run_id=1, + model="gpt-4o", + content="Response 1", + ), + CandidateResponse( + agent_run_id=2, + model="claude-3-5-sonnet", + content="Response 2", + ), + ] + + prompt = orchestrator._build_synthesis_prompt( + "What is AI?", + candidates, + [], + ) + + assert "What is AI?" in prompt + assert "Response 1" in prompt + assert "Response 2" in prompt + assert "synthesize" in prompt.lower() + + +def test_calculate_aggregate_rankings(): + """Test aggregate ranking calculation.""" + from codegen.council.models import RankingResult + + orchestrator = CouncilOrchestrator( + token="test-token", + org_id=123, + config=CouncilConfig(agents=[AgentConfig(model="gpt-4o")]), + ) + + rankings = [ + RankingResult( + judge_model="gpt-4o", + agent_run_id=1, + ranking_text="FINAL RANKING:\n1. Response A\n2. Response B", + parsed_ranking=["Response A", "Response B"], + ), + RankingResult( + judge_model="claude-3-5-sonnet", + agent_run_id=2, + ranking_text="FINAL RANKING:\n1. Response B\n2. Response A", + parsed_ranking=["Response B", "Response A"], + ), + ] + + label_to_model = { + "Response A": "model-1", + "Response B": "model-2", + } + + aggregate = orchestrator._calculate_aggregate_rankings(rankings, label_to_model) + + # Both models should have average rank of 1.5 (got 1st once, 2nd once) + assert len(aggregate) == 2 + assert all(r["average_rank"] == 1.5 for r in aggregate) + + +@pytest.mark.skip(reason="Integration test - requires live API") +def test_full_council_run(): + """Integration test for full council run (requires API access).""" + agents = [ + AgentConfig(model="gpt-4o"), + AgentConfig(model="claude-3-5-sonnet-20241022"), + ] + + config = CouncilConfig( + agents=agents, + num_candidates=1, # Keep it small for testing + enable_ranking=False, # Skip ranking for speed + ) + + orchestrator = CouncilOrchestrator( + token="your-api-token", + org_id=123, + config=config, + ) + + result = orchestrator.run("What is 2+2?", poll_interval=2.0) + + assert result.stage1_candidates + assert result.stage3_synthesis + assert result.stage3_synthesis.content +