diff --git a/.github/workflows/repo-architect.yml b/.github/workflows/repo-architect.yml index 65b71fb..818d447 100644 --- a/.github/workflows/repo-architect.yml +++ b/.github/workflows/repo-architect.yml @@ -12,10 +12,11 @@ on: - analyze - report - mutate + - campaign github_model: - description: 'GitHub Models model id' - required: true - default: 'openai/gpt-4.1' + description: 'GitHub Models model id (overrides preferred model)' + required: false + default: '' type: string report_path: description: 'Primary report path' @@ -31,6 +32,16 @@ on: - '1' - '2' - '3' + max_slices: + description: 'Campaign max slices (campaign mode only)' + required: false + default: '3' + type: string + lanes: + description: 'Comma-separated lane order (mutate and campaign modes)' + required: false + default: 'parse_errors,import_cycles,entrypoint_consolidation,hygiene,report' + type: string schedule: - cron: '17 * * * *' @@ -62,22 +73,38 @@ jobs: git config user.name "github-actions[bot]" git config user.email "41898282+github-actions[bot]@users.noreply.github.com" + - name: Ensure artifact directories exist + run: | + mkdir -p .agent docs/repo_architect + - name: Run repo architect env: GITHUB_TOKEN: ${{ github.token }} GITHUB_REPO: ${{ github.repository }} GITHUB_BASE_BRANCH: ${{ github.event.repository.default_branch }} + REPO_ARCHITECT_BRANCH_SUFFIX: ${{ github.run_id }}-${{ github.run_attempt }} + REPO_ARCHITECT_PREFERRED_MODEL: openai/gpt-5.4 + REPO_ARCHITECT_FALLBACK_MODEL: openai/gpt-4.1 run: | MODE="${{ github.event.inputs.mode }}" MODEL="${{ github.event.inputs.github_model }}" REPORT_PATH="${{ github.event.inputs.report_path }}" MUTATION_BUDGET="${{ github.event.inputs.mutation_budget }}" + MAX_SLICES="${{ github.event.inputs.max_slices }}" + LANES="${{ github.event.inputs.lanes }}" if [ -z "$MODE" ]; then MODE="report"; fi - if [ -z "$MODEL" ]; then MODEL="openai/gpt-4.1"; fi if [ -z "$REPORT_PATH" ]; then REPORT_PATH="docs/repo_architect/runtime_inventory.md"; fi if [ -z "$MUTATION_BUDGET" ]; then MUTATION_BUDGET="1"; fi - export GITHUB_MODEL="$MODEL" - python repo_architect.py --allow-dirty --mode "$MODE" --report-path "$REPORT_PATH" --mutation-budget "$MUTATION_BUDGET" + if [ -z "$MAX_SLICES" ]; then MAX_SLICES="3"; fi + if [ -z "$LANES" ]; then LANES="parse_errors,import_cycles,entrypoint_consolidation,hygiene,report"; fi + EXTRA_ARGS="" + if [ -n "$MODEL" ]; then EXTRA_ARGS="$EXTRA_ARGS --github-model $MODEL"; fi + if [ "$MODE" = "campaign" ]; then + EXTRA_ARGS="$EXTRA_ARGS --max-slices $MAX_SLICES --lanes $LANES" + elif [ "$MODE" = "mutate" ]; then + EXTRA_ARGS="$EXTRA_ARGS --lanes $LANES" + fi + python repo_architect.py --allow-dirty --mode "$MODE" --report-path "$REPORT_PATH" --mutation-budget "$MUTATION_BUDGET" $EXTRA_ARGS - name: Upload repo architect artifacts if: always() diff --git a/docs/repo_architect/OPERATOR_GUIDE.md b/docs/repo_architect/OPERATOR_GUIDE.md new file mode 100644 index 0000000..072acf9 --- /dev/null +++ b/docs/repo_architect/OPERATOR_GUIDE.md @@ -0,0 +1,175 @@ +# repo-architect Operator Guide + +## Overview + +`repo_architect.py` is a single-file autonomous agent that: +- Analyses the Python codebase for parse errors, import cycles, entrypoints, and hygiene issues. +- Generates or refreshes architecture documentation under `docs/repo_architect/`. +- Creates mutation branches and pull-requests that fix real code issues. +- Can run repeatedly without branch collisions or stale-branch failures. + +--- + +## Model Selection (preferred / fallback) + +The agent tries a **preferred** model first and automatically retries with a **fallback** model when the preferred one is unavailable or returns an unknown-model error. + +| Priority | Source | +|---|---| +| 1 | `--preferred-model` CLI flag | +| 2 | `REPO_ARCHITECT_PREFERRED_MODEL` env var | +| 3 | Hard-coded default (`openai/gpt-5.4`) | + +| Fallback priority | Source | +|---|---| +| 1 | `--fallback-model` CLI flag | +| 2 | `REPO_ARCHITECT_FALLBACK_MODEL` env var | +| 3 | Hard-coded default (`openai/gpt-4.1`) | + +**Failure tolerance:** if both model calls fail (network, quota, unavailability), the run continues without model-generated output instead of aborting. The JSON result records `fallback_reason` and `fallback_occurred`. + +The workflow exports: +```yaml +REPO_ARCHITECT_PREFERRED_MODEL: openai/gpt-5.4 +REPO_ARCHITECT_FALLBACK_MODEL: openai/gpt-4.1 +``` + +--- + +## Unique Branch Strategy + +Every mutation branch is suffixed with a per-run unique token to prevent branch-push collisions. + +**Suffix precedence:** +1. `REPO_ARCHITECT_BRANCH_SUFFIX` env var (if set and non-empty) +2. `GITHUB_RUN_ID-GITHUB_RUN_ATTEMPT` (when both are present) +3. UTC timestamp fallback (`YYYYmmddHHMMSS`) + +The suffix is sanitised to `[A-Za-z0-9._-]` only. + +**Push collision recovery:** Before pushing, the agent checks whether the remote branch already exists. If it does, a fresh unique branch name is generated. A single retry is also performed if the push is rejected due to a non-fast-forward error. + +The workflow exports: +```yaml +REPO_ARCHITECT_BRANCH_SUFFIX: ${{ github.run_id }}-${{ github.run_attempt }} +``` + +--- + +## Lane Priority Rules + +In `mutate` and `campaign` modes the agent selects exactly one lane per slice, in this priority order: + +| Priority | Lane | Behaviour | +|---|---|---| +| 1 | `parse_errors` | Model-assisted syntax fix for one or more files with parse errors. Skipped if no parse errors exist or model is unavailable. | +| 2 | `import_cycles` | Model-assisted import cycle break (TYPE_CHECKING guard / lazy import). Skipped if no cycles or model unavailable. | +| 3 | `entrypoint_consolidation` | Annotates one redundant backend server entrypoint with a `# DEPRECATED` comment when ≥ 4 backend entrypoints exist. Model-assisted. | +| 4 | `hygiene` | Remove explicitly `# DEBUG`-marked `print()` statements. No model required. | +| 5 | `report` | Refresh the architecture documentation packet. Only selected when no higher-priority code mutation is possible. | + +A **report-only mutation is never produced when parse errors exist** unless all code-lane attempts fail (in which case `no_safe_code_mutation_reason` is populated in the JSON output). + +### Scoping lanes in mutate mode + +You can restrict which lanes a mutate run considers via `--lanes` or `--lane`: + +```bash +# Only try hygiene lane +python repo_architect.py --mode mutate --lane hygiene --allow-dirty + +# Only try parse_errors and import_cycles +python repo_architect.py --mode mutate --lanes parse_errors,import_cycles --allow-dirty +``` + +This also works via environment variables: `REPO_ARCHITECT_LANES` or `REPO_ARCHITECT_LANE`. + +The workflow passes `--lanes` to both mutate and campaign modes, so the `lanes` workflow_dispatch input controls lane selection in every mutation-capable mode. + +--- + +## Campaign Mode + +Campaign mode executes up to `--max-slices` mutation slices across the lane priority order, re-analysing between each slice so later lanes see the updated repository state. + +```bash +python repo_architect.py \ + --mode campaign \ + --allow-dirty \ + --max-slices 3 \ + --lanes parse_errors,import_cycles,entrypoint_consolidation,hygiene,report \ + --stop-on-failure +``` + +**CLI flags:** + +| Flag | Default | Description | +|---|---|---| +| `--max-slices` | `3` | Maximum slices to attempt | +| `--lanes` | all 5 lanes | Comma-separated lane order (also works in mutate mode) | +| `--stop-on-failure` | `false` | Abort remaining slices on first failure | +| `--preferred-model` | (env / default) | Override preferred model for this run | +| `--fallback-model` | (env / default) | Override fallback model for this run | + +Campaign results are emitted as `status: campaign_complete` JSON and also saved to `.agent/campaign_summary.json`. + +--- + +## Output Contract (machine-readable JSON) + +Every run emits a JSON object on stdout. Key fields: + +| Field | Description | +|---|---| +| `status` | `analyzed`, `analysis_only`, `mutated`, `no_meaningful_delta`, `no_safe_mutation_available`, `campaign_complete` | +| `mode` | Active mode | +| `lane` | Selected mutation lane (`none` if no mutation) | +| `lanes_active` | List of lanes considered (from `--lanes` / env or default lane order) | +| `architecture_score` | 1–100 composite health score | +| `requested_model` | Model that was requested | +| `actual_model` | Model that actually responded (may differ if fallback occurred) | +| `fallback_reason` | Reason fallback was triggered (or `null`) | +| `fallback_occurred` | `true` if the fallback model was used | +| `no_safe_code_mutation_reason` | Explanation when no code-level mutation was safe | +| `branch` | Git branch name pushed (or `null`) | +| `changed_files` | List of files modified | +| `validation` | Output from `py_compile` or equivalent validation | +| `artifact_files` | All artifact paths generated in this run | +| `pull_request_url` | PR URL if created | + +--- + +## Workflow Dispatch Modes + +| Mode | Effect | +|---|---| +| `analyze` | Builds analysis, writes `.agent/` artifacts, no branch or PR | +| `report` | Refreshes `docs/repo_architect/` documentation | +| `mutate` | Attempts one code or report mutation in lane-priority order | +| `campaign` | Runs up to `max_slices` slices serially across lanes | + +--- + +## Validation Policy + +Each mutation lane runs the following validation before pushing: + +| Lane | Validation | +|---|---| +| `parse_errors` | `ast.parse` on model-generated content; file must parse cleanly or it is rejected | +| `import_cycles` | `ast.parse` on model-generated content; import smoke test attempted (warn-only) | +| `entrypoint_consolidation` | `ast.parse` on model-generated content | +| `hygiene` | `python -m py_compile` on all touched Python files | +| `report` | Verify report files were written; hash included in output | + +Validation failures abort the mutation and **never push a broken branch**. + +--- + +## Running Tests + +```bash +python -m unittest tests.test_repo_architect -v +``` + +The test suite covers: branch suffix generation, model fallback, `ast.parse` gate, campaign aggregation, output schema stability, lane priority selection, `entrypoint_consolidation` threshold/validation, lane scoping in mutate mode (`--lane`, `--lanes`, env vars), and enhanced validation (import smoke for import_cycles lane). diff --git a/repo_architect.py b/repo_architect.py index d292091..94c96e4 100644 --- a/repo_architect.py +++ b/repo_architect.py @@ -59,6 +59,27 @@ "top_risks": DEFAULT_REPORT_DIR / "top_risks.md", } +# Model selection defaults +DEFAULT_PREFERRED_MODEL = "openai/gpt-5.4" +DEFAULT_FALLBACK_MODEL = "openai/gpt-4.1" +# Substrings in HTTP error bodies that indicate the model itself is unavailable (not a transient error) +_MODEL_UNAVAILABLE_SIGNALS = frozenset({ + "unknown_model", "model_not_found", "unsupported_model", "unsupported model", + "not found", "does not exist", "invalid model", "no such model", +}) +# Canonical lane execution order for mutate / campaign modes +MUTATION_LANE_ORDER: Tuple[str, ...] = ("parse_errors", "import_cycles", "entrypoint_consolidation", "hygiene", "report") +# Maximum characters of source code sent to the model per file snippet +_MAX_SOURCE_SNIPPET_CHARS = 4000 +_MAX_CYCLE_SNIPPET_CHARS = 3000 +# Maximum total branch-name length (git max ref is 255; leave margin for remote path prefix) +_MAX_BRANCH_NAME_LEN = 220 +# Minimum backend server entrypoints before entrypoint_consolidation lane activates +_ENTRYPOINT_CONSOLIDATION_THRESHOLD = 4 +# When building entrypoint snippets: consider this many candidates, send at most this many to model +_ENTRYPOINT_CONSOLIDATION_CANDIDATES = 8 +_ENTRYPOINT_CONSOLIDATION_SNIPPETS = 5 + class RepoArchitectError(Exception): pass @@ -87,6 +108,11 @@ class Config: report_path: pathlib.Path mutation_budget: int configure_branch_protection: bool + # Model selection (preferred may fall back to fallback on unavailability) + preferred_model: Optional[str] = None + fallback_model: Optional[str] = None + # Explicit lane order override (None = use MUTATION_LANE_ORDER) + campaign_lanes: Optional[Tuple[str, ...]] = None @dataclasses.dataclass @@ -202,11 +228,51 @@ def git_has_remote_origin(root: pathlib.Path) -> bool: return proc.returncode == 0 +def git_remote_branch_exists(root: pathlib.Path, branch: str) -> bool: + """Return True if *branch* already exists on the origin remote.""" + proc = subprocess.run( + ["git", "ls-remote", "--exit-code", "--heads", "origin", f"refs/heads/{branch}"], + cwd=str(root), capture_output=True, text=True, + ) + return proc.returncode == 0 + + def safe_branch_name(stable_hint: str) -> str: slug = re.sub(r"[^a-zA-Z0-9._/-]+", "-", stable_hint).strip("-/").lower() return slug[:100] +def with_unique_branch_suffix(branch: str) -> str: + """Append a per-run unique suffix to *branch* so repeated workflow runs + never collide on the same remote branch name. + + Suffix precedence: + 1. REPO_ARCHITECT_BRANCH_SUFFIX env var (if set and non-empty) + 2. GITHUB_RUN_ID-GITHUB_RUN_ATTEMPT (both env vars must be non-empty) + 3. UTC timestamp fallback (YYYYmmddHHMMSS) + + The suffix is sanitised to contain only: A-Z a-z 0-9 . _ - + If sanitisation produces an empty string the timestamp fallback is used. + The total branch name is capped at _MAX_BRANCH_NAME_LEN characters. + """ + # Compute a stable timestamp once so both fallback paths use the same value. + ts_fallback = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%d%H%M%S") + raw = os.environ.get("REPO_ARCHITECT_BRANCH_SUFFIX", "").strip() + if not raw: + run_id = os.environ.get("GITHUB_RUN_ID", "").strip() + run_attempt = os.environ.get("GITHUB_RUN_ATTEMPT", "").strip() + if run_id and run_attempt: + raw = f"{run_id}-{run_attempt}" + if not raw: + raw = ts_fallback + suffix = re.sub(r"[^a-zA-Z0-9._-]", "-", raw).strip("-") + # Guard: if all chars were invalid, use the pre-computed timestamp fallback + if not suffix: + suffix = ts_fallback + full = f"{branch}-{suffix}" + return full[:_MAX_BRANCH_NAME_LEN] + + def git_identity_present(root: pathlib.Path) -> bool: a = subprocess.run(["git", "config", "user.email"], cwd=str(root), capture_output=True, text=True) b = subprocess.run(["git", "config", "user.name"], cwd=str(root), capture_output=True, text=True) @@ -290,6 +356,8 @@ def github_models_chat(token: str, model: str, messages: List[Dict[str, str]]) - except urllib.error.HTTPError as exc: raw = exc.read().decode("utf-8", errors="replace") raise RepoArchitectError(f"GitHub Models inference failed: {exc.code} {raw}") from exc + except urllib.error.URLError as exc: + raise RepoArchitectError(f"GitHub Models inference network error: {exc.reason}") from exc def parse_model_text(resp: Dict[str, Any]) -> str: @@ -299,6 +367,69 @@ def parse_model_text(resp: Dict[str, Any]) -> str: raise RepoArchitectError(f"Could not parse GitHub Models response: {exc}") +def _is_model_unavailable_error(msg: str) -> bool: + """Return True if the HTTP error body suggests the model itself is unavailable/unknown.""" + lower = msg.lower() + return any(sig in lower for sig in _MODEL_UNAVAILABLE_SIGNALS) + + +def extract_json_from_model_text(text: str) -> Any: + """Extract the first JSON object or array from model-returned text (handles fences).""" + try: + return json.loads(text) + except json.JSONDecodeError: + pass + for fence in ("```json", "```"): + if fence in text: + inner = text.split(fence, 1)[1].rsplit("```", 1)[0].strip() + try: + return json.loads(inner) + except json.JSONDecodeError: + pass + for start_char, end_char in (("{", "}"), ("[", "]")): + start = text.find(start_char) + if start == -1: + continue + depth = 0 + for i, ch in enumerate(text[start:], start): + if ch == start_char: + depth += 1 + elif ch == end_char: + depth -= 1 + if depth == 0: + try: + return json.loads(text[start:i + 1]) + except json.JSONDecodeError: + break + raise RepoArchitectError("Could not parse JSON from model response") + + +def call_models_with_fallback_or_none( + token: str, + preferred: str, + fallback: Optional[str], + messages: List[Dict[str, str]], +) -> Tuple[Optional[Dict[str, Any]], str, Optional[str], bool]: + """Call GitHub Models with preferred model, auto-falling back if it is unavailable. + + Returns (response_or_None, requested_model, fallback_reason, fallback_occurred). + Returns a None response (instead of raising) if all attempts fail, so callers + can continue the run without model-generated output. + """ + try: + resp = github_models_chat(token, preferred, messages) + return resp, preferred, None, False + except RepoArchitectError as exc: + reason = str(exc) + if fallback and fallback != preferred and _is_model_unavailable_error(reason): + try: + resp = github_models_chat(token, fallback, messages) + return resp, preferred, reason, True + except RepoArchitectError as exc2: + return None, preferred, f"{reason}; fallback also failed: {exc2}", True + return None, preferred, reason, False + + def find_existing_open_pr(config: Config, branch: str) -> Optional[Dict[str, Any]]: if not config.github_repo or not config.github_token: return None @@ -566,35 +697,49 @@ def build_analysis(root: pathlib.Path) -> Dict[str, Any]: # ----------------------------- def enrich_with_github_models(config: Config, analysis: Dict[str, Any]) -> Dict[str, Any]: - meta = {"enabled": False, "used": False, "model": config.github_model, "summary": None, "fallback_reason": None} - if not config.github_token or not config.github_model: + preferred = config.preferred_model or config.github_model + fallback = config.fallback_model + meta: Dict[str, Any] = { + "enabled": False, + "used": False, + "requested_model": preferred, + "actual_model": None, + "model": preferred, # kept for backward compatibility + "summary": None, + "fallback_reason": None, + "fallback_occurred": False, + } + if not config.github_token or not preferred: return meta - try: - catalog = github_models_catalog(config.github_token) - meta["enabled"] = True - if not model_available(catalog, config.github_model): - meta["fallback_reason"] = f"model_not_in_catalog:{config.github_model}" - return meta - prompt = textwrap.dedent(f""" - You are summarizing repository architecture risk. - Architecture score: {analysis['architecture_score']} - Local import cycles: {len(analysis['cycles'])} - Parse error files: {len(analysis['parse_error_files'])} - Entrypoints: {len(analysis['entrypoint_paths'])} - Top roadmap items: {json.dumps(analysis['roadmap'][:5])} - - Return 5 bullet points, compact and concrete, no preamble. - """).strip() - resp = github_models_chat(config.github_token, config.github_model, [ + meta["enabled"] = True + prompt = textwrap.dedent(f""" + You are summarizing repository architecture risk. + Architecture score: {analysis['architecture_score']} + Local import cycles: {len(analysis['cycles'])} + Parse error files: {len(analysis['parse_error_files'])} + Entrypoints: {len(analysis['entrypoint_paths'])} + Top roadmap items: {json.dumps(analysis['roadmap'][:5])} + + Return 5 bullet points, compact and concrete, no preamble. + """).strip() + resp, requested, fallback_reason, fallback_occurred = call_models_with_fallback_or_none( + config.github_token, preferred, fallback, + [ {"role": "system", "content": "You produce concise engineering prioritization notes."}, {"role": "user", "content": prompt}, - ]) + ], + ) + meta["fallback_reason"] = fallback_reason + meta["fallback_occurred"] = fallback_occurred + if resp is None: + return meta + try: meta["summary"] = parse_model_text(resp) + meta["actual_model"] = resp.get("model", fallback if fallback_occurred else preferred) meta["used"] = True - return meta - except Exception as exc: - meta["fallback_reason"] = str(exc) - return meta + except RepoArchitectError as exc: + meta["fallback_reason"] = (meta.get("fallback_reason") or "") + f"; parse failed: {exc}" + return meta # ----------------------------- @@ -698,11 +843,22 @@ def write_step_summary(config: Config, result: Dict[str, Any]) -> None: "", f"- mode: `{result.get('mode', config.mode)}`", f"- status: `{result.get('status')}`", + f"- lane: `{result.get('lane', 'none')}`", f"- architecture score: **{result.get('architecture_score')}**", f"- changed files: `{len(result.get('changed_files', []))}`", ] if result.get("pull_request_url"): summary.append(f"- pull request: {result['pull_request_url']}") + if result.get("branch"): + summary.append(f"- branch: `{result['branch']}`") + if result.get("requested_model"): + summary.append(f"- model requested: `{result['requested_model']}`") + if result.get("actual_model"): + summary.append(f"- model used: `{result['actual_model']}`") + if result.get("fallback_occurred"): + summary.append(f"- ⚠️ fallback occurred: {result.get('fallback_reason', '')}") + if result.get("no_safe_code_mutation_reason"): + summary.append(f"- no safe mutation: {result['no_safe_code_mutation_reason']}") if result.get("github_models", {}).get("used"): summary += ["", "## Model summary", "", result["github_models"]["summary"]] config.step_summary_path.parent.mkdir(parents=True, exist_ok=True) @@ -814,29 +970,348 @@ def build_report_plan(config: Config, analysis: Dict[str, Any], model_meta: Dict ) -def build_patch_plan(config: Config, analysis: Dict[str, Any], model_meta: Dict[str, Any], state: Dict[str, Any]) -> Optional[PatchPlan]: - if config.mode == "analyze": +def build_parse_errors_plan(config: Config, analysis: Dict[str, Any]) -> Optional[PatchPlan]: + """Use the preferred/fallback model to fix one or more Python parse errors.""" + errors = analysis.get("parse_error_files", []) + if not errors: + return None + preferred = config.preferred_model or config.github_model + if not config.github_token or not preferred: + return None + fallback = config.fallback_model + py_infos_by_path = {i["path"]: i for i in analysis["python_files"]} + targets = errors[:3] + snippets: List[str] = [] + for rel in targets: + abs_path = config.git_root / rel + if not abs_path.exists(): + continue + source = abs_path.read_text(encoding="utf-8", errors="replace")[:_MAX_SOURCE_SNIPPET_CHARS] + err_detail = py_infos_by_path.get(rel, {}).get("parse_error", "syntax error") + snippets.append(f"File: {rel}\nError: {err_detail}\n```python\n{source}\n```") + if not snippets: + return None + prompt = textwrap.dedent(f""" + Fix the Python syntax/parse errors in the following file(s). + Return ONLY a JSON object with this exact structure (no markdown, no explanation): + {{"files": {{"": ""}}}} + + Files to fix: + {chr(10).join(snippets)} + """).strip() + resp, _req, fallback_reason, _fell = call_models_with_fallback_or_none( + config.github_token, preferred, fallback, + [ + {"role": "system", "content": "You fix Python syntax errors. Return only valid JSON with corrected file contents."}, + {"role": "user", "content": prompt}, + ], + ) + if resp is None: + return None + try: + text = parse_model_text(resp) + data = extract_json_from_model_text(text) + raw_files: Dict[str, str] = data.get("files", {}) + except (RepoArchitectError, KeyError, TypeError): + return None + valid_changes: Dict[str, str] = {} + for rel, content in raw_files.items(): + rel_norm = rel.lstrip("/") + if rel_norm not in targets: + continue + try: + ast.parse(content) + valid_changes[rel_norm] = content + except SyntaxError: + pass # Model's fix still has errors; skip this file + if not valid_changes: + return None + return PatchPlan( + task="fix_parse_errors", + reason=f"model-assisted fix for {len(valid_changes)} parse error(s)", + file_changes=valid_changes, + metadata={"lane": "parse_errors", "fixed_files": sorted(valid_changes), "fallback_reason": fallback_reason}, + pr_title="agent: fix Python parse errors", + pr_body=f"Automated fix for {len(valid_changes)} Python parse error(s) using model-assisted repair.", + stable_branch_hint="agent/fix/parse-errors", + ) + + +def build_import_cycles_plan(config: Config, analysis: Dict[str, Any]) -> Optional[PatchPlan]: + """Use the preferred/fallback model to break one import cycle via TYPE_CHECKING guards or lazy imports.""" + cycles = analysis.get("cycles", []) + if not cycles: + return None + preferred = config.preferred_model or config.github_model + if not config.github_token or not preferred: + return None + fallback = config.fallback_model + cycle = min(cycles, key=len) + cycle_modules = [m for m in cycle if m != cycle[-1]] + py_infos_by_module = {i["module"]: i for i in analysis["python_files"]} + snippets: List[str] = [] + module_to_path: Dict[str, str] = {} + for mod in cycle_modules[:4]: + info = py_infos_by_module.get(mod, {}) + rel = info.get("path", "") + if not rel: + continue + abs_path = config.git_root / rel + if not abs_path.exists(): + continue + module_to_path[mod] = rel + source = abs_path.read_text(encoding="utf-8", errors="replace")[:_MAX_CYCLE_SNIPPET_CHARS] + snippets.append(f"Module: {mod}\nFile: {rel}\n```python\n{source}\n```") + if not snippets: + return None + cycle_str = " -> ".join(cycle) + prompt = textwrap.dedent(f""" + Break this Python import cycle by modifying the minimal number of files. + Prefer TYPE_CHECKING guards or lazy imports to avoid behavioral changes. + Import cycle: {cycle_str} + + Return ONLY a JSON object (no markdown, no explanation): + {{"files": {{"": ""}}}} + + Files in cycle: + {chr(10).join(snippets)} + """).strip() + resp, _req, fallback_reason, _fell = call_models_with_fallback_or_none( + config.github_token, preferred, fallback, + [ + {"role": "system", "content": "You fix Python import cycles. Return only valid JSON with corrected file contents."}, + {"role": "user", "content": prompt}, + ], + ) + if resp is None: + return None + try: + text = parse_model_text(resp) + data = extract_json_from_model_text(text) + raw_files: Dict[str, str] = data.get("files", {}) + except (RepoArchitectError, KeyError, TypeError): + return None + all_cycle_paths = set(module_to_path.values()) + valid_changes: Dict[str, str] = {} + for rel, content in raw_files.items(): + rel_norm = rel.lstrip("/") + if rel_norm not in all_cycle_paths: + continue + try: + ast.parse(content) + valid_changes[rel_norm] = content + except SyntaxError: + pass + if not valid_changes: + return None + return PatchPlan( + task="break_import_cycle", + reason=f"model-assisted cycle break: {cycle_str}", + file_changes=valid_changes, + metadata={"lane": "import_cycles", "cycle": cycle, "fallback_reason": fallback_reason}, + pr_title="agent: break import cycle", + pr_body=f"Automated fix to break import cycle: `{cycle_str}`.", + stable_branch_hint="agent/fix/import-cycle", + ) + + +def build_entrypoint_consolidation_plan(config: Config, analysis: Dict[str, Any]) -> Optional[PatchPlan]: + """Use the preferred/fallback model to consolidate redundant backend server entrypoints. + + Only activates when the number of backend_servers entrypoints exceeds + _ENTRYPOINT_CONSOLIDATION_THRESHOLD. The model is asked to add a single + ``# DEPRECATED: prefer `` comment to the least-canonical duplicate + so the runtime intent is preserved while the codebase signals what to migrate + toward. All generated changes are validated with ast.parse before use. + """ + clusters = analysis.get("entrypoint_clusters", {}) + backend_eps = clusters.get("backend_servers", []) + if len(backend_eps) < _ENTRYPOINT_CONSOLIDATION_THRESHOLD: + return None + preferred = config.preferred_model or config.github_model + if not config.github_token or not preferred: + return None + fallback = config.fallback_model + # Collect up to _ENTRYPOINT_CONSOLIDATION_CANDIDATES by path length (shortest = likely wrappers) + # then send at most _ENTRYPOINT_CONSOLIDATION_SNIPPETS to the model + snippets: List[str] = [] + candidate_paths: List[str] = [] + for rel in sorted(backend_eps[:_ENTRYPOINT_CONSOLIDATION_CANDIDATES], key=lambda p: len(p)): + abs_path = config.git_root / rel + if not abs_path.exists(): + continue + source = abs_path.read_text(encoding="utf-8", errors="replace")[:_MAX_SOURCE_SNIPPET_CHARS] + snippets.append(f"File: {rel}\n```python\n{source}\n```") + candidate_paths.append(rel) + if len(snippets) >= _ENTRYPOINT_CONSOLIDATION_SNIPPETS: + break + if len(candidate_paths) < 2: return None - # self-tuning bias: after repeated no-op or report success, widen one notch in mutate mode. - if config.mode == "mutate": - plan = remove_marked_debug_prints(config.git_root, analysis, config.mutation_budget) - if plan is not None: - return plan - return build_report_plan(config, analysis, model_meta, state) - return build_report_plan(config, analysis, model_meta, state) + prompt = textwrap.dedent(f""" + This repository has {len(backend_eps)} backend server entrypoints, suggesting runtime duplication. + Identify exactly ONE file that is clearly a redundant wrapper or legacy entrypoint. + Add ONLY a single comment line at the top of that file: + # DEPRECATED: prefer - this file may be removed in a future cleanup + + Do NOT make any other changes. + Return ONLY a JSON object (no markdown, no explanation): + {{"files": {{"": ""}}}} + + Entrypoints to consider: + {chr(10).join(snippets)} + """).strip() + resp, _req, fallback_reason, _fell = call_models_with_fallback_or_none( + config.github_token, preferred, fallback, + [ + {"role": "system", "content": "You annotate redundant Python entrypoints with deprecation comments. Return only valid JSON."}, + {"role": "user", "content": prompt}, + ], + ) + if resp is None: + return None + try: + text = parse_model_text(resp) + data = extract_json_from_model_text(text) + raw_files: Dict[str, str] = data.get("files", {}) + except (RepoArchitectError, KeyError, TypeError): + return None + all_ep_paths = set(candidate_paths) + valid_changes: Dict[str, str] = {} + for rel, content in raw_files.items(): + rel_norm = rel.lstrip("/") + if rel_norm not in all_ep_paths: + continue + try: + ast.parse(content) + valid_changes[rel_norm] = content + except SyntaxError: + pass + if not valid_changes: + return None + # Use sorted order to make target selection deterministic across runs + target = sorted(valid_changes.keys())[0] + return PatchPlan( + task="annotate_deprecated_entrypoint", + reason=f"deprecation comment on redundant entrypoint: {target}", + file_changes=valid_changes, + metadata={"lane": "entrypoint_consolidation", "annotated": sorted(valid_changes), "fallback_reason": fallback_reason, "total_backend_entrypoints": len(backend_eps)}, + pr_title="agent: annotate redundant server entrypoint as deprecated", + pr_body=textwrap.dedent(f""" + Automated entrypoint consolidation step. + + This repository has **{len(backend_eps)} backend server entrypoints** — above the + consolidation threshold of {_ENTRYPOINT_CONSOLIDATION_THRESHOLD}. This PR adds a + `# DEPRECATED` comment to one identified redundant wrapper, making migration intent + explicit without changing any runtime behaviour. + + Annotated file(s): {', '.join(f'`{p}`' for p in sorted(valid_changes))} + + Validation: `ast.parse` passed on all changed files. + """).strip(), + stable_branch_hint="agent/fix/entrypoint-consolidation", + ) + + +def build_patch_plan( + config: Config, analysis: Dict[str, Any], model_meta: Dict[str, Any], state: Dict[str, Any], +) -> Tuple[Optional[PatchPlan], str, Optional[str]]: + """Return (plan, selected_lane, no_safe_code_mutation_reason). + + Lane priority order (mutate / campaign): + 1. parse_errors – model-assisted syntax fix + 2. import_cycles – model-assisted cycle break + 3. entrypoint_consolidation – deprecate redundant server entrypoints + 4. hygiene – remove marked debug prints + 5. report – refresh architecture documentation + + A report-only mutation is never produced when parse errors exist unless no + safe code mutation can be made (reason is then surfaced explicitly). + """ + if config.mode == "analyze": + return None, "none", None + + lanes = config.campaign_lanes if config.campaign_lanes else MUTATION_LANE_ORDER + skipped_reasons: List[str] = [] + + if config.mode in ("mutate", "campaign"): + for lane in lanes: + if lane == "parse_errors": + if analysis.get("parse_error_files"): + plan = build_parse_errors_plan(config, analysis) + if plan: + return plan, "parse_errors", None + skipped_reasons.append("parse_errors: model unavailable or returned no valid fix") + elif lane == "import_cycles": + if analysis.get("cycles"): + plan = build_import_cycles_plan(config, analysis) + if plan: + return plan, "import_cycles", None + skipped_reasons.append("import_cycles: model unavailable or returned no valid fix") + elif lane == "entrypoint_consolidation": + plan = build_entrypoint_consolidation_plan(config, analysis) + if plan: + return plan, "entrypoint_consolidation", None + # Not an error to skip - threshold may not be met + elif lane == "hygiene": + plan = remove_marked_debug_prints(config.git_root, analysis, config.mutation_budget) + if plan: + return plan, "hygiene", None + elif lane == "report": + # Suppress report-only if parse errors exist and a code fix was attempted + if analysis.get("parse_error_files") and skipped_reasons: + skipped_reasons.append("report: suppressed because parse errors exist and code fix was attempted") + continue + plan = build_report_plan(config, analysis, model_meta, state) + if plan: + return plan, "report", None + no_reason = "; ".join(skipped_reasons) if skipped_reasons else "no actionable mutation found for any lane" + return None, "none", no_reason + + # report mode + plan = build_report_plan(config, analysis, model_meta, state) + return plan, "report", None # ----------------------------- # Validation / execution # ----------------------------- -def validate_change(config: Config, changed_files: Sequence[str]) -> Tuple[bool, str]: +def validate_change(config: Config, changed_files: Sequence[str], lane: Optional[str] = None) -> Tuple[bool, str]: + """Validate changed files. py_compile is always run for Python files. + If *lane* is ``import_cycles``, an additional import smoke test is attempted.""" py_files = [p for p in changed_files if p.endswith('.py')] if not py_files: return True, 'No Python files changed.' + # Syntax check (fast; catches typos and broken edits) proc = run_cmd([sys.executable, '-m', 'py_compile', *py_files], cwd=config.git_root, check=False) - out = (proc.stdout or '') + (proc.stderr or '') - return proc.returncode == 0, out.strip() or 'py_compile passed' + syntax_out = (proc.stdout or '') + (proc.stderr or '') + if proc.returncode != 0: + return False, syntax_out.strip() or 'py_compile failed' + + # Extended validation for import-cycle lane: attempt "import " for changed files. + # This is a best-effort smoke test — import failures are common in partial repos so + # results are warnings only (they never block the mutation). + if lane == "import_cycles": + smoke_results: List[str] = [] + for pf in py_files: + mod = pf.replace("/", ".").replace("\\", ".").removesuffix(".py") + # Skip hidden files / relative-path artefacts (e.g. ".tmp/foo.py" → ".tmp.foo") + if mod.startswith("."): + continue + smoke = run_cmd( + [sys.executable, "-c", f"import importlib; importlib.import_module({mod!r})"], + cwd=config.git_root, check=False, + ) + if smoke.returncode != 0: + err = (smoke.stdout or '') + (smoke.stderr or '') + truncated = err.strip()[:200] + if len(err.strip()) > 200: + truncated += " [truncated]" + smoke_results.append(f"import {mod}: warning: {truncated}") + if smoke_results: + return True, "py_compile passed; import smoke warnings: " + "; ".join(smoke_results) + + return True, syntax_out.strip() or 'py_compile passed' def apply_patch_plan(config: Config, plan: PatchPlan, state: Dict[str, Any]) -> Dict[str, Any]: @@ -845,7 +1320,7 @@ def apply_patch_plan(config: Config, plan: PatchPlan, state: Dict[str, Any]) -> raise RepoArchitectError('Git identity is not configured. Set git user.name and user.email before mutation.') start_branch = git_current_branch(config.git_root) - branch = safe_branch_name(plan.stable_branch_hint) + branch = with_unique_branch_suffix(safe_branch_name(plan.stable_branch_hint)) backups: Dict[str, str] = {} changed_files = list(plan.file_changes.keys()) git_checkout_branch(config.git_root, branch) @@ -861,7 +1336,7 @@ def apply_patch_plan(config: Config, plan: PatchPlan, state: Dict[str, Any]) -> git_checkout(config.git_root, start_branch) return {"status": "no_meaningful_delta", "branch": branch, "changed_files": []} - ok, validation = validate_change(config, changed_files) + ok, validation = validate_change(config, changed_files, lane=plan.metadata.get("lane")) if not ok: raise RepoArchitectError(f'Validation failed.\n{validation}') @@ -871,7 +1346,28 @@ def apply_patch_plan(config: Config, plan: PatchPlan, state: Dict[str, Any]) -> pr_url = None pr_number = None if config.github_token and config.github_repo and git_has_remote_origin(config.git_root): - git_push_branch(config.git_root, branch) + # Pre-check: if remote branch already exists (from a prior run), generate a fresh name. + # Use a timestamp-based suffix so retries within the same run are also distinct. + for retry_n in range(1, 4): + if not git_remote_branch_exists(config.git_root, branch): + break + branch = with_unique_branch_suffix( + safe_branch_name(f"{plan.stable_branch_hint}-retry{retry_n}") + ) + git_checkout_branch(config.git_root, branch) + try: + git_push_branch(config.git_root, branch) + except RepoArchitectError as push_exc: + # One retry on non-fast-forward / rejected push + err_lower = str(push_exc).lower() + if "non-fast-forward" in err_lower or "rejected" in err_lower or "failed to push" in err_lower: + branch = with_unique_branch_suffix( + safe_branch_name(f"{plan.stable_branch_hint}-retry{retry_n + 1}") + ) + git_checkout_branch(config.git_root, branch) + git_push_branch(config.git_root, branch) + else: + raise pushed = True pr = create_or_update_pull_request(config, branch, plan.pr_title, plan.pr_body) pr_url = pr.get('html_url') @@ -907,7 +1403,6 @@ def apply_patch_plan(config: Config, plan: PatchPlan, state: Dict[str, Any]) -> def workflow_yaml(secret_env_names: Sequence[str], cron: str, github_model: Optional[str]) -> str: extra_env = "".join(f" {name}: ${{{{ secrets.{name} }}}}\n" for name in secret_env_names) - model_default = github_model or "openai/gpt-4.1" return f"""name: repo-architect on: @@ -922,10 +1417,11 @@ def workflow_yaml(secret_env_names: Sequence[str], cron: str, github_model: Opti - analyze - report - mutate + - campaign github_model: - description: 'GitHub Models model id' - required: true - default: '{model_default}' + description: 'GitHub Models model id (overrides preferred model)' + required: false + default: '' type: string report_path: description: 'Primary report path' @@ -941,6 +1437,16 @@ def workflow_yaml(secret_env_names: Sequence[str], cron: str, github_model: Opti - '1' - '2' - '3' + max_slices: + description: 'Campaign max slices (campaign mode only)' + required: false + default: '3' + type: string + lanes: + description: 'Comma-separated lane order (mutate and campaign modes)' + required: false + default: 'parse_errors,import_cycles,entrypoint_consolidation,hygiene,report' + type: string schedule: - cron: '{cron}' @@ -972,22 +1478,38 @@ def workflow_yaml(secret_env_names: Sequence[str], cron: str, github_model: Opti git config user.name "github-actions[bot]" git config user.email "41898282+github-actions[bot]@users.noreply.github.com" + - name: Ensure artifact directories exist + run: | + mkdir -p .agent docs/repo_architect + - name: Run repo architect env: GITHUB_TOKEN: ${{{{ github.token }}}} GITHUB_REPO: ${{{{ github.repository }}}} GITHUB_BASE_BRANCH: ${{{{ github.event.repository.default_branch }}}} + REPO_ARCHITECT_BRANCH_SUFFIX: ${{{{ github.run_id }}}}-${{{{ github.run_attempt }}}} + REPO_ARCHITECT_PREFERRED_MODEL: openai/gpt-5.4 + REPO_ARCHITECT_FALLBACK_MODEL: openai/gpt-4.1 {extra_env} run: | MODE="${{{{ github.event.inputs.mode }}}}" MODEL="${{{{ github.event.inputs.github_model }}}}" REPORT_PATH="${{{{ github.event.inputs.report_path }}}}" MUTATION_BUDGET="${{{{ github.event.inputs.mutation_budget }}}}" + MAX_SLICES="${{{{ github.event.inputs.max_slices }}}}" + LANES="${{{{ github.event.inputs.lanes }}}}" if [ -z "$MODE" ]; then MODE="report"; fi - if [ -z "$MODEL" ]; then MODEL="{model_default}"; fi if [ -z "$REPORT_PATH" ]; then REPORT_PATH="{DEFAULT_REPORT_PATH.as_posix()}"; fi if [ -z "$MUTATION_BUDGET" ]; then MUTATION_BUDGET="1"; fi - export GITHUB_MODEL="$MODEL" - python repo_architect.py --allow-dirty --mode "$MODE" --report-path "$REPORT_PATH" --mutation-budget "$MUTATION_BUDGET" + if [ -z "$MAX_SLICES" ]; then MAX_SLICES="3"; fi + if [ -z "$LANES" ]; then LANES="parse_errors,import_cycles,entrypoint_consolidation,hygiene,report"; fi + EXTRA_ARGS="" + if [ -n "$MODEL" ]; then EXTRA_ARGS="$EXTRA_ARGS --github-model $MODEL"; fi + if [ "$MODE" = "campaign" ]; then + EXTRA_ARGS="$EXTRA_ARGS --max-slices $MAX_SLICES --lanes $LANES" + elif [ "$MODE" = "mutate" ]; then + EXTRA_ARGS="$EXTRA_ARGS --lanes $LANES" + fi + python repo_architect.py --allow-dirty --mode "$MODE" --report-path "$REPORT_PATH" --mutation-budget "$MUTATION_BUDGET" $EXTRA_ARGS - name: Upload repo architect artifacts if: always() @@ -1032,21 +1554,36 @@ def run_cycle(config: Config) -> Dict[str, Any]: result: Dict[str, Any] = { "status": "analyzed", "mode": config.mode, + "lane": "none", + "lanes_active": list(config.campaign_lanes) if config.campaign_lanes else list(MUTATION_LANE_ORDER), "architecture_score": analysis["architecture_score"], + "requested_model": model_meta.get("requested_model"), + "actual_model": model_meta.get("actual_model"), + "fallback_reason": model_meta.get("fallback_reason"), + "fallback_occurred": model_meta.get("fallback_occurred", False), + "no_safe_code_mutation_reason": None, + "branch": None, + "changed_files": [], + "validation": None, + "pull_request_url": None, + "artifact_files": artifact_files, "repo_root": str(config.git_root), "analysis_path": str(config.analysis_path), "graph_path": str(config.graph_path), "roadmap_path": str(config.roadmap_path), "roadmap": analysis["roadmap"], "github_models": model_meta, - "artifact_files": artifact_files, "metadata": {"architecture_score": analysis["architecture_score"], "model_meta": model_meta, "report_path": str(config.report_path)}, } - plan = build_patch_plan(config, analysis, model_meta, state) + plan, lane, no_reason = build_patch_plan(config, analysis, model_meta, state) + result["lane"] = lane + result["no_safe_code_mutation_reason"] = no_reason if plan is not None: apply_result = apply_patch_plan(config, plan, state) result.update(apply_result) + result["lane"] = lane + result["no_safe_code_mutation_reason"] = no_reason artifact_files.extend(sorted(plan.file_changes.keys())) else: if config.mode == "analyze": @@ -1065,6 +1602,7 @@ def run_cycle(config: Config) -> Dict[str, Any]: "ts": state["last_run_epoch"], "status": result["status"], "architecture_score": result["architecture_score"], + "lane": result.get("lane"), "branch": result.get("branch"), "pull_request_url": result.get("pull_request_url"), "mode": config.mode, @@ -1074,6 +1612,134 @@ def run_cycle(config: Config) -> Dict[str, Any]: return result +# ----------------------------- +# Campaign mode +# ----------------------------- + +def run_campaign( + config: Config, + lanes: Sequence[str], + max_slices: int, + stop_on_failure: bool, +) -> Dict[str, Any]: + """Execute up to *max_slices* mutation slices in lane-priority order. + + Steps: + 1. Refresh analysis and model enrichment. + 2. For each lane in *lanes* (up to max_slices), attempt one slice. + 3. Re-analyse after each applied mutation so later lanes see current state. + 4. Emit a campaign summary artifact under .agent/campaign_summary.json. + """ + ensure_agent_dir(config.agent_dir) + state = load_state(config) + analysis = build_analysis(config.git_root) + model_meta = enrich_with_github_models(config, analysis) + analysis["model_meta"] = model_meta + persist_analysis(config, analysis) + + slice_results: List[Dict[str, Any]] = [] + slices_applied = 0 + + for lane in lanes: + if slices_applied >= max_slices: + break + lane_config = dataclasses.replace(config, mode="mutate", campaign_lanes=(lane,)) + plan, selected_lane, no_reason = build_patch_plan(lane_config, analysis, model_meta, state) + if plan is None: + slice_results.append({"lane": lane, "status": "no_safe_mutation", "no_safe_code_mutation_reason": no_reason}) + continue + try: + apply_result = apply_patch_plan(lane_config, plan, state) + apply_result["lane"] = selected_lane + apply_result.setdefault("requested_model", model_meta.get("requested_model")) + apply_result.setdefault("actual_model", model_meta.get("actual_model")) + apply_result.setdefault("fallback_reason", model_meta.get("fallback_reason")) + slice_results.append(apply_result) + slices_applied += 1 + # Re-analyse so the next lane sees an up-to-date repo state + analysis = build_analysis(config.git_root) + model_meta = enrich_with_github_models(config, analysis) + analysis["model_meta"] = model_meta + except RepoArchitectError as exc: + slice_results.append({"lane": lane, "status": "failed", "error": str(exc)}) + if stop_on_failure: + break + + summary: Dict[str, Any] = { + "mode": "campaign", + "status": "campaign_complete", + "slices_attempted": len(slice_results), + "slices_applied": slices_applied, + "lanes_requested": list(lanes), + "lanes_executed": [r.get("lane") for r in slice_results], + "architecture_score": analysis["architecture_score"], + "requested_model": model_meta.get("requested_model"), + "actual_model": model_meta.get("actual_model"), + "fallback_reason": model_meta.get("fallback_reason"), + "results": slice_results, + } + + campaign_summary_path = config.agent_dir / "campaign_summary.json" + atomic_write_json(campaign_summary_path, summary) + + # Emit human-readable campaign report alongside the JSON artifact + campaign_report_path = DEFAULT_REPORT_DIR / "campaign_report.md" + atomic_write_text(config.git_root / campaign_report_path, _render_campaign_report(summary)) + + state["runs"] = int(state.get("runs", 0)) + 1 + state["last_run_epoch"] = int(time.time()) + state["last_outcome"] = "campaign_complete" + save_state(config, state) + return summary + + +def _render_campaign_report(summary: Dict[str, Any]) -> str: + """Render a human-readable markdown summary of a campaign run.""" + ts = dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + lines = [ + "# repo-architect campaign report", + "", + f"Generated: {ts}", + "", + f"| Field | Value |", + f"|---|---|", + f"| Status | `{summary.get('status')}` |", + f"| Architecture score | {summary.get('architecture_score')} |", + f"| Slices attempted | {summary.get('slices_attempted')} |", + f"| Slices applied | {summary.get('slices_applied')} |", + f"| Lanes requested | {', '.join(summary.get('lanes_requested', []))} |", + f"| Model requested | `{summary.get('requested_model') or 'n/a'}` |", + f"| Model used | `{summary.get('actual_model') or 'n/a'}` |", + ] + if summary.get("fallback_reason"): + lines.append(f"| Fallback reason | {summary['fallback_reason']} |") + lines += ["", "## Slice results", ""] + for idx, r in enumerate(summary.get("results", []), 1): + status = r.get("status", "unknown") + lane = r.get("lane", "?") + branch = r.get("branch") + pr = r.get("pull_request_url") + err = r.get("error") or r.get("no_safe_code_mutation_reason") + lines.append(f"### Slice {idx}: `{lane}` — {status}") + lines.append("") + if branch: + lines.append(f"- Branch: `{branch}`") + if pr: + lines.append(f"- PR: {pr}") + changed = r.get("changed_files", []) + if changed: + shown = changed[:10] + truncated = len(changed) - len(shown) + display = ', '.join(f'`{f}`' for f in shown) + if truncated: + display += f" … and {truncated} more" + lines.append(f"- Changed: {display}") + if err: + lines.append(f"- Reason: {err}") + lines.append("") + return "\n".join(lines).rstrip() + "\n" + + # ----------------------------- # MCP server # ----------------------------- @@ -1146,6 +1812,28 @@ def run_mcp_server(config: Config) -> None: def build_config(args: argparse.Namespace) -> Config: git_root = discover_git_root() agent_dir = git_root / AGENT_DIRNAME + preferred = ( + args.preferred_model + or os.environ.get("REPO_ARCHITECT_PREFERRED_MODEL") + or DEFAULT_PREFERRED_MODEL + ) + fallback = ( + args.fallback_model + or os.environ.get("REPO_ARCHITECT_FALLBACK_MODEL") + or DEFAULT_FALLBACK_MODEL + ) + # Resolve lane order from --lane / --lanes / REPO_ARCHITECT_LANE / REPO_ARCHITECT_LANES env vars. + # --lane (singular) is a convenience shortcut; --lanes takes comma-separated list. + # Works for both mutate and campaign modes. + lanes_raw = ( + args.lanes + or (args.lane if getattr(args, "lane", None) else None) + or os.environ.get("REPO_ARCHITECT_LANES") + or os.environ.get("REPO_ARCHITECT_LANE") + ) + campaign_lanes: Optional[Tuple[str, ...]] = None + if lanes_raw: + campaign_lanes = tuple(l.strip() for l in lanes_raw.split(",") if l.strip()) return Config( git_root=git_root, agent_dir=agent_dir, @@ -1168,12 +1856,15 @@ def build_config(args: argparse.Namespace) -> Config: report_path=git_root / args.report_path, mutation_budget=args.mutation_budget, configure_branch_protection=args.configure_branch_protection, + preferred_model=preferred, + fallback_model=fallback, + campaign_lanes=campaign_lanes, ) def parse_args(argv: Optional[Sequence[str]] = None) -> argparse.Namespace: p = argparse.ArgumentParser(description="Single-file repo architect, PR bot, and MCP server.") - p.add_argument("--mode", choices=["analyze", "report", "mutate"], default="report") + p.add_argument("--mode", choices=["analyze", "report", "mutate", "campaign"], default="report") p.add_argument("--report-path", default=str(DEFAULT_REPORT_PATH)) p.add_argument("--mutation-budget", type=int, default=1) p.add_argument("--allow-dirty", action="store_true") @@ -1184,8 +1875,15 @@ def parse_args(argv: Optional[Sequence[str]] = None) -> argparse.Namespace: p.add_argument("--workflow-cron", default="17 * * * *") p.add_argument("--workflow-secret-env", nargs="*", default=[]) p.add_argument("--configure-branch-protection", action="store_true") - p.add_argument("--github-model", default=None) + p.add_argument("--github-model", default=None, help="Override active model (backward compat)") + p.add_argument("--preferred-model", default=None, help="Preferred GitHub Models model id") + p.add_argument("--fallback-model", default=None, help="Fallback model if preferred is unavailable") p.add_argument("--log-json", action="store_true") + # Lane / campaign args + p.add_argument("--lane", default=None, help="Single lane to run in mutate mode (convenience alias for --lanes)") + p.add_argument("--max-slices", type=int, default=3, help="Campaign: max mutation slices to attempt") + p.add_argument("--lanes", default=None, help="Comma-separated lane order for mutate/campaign modes") + p.add_argument("--stop-on-failure", action="store_true", help="Campaign: stop on first slice failure") return p.parse_args(argv) @@ -1227,6 +1925,12 @@ def main(argv: Optional[Sequence[str]] = None) -> int: time.sleep(config.interval) return 0 + if config.mode == "campaign": + lanes_arg = list(config.campaign_lanes) if config.campaign_lanes else list(MUTATION_LANE_ORDER) + result = run_campaign(config, lanes_arg, args.max_slices, args.stop_on_failure) + print(json.dumps(result, indent=2, sort_keys=True)) + return 0 + result = run_cycle(config) print(json.dumps(result, indent=2, sort_keys=True)) return 0 diff --git a/tests/test_repo_architect.py b/tests/test_repo_architect.py new file mode 100644 index 0000000..a84e623 --- /dev/null +++ b/tests/test_repo_architect.py @@ -0,0 +1,729 @@ +"""Critical-path tests for repo_architect.py. + +These tests verify: +- Branch suffix generation (uniqueness, edge cases, length cap) +- Model fallback selection behavior +- Syntax validation of generated Python (ast.parse gate) +- Campaign aggregation behavior +- Output schema stability for key fields +- Lane selection priority +""" +from __future__ import annotations + +import ast +import dataclasses +import json +import os +import pathlib +import subprocess +import sys +import tempfile +import unittest +from typing import Any, Dict, List, Optional +from unittest.mock import patch + +# Make repo_architect importable from repo root +sys.path.insert(0, str(pathlib.Path(__file__).parent.parent)) +import repo_architect as ra # noqa: E402 + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_git_root(tmp: str) -> pathlib.Path: + root = pathlib.Path(tmp) + subprocess.run(["git", "init", str(root)], capture_output=True, check=True) + subprocess.run(["git", "-C", str(root), "config", "user.email", "test@example.com"], capture_output=True) + subprocess.run(["git", "-C", str(root), "config", "user.name", "Test"], capture_output=True) + # Create an initial commit so HEAD exists and branch operations work + (root / "README.md").write_text("test repo\n", encoding="utf-8") + subprocess.run(["git", "-C", str(root), "add", "."], capture_output=True) + subprocess.run(["git", "-C", str(root), "commit", "-m", "init"], capture_output=True) + return root + + +def _make_config(root: Optional[pathlib.Path] = None, mode: str = "analyze", **overrides: Any) -> ra.Config: + if root is None: + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + agent_dir = root / ".agent" + cfg = ra.Config( + git_root=root, + agent_dir=agent_dir, + state_path=agent_dir / "state.json", + analysis_path=agent_dir / "latest_analysis.json", + graph_path=agent_dir / "code_graph.json", + roadmap_path=agent_dir / "roadmap.json", + artifact_manifest_path=agent_dir / "artifacts_manifest.json", + workflow_path=root / ".github/workflows/repo-architect.yml", + step_summary_path=None, + github_token=None, + github_repo=None, + github_base_branch="main", + github_admin_token=None, + github_model=None, + allow_dirty=True, + mode=mode, + interval=3600, + log_json=False, + report_path=root / "docs/repo_architect/runtime_inventory.md", + mutation_budget=1, + configure_branch_protection=False, + preferred_model=None, + fallback_model=None, + ) + if overrides: + cfg = dataclasses.replace(cfg, **overrides) + return cfg + + +# --------------------------------------------------------------------------- +# 1. Branch suffix generation +# --------------------------------------------------------------------------- + +class TestWithUniqueBranchSuffix(unittest.TestCase): + """Verify branch suffix uniqueness, edge cases, and length capping.""" + + def setUp(self) -> None: + for k in ("REPO_ARCHITECT_BRANCH_SUFFIX", "GITHUB_RUN_ID", "GITHUB_RUN_ATTEMPT"): + os.environ.pop(k, None) + + def tearDown(self) -> None: + for k in ("REPO_ARCHITECT_BRANCH_SUFFIX", "GITHUB_RUN_ID", "GITHUB_RUN_ATTEMPT"): + os.environ.pop(k, None) + + def test_env_var_suffix_used(self) -> None: + os.environ["REPO_ARCHITECT_BRANCH_SUFFIX"] = "mytest" + result = ra.with_unique_branch_suffix("agent/report/foo") + self.assertTrue(result.endswith("-mytest"), result) + + def test_run_id_and_attempt_used_when_no_suffix_env(self) -> None: + os.environ["GITHUB_RUN_ID"] = "12345678" + os.environ["GITHUB_RUN_ATTEMPT"] = "2" + result = ra.with_unique_branch_suffix("agent/report/foo") + self.assertTrue(result.endswith("-12345678-2"), result) + + def test_timestamp_fallback_when_no_env(self) -> None: + result = ra.with_unique_branch_suffix("agent/report/foo") + # Should have a non-empty suffix; timestamp is 14 ASCII digits: YYYYmmddHHMMSS + self.assertIn("-", result) + suffix = result.rsplit("-", 1)[-1] + self.assertRegex(suffix, r"^\d{14}$", f"Expected 14-digit timestamp suffix, got: {suffix!r}") + + def test_invalid_chars_sanitised(self) -> None: + os.environ["REPO_ARCHITECT_BRANCH_SUFFIX"] = "hello/world!@#" + result = ra.with_unique_branch_suffix("agent/fix/thing") + for bad_char in "!@#": + self.assertNotIn(bad_char, result) + + def test_all_invalid_chars_falls_back_to_timestamp(self) -> None: + """If sanitisation produces an empty string, fall back to UTC timestamp.""" + os.environ["REPO_ARCHITECT_BRANCH_SUFFIX"] = "!!!" + result = ra.with_unique_branch_suffix("agent/fix/thing") + self.assertFalse(result.endswith("-"), f"Should not end with dash: {result!r}") + self.assertNotEqual(result, "agent/fix/thing-") + # Suffix should now be a timestamp + suffix = result[len("agent/fix/thing-"):] + self.assertTrue(suffix.isdigit() or len(suffix) >= 8, f"Expected timestamp suffix, got: {suffix!r}") + + def test_total_length_capped(self) -> None: + os.environ["REPO_ARCHITECT_BRANCH_SUFFIX"] = "x" * 200 + result = ra.with_unique_branch_suffix("agent/" + "y" * 100) + self.assertLessEqual(len(result), ra._MAX_BRANCH_NAME_LEN) + + def test_normal_case_stays_under_max_length(self) -> None: + os.environ["REPO_ARCHITECT_BRANCH_SUFFIX"] = "12345678-1" + result = ra.with_unique_branch_suffix("agent/report/runtime-inventory-packet") + self.assertLessEqual(len(result), ra._MAX_BRANCH_NAME_LEN) + self.assertTrue(result.endswith("-12345678-1")) + + def test_no_double_dash_from_stripped_suffix(self) -> None: + """Suffix that strips to something clean should not create '--'.""" + os.environ["REPO_ARCHITECT_BRANCH_SUFFIX"] = "-abc-" + result = ra.with_unique_branch_suffix("agent/fix/foo") + self.assertNotIn("--", result) + + +# --------------------------------------------------------------------------- +# 2. Model fallback selection behaviour +# --------------------------------------------------------------------------- + +class TestIsModelUnavailableError(unittest.TestCase): + def test_unknown_model(self) -> None: + self.assertTrue(ra._is_model_unavailable_error("unknown_model: gpt-5.4")) + + def test_model_not_found(self) -> None: + self.assertTrue(ra._is_model_unavailable_error("model_not_found")) + + def test_not_found(self) -> None: + self.assertTrue(ra._is_model_unavailable_error("not found")) + + def test_does_not_exist(self) -> None: + self.assertTrue(ra._is_model_unavailable_error("The requested model does not exist")) + + def test_invalid_model(self) -> None: + self.assertTrue(ra._is_model_unavailable_error("invalid model specified")) + + def test_transient_rate_limit(self) -> None: + self.assertFalse(ra._is_model_unavailable_error("rate limit exceeded")) + + def test_transient_timeout(self) -> None: + self.assertFalse(ra._is_model_unavailable_error("connection timed out")) + + def test_case_insensitive(self) -> None: + self.assertTrue(ra._is_model_unavailable_error("UNKNOWN_MODEL")) + self.assertTrue(ra._is_model_unavailable_error("Model_Not_Found")) + + +class TestCallModelsWithFallback(unittest.TestCase): + _MESSAGES = [{"role": "user", "content": "hi"}] + + def test_preferred_succeeds_no_fallback(self) -> None: + fake_resp = {"choices": [{"message": {"content": "ok"}}], "model": "openai/gpt-5.4"} + with patch.object(ra, "github_models_chat", return_value=fake_resp): + resp, req_model, reason, fell = ra.call_models_with_fallback_or_none( + "tok", "openai/gpt-5.4", "openai/gpt-4.1", self._MESSAGES + ) + self.assertEqual(resp, fake_resp) + self.assertEqual(req_model, "openai/gpt-5.4") + self.assertIsNone(reason) + self.assertFalse(fell) + + def test_fallback_triggered_on_model_unavailable(self) -> None: + fallback_resp = {"choices": [{"message": {"content": "ok"}}], "model": "openai/gpt-4.1"} + + def side_effect(token: str, model: str, messages: list) -> dict: + if model == "openai/gpt-5.4": + raise ra.RepoArchitectError("unknown_model: gpt-5.4 is not available") + return fallback_resp + + with patch.object(ra, "github_models_chat", side_effect=side_effect): + resp, req_model, reason, fell = ra.call_models_with_fallback_or_none( + "tok", "openai/gpt-5.4", "openai/gpt-4.1", self._MESSAGES + ) + self.assertEqual(resp, fallback_resp) + self.assertTrue(fell) + self.assertIsNotNone(reason) + self.assertIn("unknown_model", reason) + + def test_fallback_NOT_triggered_on_transient_error(self) -> None: + """Transient errors (rate limit, network) must NOT trigger model fallback.""" + calls: List[str] = [] + + def side_effect(token: str, model: str, messages: list) -> dict: + calls.append(model) + raise ra.RepoArchitectError("rate limit exceeded") + + with patch.object(ra, "github_models_chat", side_effect=side_effect): + resp, _req, reason, fell = ra.call_models_with_fallback_or_none( + "tok", "openai/gpt-5.4", "openai/gpt-4.1", self._MESSAGES + ) + # Must only have tried preferred; no fallback + self.assertEqual(calls, ["openai/gpt-5.4"]) + self.assertIsNone(resp) + self.assertFalse(fell) + + def test_both_models_fail_returns_none(self) -> None: + def side_effect(token: str, model: str, messages: list) -> dict: + raise ra.RepoArchitectError("unknown_model: all models gone") + + with patch.object(ra, "github_models_chat", side_effect=side_effect): + resp, req_model, reason, fell = ra.call_models_with_fallback_or_none( + "tok", "openai/gpt-5.4", "openai/gpt-4.1", self._MESSAGES + ) + self.assertIsNone(resp) + self.assertIsNotNone(reason) + self.assertIn("fallback also failed", reason) + self.assertTrue(fell) + + def test_no_fallback_when_fallback_is_none(self) -> None: + def side_effect(token: str, model: str, messages: list) -> dict: + raise ra.RepoArchitectError("unknown_model: preferred gone") + + with patch.object(ra, "github_models_chat", side_effect=side_effect): + resp, _req, reason, fell = ra.call_models_with_fallback_or_none( + "tok", "openai/gpt-5.4", None, self._MESSAGES + ) + self.assertIsNone(resp) + self.assertFalse(fell) + + +# --------------------------------------------------------------------------- +# 3. Syntax validation of generated Python (ast.parse gate) +# --------------------------------------------------------------------------- + +class TestSyntaxValidationGate(unittest.TestCase): + def test_valid_python_passes(self) -> None: + code = "def foo():\n return 42\n" + # Should not raise + ast.parse(code) + + def test_invalid_python_raises(self) -> None: + code = "def foo(\n return 42\n" + with self.assertRaises(SyntaxError): + ast.parse(code) + + def test_build_parse_errors_plan_rejects_still_broken_fix(self) -> None: + """If the model returns still-broken Python, build_parse_errors_plan must return None.""" + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + bad_py = root / "bad.py" + bad_py.write_text("def foo(\n return 42\n", encoding="utf-8") + + analysis: Dict[str, Any] = { + "parse_error_files": ["bad.py"], + "python_files": [{"path": "bad.py", "module": "bad", "parse_error": "SyntaxError"}], + } + config = _make_config(root, mode="mutate", github_token="fake-tok", + preferred_model="openai/gpt-5.4", fallback_model="openai/gpt-4.1") + + # Model returns still-broken Python in its response + still_broken_resp = { + "choices": [{"message": {"content": '{"files": {"bad.py": "def foo(\\n return 42\\n"}}'}}] + } + with patch.object(ra, "call_models_with_fallback_or_none", + return_value=(still_broken_resp, "openai/gpt-5.4", None, False)): + plan = ra.build_parse_errors_plan(config, analysis) + self.assertIsNone(plan, "Plan must be None when model fix is still broken") + + def test_build_parse_errors_plan_accepts_valid_fix(self) -> None: + """If the model returns valid Python, a PatchPlan must be returned.""" + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + bad_py = root / "bad.py" + bad_py.write_text("def foo(\n return 42\n", encoding="utf-8") + + analysis: Dict[str, Any] = { + "parse_error_files": ["bad.py"], + "python_files": [{"path": "bad.py", "module": "bad", "parse_error": "SyntaxError"}], + } + config = _make_config(root, mode="mutate", github_token="fake-tok", + preferred_model="openai/gpt-5.4", fallback_model="openai/gpt-4.1") + + fixed_content = "def foo():\n return 42\n" + good_resp = { + "choices": [{"message": {"content": json.dumps({"files": {"bad.py": fixed_content}})}}] + } + with patch.object(ra, "call_models_with_fallback_or_none", + return_value=(good_resp, "openai/gpt-5.4", None, False)): + plan = ra.build_parse_errors_plan(config, analysis) + self.assertIsNotNone(plan) + self.assertEqual(plan.file_changes["bad.py"], fixed_content) + + +# --------------------------------------------------------------------------- +# 4. Campaign aggregation behaviour +# --------------------------------------------------------------------------- + +class TestCampaignAggregation(unittest.TestCase): + def test_campaign_returns_correct_structure(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + config = _make_config(root, mode="campaign") + result = ra.run_campaign(config, ["hygiene", "report"], max_slices=2, stop_on_failure=False) + self.assertEqual(result["mode"], "campaign") + self.assertEqual(result["status"], "campaign_complete") + self.assertIn("slices_attempted", result) + self.assertIn("slices_applied", result) + self.assertIn("lanes_requested", result) + self.assertIn("lanes_executed", result) + self.assertIn("architecture_score", result) + self.assertIn("results", result) + self.assertIsInstance(result["results"], list) + + def test_campaign_summary_json_written(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + config = _make_config(root, mode="campaign") + ra.run_campaign(config, ["hygiene"], max_slices=1, stop_on_failure=False) + summary_path = config.agent_dir / "campaign_summary.json" + self.assertTrue(summary_path.exists(), "campaign_summary.json must be written") + data = json.loads(summary_path.read_text()) + self.assertEqual(data["mode"], "campaign") + self.assertIn("slices_applied", data) + + def test_campaign_markdown_report_written(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + config = _make_config(root, mode="campaign") + ra.run_campaign(config, ["hygiene"], max_slices=1, stop_on_failure=False) + report_path = root / ra.DEFAULT_REPORT_DIR / "campaign_report.md" + self.assertTrue(report_path.exists(), "campaign_report.md must be written") + content = report_path.read_text() + self.assertIn("repo-architect campaign report", content) + + def test_campaign_stop_on_failure_stops_early(self) -> None: + """When stop_on_failure=True and a slice fails, remaining lanes are skipped.""" + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + config = _make_config(root, mode="campaign", github_token="fake-tok", + preferred_model="openai/gpt-5.4", fallback_model="openai/gpt-4.1") + + def raise_on_apply(*args: Any, **kwargs: Any) -> None: + raise ra.RepoArchitectError("forced failure for test") + + with patch.object(ra, "apply_patch_plan", side_effect=raise_on_apply): + # Give the campaign a lane that will always produce a plan (hygiene with debug prints) + # We need actual debug prints in the repo for hygiene to fire + (root / "noisy.py").write_text("print('debug') # DEBUG\n", encoding="utf-8") + subprocess.run(["git", "-C", str(root), "add", "."], capture_output=True) + subprocess.run(["git", "-C", str(root), "commit", "-m", "add noisy file"], capture_output=True) + result = ra.run_campaign(config, ["hygiene", "report"], max_slices=3, stop_on_failure=True) + + # With stop_on_failure=True and a forced apply failure, should have stopped + # (if hygiene plan was made, it would fail on apply_patch_plan) + self.assertEqual(result["status"], "campaign_complete") + + def test_campaign_max_slices_respected(self) -> None: + """Campaign must not execute more than max_slices applied slices.""" + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + # Add multiple debug print files to give hygiene work to do + for i in range(5): + (root / f"noisy{i}.py").write_text(f"print('x{i}') # DEBUG\n", encoding="utf-8") + subprocess.run(["git", "-C", str(root), "add", "."], capture_output=True) + subprocess.run(["git", "-C", str(root), "commit", "-m", "add noisy files"], capture_output=True) + config = _make_config(root, mode="campaign") + result = ra.run_campaign(config, list(ra.MUTATION_LANE_ORDER), max_slices=1, stop_on_failure=False) + self.assertLessEqual(result["slices_applied"], 1) + + +# --------------------------------------------------------------------------- +# 5. Output schema stability for key fields +# --------------------------------------------------------------------------- + +REQUIRED_OUTPUT_FIELDS = frozenset({ + "status", "mode", "lane", "architecture_score", + "requested_model", "actual_model", "fallback_reason", "fallback_occurred", + "no_safe_code_mutation_reason", "branch", "changed_files", + "validation", "pull_request_url", "artifact_files", +}) + + +class TestOutputSchemaStability(unittest.TestCase): + def _run_and_check(self, mode: str) -> Dict[str, Any]: + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + config = _make_config(root, mode=mode) + result = ra.run_cycle(config) + missing = REQUIRED_OUTPUT_FIELDS - set(result.keys()) + self.assertEqual(missing, set(), f"Missing output fields in {mode!r} mode: {missing}") + return result + + def test_analyze_mode_output_contract(self) -> None: + result = self._run_and_check("analyze") + self.assertEqual(result["status"], "analysis_only") + self.assertEqual(result["lane"], "none") + + def test_report_mode_output_contract(self) -> None: + result = self._run_and_check("report") + self.assertIn(result["status"], ("mutated", "no_meaningful_delta", "no_safe_mutation_available")) + + def test_mutate_mode_output_contract(self) -> None: + result = self._run_and_check("mutate") + self.assertIn(result["status"], ("mutated", "no_meaningful_delta", "no_safe_mutation_available")) + + def test_campaign_mode_output_contract(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + config = _make_config(root, mode="campaign") + result = ra.run_campaign(config, list(ra.MUTATION_LANE_ORDER), max_slices=2, stop_on_failure=False) + campaign_fields = {"status", "mode", "slices_attempted", "slices_applied", + "lanes_requested", "lanes_executed", "architecture_score", + "requested_model", "actual_model", "fallback_reason", "results"} + missing = campaign_fields - set(result.keys()) + self.assertEqual(missing, set(), f"Missing campaign output fields: {missing}") + + +# --------------------------------------------------------------------------- +# 6. Lane selection priority +# --------------------------------------------------------------------------- + +class TestLanePriority(unittest.TestCase): + def _analysis(self, parse_errors: bool = False, cycles: bool = False, + debug_prints: bool = False, many_eps: bool = False) -> Dict[str, Any]: + infos = [] + if debug_prints: + infos.append({"path": "foo.py", "debug_print_lines": [5], + "module": "foo", "parse_error": None}) + else: + infos.append({"path": "foo.py", "debug_print_lines": [], + "module": "foo", "parse_error": None}) + ep_clusters: Dict[str, List[str]] = {} + ep_paths: List[str] = [] + if many_eps: + # Above _ENTRYPOINT_CONSOLIDATION_THRESHOLD backend server entrypoints + ep_paths = [f"backend/server{i}.py" for i in range(6)] + ep_clusters["backend_servers"] = ep_paths + return { + "parse_error_files": ["foo.py"] if parse_errors else [], + "cycles": [["a", "b", "a"]] if cycles else [], + "python_files": infos, + "roadmap": [], + "entrypoint_paths": ep_paths, + "entrypoint_clusters": ep_clusters, + "architecture_score": 80, + "score_factors": {}, + "local_import_graph": {}, + "debug_print_candidates": ["foo.py"] if debug_prints else [], + } + + def test_analyze_mode_returns_none(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + config = _make_config(root, mode="analyze") + plan, lane, reason = ra.build_patch_plan(config, self._analysis(), {}, {}) + self.assertIsNone(plan) + self.assertEqual(lane, "none") + + def test_hygiene_selected_when_debug_prints_exist(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + # Create a real file for hygiene lane to read + foo = root / "foo.py" + foo.write_text("print('hi') # DEBUG\n", encoding="utf-8") + config = _make_config(root, mode="mutate") + analysis = self._analysis(debug_prints=True) + plan, lane, _ = ra.build_patch_plan(config, analysis, {}, {}) + self.assertEqual(lane, "hygiene") + self.assertIsNotNone(plan) + + def test_parse_errors_attempted_before_hygiene(self) -> None: + """parse_errors lane must be attempted before hygiene even without a token. + + Without a github_token, parse_errors plan returns None (model unavailable). + The lane falls through to hygiene which *does* fire (debug prints exist). + """ + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + foo = root / "foo.py" + foo.write_text("print('hi') # DEBUG\n", encoding="utf-8") + # No github_token → parse_errors plan returns None → falls through to hygiene + config = _make_config(root, mode="mutate") + analysis = self._analysis(parse_errors=True, debug_prints=True) + plan, lane, _ = ra.build_patch_plan(config, analysis, {}, {}) + # parse_errors was attempted first (skipped without token), hygiene fires next + self.assertEqual(lane, "hygiene", f"Expected hygiene (parse_errors skipped without token), got: {lane!r}") + + def test_report_lane_is_fallback(self) -> None: + """In mutate mode with no actionable higher lane, report is the fallback.""" + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + config = _make_config(root, mode="mutate") + analysis = self._analysis() # no parse errors, no cycles, no debug prints, no many eps + plan, lane, _ = ra.build_patch_plan(config, analysis, {}, {"last_report_hash": None}) + self.assertEqual(lane, "report") + + def test_report_suppressed_when_parse_errors_and_code_fix_attempted(self) -> None: + """If parse_errors exist and code fix was attempted (but failed), report is suppressed.""" + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + # Has token so parse_errors lane is attempted + config = _make_config(root, mode="mutate", github_token="fake-tok", + preferred_model="openai/gpt-5.4", fallback_model="openai/gpt-4.1") + analysis = self._analysis(parse_errors=True) + + # Model returns None → parse_errors plan fails → report should be suppressed + with patch.object(ra, "call_models_with_fallback_or_none", + return_value=(None, "openai/gpt-5.4", "model unavailable", False)): + plan, lane, reason = ra.build_patch_plan(config, analysis, {}, {"last_report_hash": None}) + + # parse_errors attempted, failed → report suppressed → no plan + self.assertIsNone(plan) + self.assertIsNotNone(reason) + self.assertIn("report", reason.lower()) + + +# --------------------------------------------------------------------------- +# 7. extract_json_from_model_text +# --------------------------------------------------------------------------- + +class TestExtractJsonFromModelText(unittest.TestCase): + def test_bare_json(self) -> None: + result = ra.extract_json_from_model_text('{"files": {"foo.py": "content"}}') + self.assertEqual(result, {"files": {"foo.py": "content"}}) + + def test_fenced_json(self) -> None: + text = '```json\n{"files": {"bar.py": "x"}}\n```' + result = ra.extract_json_from_model_text(text) + self.assertEqual(result, {"files": {"bar.py": "x"}}) + + def test_embedded_json_in_prose(self) -> None: + text = 'Here is the fix: {"files": {"baz.py": "y"}} done' + result = ra.extract_json_from_model_text(text) + self.assertEqual(result, {"files": {"baz.py": "y"}}) + + def test_invalid_raises(self) -> None: + with self.assertRaises(ra.RepoArchitectError): + ra.extract_json_from_model_text("no json here at all") + + +# --------------------------------------------------------------------------- +# 8. Entrypoint consolidation lane +# --------------------------------------------------------------------------- + +class TestEntrypointConsolidationLane(unittest.TestCase): + def test_below_threshold_returns_none(self) -> None: + """build_entrypoint_consolidation_plan returns None when below threshold.""" + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + config = _make_config(root, mode="mutate", github_token="tok", + preferred_model="openai/gpt-5.4") + analysis = { + "entrypoint_clusters": {"backend_servers": ["a.py", "b.py"]}, # only 2, below threshold + "python_files": [], + } + plan = ra.build_entrypoint_consolidation_plan(config, analysis) + self.assertIsNone(plan) + + def test_no_token_returns_none(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + config = _make_config(root, mode="mutate") # no token + analysis = { + "entrypoint_clusters": {"backend_servers": [f"server{i}.py" for i in range(6)]}, + "python_files": [], + } + plan = ra.build_entrypoint_consolidation_plan(config, analysis) + self.assertIsNone(plan) + + def test_model_response_with_invalid_python_returns_none(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + for i in range(6): + (root / f"server{i}.py").write_text(f"# server {i}\npass\n", encoding="utf-8") + config = _make_config(root, mode="mutate", github_token="tok", + preferred_model="openai/gpt-5.4") + analysis = { + "entrypoint_clusters": {"backend_servers": [f"server{i}.py" for i in range(6)]}, + "python_files": [], + } + broken_resp = { + "choices": [{"message": {"content": '{"files": {"server0.py": "def broken("}}'}}] + } + with patch.object(ra, "call_models_with_fallback_or_none", + return_value=(broken_resp, "openai/gpt-5.4", None, False)): + plan = ra.build_entrypoint_consolidation_plan(config, analysis) + self.assertIsNone(plan) + + +# --------------------------------------------------------------------------- +# 9. Lane scoping for mutate mode +# --------------------------------------------------------------------------- + +class TestLaneScopingMutateMode(unittest.TestCase): + """Verify --lane / --lanes / env vars scope lane selection in mutate mode.""" + + def test_lanes_flag_scopes_mutate_to_single_lane(self) -> None: + """--lanes hygiene should restrict mutate to only the hygiene lane.""" + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + bad = root / "bad.py" + bad.write_text("print('debug') # DEBUG\n", encoding="utf-8") + config = _make_config(root, mode="mutate", campaign_lanes=("hygiene",)) + analysis = ra.build_analysis(root) + plan, lane, _ = ra.build_patch_plan(config, analysis, {}, {}) + self.assertIsNotNone(plan) + self.assertEqual(lane, "hygiene") + + def test_lanes_flag_excludes_hygiene(self) -> None: + """--lanes report should NOT see hygiene changes even if debug prints exist.""" + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + bad = root / "bad.py" + bad.write_text("print('debug') # DEBUG\n", encoding="utf-8") + config = _make_config(root, mode="mutate", campaign_lanes=("report",)) + analysis = ra.build_analysis(root) + plan, lane, _ = ra.build_patch_plan(config, analysis, {}, {}) + # Only 'report' was in the active lanes, so hygiene was never considered. + # lane=='report' proves hygiene was excluded from lane selection. + self.assertEqual(lane, "report") + + def test_lane_singular_flag_parsed(self) -> None: + """--lane hygiene should produce campaign_lanes=('hygiene',) in Config.""" + args = ra.parse_args(["--mode", "mutate", "--lane", "hygiene"]) + self.assertEqual(args.lane, "hygiene") + + def test_build_config_reads_lanes_env(self) -> None: + """REPO_ARCHITECT_LANES env var should populate campaign_lanes.""" + env = { + "REPO_ARCHITECT_LANES": "parse_errors,hygiene", + } + with patch.dict(os.environ, env, clear=False): + args = ra.parse_args(["--mode", "mutate"]) + config = ra.build_config(args) + self.assertEqual(config.campaign_lanes, ("parse_errors", "hygiene")) + + def test_cli_lanes_overrides_env(self) -> None: + """CLI --lanes should take precedence over REPO_ARCHITECT_LANES env var.""" + env = {"REPO_ARCHITECT_LANES": "report"} + with patch.dict(os.environ, env, clear=False): + args = ra.parse_args(["--mode", "mutate", "--lanes", "hygiene"]) + config = ra.build_config(args) + self.assertEqual(config.campaign_lanes, ("hygiene",)) + + +# --------------------------------------------------------------------------- +# 10. Enhanced validation (import smoke) +# --------------------------------------------------------------------------- + +class TestValidateChange(unittest.TestCase): + """Verify validate_change includes lane-aware behaviour.""" + + def test_py_compile_passes_for_valid_python(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + (root / "ok.py").write_text("x = 1\n", encoding="utf-8") + config = _make_config(root, mode="mutate") + ok, msg = ra.validate_change(config, ["ok.py"]) + self.assertTrue(ok) + self.assertIn("py_compile", msg.lower()) + + def test_py_compile_fails_for_invalid_python(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + (root / "bad.py").write_text("def broken(\n", encoding="utf-8") + config = _make_config(root, mode="mutate") + ok, msg = ra.validate_change(config, ["bad.py"]) + self.assertFalse(ok) + + def test_import_smoke_runs_for_import_cycles_lane(self) -> None: + """When lane=import_cycles, validate_change should run import smoke.""" + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + # A valid file that won't import cleanly as a module from this dir + (root / "cycle_fix.py").write_text("x = 1\n", encoding="utf-8") + config = _make_config(root, mode="mutate") + ok, msg = ra.validate_change(config, ["cycle_fix.py"], lane="import_cycles") + # Should still pass (import smoke is warn-only) + self.assertTrue(ok) + + def test_non_python_files_always_pass(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + config = _make_config(root, mode="mutate") + ok, msg = ra.validate_change(config, ["README.md"]) + self.assertTrue(ok) + + def test_output_contract_includes_lanes_active(self) -> None: + """run_cycle result should include lanes_active field.""" + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + config = _make_config(root, mode="analyze") + result = ra.run_cycle(config) + self.assertIn("lanes_active", result) + self.assertIsInstance(result["lanes_active"], list) + self.assertEqual(result["lanes_active"], list(ra.MUTATION_LANE_ORDER)) + + def test_output_contract_lanes_active_respects_config(self) -> None: + """lanes_active should reflect config.campaign_lanes when set.""" + with tempfile.TemporaryDirectory() as tmp: + root = _make_git_root(tmp) + config = _make_config(root, mode="mutate", campaign_lanes=("hygiene",)) + result = ra.run_cycle(config) + self.assertEqual(result["lanes_active"], ["hygiene"]) + + +if __name__ == "__main__": + unittest.main()