Skip to content

Commit 7a1d4cd

Browse files
committed
feat(governance): add breaking-changes detector for capabilities
G1 — tools/detect_breaking_changes.py compares capability YAMLs between two git refs and flags: removed inputs/outputs, new required inputs, and type changes.
1 parent ced8b33 commit 7a1d4cd

File tree

1 file changed

+137
-0
lines changed

1 file changed

+137
-0
lines changed

tools/detect_breaking_changes.py

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
#!/usr/bin/env python3
2+
"""G1 — Detect breaking changes in capabilities between two git refs.
3+
4+
Compares each capability YAML between ``--base`` (default: HEAD~1) and the
5+
working tree. A change is **breaking** if it:
6+
7+
1. Removes a previously required output field.
8+
2. Removes a previously declared input field.
9+
3. Adds a new *required* input field (callers unprepared).
10+
4. Changes the type of an existing input or output field.
11+
12+
Usage:
13+
python tools/detect_breaking_changes.py [--base HEAD~1]
14+
15+
Exit codes:
16+
0 — no breaking changes
17+
1 — breaking changes detected
18+
2 — runtime error (e.g. git not available)
19+
"""
20+
from __future__ import annotations
21+
22+
import argparse
23+
import subprocess
24+
import sys
25+
from pathlib import Path
26+
from typing import Any, Dict, List
27+
28+
import yaml
29+
30+
REPO_ROOT = Path(__file__).resolve().parent.parent
31+
32+
33+
def _git_show(ref: str, path: str) -> str | None:
34+
"""Return file content at a given git ref, or None if it didn't exist."""
35+
try:
36+
result = subprocess.run(
37+
["git", "show", f"{ref}:{path}"],
38+
capture_output=True,
39+
text=True,
40+
cwd=str(REPO_ROOT),
41+
timeout=10,
42+
)
43+
if result.returncode == 0:
44+
return result.stdout
45+
except (FileNotFoundError, subprocess.TimeoutExpired):
46+
pass
47+
return None
48+
49+
50+
def _load_yaml_str(text: str) -> Dict[str, Any]:
51+
return yaml.safe_load(text) or {}
52+
53+
54+
def _detect_for_capability(
55+
cap_id: str, old: Dict[str, Any], new: Dict[str, Any]
56+
) -> List[str]:
57+
"""Return list of breaking-change descriptions."""
58+
breaks: List[str] = []
59+
60+
old_inputs = old.get("inputs", {}) or {}
61+
new_inputs = new.get("inputs", {}) or {}
62+
old_outputs = old.get("outputs", {}) or {}
63+
new_outputs = new.get("outputs", {}) or {}
64+
65+
# 1. Removed output fields
66+
for name in old_outputs:
67+
if name not in new_outputs:
68+
breaks.append(f"[{cap_id}] output '{name}' removed")
69+
70+
# 2. Removed input fields
71+
for name in old_inputs:
72+
if name not in new_inputs:
73+
breaks.append(f"[{cap_id}] input '{name}' removed")
74+
75+
# 3. New required inputs
76+
for name, spec in new_inputs.items():
77+
if name not in old_inputs:
78+
if isinstance(spec, dict) and spec.get("required"):
79+
breaks.append(f"[{cap_id}] new required input '{name}' added")
80+
81+
# 4. Type changes
82+
for name in old_inputs:
83+
if name in new_inputs:
84+
old_type = old_inputs[name].get("type") if isinstance(old_inputs[name], dict) else None
85+
new_type = new_inputs[name].get("type") if isinstance(new_inputs[name], dict) else None
86+
if old_type and new_type and old_type != new_type:
87+
breaks.append(f"[{cap_id}] input '{name}' type changed: {old_type}{new_type}")
88+
89+
for name in old_outputs:
90+
if name in new_outputs:
91+
old_type = old_outputs[name].get("type") if isinstance(old_outputs[name], dict) else None
92+
new_type = new_outputs[name].get("type") if isinstance(new_outputs[name], dict) else None
93+
if old_type and new_type and old_type != new_type:
94+
breaks.append(f"[{cap_id}] output '{name}' type changed: {old_type}{new_type}")
95+
96+
return breaks
97+
98+
99+
def main() -> int:
100+
parser = argparse.ArgumentParser(description="Detect breaking capability changes")
101+
parser.add_argument("--base", default="HEAD~1", help="Git ref to compare against (default: HEAD~1)")
102+
args = parser.parse_args()
103+
104+
cap_dir = REPO_ROOT / "capabilities"
105+
if not cap_dir.exists():
106+
print("No capabilities/ directory found.", file=sys.stderr)
107+
return 2
108+
109+
all_breaks: List[str] = []
110+
111+
for path in sorted(cap_dir.glob("*.yaml")):
112+
if path.name.startswith("_"):
113+
continue
114+
115+
new_data = _load_yaml_str(path.read_text(encoding="utf-8-sig"))
116+
cap_id = new_data.get("id", path.stem)
117+
118+
rel_path = path.relative_to(REPO_ROOT).as_posix()
119+
old_text = _git_show(args.base, rel_path)
120+
if old_text is None:
121+
continue # new capability — not a breaking change
122+
123+
old_data = _load_yaml_str(old_text)
124+
all_breaks.extend(_detect_for_capability(cap_id, old_data, new_data))
125+
126+
if all_breaks:
127+
print(f"BREAKING CHANGES DETECTED ({len(all_breaks)}):\n")
128+
for b in all_breaks:
129+
print(f" - {b}")
130+
return 1
131+
132+
print("No breaking changes detected.")
133+
return 0
134+
135+
136+
if __name__ == "__main__":
137+
sys.exit(main())

0 commit comments

Comments
 (0)