-
Notifications
You must be signed in to change notification settings - Fork 214
Expand file tree
/
Copy pathanalyzer.py
More file actions
280 lines (230 loc) · 10.1 KB
/
analyzer.py
File metadata and controls
280 lines (230 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
"""GraySwan Cygnal security analyzer for OpenHands SDK.
This module provides a security analyzer that uses GraySwan's Cygnal API
for AI safety monitoring. It analyzes agent actions and conversation history
to detect potential security risks.
"""
from __future__ import annotations
import json
import os
from collections.abc import Sequence
from typing import Any
import httpx
from pydantic import Field, PrivateAttr, SecretStr, model_validator
from openhands.sdk.event import ActionEvent, LLMConvertibleEvent
from openhands.sdk.logger import get_logger
from openhands.sdk.security.analyzer import SecurityAnalyzerBase
from openhands.sdk.security.grayswan.utils import convert_events_to_openai_messages
from openhands.sdk.security.risk import SecurityRisk
logger = get_logger(__name__)
class GraySwanAnalyzer(SecurityAnalyzerBase):
"""Security analyzer using GraySwan's Cygnal API for AI safety monitoring.
This analyzer sends conversation history and pending actions to the GraySwan
Cygnal API for security analysis. The API returns a violation score which is
mapped to SecurityRisk levels.
Environment Variables:
GRAYSWAN_API_KEY: Required API key for GraySwan authentication
GRAYSWAN_POLICY_ID: Optional policy ID for custom GraySwan policy
Example:
>>> from openhands.sdk.security.grayswan import GraySwanAnalyzer
>>> analyzer = GraySwanAnalyzer()
>>> risk = analyzer.security_risk(action_event)
"""
history_limit: int = Field(
default=20,
description="Number of recent events to include as context",
)
max_message_chars: int = Field(
default=30000,
description="Max characters for conversation processing",
)
timeout: float = Field(
default=30.0,
description="Request timeout in seconds",
)
low_threshold: float = Field(
default=0.3,
description="Risk threshold for LOW classification (score <= threshold)",
)
medium_threshold: float = Field(
default=0.7,
description="Risk threshold for MEDIUM classification (score <= threshold)",
)
api_url: str = Field(
default="https://api.grayswan.ai/cygnal/monitor",
description="GraySwan Cygnal API endpoint",
)
api_key: SecretStr | None = Field(
default=None,
description="GraySwan API key (via GRAYSWAN_API_KEY env var)",
)
policy_id: str | None = Field(
default=None,
description="GraySwan policy ID (via GRAYSWAN_POLICY_ID env var)",
)
# Internal state - not serialized (using PrivateAttr for Pydantic)
_client: httpx.Client | None = PrivateAttr(default=None)
_events: list[LLMConvertibleEvent] = PrivateAttr(default_factory=list)
@model_validator(mode="after")
def validate_thresholds(self) -> GraySwanAnalyzer:
"""Validate that thresholds are properly ordered."""
if self.low_threshold >= self.medium_threshold:
raise ValueError(
f"low_threshold ({self.low_threshold}) must be less than "
f"medium_threshold ({self.medium_threshold})"
)
return self
def model_post_init(self, __context: Any) -> None:
"""Initialize the analyzer after model creation."""
# ALWAYS prefer environment variable - this ensures Docker gets the correct key
# even if serialization didn't work properly
env_key = os.getenv("GRAYSWAN_API_KEY")
if env_key:
self.api_key = SecretStr(env_key)
logger.info("Using GraySwan API key from environment")
elif not self.api_key or not self.api_key.get_secret_value():
logger.warning(
"GRAYSWAN_API_KEY not set. GraySwanAnalyzer will return UNKNOWN risk."
)
# Always prefer environment variable for policy ID too
env_policy = os.getenv("GRAYSWAN_POLICY_ID")
if env_policy:
self.policy_id = env_policy
logger.info(f"Using GraySwan policy ID from environment: {self.policy_id}")
elif not self.policy_id:
self.policy_id = "689ca4885af3538a39b2ba04"
logger.info(f"Using default GraySwan policy ID: {self.policy_id}")
logger.info(
f"GraySwanAnalyzer initialized with history_limit={self.history_limit}, "
f"timeout={self.timeout}s"
)
def set_events(self, events: Sequence[LLMConvertibleEvent]) -> None:
"""Set the events for context when analyzing actions.
Args:
events: Sequence of events to use as context for security analysis
"""
self._events = list(events)
def _create_client(self) -> httpx.Client:
"""Create a new HTTP client instance."""
api_key_value = self.api_key.get_secret_value() if self.api_key else ""
return httpx.Client(
timeout=self.timeout,
headers={
"Authorization": f"Bearer {api_key_value}",
"Content-Type": "application/json",
},
)
def _get_client(self) -> httpx.Client:
"""Get or create HTTP client."""
# Split condition to avoid AttributeError when _client is None
if self._client is None:
self._client = self._create_client()
elif self._client.is_closed:
self._client = self._create_client()
return self._client
def _map_violation_to_risk(self, violation_score: float) -> SecurityRisk:
"""Map GraySwan violation score to SecurityRisk.
Args:
violation_score: Score from 0.0 to 1.0 indicating violation severity
Returns:
SecurityRisk level based on configured thresholds
"""
if violation_score <= self.low_threshold:
return SecurityRisk.LOW
elif violation_score <= self.medium_threshold:
return SecurityRisk.MEDIUM
else:
return SecurityRisk.HIGH
def _call_grayswan_api(self, messages: list[dict[str, Any]]) -> SecurityRisk:
"""Call GraySwan API with formatted messages.
Args:
messages: List of messages in OpenAI format
Returns:
SecurityRisk level based on API response
"""
if not self.api_key:
logger.warning("No API key configured, returning UNKNOWN risk")
return SecurityRisk.UNKNOWN
try:
client = self._get_client()
payload = {"messages": messages, "policy_id": self.policy_id}
logger.debug(
f"Sending request to GraySwan API with {len(messages)} messages "
f"and policy_id: {self.policy_id}"
)
response = client.post(self.api_url, json=payload)
if response.status_code == 200:
try:
result = response.json()
except json.JSONDecodeError:
logger.error(f"Invalid JSON from GraySwan API: {response.text}")
return SecurityRisk.UNKNOWN
violation_score = result.get("violation")
# Validate response structure
if violation_score is None:
logger.error("GraySwan API response missing 'violation' field")
return SecurityRisk.UNKNOWN
risk_level = self._map_violation_to_risk(violation_score)
# Indirect prompt injection is auto-escalated to HIGH
if result.get("ipi"):
risk_level = SecurityRisk.HIGH
logger.warning(
"Indirect prompt injection detected, escalating to HIGH risk"
)
logger.info(
f"GraySwan risk assessment: {risk_level.name} "
f"(violation_score: {violation_score:.2f})"
)
return risk_level
else:
logger.error(
f"GraySwan API error {response.status_code}: {response.text}"
)
return SecurityRisk.UNKNOWN
except httpx.TimeoutException:
logger.error("GraySwan API request timed out")
return SecurityRisk.UNKNOWN
except Exception as e:
logger.error(f"GraySwan security analysis failed: {e}")
return SecurityRisk.UNKNOWN
def security_risk(self, action: ActionEvent) -> SecurityRisk:
"""Analyze action for security risks using GraySwan API.
This method converts the conversation history and the pending action
to OpenAI message format and sends them to the GraySwan Cygnal API
for security analysis.
Args:
action: The ActionEvent to analyze
Returns:
SecurityRisk level based on GraySwan analysis
"""
logger.debug(
f"Calling security_risk on GraySwanAnalyzer for action: {action.tool_name}"
)
if not self.api_key:
logger.warning("No API key configured for GraySwan analysis")
return SecurityRisk.UNKNOWN
try:
# Limit to recent history
recent_events = self._events
if len(recent_events) > self.history_limit:
recent_events = recent_events[-self.history_limit :]
# Convert events to OpenAI message format
events_to_process: list[LLMConvertibleEvent] = list(recent_events) + [
action
]
openai_messages = convert_events_to_openai_messages(events_to_process)
if not openai_messages:
logger.warning("No valid messages to analyze")
return SecurityRisk.UNKNOWN
logger.debug(
f"Converted {len(events_to_process)} events into "
f"{len(openai_messages)} OpenAI messages for GraySwan analysis"
)
return self._call_grayswan_api(openai_messages)
except Exception as e:
logger.error(f"GraySwan security analysis failed: {e}")
return SecurityRisk.UNKNOWN
def close(self) -> None:
"""Clean up resources."""
if self._client is not None and not self._client.is_closed:
self._client.close()
self._client = None