Skip to content
This repository was archived by the owner on Nov 29, 2025. It is now read-only.

Commit 39fe4d0

Browse files
committed
Add validation specialist agent as tool: implement a 7-gate validation checklist for security findings to mitigate false positives;
1 parent 1c12370 commit 39fe4d0

File tree

2 files changed

+203
-1
lines changed

2 files changed

+203
-1
lines changed

src/modules/operation_plugins/README.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@ operation_plugins/
4040

4141
## Component Functions
4242

43+
### Specialist Agents (General Module)
44+
The `general` module currently ships with a `validation_specialist` tool that spins up its own Strands `Agent` to run the seven-gate validation checklist before a finding is accepted. The tool lives under `tools/validation_specialist.py` and follows a repeatable pattern:
45+
46+
- `_create_specialist_model()` pulls the same provider/model configuration used by the main agent.
47+
- The `@tool` entry point builds a Strands `Agent` with a focused system prompt plus the minimal tool set (`shell`, `http_request`, etc.) required for validation.
48+
49+
You can add additional specialists (e.g., SQLi, XSS, SSRF) by copying this file, adjusting the prompt/available tools, and registering the new tool name in `module.yaml`. The runtime orchestration automatically exposes any `tools/*.py` entry that uses this pattern.
50+
4351
### module.yaml
4452
Defines module metadata and capabilities:
4553

@@ -377,4 +385,4 @@ if loader.validate_module("custom_module"):
377385

378386
**Module Loading:** `src/modules/prompts/factory.py:ModulePromptLoader`
379387
**Agent Integration:** `src/modules/agents/cyber_autoagent.py:create_agent`
380-
**Report Generation:** `src/modules/tools/report_generator.py`
388+
**Report Generation:** `src/modules/tools/report_generator.py`
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
"""Validation specialist - Rigorous false positive prevention."""
2+
3+
import json
4+
import logging
5+
6+
from strands import Agent, tool
7+
from modules.config.manager import ConfigManager
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
VALIDATION_METHODOLOGY = """<validation_specialist>
13+
<role>Evidence validator - Apply scientific method to security claims</role>
14+
15+
<mandate>
16+
Prevent false positives via 7-gate validation checklist. Each gate must PASS for verified status.
17+
</mandate>
18+
19+
<seven_gates>
20+
<gate id="1" name="Baseline Artifact">
21+
- Requirement: Pre-exploit state captured in artifact
22+
- Test: Artifact file exists and readable
23+
- Failure: No baseline comparison possible
24+
</gate>
25+
26+
<gate id="2" name="Exploit Artifact">
27+
- Requirement: Post-exploit state captured in artifact
28+
- Test: Artifact file exists and readable
29+
- Failure: No exploit evidence
30+
</gate>
31+
32+
<gate id="3" name="Content Differential">
33+
- Requirement: Semantic content change (not just metadata)
34+
- Test: diff baseline vs exploit shows meaningful change
35+
- Failure: Status/size change only = Same layer, different variation
36+
</gate>
37+
38+
<gate id="4" name="Causality">
39+
- Requirement: YOUR change caused THEIR response change
40+
- Test: Repeat exploit→same outcome | Omit exploit→different outcome
41+
- Failure: Non-reproducible or correlation without causation
42+
</gate>
43+
44+
<gate id="5" name="Data Extraction">
45+
- Requirement: If claiming "extracted X", X must be IN response
46+
- Test: grep exploit_artifact for claimed data (e.g. session_token, admin hash)
47+
- Failure: Claimed data NOT in response (was in payload sent, not received)
48+
</gate>
49+
50+
<gate id="6" name="Layer Discrimination">
51+
- Requirement: Backend reached, not just different defensive layer
52+
- Test: Response contains application content (not WAF/CDN/challenge page)
53+
- Failure: Same error type (both Cloudflare, both WAF) = Same layer
54+
</gate>
55+
56+
<gate id="7" name="Authorization">
57+
- Requirement: Bypass claims need 2xx + protected data
58+
- Test: 40x = Authorization blocking (NOT bypassed) | 2xx + data = Bypass
59+
- Failure: "Got 403" ≠ "Bypassed auth" (403 = blocked)
60+
</gate>
61+
</seven_gates>
62+
63+
<decision_logic>
64+
ALL gates PASS:
65+
- validation_status: "verified"
66+
- confidence: No penalty
67+
- severity_max: As claimed (CRITICAL/HIGH)
68+
69+
ANY gate FAILS:
70+
- validation_status: "hypothesis"
71+
- confidence: Deduct 15% per failed gate
72+
- severity_max: Cap at MEDIUM (too weak for HIGH/CRITICAL)
73+
74+
Layer confusion (defensive layer mistaken for backend):
75+
- severity_max: Cap at INFO
76+
</decision_logic>
77+
78+
<output_format>
79+
Return JSON only:
80+
{
81+
"validation_status": "verified" | "hypothesis" | "error",
82+
"confidence": 0-100,
83+
"severity_max": "CRITICAL" | "HIGH" | "MEDIUM" | "LOW" | "INFO",
84+
"failed_gates": [1, 2, 3, 4, 5, 6, 7],
85+
"evidence_summary": "Detailed findings from artifact inspection",
86+
"recommendation": "Specific next action"
87+
}
88+
</output_format>
89+
90+
<execution_protocol>
91+
1. Use editor to read each artifact in artifact_paths
92+
2. Apply gates systematically (don't skip any)
93+
3. For extraction claims: grep artifact for specific data
94+
4. For bypass claims: diff artifacts for semantic content change
95+
5. Document which gates passed/failed with evidence
96+
6. Return JSON result only
97+
</execution_protocol>
98+
</validation_specialist>"""
99+
100+
101+
def _create_specialist_model():
102+
"""Create model for specialist - reuse main LLM/provider when swarm override is unavailable."""
103+
import os
104+
from modules.agents.cyber_autoagent import create_bedrock_model, create_local_model, create_litellm_model
105+
from modules.config.manager import ConfigManager
106+
107+
provider = os.getenv("CYBER_AGENT_PROVIDER", "bedrock")
108+
config_manager = ConfigManager()
109+
swarm_model_id = config_manager.get_swarm_model_id()
110+
region = os.getenv("AWS_REGION", config_manager.getenv("AWS_REGION", "us-east-1"))
111+
112+
def _build(model_id: str):
113+
if provider == "bedrock":
114+
return create_bedrock_model(model_id, region, provider)
115+
if provider == "ollama":
116+
return create_local_model(model_id, provider)
117+
if provider == "litellm":
118+
return create_litellm_model(model_id, region, provider)
119+
raise ValueError(f"Unknown provider: {provider}")
120+
121+
try:
122+
return _build(swarm_model_id)
123+
except Exception as exc: # fall back to main LLM if swarm override is misconfigured
124+
primary_model = config_manager.get_llm_config(provider).model_id
125+
logger.warning(
126+
"Specialist model '%s' unavailable for provider '%s' (%s). Falling back to main model '%s'.",
127+
swarm_model_id,
128+
provider,
129+
exc,
130+
primary_model,
131+
)
132+
return _build(primary_model)
133+
134+
135+
136+
@tool
137+
def validation_specialist(
138+
finding_description: str,
139+
artifact_paths: list[str],
140+
claimed_severity: str = "HIGH"
141+
) -> dict:
142+
"""Validate HIGH/CRITICAL findings via rigorous 7-gate checklist."""
143+
try:
144+
from strands_tools import editor, shell
145+
146+
validator = Agent(
147+
model=_create_specialist_model(),
148+
system_prompt=VALIDATION_METHODOLOGY,
149+
tools=[editor, shell],
150+
151+
)
152+
153+
task = f"""Validate security finding:
154+
155+
CLAIMED FINDING:
156+
{finding_description}
157+
158+
CLAIMED SEVERITY: {claimed_severity}
159+
160+
ARTIFACTS:
161+
{json.dumps(artifact_paths, indent=2)}
162+
163+
Execute 7-gate validation checklist. Return JSON only."""
164+
165+
result = validator(task)
166+
result_text = str(result)
167+
168+
# Parse JSON from response
169+
if "{" in result_text and "}" in result_text:
170+
json_start = result_text.find("{")
171+
json_end = result_text.rfind("}") + 1
172+
json_str = result_text[json_start:json_end]
173+
return json.loads(json_str)
174+
175+
# Fallback if no JSON found
176+
return {
177+
"validation_status": "hypothesis",
178+
"confidence": 40,
179+
"severity_max": "MEDIUM",
180+
"failed_gates": list(range(1, 8)),
181+
"evidence_summary": "Could not parse validation results",
182+
"recommendation": "Manually review artifacts"
183+
}
184+
185+
except Exception as e:
186+
logger.error(f"Validation specialist error: {e}")
187+
return {
188+
"validation_status": "error",
189+
"confidence": 0,
190+
"severity_max": "INFO",
191+
"failed_gates": [],
192+
"evidence_summary": f"Validation error: {str(e)}",
193+
"recommendation": "Fix specialist configuration"
194+
}

0 commit comments

Comments
 (0)