-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathenvelope.py
More file actions
314 lines (261 loc) · 10.9 KB
/
envelope.py
File metadata and controls
314 lines (261 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
"""IR envelope: bundles a Policy DAG with coordinate index and hash boundaries.
The envelope is the top-level serializable structure that carries a complete
policy analysis result. It is assembled from a ``Policy`` DAG plus
caller-provided blob metadata (section offsets, hashes, op-table indices).
``pawl.contract`` must NOT import from ``pawl.structure``, ``pawl.reverse``,
or ``pawl.forward``. All blob-level data arrives as arguments.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Mapping
from pawl.contract.coordinates import (
CoordinateIndex,
HashBoundaries,
SectionHashes,
build_coordinate_index,
)
from pawl.contract.policy_dag import Policy
IR_ENVELOPE_VERSION = "0.1.0"
@dataclass(frozen=True)
class IREnvelope:
"""Top-level IR envelope bundling a Policy DAG with its metadata.
Fields
------
ir_version:
Schema version of this envelope format.
world_id:
Identifies the policy universe (profile name, bundle ID, etc.).
policy_ir:
The Policy DAG.
evidence_ir:
Placeholder for future evidence layer.
coordinates:
Node-to-blob origin mappings.
hashes:
Blob section hashes at three granularities.
"""
ir_version: str
world_id: str
policy_ir: Policy
evidence_ir: object | None
coordinates: CoordinateIndex
hashes: HashBoundaries
def policy_view(self) -> dict[str, object]:
"""Return only the Policy IR layer as a dict."""
return self.policy_ir.as_dict()
def evidence_view(self) -> object | None:
"""Return only the Evidence IR layer (None when no blob)."""
return self.evidence_ir
def as_dict(self) -> dict[str, object]:
return {
"ir_version": self.ir_version,
"world_id": self.world_id,
"policy_ir": self.policy_ir.as_dict(),
"evidence_ir": self.evidence_ir,
"coordinates": self.coordinates.as_dict(),
"hashes": self.hashes.as_dict(),
}
@dataclass(frozen=True)
class StructuralGateResult:
"""Result payload for Track D.2 structural gate checks."""
passed: bool
checks: Mapping[str, bool]
errors: tuple[str, ...]
def as_dict(self) -> dict[str, object]:
return {
"passed": bool(self.passed),
"checks": dict(self.checks),
"errors": list(self.errors),
}
# ---------------------------------------------------------------------------
# Extract helpers (DAG → metadata)
# ---------------------------------------------------------------------------
def extract_node_origins(policy: Policy) -> dict[str, tuple[int, ...]]:
"""Extract node-stream origin offsets from a Policy DAG.
Walks all operations' nodes and extracts ``provenance.origin_offsets``
where present. Keys are ``"{op}:{node_id}"``. Merges shared nodes
by concatenating offset tuples when the same key appears more than once.
Returns an empty dict when no nodes carry provenance with offsets.
"""
origins: dict[str, tuple[int, ...]] = {}
for op_name, dag in policy.operations.items():
for node_id, node in dag.nodes.items():
prov = getattr(node, "provenance", None)
if prov is not None and prov.origin_offsets:
key = f"{op_name}:{node_id}"
offsets = tuple(prov.origin_offsets)
if key in origins:
origins[key] = origins[key] + offsets
else:
origins[key] = offsets
return origins
def extract_subgraph_hashes(policy: Policy) -> dict[str, str]:
"""Extract per-operation structural hashes from Policy metadata.
Reads ``metadata["structural_hash_entry"]`` from each ``OperationDAG``
that carries it. Requires a normalized Policy (caller responsibility).
"""
hashes: dict[str, str] = {}
for op_name, dag in policy.operations.items():
if dag.metadata and "structural_hash_entry" in dag.metadata:
hashes[op_name] = str(dag.metadata["structural_hash_entry"])
return hashes
# ---------------------------------------------------------------------------
# Assembly
# ---------------------------------------------------------------------------
def _all_node_keys(policy: Policy) -> dict[str, tuple[int, ...]]:
"""Enumerate every node in the Policy as ``{op}:{node_id}`` → ``()``."""
keys: dict[str, tuple[int, ...]] = {}
for op_name, dag in policy.operations.items():
for node_id in dag.nodes:
keys[f"{op_name}:{node_id}"] = ()
return keys
def validate_structural_gate_d2(envelope: IREnvelope) -> StructuralGateResult:
"""
Validate Track D.2 structural gate requirements for a blob-backed envelope.
D.2 requires:
- all hash boundaries populated (profile, sections, subgraph hashes)
- each Policy node has coordinate linkage or explicit ``absence_reason``
"""
errors: list[str] = []
expected_node_keys = set(_all_node_keys(envelope.policy_ir).keys())
expected_ops = set(envelope.policy_ir.operations.keys())
hashes = envelope.hashes
hash_profile_populated = hashes.profile_hash is not None
hash_sections_populated = hashes.section_hashes is not None
hash_subgraphs_populated = bool(hashes.subgraph_hashes)
hash_subgraphs_cover_ops = expected_ops.issubset(set(hashes.subgraph_hashes.keys()))
if not hash_profile_populated:
errors.append("hashes.profile_hash missing")
if not hash_sections_populated:
errors.append("hashes.section_hashes missing")
if not hash_subgraphs_populated:
errors.append("hashes.subgraph_hashes empty")
if not hash_subgraphs_cover_ops:
missing_ops = sorted(expected_ops - set(hashes.subgraph_hashes.keys()))
errors.append(f"hashes.subgraph_hashes missing operations: {missing_ops}")
coords = envelope.coordinates
coord_nodes_cover_policy = expected_node_keys.issubset(set(coords.nodes.keys()))
coord_op_table_cover_policy = expected_ops.issubset(set(coords.op_table.keys()))
coord_nodes_have_link_or_reason = True
coord_byte_offset_alignment = True
if not coord_nodes_cover_policy:
missing_nodes = sorted(expected_node_keys - set(coords.nodes.keys()))
errors.append(f"coordinates.nodes missing policy nodes: {missing_nodes}")
if not coord_op_table_cover_policy:
missing_ops = sorted(expected_ops - set(coords.op_table.keys()))
errors.append(f"coordinates.op_table missing operations: {missing_ops}")
for key in sorted(expected_node_keys):
coord = coords.nodes.get(key)
if coord is None:
coord_nodes_have_link_or_reason = False
continue
has_offsets = bool(coord.node_indices)
has_reason = coord.absence_reason is not None
if not has_offsets and not has_reason:
coord_nodes_have_link_or_reason = False
errors.append(f"coordinates.nodes[{key}] missing both offsets and absence_reason")
if has_offsets and len(coord.byte_offsets) != len(coord.node_indices):
coord_byte_offset_alignment = False
errors.append(
f"coordinates.nodes[{key}] byte_offsets/node_indices length mismatch"
)
checks = {
"hash_profile_populated": hash_profile_populated,
"hash_sections_populated": hash_sections_populated,
"hash_subgraphs_populated": hash_subgraphs_populated,
"hash_subgraphs_cover_ops": hash_subgraphs_cover_ops,
"coord_nodes_cover_policy": coord_nodes_cover_policy,
"coord_op_table_cover_policy": coord_op_table_cover_policy,
"coord_nodes_have_link_or_reason": coord_nodes_have_link_or_reason,
"coord_byte_offset_alignment": coord_byte_offset_alignment,
}
passed = all(checks.values())
return StructuralGateResult(
passed=passed,
checks=checks,
errors=tuple(errors),
)
def assert_structural_gate_d2(envelope: IREnvelope) -> None:
"""Raise ``ValueError`` when Track D.2 structural gate checks fail."""
result = validate_structural_gate_d2(envelope)
if result.passed:
return
if result.errors:
detail = "; ".join(result.errors)
else:
detail = "unknown structural gate failure"
raise ValueError(f"Track D.2 structural gate failed: {detail}")
def assemble_ir_envelope(
*,
policy: Policy,
world_id: str,
profile_hash: str | None = None,
section_hashes: SectionHashes | None = None,
op_table_indices: Mapping[str, int] | None = None,
nodes_start_offset: int = 0,
ir_version: str = IR_ENVELOPE_VERSION,
evidence_ir: object | None = None,
) -> IREnvelope:
"""Assemble an IR envelope from a Policy DAG and optional blob metadata.
Two modes:
**Blob-backed** (``profile_hash`` + ``section_hashes`` +
``op_table_indices`` all provided): builds full ``CoordinateIndex``
via ``build_coordinate_index()`` with real byte offsets, populates
``HashBoundaries`` from caller-provided hashes.
**Source-only** (no blob data): builds ``CoordinateIndex`` with
``absence_reason="source_evaluate_no_blob"`` for every node, empty
``HashBoundaries``.
"""
node_origins = extract_node_origins(policy)
subgraph_hashes = extract_subgraph_hashes(policy)
blob_backed = (
profile_hash is not None
and section_hashes is not None
and op_table_indices is not None
)
if blob_backed:
# Merge extracted origins into a full map of all node keys so that
# nodes without provenance still appear with absence reasons.
all_keys = _all_node_keys(policy)
all_keys.update(node_origins)
coordinates = build_coordinate_index(
node_origins=all_keys,
op_table_indices=op_table_indices,
nodes_start_offset=nodes_start_offset,
)
hashes = HashBoundaries(
profile_hash=profile_hash,
section_hashes=section_hashes,
subgraph_hashes=subgraph_hashes,
)
else:
# Source-only: all nodes get absence reason, no blob coordinates.
all_keys = _all_node_keys(policy)
absence_reasons = {
key: "source_evaluate_no_blob" for key in all_keys
}
coordinates = build_coordinate_index(
node_origins=all_keys,
op_table_indices={},
nodes_start_offset=0,
absence_reasons=absence_reasons,
)
hashes = HashBoundaries(subgraph_hashes=subgraph_hashes)
return IREnvelope(
ir_version=ir_version,
world_id=world_id,
policy_ir=policy,
evidence_ir=evidence_ir,
coordinates=coordinates,
hashes=hashes,
)
__all__ = [
"IR_ENVELOPE_VERSION",
"IREnvelope",
"StructuralGateResult",
"assemble_ir_envelope",
"validate_structural_gate_d2",
"assert_structural_gate_d2",
"extract_node_origins",
"extract_subgraph_hashes",
]