-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathir_builder.py
More file actions
767 lines (668 loc) · 28.7 KB
/
ir_builder.py
File metadata and controls
767 lines (668 loc) · 28.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
"""
Build a Profile IR from a compiled sandbox blob.
The Profile IR is a structured intermediate representation that classifies
each op-table slot's assignment, enabling semantic comparison between
original and roundtripped blobs even when byte-level equality fails.
"""
from __future__ import annotations
import hashlib
import io
import json
import struct
from pathlib import Path
from typing import Any
from pawl.reverse.api import load_reverse_context
from pawl.reverse.core import operation_node
from pawl.reverse.core.filters import Filters
from pawl.reverse.render.op_attribution import (
AttributionMode,
build_profile_op_table_view,
)
from pawl.reverse.render.sbpl import (
_is_contextual_implicit_op,
_is_implicit_nonterm_op,
_is_implicit_terminal_op,
_select_default_terminal_node,
)
from integration.ir import literal_pool
def _compute_vocab_hash(repo_root: Path) -> str | None:
"""Compute SHA-256 hash of the ops vocabulary file.
Returns None if the vocab file doesn't exist.
"""
vocab_path = (
repo_root
/ "integration"
/ "carton"
/ "contract"
/ "bundle"
/ "relationships"
/ "mappings"
/ "vocab"
/ "ops.json"
)
if not vocab_path.exists():
return None
return hashlib.sha256(vocab_path.read_bytes()).hexdigest()
def build_profile_ir(
blob_path: Path | bytes,
*,
repo_root: Path,
source_explicit_ops: set[str] | None = None,
) -> dict[str, Any]:
"""Build a Profile IR dict from a compiled sandbox blob.
Parameters
----------
blob_path:
Path to the compiled ``.sb.bin`` file, or raw blob bytes.
repo_root:
Repository root for loading sb_ops and corpora.
source_explicit_ops:
Optional set of op names explicitly declared in the source SBPL.
Used to classify slots as ``explicit`` vs ``implicit``.
Returns
-------
dict conforming to ``pawl.profile.ir.v1`` schema.
"""
if isinstance(blob_path, (bytes, bytearray)):
blob_bytes = bytes(blob_path)
else:
blob_bytes = blob_path.read_bytes()
blob_sha256 = hashlib.sha256(blob_bytes).hexdigest()
# Compute vocab hash for cross-host comparison.
vocab_hash = _compute_vocab_hash(repo_root)
# load_reverse_context accepts both Path and bytes.
ctx = load_reverse_context(
blob_bytes if isinstance(blob_path, (bytes, bytearray)) else blob_path,
repo_root=repo_root,
)
sd = ctx.sandbox_data
nodes = ctx.operation_nodes
# Read raw op-table.
f = io.BytesIO(blob_bytes)
f.seek(sd.profiles_offset)
raw_op_table = struct.unpack(
"<%dH" % sd.sb_ops_count,
f.read(2 * sd.sb_ops_count),
)
table_view = build_profile_op_table_view(
raw_op_table=raw_op_table,
node_count=len(nodes),
sb_ops=sd.sb_ops,
attribution_mode=AttributionMode.DIRECT_SLOT_TO_OP_ID,
profile_type=sd.type,
sb_ops_count=sd.sb_ops_count,
)
decoded = list(table_view.decoded_offsets)
# Default terminal.
missing_nodes: set[int] = set()
default_node = _select_default_terminal_node(
decoded, nodes, missing_nodes=missing_nodes,
)
if not default_node or not default_node.terminal:
raise ValueError("Cannot determine default terminal node")
default_off = default_node.offset
default_action = "allow" if default_node.terminal.is_allow() else "deny"
profile_type = sd.type
if source_explicit_ops is None:
source_explicit_ops = set()
# Build trigger context for refined classification.
trigger_context = _build_trigger_context(sd, nodes, profile_type)
# Classify each op-table slot.
slots: list[dict[str, Any]] = []
for slot in table_view.attributed.slots:
idx = int(slot.slot_index)
op_id = int(slot.op_id)
op_name = str(slot.op_name)
off = int(slot.decoded_node_offset)
if off == default_off:
assignment_class = "default"
else:
node = operation_node.find_operation_node_by_offset(
nodes, off, missing_nodes,
)
assignment_class = _classify_slot(
op_name, node, default_action, source_explicit_ops,
profile_type=profile_type,
trigger_context=trigger_context,
)
slots.append({
"slot": idx,
"op_id": op_id,
"op_name": op_name,
"node_offset": off,
"assignment_class": assignment_class,
})
# Shared-entrypoint echoes: compiler can wire baseline/sibling operations
# to the same entrypoint as an explicit source operation.
explicit_offsets = {
s["node_offset"] for s in slots if s["assignment_class"] == "explicit"
}
for slot in slots:
if (
slot["assignment_class"] == "unknown"
and slot["node_offset"] in explicit_offsets
):
slot["assignment_class"] = "shared_entrypoint"
# Wildcard-coverage deduplication: when a child op shares the exact
# same non-default offset as a wildcard parent, both map to the same
# compiler decision graph. Reclassify the child to avoid spurious
# class mismatches when source and roundtrip compilers handle wildcard
# expansion differently.
wildcard_offset_map: dict[str, int] = {}
for s in slots:
if s["op_name"].endswith("*") and s["assignment_class"] == "explicit":
wildcard_offset_map[s["op_name"]] = s["node_offset"]
if wildcard_offset_map:
for s in slots:
if (
s["assignment_class"] == "explicit"
and not s["op_name"].endswith("*")
):
# Walk progressively shorter prefixes to find parent wildcard.
parts = s["op_name"].split("-")
for i in range(len(parts) - 1, 0, -1):
candidate = "-".join(parts[:i]) + "*"
wc_offset = wildcard_offset_map.get(candidate)
if wc_offset is not None and wc_offset == s["node_offset"]:
s["assignment_class"] = "shared_entrypoint"
break
# Literal pool: parse records and compute digests.
# Parse pool records BEFORE node loop so we can build offset->literal_id map.
# Use base_addr from sandbox_data - it accounts for alignment padding.
lit_start = sd.base_addr
lit_bytes = blob_bytes[lit_start:] if lit_start < len(blob_bytes) else b""
lit_sha256 = hashlib.sha256(lit_bytes).hexdigest() if lit_bytes else None
# Parse pool records and compute digests.
pool_records = literal_pool.parse_literal_pool_records(lit_bytes) if lit_bytes else []
pool_records = literal_pool.classify_pool_records(pool_records, lit_bytes)
content_digest, layout_digest = literal_pool.compute_pool_digests(pool_records) if pool_records else (None, None)
# Build offset-to-literal_id lookup for LiteralRef bindings.
# The filter_arg (argument_id) is an 8-byte-aligned index: byte_offset = arg_id * 8.
# Map from byte offset (rel_offset) to literal_id for fast lookup.
#
# Note: The compiler aligns literal records to 8-byte boundaries, but the
# pool parser may not capture all records due to padding. We try to match
# parsed records first, then fall back to direct byte offset if needed.
offset_to_lit_id: dict[int, int] = {}
lit_id_to_record: dict[int, dict[str, Any]] = {}
for rec in pool_records:
if rec.get("class") == "len_prefixed" and rec.get("literal_id") is not None:
offset_to_lit_id[rec["rel_offset"]] = rec["literal_id"]
lit_id_to_record[rec["literal_id"]] = rec
# For 8-byte aligned pools, also probe byte offsets directly.
# Build a secondary lookup for offsets not in the parsed records.
def probe_literal_at_offset(byte_offset: int) -> dict[str, Any] | None:
"""Probe literal pool at a specific byte offset."""
if byte_offset < 0 or byte_offset + 2 > len(lit_bytes):
return None
length = int.from_bytes(lit_bytes[byte_offset:byte_offset + 2], "little")
if length == 0 or byte_offset + 2 + length > len(lit_bytes):
return None
payload = lit_bytes[byte_offset + 2:byte_offset + 2 + length]
return {
"kind": "literal",
"lit_id": None, # Not in parsed pool
"rel_offset": byte_offset,
"len_u16": length,
"payload_sha256": hashlib.sha256(payload).hexdigest(),
}
# Node graph with LiteralRef evidence bindings.
node_records = []
unmatched_offsets: list[tuple[int, int, int]] = [] # (node_offset, filter_id, byte_offset)
for n in nodes:
rec: dict[str, Any] = {"offset": n.offset}
if n.terminal and not n.is_non_terminal():
rec["kind"] = "terminal"
rec["action"] = "allow" if n.terminal.is_allow() else "deny"
elif n.is_non_terminal():
rec["kind"] = "non_terminal"
rec["action"] = None
nt = n.non_terminal
rec["match_offset"] = nt.match.offset if nt and nt.match else None
rec["unmatch_offset"] = nt.unmatch.offset if nt and nt.unmatch else None
rec["filter_id"] = nt.filter_id if nt else None
# Capture argument_id and attempt LiteralRef binding.
# Only attempt for filters that use literal pool (string offset functions).
arg_id = getattr(nt, "argument_id", None)
if arg_id is not None:
rec["argument_id"] = arg_id
# Check what kind of argument this filter uses.
uses_literal_pool = False
uses_regex_table = False
if Filters.exists(nt.filter_id):
filter_info = Filters.get(nt.filter_id)
arg_fn = filter_info.get("arg_process_fn", "")
# Filters that read from literal pool use these functions:
uses_literal_pool = arg_fn in {
"get_filter_arg_string_by_offset",
"get_filter_arg_string_by_offset_with_type",
"get_filter_arg_string_by_offset_no_skip",
}
# Filters that read from regex table:
uses_regex_table = arg_fn == "get_filter_arg_regex_by_id"
if uses_regex_table:
# Emit TableRef for regex table
regex_idx = arg_id
rec["evidence_refs"] = [{
"kind": "table",
"table": "regex",
"index": regex_idx,
"raw_u16": arg_id,
}]
elif uses_literal_pool:
byte_offset = arg_id * 8
if byte_offset in offset_to_lit_id:
# Found in parsed pool records
lit_id = offset_to_lit_id[byte_offset]
lit_rec = lit_id_to_record[lit_id]
rec["evidence_refs"] = [{
"kind": "literal",
"lit_id": lit_id,
"rel_offset": lit_rec["rel_offset"],
"len_u16": lit_rec["len_u16"],
"payload_sha256": lit_rec["payload_sha256"],
}]
else:
# Try probing directly at the byte offset (8-byte aligned pools)
probed = probe_literal_at_offset(byte_offset)
if probed:
rec["evidence_refs"] = [probed]
elif byte_offset > 0:
# Track unmatched for diagnostics
unmatched_offsets.append((n.offset, nt.filter_id, byte_offset))
else:
rec["kind"] = "unknown"
rec["action"] = None
node_records.append(rec)
# Regex patterns from reverse context.
regex_patterns: list[dict[str, Any]] = []
if sd.regex_list:
for idx, pattern_list in enumerate(sd.regex_list):
if isinstance(pattern_list, list):
# Check if it's a placeholder
is_placeholder = (
len(pattern_list) == 1 and
isinstance(pattern_list[0], str) and
pattern_list[0].startswith("PAWL_UNPARSED_REGEX_")
)
regex_patterns.append({
"index": idx,
"patterns": pattern_list,
"status": "placeholder" if is_placeholder else "parsed",
})
elif isinstance(pattern_list, str):
regex_patterns.append({
"index": idx,
"patterns": [pattern_list],
"status": "parsed",
})
# Extract capability predicates (entitlement filters).
capability_predicates = _extract_capability_predicates(nodes, ctx)
# Extract message-filter structure for 0x4000 profiles.
opaque_regions = _extract_opaque_regions(profile_type, nodes)
return {
"schema_version": "pawl.profile.ir.v1",
"blob_sha256": blob_sha256,
"blob_length": len(blob_bytes),
"default_decision": default_action,
"profile_type_flags": profile_type,
"vocab_hash": vocab_hash,
"op_table": {
"slot_count": len(table_view.attributed.slots),
"decode": dict(table_view.op_table_decode),
"attribution": dict(table_view.attributed.metadata),
"slots": slots,
},
"node_graph": {
"node_count": len(nodes),
"stride": 8,
"default_terminal_offset": default_off,
"nodes": node_records,
"evidence_ref_stats": {
"nodes_with_literal_ref": sum(
1 for rec in node_records
if any(r.get("kind") == "literal" for r in rec.get("evidence_refs", []))
),
"nodes_with_table_ref": sum(
1 for rec in node_records
if any(r.get("kind") == "table" for r in rec.get("evidence_refs", []))
),
"unmatched_offset_count": len(unmatched_offsets),
"unmatched_offsets": unmatched_offsets[:10] if unmatched_offsets else None,
},
},
"literal_pool": {
"raw_sha256": lit_sha256,
"byte_length": len(lit_bytes),
"record_count": len(pool_records),
"records": pool_records,
"content_digest": content_digest,
"layout_digest": layout_digest,
},
"regex_patterns": {
"count": sd.regex_count,
"parsed_count": len([p for p in regex_patterns if p.get("status") == "parsed"]),
"patterns": regex_patterns,
},
"capability_predicates": capability_predicates,
"opaque_regions": opaque_regions,
}
def _extract_opaque_regions(
profile_type: int,
nodes: list,
) -> dict[str, Any]:
"""Extract opaque region information, particularly message-filter structure.
For 0x4000 profiles, captures:
- MF index nodes (terminals with 0x13BF, 0x13C0, 0x13C2 markers)
- MF-related operations (iokit-open-user-client, mach-bootstrap)
"""
is_mf_profile = bool(profile_type & 0x4000)
# MF index markers (little-endian: 0xBF13 = IOKit, 0xC013/0xC213 = Mach)
MF_MARKERS = {0xBF13, 0xC013, 0xC213}
mf_index_nodes: list[dict[str, Any]] = []
mf_ops: set[str] = set()
for node in nodes:
# Check for MF index nodes (terminals with MF markers)
if hasattr(node, "message_filter_index") and node.message_filter_index:
mf_info = node.message_filter_index
mf_index_nodes.append({
"node_offset": node.offset,
"marker": mf_info.get("marker"),
"marker_hex": f"0x{mf_info.get('marker', 0):04x}",
"u16_2": mf_info.get("u16_2"),
})
# Map MF markers to operation families (little-endian format)
mf_marker_to_op = {
0xBF13: "iokit", # IOKit message filter
0xC013: "mach", # Mach message filter
0xC213: "mach", # Mach message filter variant
}
for mf_node in mf_index_nodes:
marker = mf_node.get("marker")
if marker in mf_marker_to_op:
mf_ops.add(mf_marker_to_op[marker])
# Build message_filter section
message_filter = None
if is_mf_profile or mf_index_nodes:
message_filter = {
"is_mf_profile": is_mf_profile,
"type_flag_0x4000": is_mf_profile,
"type_flag_0x1000": bool(profile_type & 0x1000), # Mach MF variant
"index_node_count": len(mf_index_nodes),
"index_nodes": mf_index_nodes if mf_index_nodes else None,
"mf_ops": sorted(mf_ops) if mf_ops else None,
"roundtrip_blocked": is_mf_profile, # 0x4000 profiles can't roundtrip
"blocker_reason": "requires_private_entitlement" if is_mf_profile else None,
}
return {
"message_filter": message_filter,
}
def _extract_capability_predicates(
nodes: list,
ctx,
) -> dict[str, Any]:
"""Extract entitlement and capability predicates from the node graph.
Scans for filter nodes with entitlement filter IDs:
- 0x57: require-entitlement (runtime boolean check)
- 0x58: entitlement-is-present (presence check)
Returns dict with keys: require_entitlement, entitlement_is_present.
"""
# Filter IDs for entitlement predicates.
FILTER_REQUIRE_ENTITLEMENT = 0x57
FILTER_ENTITLEMENT_IS_PRESENT = 0x58
require_entitlement: list[dict[str, Any]] = []
entitlement_is_present: list[dict[str, Any]] = []
for node in nodes:
if not node.is_non_terminal() or not node.non_terminal:
continue
nt = node.non_terminal
filter_id = getattr(nt, "filter_id", None)
if filter_id is None:
continue
# Try to extract the entitlement key from the filter argument.
entitlement_key = None
if hasattr(nt, "argument") and nt.argument:
# argument contains the entitlement key string
arg = nt.argument
if isinstance(arg, str):
# Strip surrounding quotes if present
entitlement_key = arg.strip('"')
elif hasattr(arg, "value"):
entitlement_key = str(arg.value).strip('"')
if filter_id == FILTER_REQUIRE_ENTITLEMENT:
require_entitlement.append({
"key": entitlement_key,
"node_offset": node.offset,
"filter_id": filter_id,
})
elif filter_id == FILTER_ENTITLEMENT_IS_PRESENT:
entitlement_is_present.append({
"key": entitlement_key,
"node_offset": node.offset,
"filter_id": filter_id,
})
return {
"require_entitlement": require_entitlement if require_entitlement else None,
"entitlement_is_present": entitlement_is_present if entitlement_is_present else None,
}
def _build_trigger_context(
sd,
nodes: list,
profile_type: int,
) -> dict[str, Any]:
"""Build trigger context for refined classification.
Analyzes the profile to detect what triggered contextual implicit promotions:
- has_regex: Profile contains regex patterns
- is_mf_profile: Profile has message-filter (0x4000) type flag
- has_nonterm_filters: Profile has non-terminal nodes with real filters
- filter_families: Set of operation families with filters (e.g., {"iokit", "mach"})
"""
# Check for regex patterns.
has_regex = getattr(sd, "regex_count", 0) > 0
# Check for message-filter profile type.
is_mf_profile = bool(profile_type & 0x4000)
# Scan nodes for non-terminal filters and extract filter families.
has_nonterm_filters = False
filter_families: set[str] = set()
# Filter IDs that indicate real content filters (not guards).
# Guards like 0x0e (target) don't trigger contextual implicits.
_GUARD_FILTER_IDS = {0x0e} # "target" filter
for node in nodes:
if node.is_non_terminal() and node.non_terminal:
nt = node.non_terminal
filter_id = getattr(nt, "filter_id", None)
if filter_id is not None and filter_id not in _GUARD_FILTER_IDS:
has_nonterm_filters = True
# Infer filter family from filter ID ranges.
# These ranges are approximate based on the filter vocabulary.
if 0x01 <= filter_id <= 0x20:
filter_families.add("file") # path filters
elif 0x80 <= filter_id <= 0x8F:
filter_families.add("file") # regex filters for paths
elif filter_id in {0x13, 0x14}:
filter_families.add("mach") # mach service filters
elif filter_id in {0x15, 0x16}:
filter_families.add("iokit") # iokit filters
return {
"has_regex": has_regex,
"is_mf_profile": is_mf_profile,
"has_nonterm_filters": has_nonterm_filters,
"filter_families": filter_families,
}
def _is_bare_opposite_terminal(
node: operation_node.OperationNode | None,
default_action: str,
*,
profile_type: int,
) -> bool:
"""Return True if *node* is a bare terminal with the opposite action.
Structural heuristic: any terminal-only op whose action is opposite the
default, with no modifiers and no MF index, is a compiler-promoted
implicit. Used for IR classification only (not renderer suppression).
"""
if profile_type != 0x0000:
return False
if not (node and node.terminal and not node.is_non_terminal()):
return False
opposite = "allow" if default_action == "deny" else "deny"
actual = "allow" if node.terminal.is_allow() else "deny"
if actual != opposite:
return False
if hasattr(node.terminal, "db_modifiers"):
modifiers = [k for k, v in node.terminal.db_modifiers.items() if len(v)]
if modifiers:
return False
if getattr(node, "message_filter_index", None):
return False
return True
def _classify_slot(
op_name: str,
node: operation_node.OperationNode | None,
default_action: str,
source_explicit_ops: set[str],
*,
profile_type: int,
trigger_context: dict[str, Any] | None = None,
) -> str:
"""Classify a single op-table slot's assignment.
Returns one of: ``explicit``, ``implicit_baseline``,
``implicit_nonterm_guard``, ``contextual_implicit``,
``contextual_implicit_regex``, ``contextual_implicit_filter``,
``contextual_implicit_mf``, ``shared_entrypoint``, ``unknown``.
Parameters
----------
trigger_context:
Optional dict with keys:
- has_regex: bool - profile has regex patterns
- is_mf_profile: bool - profile type has 0x4000 flag
- filter_families: set[str] - filter families present (e.g., "iokit", "mach")
"""
if node is None:
return "unknown"
# 1. Explicit: op appears in source SBPL.
if op_name in source_explicit_ops:
return "explicit"
# 2. Baseline terminal implicit.
if _is_implicit_terminal_op(
op_name,
node,
default_action,
profile_type=profile_type,
):
return "implicit_baseline"
# 3. Baseline nonterm guard implicit.
if _is_implicit_nonterm_op(
op_name,
node,
default_action,
profile_type=profile_type,
):
return "implicit_nonterm_guard"
# 4. Contextual implicit (terminal or nonterm).
if _is_contextual_implicit_op(op_name, node, default_action):
# Refine based on trigger context.
return _refine_contextual_implicit(op_name, trigger_context)
# 5. Structural heuristic: bare opposite-terminal ops are compiler-
# promoted implicits even if they don't appear in any corpus. This
# is safe for classification (not for renderer suppression, where
# explicit bare ops are indistinguishable from implicits).
if _is_bare_opposite_terminal(
node,
default_action,
profile_type=profile_type,
):
return "implicit_baseline"
return "unknown"
def _refine_contextual_implicit(
op_name: str,
trigger_context: dict[str, Any] | None,
) -> str:
"""Refine contextual_implicit into sub-categories based on trigger.
Categories:
- contextual_implicit_mf: Message-filter related ops or MF profiles
- contextual_implicit_regex: Ops promoted when regex filters present
- contextual_implicit_filter: Ops promoted by specific filter families
- contextual_implicit: Fallback when trigger cannot be determined
Priority order:
1. MF-related (always takes precedence for MF profiles and default-message-filter)
2. Regex-triggered (for ops that are known regex-promoted when regex is present)
3. Filter-triggered (for other ops when filters are present)
"""
if trigger_context is None:
return "contextual_implicit"
# 1. Message-filter related ops or 0x4000 profiles.
if op_name == "default-message-filter":
return "contextual_implicit_mf"
if trigger_context.get("is_mf_profile"):
return "contextual_implicit_mf"
# 2. Regex-triggered: check BEFORE general filter classification.
# These ops are specifically known to be promoted by regex presence,
# independent of what filter families are present.
if trigger_context.get("has_regex"):
# Ops that are consistently regex-promoted across the corpus.
_REGEX_PROMOTED_OPS = {
"file-map-executable",
"file-test-existence",
"file-clone",
"darwin-notification-post",
"file*", # wildcard form
}
if op_name in _REGEX_PROMOTED_OPS:
return "contextual_implicit_regex"
# 3. Filter-triggered: ops that correlate with specific filter families.
filter_families = trigger_context.get("filter_families", set())
op_family = _get_op_family(op_name)
# If the profile has filters of the same family as this op, it's filter-triggered.
if op_family and op_family in filter_families:
return "contextual_implicit_filter"
# If profile has non-terminal filters but op doesn't match a family,
# still classify as filter-triggered (generic promotion).
if trigger_context.get("has_nonterm_filters"):
return "contextual_implicit_filter"
# Fallback: regex-triggered for remaining ops if regex is present.
if trigger_context.get("has_regex"):
return "contextual_implicit_regex"
return "contextual_implicit"
def _get_op_family(op_name: str) -> str | None:
"""Extract the operation family from an op name.
Returns the family prefix (e.g., "iokit", "mach", "file") or None.
"""
families = ["iokit", "mach", "file", "network", "socket", "process", "signal"]
for fam in families:
if op_name.startswith(fam + "-") or op_name.startswith(fam + "*"):
return fam
return None
def ir_structural_hash(ir: dict) -> str:
"""Compute a hierarchical structural hash of the IR's evidence sections.
Returns a composite hash built from three sub-hashes:
``{op_table_hash, graph_hash, literal_pool_hash}``.
When ``ir_eq`` fails, comparing the sub-hashes pinpoints the divergent
section.
The sub-hashes are stored on the IR dict as ``_section_hashes`` for
downstream diagnostics.
"""
# Op-table hash: sorted (op_name, assignment_class) pairs for non-default slots.
op_parts = []
for slot in ir["op_table"]["slots"]:
if slot["assignment_class"] != "default":
op_parts.append(f"{slot['op_name']}:{slot['assignment_class']}")
op_table_hash = hashlib.sha256("\n".join(sorted(op_parts)).encode()).hexdigest()
# Graph hash: (kind, action) tuples.
graph_parts = []
for n in ir["node_graph"]["nodes"]:
graph_parts.append(f"node:{n['kind']}:{n.get('action')}")
graph_hash = hashlib.sha256("\n".join(sorted(graph_parts)).encode()).hexdigest()
# Literal pool hash: prefer content_digest (semantic parity) over raw_sha256.
lit_content = ir["literal_pool"].get("content_digest")
if not lit_content:
lit_content = ir["literal_pool"].get("raw_sha256", "none")
literal_pool_hash = hashlib.sha256(f"lit:{lit_content}".encode()).hexdigest()
# Store section hashes for diagnostics.
ir["_section_hashes"] = {
"op_table_hash": op_table_hash,
"graph_hash": graph_hash,
"literal_pool_hash": literal_pool_hash,
}
# Composite hash.
composite = f"{op_table_hash}:{graph_hash}:{literal_pool_hash}"
return hashlib.sha256(composite.encode()).hexdigest()