-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfive_point_harness.py
More file actions
1435 lines (1274 loc) · 57.4 KB
/
five_point_harness.py
File metadata and controls
1435 lines (1274 loc) · 57.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Five-Point Harness for profile-level roundtrip validation.
Compares an original compiled blob against a roundtripped blob using five
structured checks that progressively degrade from byte-level to semantic
equivalence. Each check provides a different lens on whether the roundtrip
preserved the profile's meaning.
Checks:
1. **sbpl_eq**: Canonical SBPL text comparison.
2. **ir_eq**: Per-operation subgraph hash comparison from
normalized Policy DAGs.
3. **op_table_eq**: Per-slot assignment_class comparison, ignoring
default and implicit slots.
4. **graph_eq**: For explicit slots, reachable subgraph structural
identity.
5. **runtime_eq**: Dual-oracle runtime comparison:
- PolicyWitness: sandbox_check based queries
- Sandboxed binary: actual file ops with secret marker
(None if unavailable).
"""
from __future__ import annotations
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from integration.ir.profile.ir_builder import (
build_profile_ir,
)
from pawl.contract.envelope import extract_subgraph_hashes
from pawl.forward import decode_policy_cd_from_compiled_blob
from integration.ir.profile.learn_contextual_implicits import (
extract_explicit_ops_from_sbpl,
)
from integration.ir.profile.runtime_compare import (
RuntimeCompareResult,
compare_runtime,
generate_mf_probe_plan,
generate_probe_plan,
)
from runtime.oracle.profile import run_binary_oracle
from pawl.normalize.reversed import compare_source_and_reversed
from pawl.normalize.baseline.compiler_model import predict_transforms
from pawl.normalize.baseline.predicates import is_baseline_operation
from pawl.reverse.param_reconstruct import reconstruct_sbpl_params
from runtime.core.unavailability import EXECUTION_ERROR, ProbeUnavailable
# FIXME(2026-02-12): Temporary timeout wrapper to prevent hangs on certain fixtures.
# Remove once root cause is identified and fixed. See investigation notes in PLAN.md.
# Tracking: p1_regex_06_complex, p1_regex_12_named_capture_unsupported hang in FPH.
_FPH_TIMEOUT_SECONDS = 240 # keep per-casefile harness timeout conservative
_NON_EXPLICIT_ASSIGNMENT_CLASSES = frozenset({
"default",
"implicit_baseline",
"implicit_nonterm_guard",
"contextual_implicit",
"contextual_implicit_regex",
"contextual_implicit_filter",
"contextual_implicit_mf",
"bare_opposite_terminal",
"shared_entrypoint",
"unknown",
})
@dataclass
class ProfileFivePointResult:
"""Result of the profile five-point harness comparison.
The harness publishes the comparison vector and distinguishing evidence
directly. Any flat status view is a downstream derivation, not part of the
harness contract.
"""
checks: dict[str, bool | None]
distinguishing: dict[str, Any]
ir_a: dict
ir_b: dict
blob_hash_a: str
blob_hash_b: str
@dataclass(frozen=True)
class _CompileDecodeSubgraphBridge:
"""Contract-level subgraph identity used by the profile harness `ir_eq` check."""
subgraph_hashes: dict[str, str]
def _build_compile_decode_subgraph_bridge(
blob_path: Path,
*,
source_path: Path,
search_paths: list[Path] | None,
repo_root: Path,
) -> _CompileDecodeSubgraphBridge:
"""Bridge one compiled blob into normalized per-operation subgraph hashes.
`build_profile_ir()` owns slot classification for profile-harness checks.
`ir_eq` is the one comparison that intentionally crosses into the normalized
Policy DAG lane because subgraph identity is defined there, not in
`pawl.profile.ir.v1`.
"""
compile_decode = decode_policy_cd_from_compiled_blob(
blob_path.read_bytes(),
source_path,
search_paths=search_paths or (),
repo_root=repo_root,
)
return _CompileDecodeSubgraphBridge(
subgraph_hashes=extract_subgraph_hashes(compile_decode.policy_normalized),
)
def five_point_harness(
source_sbpl: str,
original_blob_path: Path,
reversed_sbpl: str | None = None,
roundtrip_blob_path: Path | None = None,
*,
repo_root: Path,
disconnected_filters: list[dict] | None = None,
param_bindings: dict[str, str] | None = None,
search_paths: list[Path] | None = None,
source_path: Path | None = None,
timeout: int | None = None,
node_sharing_sidecar: dict[str, Any] | None = None,
) -> ProfileFivePointResult:
"""Run the five-point harness comparing original and roundtripped profiles.
Parameters
----------
source_sbpl:
Original SBPL source text (for extracting explicit ops).
original_blob_path:
Path to the original compiled blob.
reversed_sbpl:
SBPL text produced by the reverse engine. Optional for decode-only mode.
roundtrip_blob_path:
Path to the blob compiled from *reversed_sbpl*. Optional for decode-only
mode (e.g., message-filter profiles that can't be recompiled).
repo_root:
Repository root.
disconnected_filters:
Disconnected filter information for predicate reconstruction.
param_bindings:
Mapping of param names to resolved values for source substitution.
search_paths:
Directories to search for imported profiles. When provided, import
simulation filters out operations that came from imports (reducing
false positives in canonical comparison).
source_path:
Path to the source file (for relative import resolution).
timeout:
Optional timeout in seconds. If None, uses _FPH_TIMEOUT_SECONDS default.
Returns
-------
ProfileFivePointResult with the comparison checks and distinguishing
evidence, or a decode-only/timeout-shaped result where the checks remain
null and the distinguishing payload records why the comparison did not run.
"""
# FIXME(2026-02-12): Timeout wrapper using threading - remove once hang is fixed
# Signal-based timeout doesn't work reliably in all contexts.
import threading
import queue
effective_timeout = timeout if timeout is not None else _FPH_TIMEOUT_SECONDS
result_queue: queue.Queue = queue.Queue()
def _run_impl():
try:
result = _five_point_harness_impl(
source_sbpl=source_sbpl,
original_blob_path=original_blob_path,
reversed_sbpl=reversed_sbpl,
roundtrip_blob_path=roundtrip_blob_path,
repo_root=repo_root,
disconnected_filters=disconnected_filters,
param_bindings=param_bindings,
search_paths=search_paths,
source_path=source_path,
node_sharing_sidecar=node_sharing_sidecar,
)
result_queue.put(("ok", result))
except Exception as e:
result_queue.put(("error", e))
thread = threading.Thread(target=_run_impl, daemon=True)
thread.start()
thread.join(timeout=effective_timeout)
if thread.is_alive():
# Thread still running = timeout
# Note: daemon thread will be killed when main process exits
blob_hash = ""
try:
from tools import doctor
blob_hash = doctor.sha256(original_blob_path)
except Exception:
pass
return ProfileFivePointResult(
checks={
"sbpl_eq": None,
"canonical_eq": None,
"ir_eq": None,
"op_table_eq": None,
"graph_eq": None,
"runtime_eq": None,
},
distinguishing={
"timeout": True,
"timeout_seconds": effective_timeout,
},
ir_a={},
ir_b={},
blob_hash_a=blob_hash,
blob_hash_b="",
)
# Thread completed - get result
try:
status, result = result_queue.get_nowait()
if status == "ok":
return result
else:
raise result # Re-raise exception
except queue.Empty:
# Shouldn't happen, but handle it
return ProfileFivePointResult(
checks={
"sbpl_eq": None, "canonical_eq": None, "ir_eq": None,
"op_table_eq": None, "graph_eq": None, "runtime_eq": None,
},
distinguishing={"error": "result queue empty"},
ir_a={}, ir_b={},
blob_hash_a="", blob_hash_b="",
)
def _five_point_harness_impl(
source_sbpl: str,
original_blob_path: Path,
reversed_sbpl: str | None = None,
roundtrip_blob_path: Path | None = None,
*,
repo_root: Path,
disconnected_filters: list[dict] | None = None,
param_bindings: dict[str, str] | None = None,
search_paths: list[Path] | None = None,
source_path: Path | None = None,
node_sharing_sidecar: dict[str, Any] | None = None,
) -> ProfileFivePointResult:
"""Implementation of five_point_harness (wrapped with timeout above)."""
# Load sb_ops vocabulary for wildcard expansion.
ops_vocab_path = (
repo_root
/ "integration"
/ "carton"
/ "contract"
/ "bundle"
/ "relationships"
/ "mappings"
/ "vocab"
/ "ops.json"
)
sb_ops: list[str] | None = None
if ops_vocab_path.exists():
sb_ops = [e["name"] for e in json.loads(ops_vocab_path.read_text())["ops"]]
source_explicit_ops = extract_explicit_ops_from_sbpl(source_sbpl, sb_ops=sb_ops)
ir_a = build_profile_ir(
original_blob_path,
repo_root=repo_root,
source_explicit_ops=source_explicit_ops,
)
# Decode-only mode: MF profiles (0x4000) cannot be roundtripped because
# sandbox_apply requires the private entitlement com.apple.private.security.message-filter.
# Also triggered if roundtrip_blob_path is not provided.
is_mf_profile = bool(ir_a.get("profile_type_flags", 0) & 0x4000)
decode_only = is_mf_profile or roundtrip_blob_path is None
if decode_only:
# Return decode-only result: IR is valid but no roundtrip comparison possible.
decode_reason = "mf_profile_requires_private_entitlement" if is_mf_profile else "no_roundtrip_blob"
return ProfileFivePointResult(
checks={
"sbpl_eq": None,
"canonical_eq": None,
"ir_eq": None,
"op_table_eq": None,
"graph_eq": None,
"runtime_eq": None,
},
distinguishing={
"decode_only": True,
"decode_reason": decode_reason,
"mf_profile": is_mf_profile,
"profile_type_flags": ir_a.get("profile_type_flags", 0),
},
ir_a=ir_a,
ir_b={}, # No roundtrip IR
blob_hash_a=ir_a["blob_sha256"],
blob_hash_b="", # No roundtrip blob
)
# Build roundtrip IR for comparison
ir_b = build_profile_ir(
roundtrip_blob_path,
repo_root=repo_root,
source_explicit_ops=source_explicit_ops,
)
# Ensure reversed_sbpl is available for comparison
if reversed_sbpl is None:
reversed_sbpl = ""
checks: dict[str, bool | None] = {}
distinguishing: dict[str, Any] = {}
promotion_remap = _detect_promotion_remap(ir_a, ir_b, source_explicit_ops)
# ── Check 1: SBPL text equality ──────────────────────────────────────
# Canonical form, IR-informed filtering.
# Filter both texts to remove rules for ops the IR classifies as
# profile-type baseline. This avoids false negatives from the renderer
# emitting baseline operations that weren't in the source.
#
# If param_bindings are provided, reconstruct param references in the
# reversed SBPL before comparison. This handles cases where the compiler
# resolved (param "X") to a literal value at compile time.
# Assignment classes that are NOT source-declared. Operations with these
# classes are excluded from sbpl_eq text comparison entirely — the renderer
# may emit rules for them, but they have no source-side counterpart to match.
_implicit_classes = {
"implicit_baseline", "implicit_nonterm_guard",
"contextual_implicit", "contextual_implicit_regex",
"contextual_implicit_filter", "contextual_implicit_mf",
"bare_opposite_terminal", "default", "shared_entrypoint",
}
_implicit_ops_a = {
s["op_name"] for s in ir_a["op_table"]["slots"]
if s["assignment_class"] in _implicit_classes
}
# Shared-entrypoint ops: the compiler wired these to the same decision-graph
# node as an explicit sibling, so the reverser emits the sibling's predicates
# on them. They're already excluded from sbpl_eq above (implicit class), but
# canonical_eq/semantic_eq need the list for predicate-level filtering —
# the normalize layer doesn't have the IR, so we pass the list explicitly.
_shared_entrypoint_ops_a = {
s["op_name"] for s in ir_a["op_table"]["slots"]
if s.get("assignment_class") == "shared_entrypoint"
}
_implicit_ops_a |= {
s["op_name"] for s in ir_a["op_table"]["slots"]
if s["assignment_class"] == "unknown"
and is_baseline_operation(s["op_name"])
}
if promotion_remap:
# Promoted parents are the semantic representative of source-declared
# child operations; keep them in the sbpl_eq comparison.
_implicit_ops_a.difference_update(promotion_remap.values())
normalized_source = _normalize_sbpl(source_sbpl)
if promotion_remap:
normalized_source = _remap_sbpl_operations(normalized_source, promotion_remap)
# Reconstruct param references in reversed SBPL if bindings provided
reversed_sbpl_for_compare = reversed_sbpl
if param_bindings:
reversed_sbpl_for_compare = reconstruct_sbpl_params(reversed_sbpl, param_bindings)
# Source-aware reconciliation (NORMALIZE-COVERAGE-PLAN Phase 1)
from integration.ir.profile.sbpl_reconcile import reconcile_sbpl_for_compare
reversed_sbpl_for_compare, _reconcile_info = reconcile_sbpl_for_compare(
reversed_sbpl_for_compare,
source_sbpl,
param_bindings=param_bindings,
search_paths=search_paths,
)
normalized_reversed = _normalize_sbpl(reversed_sbpl_for_compare)
filtered_source = _filter_sbpl_by_ir(normalized_source, _implicit_ops_a)
filtered_reversed = _filter_sbpl_by_ir(normalized_reversed, _implicit_ops_a)
checks["sbpl_eq"] = filtered_source == filtered_reversed
checks["sbpl_eq"], sbpl_eq_import_filtered = _maybe_recompute_sbpl_eq_with_import_filter(
current_eq=checks["sbpl_eq"],
normalized_source=normalized_source,
normalized_reversed=normalized_reversed,
implicit_ops=_implicit_ops_a,
source_sbpl=source_sbpl,
search_paths=search_paths,
source_path=source_path,
)
if sbpl_eq_import_filtered:
distinguishing["sbpl_eq_import_filtered"] = True
# ── Check 2: Per-operation subgraph hash equality ────────────────────
# Decode both blobs through the forward pipeline to get normalized
# Policy DAGs, extract per-operation subgraph hashes, and compare.
if source_path is not None:
try:
bridge_a = _build_compile_decode_subgraph_bridge(
original_blob_path,
source_path=source_path,
search_paths=search_paths,
repo_root=repo_root,
)
bridge_b = _build_compile_decode_subgraph_bridge(
roundtrip_blob_path,
source_path=source_path,
search_paths=search_paths,
repo_root=repo_root,
)
sg_a = bridge_a.subgraph_hashes
sg_b = bridge_b.subgraph_hashes
shared_ops = set(sg_a) & set(sg_b)
divergent_ops = [op for op in sorted(shared_ops) if sg_a[op] != sg_b[op]]
checks["ir_eq"] = len(divergent_ops) == 0
if not checks["ir_eq"]:
distinguishing["ir_divergent_ops"] = divergent_ops[:10]
distinguishing["ir_op_count_a"] = len(sg_a)
distinguishing["ir_op_count_b"] = len(sg_b)
except Exception as exc:
checks["ir_eq"] = False
distinguishing["ir_eq_error"] = str(exc)
else:
checks["ir_eq"] = None
distinguishing["ir_eq_skipped"] = "no_source_path"
# ── Check 3: Op-table per-slot comparison (explicit + unknown only) ──
op_diff = _compare_op_tables(ir_a, ir_b, promotion_remap=promotion_remap)
checks["op_table_eq"] = len(op_diff) == 0
if op_diff:
distinguishing["op_table_diffs"] = op_diff[:10]
# Surface unknown-slot counts for progress tracking.
unknown_a = sum(1 for s in ir_a["op_table"]["slots"] if s["assignment_class"] == "unknown")
unknown_b = sum(1 for s in ir_b["op_table"]["slots"] if s["assignment_class"] == "unknown")
if unknown_a or unknown_b:
distinguishing["unknown_count_a"] = unknown_a
distinguishing["unknown_count_b"] = unknown_b
# ── Check 4: Graph equality for explicit and unknown slots ───────────
# Use semantic comparison (decision paths) rather than structural comparison
# to tolerate filter reordering and tree restructuring.
# Exclusion set is computed across BOTH IRs so that ops non-explicit
# in either IR are excluded from both sides (cross-IR alignment).
_graph_excl = _graph_exclude_ops(ir_a, ir_b)
_has_imports = search_paths is not None
graph_ok, graph_diverging = _compare_explicit_graphs_semantic(
_normalize_ir_for_graph_compare(ir_a, promotion_remap=promotion_remap, source_side=True, exclude_ops=_graph_excl),
_normalize_ir_for_graph_compare(ir_b, promotion_remap=promotion_remap, source_side=False, exclude_ops=_graph_excl),
)
checks["graph_eq"] = graph_ok
# ── Check 5: Canonical policy equality ───────────────────────────────
# Deferred to after checks 2-4 because the REDUNDANT (C09) normalize
# pass needs structural gate results (ir_eq, op_table_eq, graph_eq)
# as context for its cluster-B guardrail.
#
# Uses the pawl.normalize module to:
# - Normalize predicates (character class ordering, whitespace)
# - Filter message-filter-only predicates from wrong scope
# - Reconstruct predicates from disconnected filter nodes
# - Substitute param references with bound values
redundant_context = {
"source_explicit_ops": sorted(source_explicit_ops),
"source_slots": ir_a["op_table"]["slots"],
"reversed_slots": ir_b["op_table"]["slots"],
"blob_equal": ir_a["blob_sha256"] == ir_b["blob_sha256"],
"ir_eq": checks.get("ir_eq") is True,
"op_table_eq": checks.get("op_table_eq") is True,
"graph_eq": checks.get("graph_eq") is True,
}
canonical_result = compare_source_and_reversed(
source_sbpl,
reversed_sbpl,
disconnected_filters=disconnected_filters or [],
param_bindings=param_bindings,
semantic=True, # Also compute semantic equivalence
filter_imports=search_paths is not None,
filter_baseline=True, # Filter known baseline predicates from imports
search_paths=search_paths,
source_path=source_path,
augment_entitlements=True, # Inject entitlement blocks from source into reversed
filter_shared_predicates=True, # Filter compiler-shared predicates (see _shared_entrypoint_ops_a)
shared_operations=sorted(_shared_entrypoint_ops_a), # IR-derived; normalize layer has no IR access
filter_redundant_rules=True, # C09 REDUNDANT filtering lives in normalize, not FPH.
redundant_context=redundant_context,
node_sharing_sidecar=node_sharing_sidecar,
)
checks["canonical_eq"] = canonical_result["equivalent"]
# ── Check 6: Semantic equivalence ────────────────────────────────────
# Ignores require-any/require-all grouping. Passes when source and
# reversed have the same leaf predicates even if grouping differs.
checks["semantic_eq"] = canonical_result.get("semantic_equivalent", False)
if _should_promote_graph_eq_for_import_context(
has_imports=_has_imports,
graph_ok=graph_ok,
canonical_eq=checks["canonical_eq"],
semantic_eq=checks["semantic_eq"],
):
checks["graph_eq"] = True
distinguishing["graph_eq_promoted_from_normalize"] = {
"reason": "import_context_normalized_equivalent",
"diverging_ops": graph_diverging[:10],
}
elif not graph_ok:
distinguishing["graph_diverging_ops"] = graph_diverging[:10]
if not checks["canonical_eq"]:
distinguishing["canonical_diff"] = canonical_result.get("diff")
# Include semantic diff if structural failed but semantic might have passed
if checks["semantic_eq"] and canonical_result.get("semantic_diff"):
distinguishing["semantic_diff"] = canonical_result.get("semantic_diff")
if canonical_result.get("has_reconstruction"):
distinguishing["has_reconstruction"] = True
distinguishing["predicted_transforms"] = [
t.name for t in predict_transforms(source_sbpl)
]
# Include import filtering metadata if available
if canonical_result.get("import_filtered_count"):
distinguishing["import_filtered_count"] = canonical_result["import_filtered_count"]
if canonical_result.get("unresolved_imports"):
distinguishing["unresolved_imports"] = canonical_result["unresolved_imports"]
# ── Compiler-loss adjusted eq ────────────────────────────────────────
# When the node-sharing sidecar is provided, compiler_loss_adjusted_eq
# is True when ALL remaining diffs are explained by compiler optimization.
# This reclassifies sub-group A failures from "reverser defect" to
# "compiler loss" (documented and attributed, not unknown).
if "compiler_loss_adjusted_eq" in canonical_result:
checks["compiler_loss_adjusted_eq"] = canonical_result["compiler_loss_adjusted_eq"]
if canonical_result.get("node_sharing_info"):
distinguishing["node_sharing_info"] = canonical_result["node_sharing_info"]
if canonical_result.get("shared_predicate_info"):
distinguishing["shared_predicate_info"] = canonical_result["shared_predicate_info"]
if canonical_result.get("redundant_info"):
distinguishing["redundant_info"] = canonical_result["redundant_info"]
# The REDUNDANT normalize pass can designate a bounded set of source ops
# to omit from sbpl_eq text comparison. This keeps sbpl_eq aligned with
# canonical/semantic reconciliation without hardcoding category rules in FPH.
redundant_ops = set(
canonical_result["redundant_info"].get("sbpl_filter_operations", [])
)
if redundant_ops:
checks["sbpl_eq"] = (
_filter_sbpl_by_ir(normalized_source, _implicit_ops_a | redundant_ops)
== _filter_sbpl_by_ir(normalized_reversed, _implicit_ops_a | redundant_ops)
)
distinguishing["sbpl_eq_redundant_filtered"] = sorted(redundant_ops)
# ── Check 7: Runtime equality ────────────────────────────────────────
# Dual-oracle comparison.
# Oracle 1: PolicyWitness (sandbox_check based)
# Oracle 2: Sandboxed binary (actual file ops with secret marker)
#
# Both must pass for runtime_eq=True. If either fails or mismatches,
# runtime_eq=False. If both are unavailable, runtime_eq=None.
runtime_result: dict[str, Any] = {
"policy_witness": None,
"sandboxed_binary": None,
"combined": None,
}
is_mf = bool(ir_a.get("profile_type_flags", 0) & 0x4000)
# Oracle 1: PolicyWitness
pw_result: RuntimeCompareResult | None = None
try:
if is_mf:
probe_plan = generate_mf_probe_plan(ir_a)
else:
probe_plan = generate_probe_plan(ir_a)
runner_requirements: dict[str, Any] | None = None
if is_mf:
runner_requirements = {"requires_xpc": True}
pw_result = compare_runtime(
source_sbpl, reversed_sbpl, probe_plan,
runner_requirements=runner_requirements,
param_bindings=param_bindings,
)
runtime_result["policy_witness"] = {
"eq": pw_result.equal,
"status": pw_result.status,
}
if pw_result.diagnostics:
runtime_result["policy_witness"]["diagnostics"] = pw_result.diagnostics
if pw_result.unavailable:
runtime_result["policy_witness"]["unavailable"] = pw_result.unavailable
if pw_result.equal is not True and pw_result.first_divergence:
runtime_result["policy_witness"]["first_divergence"] = pw_result.first_divergence
except Exception as exc:
runtime_result["policy_witness"] = {"eq": None, "error": str(exc)}
# Oracle 2: Sandboxed binary (for file operations)
# Requires SBPL files on disk - create temp files from SBPL text.
sb_result: dict[str, Any] | None = None
try:
import tempfile
import shutil
# Create temp directory for SBPL files and workspace
workspace = Path(tempfile.mkdtemp(prefix="pawl_dual_oracle_"))
try:
source_sbpl_path = workspace / "source.sb"
roundtrip_sbpl_path = workspace / "roundtrip.sb"
# Pass raw SBPL with (param ...) references intact; the runner
# uses sandbox_init_with_parameters to resolve them natively,
# which also resolves params inside (import ...) targets.
# For the roundtrip, param_reconstruct may have converted literal
# paths back to (param ...) — same bindings resolve them.
source_sbpl_path.write_text(source_sbpl)
roundtrip_sbpl_path.write_text(reversed_sbpl)
sb_result = run_binary_oracle(
source_sbpl_path,
roundtrip_sbpl_path,
ir_a,
workspace=workspace / "probes",
param_bindings=param_bindings,
)
runtime_result["sandboxed_binary"] = {
"eq": sb_result.get("eq"),
"probes": sb_result.get("probes", 0),
}
if sb_result.get("skipped"):
runtime_result["sandboxed_binary"]["skipped"] = sb_result["skipped"]
if sb_result.get("mismatches"):
runtime_result["sandboxed_binary"]["mismatches"] = sb_result["mismatches"][:5]
finally:
shutil.rmtree(workspace, ignore_errors=True)
except Exception as exc:
reason = f"sandboxed_binary_error: {exc}"
runtime_result["sandboxed_binary"] = {
"eq": None,
"error": str(exc),
"unavailable": [
ProbeUnavailable(
category=EXECUTION_ERROR,
reason=reason,
layer="execution",
).to_json()
],
}
# Combine results: both oracles must pass (or be skipped/unavailable)
pw_eq = runtime_result["policy_witness"].get("eq") if runtime_result["policy_witness"] else None
sb_eq = runtime_result["sandboxed_binary"].get("eq") if runtime_result["sandboxed_binary"] else None
sb_skipped = runtime_result["sandboxed_binary"].get("skipped") if runtime_result["sandboxed_binary"] else None
if pw_eq is True and (sb_eq is True or sb_skipped):
runtime_result["combined"] = True
checks["runtime_eq"] = True
distinguishing["runtime_eq"] = runtime_result
elif pw_eq is False or sb_eq is False:
runtime_result["combined"] = False
checks["runtime_eq"] = False
distinguishing["runtime_eq"] = runtime_result
elif pw_eq is None and sb_eq is None:
runtime_result["combined"] = None
checks["runtime_eq"] = None
else:
# One oracle passed/skipped, other unavailable - partial success
runtime_result["combined"] = pw_eq if pw_eq is not None else sb_eq
checks["runtime_eq"] = runtime_result["combined"]
if runtime_result["combined"] is not True:
distinguishing["runtime_eq"] = runtime_result
if (
isinstance(runtime_result.get("policy_witness"), dict)
and runtime_result["policy_witness"].get("unavailable")
):
distinguishing["runtime_eq"] = runtime_result
return ProfileFivePointResult(
checks=checks,
distinguishing=distinguishing,
ir_a=ir_a,
ir_b=ir_b,
blob_hash_a=ir_a["blob_sha256"],
blob_hash_b=ir_b["blob_sha256"],
)
_PROMOTION_EQUIVALENTS: dict[str, str] = {
# 3c-B: mach promotion.
"mach-lookup": "mach-kernel-endpoint",
# 3c-E: file/ipc child -> wildcard promotions.
"file-read-data": "file-read*",
"ipc-posix-shm-read-data": "ipc-posix-shm*",
"ipc-posix-shm-write-data": "ipc-posix-shm*",
# 3c-C partial: iokit user-client widening.
"iokit-open-user-client": "iokit-open*",
}
def _slot_lookup(ir: dict[str, Any]) -> dict[str, dict[str, Any]]:
"""Build op_name -> slot record lookup from a profile IR."""
slots = ir.get("op_table", {}).get("slots", [])
return {
str(s["op_name"]): s
for s in slots
if isinstance(s, dict) and "op_name" in s
}
def _detect_promotion_remap(
ir_a: dict[str, Any],
ir_b: dict[str, Any],
source_explicit_ops: set[str],
) -> dict[str, str]:
"""Detect source child-ops that should compare as promoted parents.
The compiler may promote a source child operation (e.g.,
``file-read-data``) to its wildcard parent (``file-read*``), routing
the child's predicates through the parent's decision node. When this
happens, the reversed SBPL uses the parent operation name and the text
comparison needs to remap the source to match.
Detection conditions:
- Source explicitly declares the child, not the parent.
- The original IR routes the parent through an active node (class is
not default/explicit/None — typically ``unknown``).
- The roundtrip IR has the parent actively wired (``explicit`` or
``shared_entrypoint`` — both indicate an active decision node).
- Node offsets do not need to match across blobs.
- If the roundtrip child is still ``explicit``, it must share the
parent node; otherwise it represents an independent declaration.
"""
slots_a = _slot_lookup(ir_a)
slots_b = _slot_lookup(ir_b)
remap: dict[str, str] = {}
# Classes that indicate the roundtrip parent is actively wired.
_active_parent_classes = {"explicit", "shared_entrypoint"}
for child, parent in _PROMOTION_EQUIVALENTS.items():
if child not in source_explicit_ops or parent in source_explicit_ops:
continue
sa_parent = slots_a.get(parent)
sb_parent = slots_b.get(parent)
if not sa_parent or not sb_parent:
continue
if sa_parent.get("assignment_class") in (None, "default", "explicit"):
continue
if sb_parent.get("assignment_class") not in _active_parent_classes:
continue
# Note: node_offset equality is NOT required. Original and
# roundtrip blobs have different layouts, so the same decision
# content lives at different offsets. Semantic content is
# validated by decision_path_eq, not by offset identity.
# When the roundtrip child shares the same node as the parent,
# its explicit classification comes from wildcard coverage, not
# from an independent declaration. Allow the remap.
sb_child = slots_b.get(child)
if sb_child and sb_child.get("assignment_class") == "explicit":
child_offset = sb_child.get("node_offset")
parent_offset = sb_parent.get("node_offset")
if child_offset != parent_offset:
continue # child has its own node — not promoted
remap[child] = parent
return remap
def _remap_sbpl_operations(normalized: str, remap: dict[str, str]) -> str:
"""Rewrite allow/deny operation names in normalized SBPL using *remap*."""
if not remap:
return normalized
rewritten: list[str] = []
for line in normalized.splitlines():
stripped = line.strip()
if not stripped.startswith("(") or not stripped.endswith(")"):
rewritten.append(line)
continue
children = _extract_sexp_children(stripped[1:-1])
if len(children) < 2 or children[0] not in {"allow", "deny"}:
rewritten.append(line)
continue
op = children[1]
replacement = remap.get(op)
if replacement:
children[1] = replacement
rewritten.append("(" + " ".join(children) + ")")
continue
rewritten.append(line)
return "\n".join(rewritten)
def _normalized_slots_for_compare(
ir: dict[str, Any],
*,
promotion_remap: dict[str, str] | None,
source_side: bool,
) -> dict[str, dict[str, Any]]:
"""Return op_name->slot map normalized for promotion-aware comparison."""
slots = {k: dict(v) for k, v in _slot_lookup(ir).items()}
if not promotion_remap:
return slots
for child, parent in promotion_remap.items():
child_slot = slots.get(child)
parent_slot = slots.get(parent)
if source_side:
if child_slot is None:
continue
merged = dict(parent_slot) if parent_slot is not None else dict(child_slot)
merged["op_name"] = parent
merged["assignment_class"] = "explicit"
child_class = child_slot.get("assignment_class")
parent_class = parent_slot.get("assignment_class") if parent_slot else None
if child_class != "default":
merged["node_offset"] = child_slot.get("node_offset")
elif parent_slot is not None and parent_class != "default":
merged["node_offset"] = parent_slot.get("node_offset")
else:
merged["node_offset"] = child_slot.get("node_offset")
slots[parent] = merged
slots.pop(child, None)
continue
# Roundtrip side: pop the child when its parent is actively wired
# (explicit or shared_entrypoint). The child's explicitness in
# the roundtrip comes from the parent's wildcard coverage, not
# from an independent source declaration.
_active = {"explicit", "shared_entrypoint"}
if (
child_slot is not None
and parent_slot is not None
and parent_slot.get("assignment_class") in _active
):
slots.pop(child, None)
return slots
def _normalize_ir_for_graph_compare(
ir: dict[str, Any],
*,
promotion_remap: dict[str, str] | None,
source_side: bool,
exclude_ops: frozenset[str] | None = None,
) -> dict[str, Any]:
"""Return an IR view filtered to explicit ops for graph comparison.
Non-explicit ops (default, implicit_*, contextual_implicit_*,
bare_opposite_terminal, shared_entrypoint, unknown) are excluded
because they represent compiler-managed operations whose decision
graphs diverge structurally between original and roundtrip blobs
without semantic significance.
*exclude_ops*, when provided, is an additional set of op names to
exclude. This is used to align exclusion sets across original and
roundtrip IRs when an op has different assignment classes in each
(e.g. shared_entrypoint in original but explicit in roundtrip).
"""
raw_slots = _normalized_slots_for_compare(
ir,
promotion_remap=promotion_remap,
source_side=source_side,
)
# Keep only explicitly-declared operations for graph comparison.
filtered = {
name: slot for name, slot in raw_slots.items()
if slot.get("assignment_class") not in _NON_EXPLICIT_ASSIGNMENT_CLASSES
and (exclude_ops is None or name not in exclude_ops)
}
slots = sorted(filtered.values(), key=lambda s: int(s.get("slot", 0)))
out = dict(ir)
op_table = dict(ir.get("op_table", {}))
op_table["slots"] = slots
out["op_table"] = op_table
return out
def _graph_exclude_ops(ir_a: dict, ir_b: dict) -> frozenset[str]:
"""Compute ops to exclude from graph comparison across both IRs.
An op should be excluded if it is non-explicit in EITHER IR.
This prevents false divergences from cross-IR classification
asymmetry (e.g. shared_entrypoint in original, explicit in roundtrip).
"""
exclude: set[str] = set()
for ir in (ir_a, ir_b):
for s in ir.get("op_table", {}).get("slots", []):
if s.get("assignment_class") in _NON_EXPLICIT_ASSIGNMENT_CLASSES:
exclude.add(s["op_name"])
return frozenset(exclude)
def _should_promote_graph_eq_for_import_context(
*,
has_imports: bool,
graph_ok: bool,
canonical_eq: bool,
semantic_eq: bool,
) -> bool:
"""Decide whether import context justifies promoting `graph_eq`.
`graph_eq` is an IR-level surface. When import context is present, the
compiled original can contain import-inherited predicates that the
roundtrip graph does not reproduce structurally. We only promote the
surface after the stronger normalize-layer checks already proved both
structural and leaf-predicate equivalence.
"""
return has_imports and not graph_ok and canonical_eq and semantic_eq
def _maybe_recompute_sbpl_eq_with_import_filter(
*,
current_eq: bool,
normalized_source: str,
normalized_reversed: str,
implicit_ops: set[str],
source_sbpl: str,
search_paths: list[Path] | None,
source_path: Path | None,
) -> tuple[bool, bool]:
"""Retry `sbpl_eq` with imported operations filtered out.
Returns `(sbpl_eq, import_filtered)` where `import_filtered` is true only
when the import-aware retry resolved the mismatch.
"""
if current_eq or search_paths is None:
return current_eq, False
from pawl.normalize.baseline.imports import compute_import_expansion
from pawl.normalize.operations import get_equivalent_operations
import_expansion = compute_import_expansion(
source_sbpl,
search_paths=search_paths,
source_path=source_path,
)
if not import_expansion.imported_operations:
return current_eq, False
imported_equivs: set[str] = set()
for imported_op in import_expansion.imported_operations:
imported_equivs.update(get_equivalent_operations(imported_op))
filtered_source = _filter_sbpl_by_ir(
normalized_source, implicit_ops | imported_equivs,
)
filtered_reversed = _filter_sbpl_by_ir(
normalized_reversed, implicit_ops | imported_equivs,
)
recomputed_eq = filtered_source == filtered_reversed
return recomputed_eq, recomputed_eq
def _normalize_sbpl(text: str) -> str:
"""Normalize SBPL to canonical form for order-insensitive comparison.
- Strip comments (lines starting with ``;``)
- Normalize ``(version 1)`` header
- Collapse multi-line rules to single canonical lines
- Lowercase quoted string literals (compiler lowercases them)
- Sort ``require-all`` / ``require-any`` children (compiler may reorder)
- Sort top-level rules alphabetically
"""
# Strip comments and blank lines.
lines = []
for line in text.splitlines():
stripped = line.strip()
if not stripped or stripped.startswith(";"):
continue
lines.append(stripped)
# Join into one string, then split into top-level S-expressions.
joined = " ".join(lines)
# Extract top-level parenthesized forms.
rules: list[str] = []
depth = 0
start = None
for i, ch in enumerate(joined):
if ch == "(":
if depth == 0:
start = i
depth += 1
elif ch == ")":
depth -= 1
if depth == 0 and start is not None:
rule = joined[start : i + 1]
# Collapse internal whitespace to single spaces.
rule = " ".join(rule.split())
rules.append(rule)
start = None