Skip to content

Commit c4a3c7a

Browse files
committed
feat: add catalog semantic query tool (Gap 6 — cognitive/safety queries over catalog)
1 parent 7d2f00f commit c4a3c7a

File tree

1 file changed

+364
-0
lines changed

1 file changed

+364
-0
lines changed

tools/catalog_query.py

Lines changed: 364 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,364 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Semantic query tool over the capability catalog.
4+
5+
Exploits cognitive_hints, safety, and properties to answer questions like:
6+
- What capabilities produce Risk?
7+
- What consumes Option?
8+
- What roles are covered?
9+
- What slots are covered vs uncovered?
10+
- What capabilities are compatible (produces → consumes)?
11+
12+
Usage:
13+
python tools/catalog_query.py produces Risk
14+
python tools/catalog_query.py consumes Option
15+
python tools/catalog_query.py role analyze
16+
python tools/catalog_query.py compatible analysis.risk.extract
17+
python tools/catalog_query.py coverage
18+
python tools/catalog_query.py safety
19+
python tools/catalog_query.py chain Risk Decision
20+
"""
21+
from __future__ import annotations
22+
23+
import json
24+
import sys
25+
from pathlib import Path
26+
from typing import Any
27+
28+
29+
def _default_base() -> Path:
30+
return Path(__file__).resolve().parent.parent
31+
32+
33+
def _load_catalog(base: Path) -> list[dict[str, Any]]:
34+
path = base / "catalog" / "capabilities.json"
35+
with path.open("r", encoding="utf-8") as f:
36+
return json.load(f)
37+
38+
39+
def _load_cognitive_types(base: Path) -> dict[str, Any]:
40+
path = base / "vocabulary" / "cognitive_types.yaml"
41+
if not path.exists():
42+
return {}
43+
import yaml
44+
with path.open("r", encoding="utf-8") as f:
45+
return yaml.safe_load(f) or {}
46+
47+
48+
# ── Index builders ──────────────────────────────────────────────
49+
50+
51+
def _build_produces_index(caps: list[dict]) -> dict[str, list[str]]:
52+
"""type_name → [capability_ids that produce it]"""
53+
index: dict[str, list[str]] = {}
54+
for c in caps:
55+
hints = c.get("cognitive_hints")
56+
if not hints:
57+
continue
58+
produces = hints.get("produces")
59+
if not isinstance(produces, dict):
60+
continue
61+
seen_types: set[str] = set()
62+
for field_spec in produces.values():
63+
if isinstance(field_spec, dict):
64+
t = field_spec.get("type")
65+
if isinstance(t, str) and t not in seen_types:
66+
seen_types.add(t)
67+
index.setdefault(t, []).append(c["id"])
68+
return index
69+
70+
71+
def _build_consumes_index(caps: list[dict]) -> dict[str, list[str]]:
72+
"""type_name → [capability_ids that consume it]"""
73+
index: dict[str, list[str]] = {}
74+
for c in caps:
75+
hints = c.get("cognitive_hints")
76+
if not hints:
77+
continue
78+
consumes = hints.get("consumes")
79+
if not isinstance(consumes, list):
80+
continue
81+
for t in consumes:
82+
if isinstance(t, str):
83+
index.setdefault(t, []).append(c["id"])
84+
return index
85+
86+
87+
def _build_role_index(caps: list[dict]) -> dict[str, list[str]]:
88+
"""role → [capability_ids]"""
89+
index: dict[str, list[str]] = {}
90+
for c in caps:
91+
hints = c.get("cognitive_hints")
92+
if not hints:
93+
continue
94+
role = hints.get("role")
95+
if isinstance(role, str):
96+
index.setdefault(role, []).append(c["id"])
97+
elif isinstance(role, list):
98+
for r in role:
99+
index.setdefault(r, []).append(c["id"])
100+
return index
101+
102+
103+
# ── Query commands ──────────────────────────────────────────────
104+
105+
106+
def cmd_produces(caps: list[dict], type_name: str) -> None:
107+
"""Which capabilities produce a given cognitive type?"""
108+
idx = _build_produces_index(caps)
109+
matches = idx.get(type_name, [])
110+
if not matches:
111+
print(f"No capabilities produce '{type_name}'.")
112+
all_types = sorted(idx.keys())
113+
if all_types:
114+
print(f"Available types: {', '.join(all_types)}")
115+
return
116+
print(f"Capabilities that produce '{type_name}':")
117+
for cid in sorted(matches):
118+
print(f" {cid}")
119+
120+
121+
def cmd_consumes(caps: list[dict], type_name: str) -> None:
122+
"""Which capabilities consume a given cognitive type?"""
123+
idx = _build_consumes_index(caps)
124+
matches = idx.get(type_name, [])
125+
if not matches:
126+
print(f"No capabilities consume '{type_name}'.")
127+
all_types = sorted(idx.keys())
128+
if all_types:
129+
print(f"Available types: {', '.join(all_types)}")
130+
return
131+
print(f"Capabilities that consume '{type_name}':")
132+
for cid in sorted(matches):
133+
print(f" {cid}")
134+
135+
136+
def cmd_role(caps: list[dict], role_name: str) -> None:
137+
"""Which capabilities belong to a given cognitive role?"""
138+
idx = _build_role_index(caps)
139+
matches = idx.get(role_name, [])
140+
if not matches:
141+
print(f"No capabilities with role '{role_name}'.")
142+
all_roles = sorted(idx.keys())
143+
if all_roles:
144+
print(f"Available roles: {', '.join(all_roles)}")
145+
return
146+
print(f"Capabilities with role '{role_name}':")
147+
for cid in sorted(matches):
148+
print(f" {cid}")
149+
150+
151+
def cmd_compatible(caps: list[dict], capability_id: str) -> None:
152+
"""What capabilities can consume what this one produces (downstream)?"""
153+
cap = next((c for c in caps if c["id"] == capability_id), None)
154+
if not cap:
155+
print(f"Capability '{capability_id}' not found.")
156+
return
157+
hints = cap.get("cognitive_hints")
158+
if not hints or not hints.get("produces"):
159+
print(f"Capability '{capability_id}' has no produces declaration.")
160+
return
161+
produced_types: set[str] = set()
162+
for field_spec in hints["produces"].values():
163+
if isinstance(field_spec, dict) and isinstance(field_spec.get("type"), str):
164+
produced_types.add(field_spec["type"])
165+
166+
consumes_idx = _build_consumes_index(caps)
167+
print(f"'{capability_id}' produces: {', '.join(sorted(produced_types))}")
168+
print()
169+
found = False
170+
for t in sorted(produced_types):
171+
consumers = consumes_idx.get(t, [])
172+
consumers = [c for c in consumers if c != capability_id]
173+
if consumers:
174+
found = True
175+
print(f" {t} → consumed by:")
176+
for cid in sorted(consumers):
177+
print(f" {cid}")
178+
if not found:
179+
print(" No downstream consumers found.")
180+
181+
182+
def cmd_coverage(caps: list[dict], cognitive_types: dict) -> None:
183+
"""Which cognitive types and roles are covered vs uncovered?"""
184+
produces_idx = _build_produces_index(caps)
185+
consumes_idx = _build_consumes_index(caps)
186+
role_idx = _build_role_index(caps)
187+
188+
all_types = sorted(set(
189+
list(cognitive_types.get("types", {}).keys())
190+
))
191+
all_roles = sorted(cognitive_types.get("roles", []))
192+
193+
total_caps = len(caps)
194+
annotated = sum(1 for c in caps if c.get("cognitive_hints"))
195+
196+
print(f"Annotation coverage: {annotated}/{total_caps} capabilities")
197+
print()
198+
199+
print("Type coverage (produced / consumed):")
200+
for t in all_types:
201+
p = len(produces_idx.get(t, []))
202+
c = len(consumes_idx.get(t, []))
203+
marker = "✓" if p > 0 else "✗"
204+
print(f" {marker} {t:20s} produced={p} consumed={c}")
205+
206+
uncovered = [t for t in all_types if t not in produces_idx and t not in consumes_idx]
207+
if uncovered:
208+
print(f"\n Uncovered types: {', '.join(uncovered)}")
209+
210+
print()
211+
print("Role coverage:")
212+
for r in all_roles:
213+
count = len(role_idx.get(r, []))
214+
marker = "✓" if count > 0 else "✗"
215+
print(f" {marker} {r:15s} capabilities={count}")
216+
217+
empty_roles = [r for r in all_roles if r not in role_idx]
218+
if empty_roles:
219+
print(f"\n Uncovered roles: {', '.join(empty_roles)}")
220+
221+
222+
def cmd_safety(caps: list[dict]) -> None:
223+
"""Which capabilities have safety blocks and what do they require?"""
224+
with_safety = [c for c in caps if c.get("safety")]
225+
side_effect_caps = [
226+
c for c in caps
227+
if isinstance(c.get("properties"), dict) and c["properties"].get("side_effects")
228+
]
229+
missing_safety = [
230+
c for c in side_effect_caps if not c.get("safety")
231+
]
232+
233+
print(f"Safety coverage: {len(with_safety)} annotated / "
234+
f"{len(side_effect_caps)} side-effecting capabilities")
235+
if missing_safety:
236+
print(f"\n ⚠ Missing safety (side_effects=true but no safety block):")
237+
for c in missing_safety:
238+
print(f" {c['id']}")
239+
240+
if with_safety:
241+
print()
242+
for c in with_safety:
243+
s = c["safety"]
244+
parts = [f"trust={s.get('trust_level', '?')}"]
245+
if s.get("requires_confirmation"):
246+
parts.append("confirm=yes")
247+
if s.get("reversible") is True:
248+
parts.append("reversible")
249+
if s.get("allowed_targets"):
250+
parts.append(f"targets={s['allowed_targets']}")
251+
pre = s.get("mandatory_pre_gates", [])
252+
post = s.get("mandatory_post_gates", [])
253+
if pre:
254+
parts.append(f"pre_gates={len(pre)}")
255+
if post:
256+
parts.append(f"post_gates={len(post)}")
257+
print(f" {c['id']:40s} {', '.join(parts)}")
258+
259+
260+
def cmd_chain(caps: list[dict], source_type: str, target_type: str) -> None:
261+
"""Find capability chains from source_type to target_type (1-2 hops)."""
262+
produces_idx = _build_produces_index(caps)
263+
consumes_idx = _build_consumes_index(caps)
264+
265+
# Direct: capabilities that consume source_type AND produce target_type
266+
source_consumers = set(consumes_idx.get(source_type, []))
267+
target_producers = set(produces_idx.get(target_type, []))
268+
direct = sorted(source_consumers & target_producers)
269+
270+
print(f"Chain: {source_type} → ... → {target_type}")
271+
print()
272+
273+
if direct:
274+
print(f" Direct (1-hop):")
275+
for cid in direct:
276+
print(f" {cid}")
277+
278+
# 2-hop: source_type → Cap A (produces X) → Cap B (consumes X, produces target_type)
279+
two_hop: list[tuple[str, str, str]] = []
280+
for cap_a_id in consumes_idx.get(source_type, []):
281+
cap_a = next((c for c in caps if c["id"] == cap_a_id), None)
282+
if not cap_a:
283+
continue
284+
a_hints = cap_a.get("cognitive_hints", {})
285+
a_produces = a_hints.get("produces", {})
286+
for field_spec in a_produces.values():
287+
if not isinstance(field_spec, dict):
288+
continue
289+
mid_type = field_spec.get("type")
290+
if not isinstance(mid_type, str):
291+
continue
292+
for cap_b_id in consumes_idx.get(mid_type, []):
293+
if cap_b_id == cap_a_id:
294+
continue
295+
if cap_b_id in target_producers:
296+
two_hop.append((cap_a_id, mid_type, cap_b_id))
297+
298+
seen: set[tuple[str, str]] = set()
299+
unique_hops: list[tuple[str, str, str]] = []
300+
for a, mid, b in two_hop:
301+
key = (a, b)
302+
if key not in seen:
303+
seen.add(key)
304+
unique_hops.append((a, mid, b))
305+
306+
if unique_hops:
307+
print(f"\n 2-hop paths:")
308+
for a, mid, b in sorted(unique_hops):
309+
print(f" {a} →[{mid}]→ {b}")
310+
311+
if not direct and not unique_hops:
312+
print(" No chains found.")
313+
314+
315+
# ── Main ────────────────────────────────────────────────────────
316+
317+
318+
USAGE = """\
319+
Usage: python tools/catalog_query.py <command> [args...]
320+
321+
Commands:
322+
produces <Type> Capabilities that produce a cognitive type
323+
consumes <Type> Capabilities that consume a cognitive type
324+
role <role> Capabilities with a given cognitive role
325+
compatible <capability_id> Downstream capabilities compatible with outputs
326+
coverage Type and role coverage report
327+
safety Safety block coverage and details
328+
chain <SourceType> <TargetType> Find capability chains (1-2 hops)
329+
"""
330+
331+
332+
def main() -> None:
333+
base = _default_base()
334+
caps = _load_catalog(base)
335+
336+
args = sys.argv[1:]
337+
if not args:
338+
print(USAGE)
339+
sys.exit(1)
340+
341+
command = args[0]
342+
343+
if command == "produces" and len(args) >= 2:
344+
cmd_produces(caps, args[1])
345+
elif command == "consumes" and len(args) >= 2:
346+
cmd_consumes(caps, args[1])
347+
elif command == "role" and len(args) >= 2:
348+
cmd_role(caps, args[1])
349+
elif command == "compatible" and len(args) >= 2:
350+
cmd_compatible(caps, args[1])
351+
elif command == "coverage":
352+
ct = _load_cognitive_types(base)
353+
cmd_coverage(caps, ct)
354+
elif command == "safety":
355+
cmd_safety(caps)
356+
elif command == "chain" and len(args) >= 3:
357+
cmd_chain(caps, args[1], args[2])
358+
else:
359+
print(USAGE)
360+
sys.exit(1)
361+
362+
363+
if __name__ == "__main__":
364+
main()

0 commit comments

Comments
 (0)