|
43 | 43 | _DENY_RE = re.compile(r"deny\(?\d*\)?\s+([\w\-\*]+)\s+(.*)", re.IGNORECASE) |
44 | 44 |
|
45 | 45 |
|
46 | | -def _load_policy_rule_index() -> dict[str, str]: |
47 | | - """Load compiled_policy.json and build a reverse index: value -> rule_id. |
48 | | -
|
49 | | - This allows us to cross-reference sandbox violations back to Rule IDs. |
50 | | - """ |
51 | | - index: dict[str, str] = {} |
| 46 | +def _load_policy_rule_index() -> dict: |
| 47 | + """Load compiled_policy.json to allow cross-referencing sandbox violations.""" |
52 | 48 | try: |
53 | 49 | with open(POLICY_PATH) as f: |
54 | | - policy = json.load(f) |
55 | | - |
56 | | - for rule_id, path in policy.get("blocked_paths", {}).get("macos", {}).items(): |
57 | | - index[path] = rule_id |
58 | | - |
59 | | - for rule_id, name in policy.get("blocked_executables", {}).items(): |
60 | | - index[name] = rule_id |
61 | | - |
62 | | - for rule_id, directive in policy.get("deny_rules", {}).get("macos", {}).items(): |
63 | | - if isinstance(directive, str): |
64 | | - index[directive] = rule_id |
65 | | - |
| 50 | + return json.load(f) |
66 | 51 | except (FileNotFoundError, json.JSONDecodeError) as exc: |
67 | 52 | logger.warning("Could not load policy for rule index: %s", exc) |
68 | | - |
69 | | - return index |
| 53 | + return {} |
70 | 54 |
|
71 | 55 |
|
72 | 56 | def tail_sandbox_log(): |
@@ -109,12 +93,32 @@ def tail_sandbox_log(): |
109 | 93 | action = match.group(1) # e.g. "file-read-data" |
110 | 94 | target = match.group(2).strip() # e.g. "/Users/leo/.ssh/id_rsa" |
111 | 95 |
|
112 | | - # Try to find a matching Rule ID |
113 | | - rule_id = "UNKNOWN" |
114 | | - for value, rid in rule_index.items(): |
115 | | - if value in target: |
| 96 | + # Try to find a matching Rule ID based on strict policy boundaries |
| 97 | + rule_id = None |
| 98 | + |
| 99 | + # 1. Check paths (target must be inside the directory) |
| 100 | + for rid, path in rule_index.get("blocked_paths", {}).get("macos", {}).items(): |
| 101 | + if target.startswith(path): |
116 | 102 | rule_id = rid |
117 | 103 | break |
| 104 | + |
| 105 | + # 2. Check executables (target must be the exact file or end with /file) |
| 106 | + if not rule_id: |
| 107 | + for rid, name in rule_index.get("blocked_executables", {}).items(): |
| 108 | + if target == name or target.endswith("/" + name): |
| 109 | + rule_id = rid |
| 110 | + break |
| 111 | + |
| 112 | + # 3. Check custom deny rules (loose match for fallback custom macros inside strings) |
| 113 | + if not rule_id: |
| 114 | + for rid, directive in rule_index.get("deny_rules", {}).get("macos", {}).items(): |
| 115 | + if isinstance(directive, str) and target in directive: |
| 116 | + rule_id = rid |
| 117 | + break |
| 118 | + |
| 119 | + # Discard system-wide sandbox noise that isn't ours |
| 120 | + if not rule_id: |
| 121 | + continue |
118 | 122 |
|
119 | 123 | # Log the blocked event in the format expected by the dashboard |
120 | 124 | logger.warning("BLOCKED [%s] action=%s target=%s", rule_id, action, target) |
|
0 commit comments