|
| 1 | +"""Claude Code ``PreToolUse`` hook. |
| 2 | +
|
| 3 | +Installed as a JSON-based hook in ``~/.claude/settings.json``, this |
| 4 | +script reads the pending tool-use event from stdin, looks up the tool |
| 5 | +in the current attestation for the workspace, and permits or denies |
| 6 | +the call based on the active policy. |
| 7 | +
|
| 8 | +Hook wire format (PreToolUse): |
| 9 | +
|
| 10 | + { |
| 11 | + "tool_name": "<string>", |
| 12 | + "tool_input": {...}, |
| 13 | + "session_id": "<string>", |
| 14 | + "transcript_path": "<path>", |
| 15 | + "cwd": "<path>" |
| 16 | + } |
| 17 | +
|
| 18 | +The hook writes a JSON response to stdout with a ``decision`` field |
| 19 | +("allow" | "deny") and a short reason. A non-zero exit code converts to |
| 20 | +a denial with Claude Code's default behaviour. |
| 21 | +""" |
| 22 | + |
| 23 | +from __future__ import annotations |
| 24 | + |
| 25 | +import json |
| 26 | +import sys |
| 27 | +from pathlib import Path |
| 28 | + |
| 29 | +from mcp_governance_kit.attest import Attestation |
| 30 | +from mcp_governance_kit.breakpoints import Severity, b4_privilege |
| 31 | +from mcp_governance_kit.policy import Policy |
| 32 | + |
| 33 | + |
| 34 | +def _load_context(cwd: Path) -> tuple[Attestation | None, Policy | None]: |
| 35 | + att_path = cwd / ".mcp-governance" / "attestation.json" |
| 36 | + policy_path = cwd / ".mcp-governance" / "policy.yaml" |
| 37 | + attestation = ( |
| 38 | + Attestation.model_validate_json(att_path.read_text(encoding="utf-8")) |
| 39 | + if att_path.exists() |
| 40 | + else None |
| 41 | + ) |
| 42 | + policy = Policy.load(policy_path) if policy_path.exists() else None |
| 43 | + return attestation, policy |
| 44 | + |
| 45 | + |
| 46 | +def main() -> int: |
| 47 | + try: |
| 48 | + event = json.load(sys.stdin) |
| 49 | + except json.JSONDecodeError: |
| 50 | + print(json.dumps({"decision": "allow", "reason": "no event"})) |
| 51 | + return 0 |
| 52 | + |
| 53 | + cwd = Path(event.get("cwd") or ".") |
| 54 | + attestation, policy = _load_context(cwd) |
| 55 | + tool_name = event.get("tool_name", "") |
| 56 | + |
| 57 | + if attestation is None or policy is None: |
| 58 | + print(json.dumps({"decision": "allow", "reason": "no attestation in workspace"})) |
| 59 | + return 0 |
| 60 | + |
| 61 | + bound = {t.name for t in attestation.tools} |
| 62 | + if tool_name and tool_name not in bound and tool_name.split("__")[-1] not in bound: |
| 63 | + print( |
| 64 | + json.dumps( |
| 65 | + { |
| 66 | + "decision": "deny", |
| 67 | + "reason": ( |
| 68 | + f"tool '{tool_name}' not present in current attestation " |
| 69 | + f"({attestation.attestation_id}); re-attest with mcp-gov attest" |
| 70 | + ), |
| 71 | + } |
| 72 | + ) |
| 73 | + ) |
| 74 | + return 2 |
| 75 | + |
| 76 | + # Re-run the privilege check with the attestation's stored TCS value. |
| 77 | + priv = b4_privilege( |
| 78 | + attestation, |
| 79 | + max_tcs=policy.max_tcs, |
| 80 | + warn_tcs=policy.warn_tcs, |
| 81 | + require_approval_for_execute=policy.require_approval_for_execute, |
| 82 | + execute_approved=policy.execute_approved, |
| 83 | + ) |
| 84 | + if priv.severity is Severity.BLOCK: |
| 85 | + print( |
| 86 | + json.dumps( |
| 87 | + { |
| 88 | + "decision": "deny", |
| 89 | + "reason": f"B4 blocked: {priv.summary}", |
| 90 | + } |
| 91 | + ) |
| 92 | + ) |
| 93 | + return 2 |
| 94 | + |
| 95 | + print(json.dumps({"decision": "allow", "reason": "within policy"})) |
| 96 | + return 0 |
| 97 | + |
| 98 | + |
| 99 | +if __name__ == "__main__": # pragma: no cover |
| 100 | + sys.exit(main()) |
0 commit comments