-
Notifications
You must be signed in to change notification settings - Fork 31
Expand file tree
/
Copy pathdry_run.py
More file actions
124 lines (98 loc) · 3.52 KB
/
dry_run.py
File metadata and controls
124 lines (98 loc) · 3.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""
Dry-run policy wrapper for governance policies.
Wraps any GovernancePolicy and runs enforcement in "dry run" mode,
recording what WOULD have happened without actually blocking execution.
"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Optional
from .base import BaseIntegration, ExecutionContext
class DryRunDecision(Enum):
"""Decision that would have been made by the wrapped policy."""
ALLOW = "ALLOW"
DENY = "DENY"
WARN = "WARN"
@dataclass
class DryRunResult:
"""Result of a dry-run policy evaluation."""
action: str
decision: DryRunDecision
reason: Optional[str]
policy_name: str
timestamp: datetime = field(default_factory=datetime.now)
class DryRunCollector:
"""Accumulates dry-run results and provides summary reports."""
def __init__(self) -> None:
self._results: list[DryRunResult] = []
def add(self, result: DryRunResult) -> None:
self._results.append(result)
def get_results(self) -> list[DryRunResult]:
return list(self._results)
def summary(self) -> dict[str, Any]:
total = len(self._results)
counts = {d.value: 0 for d in DryRunDecision}
for r in self._results:
counts[r.decision.value] += 1
return {
"total": total,
"allowed": counts["ALLOW"],
"denied": counts["DENY"],
"warnings": counts["WARN"],
}
def clear(self) -> None:
self._results.clear()
class DryRunPolicy:
"""
Wraps a BaseIntegration to run policy checks in dry-run mode.
All policy evaluations are recorded but never block execution.
"""
def __init__(
self,
integration: BaseIntegration,
*,
policy_name: str = "default",
collector: Optional[DryRunCollector] = None,
) -> None:
self._integration = integration
self._policy_name = policy_name
self.collector = collector or DryRunCollector()
def evaluate(self, action: str, context: ExecutionContext, input_data: Any = None) -> DryRunResult:
"""
Evaluate a policy check in dry-run mode.
Runs the wrapped integration's pre_execute and records the decision
without blocking. Always returns a DryRunResult.
"""
allowed, reason = self._integration.pre_execute(context, input_data)
if allowed:
decision = DryRunDecision.ALLOW
else:
# Distinguish warnings from denials: threshold-based checks
# (confidence, drift) are warnings; hard blocks are denials.
decision = DryRunDecision.DENY
result = DryRunResult(
action=action,
decision=decision,
reason=reason,
policy_name=self._policy_name,
)
self.collector.add(result)
return result
def evaluate_warn(self, action: str, reason: str) -> DryRunResult:
"""Record a warning without running policy checks."""
result = DryRunResult(
action=action,
decision=DryRunDecision.WARN,
reason=reason,
policy_name=self._policy_name,
)
self.collector.add(result)
return result
def get_results(self) -> list[DryRunResult]:
return self.collector.get_results()
def summary(self) -> dict[str, Any]:
return self.collector.summary()
def clear(self) -> None:
self.collector.clear()