|
| 1 | +"""B1 — change management. |
| 2 | +
|
| 3 | +Compares two attestations of the same host. Any added, removed, or |
| 4 | +reclassified tool is a change that would, under traditional change |
| 5 | +management, generate a change record. The check is WARN if any change |
| 6 | +is present and the ``changes_approved`` flag is not set; BLOCK if |
| 7 | +``require_approval`` is enabled in the policy context and the flag is |
| 8 | +not set. PASS otherwise. |
| 9 | +""" |
| 10 | + |
| 11 | +from __future__ import annotations |
| 12 | + |
| 13 | +from mcp_governance_kit.attest.schema import Attestation, ToolRecord |
| 14 | +from mcp_governance_kit.breakpoints.base import CheckResult, Severity |
| 15 | + |
| 16 | + |
| 17 | +def _tool_key(t: ToolRecord) -> str: |
| 18 | + return f"{t.name}@{t.server.identity}" |
| 19 | + |
| 20 | + |
| 21 | +def b1_change( |
| 22 | + previous: Attestation | None, |
| 23 | + current: Attestation, |
| 24 | + *, |
| 25 | + changes_approved: bool = False, |
| 26 | + require_approval: bool = False, |
| 27 | +) -> CheckResult: |
| 28 | + """Compare two attestations and surface the change set. |
| 29 | +
|
| 30 | + If ``previous`` is ``None`` the check is considered INFO (first-time |
| 31 | + attestation, no change to compare against). |
| 32 | + """ |
| 33 | + if previous is None: |
| 34 | + return CheckResult( |
| 35 | + check_id="B1", |
| 36 | + title="Change management", |
| 37 | + severity=Severity.INFO, |
| 38 | + summary="No prior attestation supplied; change set cannot be computed.", |
| 39 | + ) |
| 40 | + |
| 41 | + prev_by_key = {_tool_key(t): t for t in previous.tools} |
| 42 | + curr_by_key = {_tool_key(t): t for t in current.tools} |
| 43 | + |
| 44 | + added = sorted(set(curr_by_key) - set(prev_by_key)) |
| 45 | + removed = sorted(set(prev_by_key) - set(curr_by_key)) |
| 46 | + reclassified: list[str] = [] |
| 47 | + for key in set(prev_by_key) & set(curr_by_key): |
| 48 | + p, c = prev_by_key[key], curr_by_key[key] |
| 49 | + if (p.reach, p.action, p.server.third_party) != (c.reach, c.action, c.server.third_party): |
| 50 | + reclassified.append(key) |
| 51 | + reclassified.sort() |
| 52 | + |
| 53 | + total_changes = len(added) + len(removed) + len(reclassified) |
| 54 | + tcs_delta = current.tcs.value - previous.tcs.value |
| 55 | + |
| 56 | + if total_changes == 0: |
| 57 | + return CheckResult( |
| 58 | + check_id="B1", |
| 59 | + title="Change management", |
| 60 | + severity=Severity.PASS, |
| 61 | + summary="Tool graph unchanged since previous attestation.", |
| 62 | + evidence={"tcs_delta": tcs_delta}, |
| 63 | + ) |
| 64 | + |
| 65 | + details = ( |
| 66 | + [f"+ {k}" for k in added] + [f"- {k}" for k in removed] + [f"~ {k}" for k in reclassified] |
| 67 | + ) |
| 68 | + |
| 69 | + if changes_approved: |
| 70 | + sev = Severity.INFO |
| 71 | + msg = f"{total_changes} approved change(s) since previous attestation." |
| 72 | + elif require_approval: |
| 73 | + sev = Severity.BLOCK |
| 74 | + msg = f"{total_changes} unapproved change(s) detected. Policy requires approval for B1." |
| 75 | + else: |
| 76 | + sev = Severity.WARN |
| 77 | + msg = f"{total_changes} change(s) detected without change record." |
| 78 | + |
| 79 | + return CheckResult( |
| 80 | + check_id="B1", |
| 81 | + title="Change management", |
| 82 | + severity=sev, |
| 83 | + summary=msg, |
| 84 | + details=details, |
| 85 | + evidence={ |
| 86 | + "added": len(added), |
| 87 | + "removed": len(removed), |
| 88 | + "reclassified": len(reclassified), |
| 89 | + "tcs_delta": tcs_delta, |
| 90 | + }, |
| 91 | + ) |
0 commit comments