-
Notifications
You must be signed in to change notification settings - Fork 31
Expand file tree
/
Copy pathpolicy_compose.py
More file actions
171 lines (134 loc) · 5.75 KB
/
policy_compose.py
File metadata and controls
171 lines (134 loc) · 5.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""
Policy Inheritance and Composition
Utilities for merging, inheriting, and overriding GovernancePolicy instances.
"""
from __future__ import annotations
from dataclasses import asdict, fields
from typing import Any
from .base import GovernancePolicy
# Fields that use "most restrictive" (min) merging
_MIN_FIELDS = {"max_tokens", "max_tool_calls", "max_concurrent", "timeout_seconds"}
# Fields that use "most restrictive" (truthy wins) merging
_BOOL_RESTRICTIVE = {"require_human_approval", "log_all_calls"}
# List fields with special merge semantics
_UNION_LIST_FIELDS = {"blocked_patterns"}
_INTERSECT_LIST_FIELDS = {"allowed_tools"}
def override_policy(base: GovernancePolicy, **kwargs: Any) -> GovernancePolicy:
"""Create a copy of *base* with the given field overrides applied."""
data = asdict(base)
data.update(kwargs)
return GovernancePolicy(**data)
def compose_policies(*policies: GovernancePolicy) -> GovernancePolicy:
"""Merge multiple policies using most-restrictive-wins semantics.
- ``max_tokens``, ``max_tool_calls``, ``max_concurrent``, ``timeout_seconds``:
smallest value wins.
- ``blocked_patterns``: union of all lists.
- ``allowed_tools``: intersection when both specify; single list kept as-is.
- ``version``: highest version string (lexicographic).
- ``name``: names joined with " + ".
- ``require_human_approval``, ``log_all_calls``: ``True`` if *any* policy
sets them.
"""
if not policies:
raise ValueError("compose_policies requires at least one policy")
if len(policies) == 1:
return override_policy(policies[0])
result = asdict(policies[0])
for policy in policies[1:]:
other = asdict(policy)
# Name
result["name"] = result["name"] + " + " + other["name"]
# Most-restrictive numeric fields
for f in _MIN_FIELDS:
result[f] = min(result[f], other[f])
# Boolean restrictive fields
for f in _BOOL_RESTRICTIVE:
result[f] = result[f] or other[f]
# Union list fields
for f in _UNION_LIST_FIELDS:
combined = list(result[f])
for item in other[f]:
if item not in combined:
combined.append(item)
result[f] = combined
# Intersect list fields (allowed_tools)
for f in _INTERSECT_LIST_FIELDS:
left = result[f]
right = other[f]
if left and right:
result[f] = [t for t in left if t in right]
elif right:
result[f] = list(right)
# if only left has values, keep them as-is
# Highest version
if other["version"] > result["version"]:
result["version"] = other["version"]
# Safety thresholds – most restrictive
result["confidence_threshold"] = max(
result["confidence_threshold"], other["confidence_threshold"]
)
result["drift_threshold"] = min(
result["drift_threshold"], other["drift_threshold"]
)
# Checkpoint frequency – more frequent wins
result["checkpoint_frequency"] = min(
result["checkpoint_frequency"], other["checkpoint_frequency"]
)
# Backpressure threshold – lower is more restrictive
result["backpressure_threshold"] = min(
result["backpressure_threshold"], other["backpressure_threshold"]
)
return GovernancePolicy(**result)
class PolicyHierarchy:
"""Tree-structured policy inheritance.
A hierarchy node wraps a *base* (parent) policy and can produce child
policies that inherit from it, with optional overrides.
"""
def __init__(self, base: GovernancePolicy) -> None:
self._base = base
@property
def policy(self) -> GovernancePolicy:
"""The base policy of this hierarchy node."""
return self._base
def extend(self, **overrides: Any) -> GovernancePolicy:
"""Return a new policy that inherits from the base with *overrides*."""
return override_policy(self._base, **overrides)
def child(self, name: str, **overrides: Any) -> PolicyHierarchy:
"""Create a child hierarchy node inheriting from this one."""
child_policy = self.extend(name=name, **overrides)
return PolicyHierarchy(child_policy)
def chain(self, *policies: GovernancePolicy) -> GovernancePolicy:
"""Apply policies in priority order on top of the base.
Later policies win for simple scalar fields; list fields are unioned.
"""
if not policies:
return override_policy(self._base)
result = asdict(self._base)
for policy in policies:
other = asdict(policy)
for f in fields(GovernancePolicy):
key = f.name
val = other[key]
if key == "name":
continue # handled separately
if key in _UNION_LIST_FIELDS:
combined = list(result[key])
for item in val:
if item not in combined:
combined.append(item)
result[key] = combined
elif key in _INTERSECT_LIST_FIELDS:
left = result[key]
right = val
if left and right:
result[key] = [t for t in left if t in right]
elif right:
result[key] = list(right)
else:
result[key] = val
# Build composite name
names = [self._base.name] + [p.name for p in policies]
result["name"] = " + ".join(names)
return GovernancePolicy(**result)