This document outlines security best practices for deploying and using the GenAI API Pentest Platform safely and responsibly.
# Set production environment
ENVIRONMENT=production
DEBUG=false
# Use strong, randomly generated secrets
SECRET_KEY=$(python3 -c "import secrets; print(secrets.token_urlsafe(32))")
JWT_SECRET=$(python3 -c "import secrets; print(secrets.token_urlsafe(32))")
# Secure database connection
DATABASE_URL=postgresql+asyncpg://user:password@secure-db-host:5432/genai_pentest?ssl=require
# Enable security features
RATE_LIMIT_ENABLED=true
CORS_ORIGINS=https://your-secure-domain.com
# Force HTTPS in production
FORCE_HTTPS=true
SECURE_COOKIES=true
# SSL certificate paths
SSL_CERT_PATH=/etc/ssl/certs/your-cert.pem
SSL_KEY_PATH=/etc/ssl/private/your-key.pem
# Allow only necessary ports
sudo ufw default deny incoming
sudo ufw default allow outgoing
sudo ufw allow 22/tcp # SSH (restrict to specific IPs)
sudo ufw allow 443/tcp # HTTPS only
sudo ufw enable
# Deny HTTP traffic in production
sudo ufw deny 80/tcp
server {
listen 443 ssl http2;
server_name your-domain.com;
# SSL Configuration
ssl_certificate /etc/letsencrypt/live/your-domain.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/your-domain.com/privkey.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-RSA-AES256-GCM-SHA512:DHE-RSA-AES256-GCM-SHA512;
ssl_prefer_server_ciphers off;
# Security Headers
add_header Strict-Transport-Security "max-age=63072000" always;
add_header X-Frame-Options DENY always;
add_header X-Content-Type-Options nosniff always;
add_header Referrer-Policy "strict-origin-when-cross-origin" always;
add_header Content-Security-Policy "default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'" always;
# Rate Limiting
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
limit_req zone=api burst=20 nodelay;
location / {
proxy_pass http://127.0.0.1:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Timeouts
proxy_connect_timeout 30s;
proxy_send_timeout 30s;
proxy_read_timeout 30s;
}
}
# Redirect HTTP to HTTPS
server {
listen 80;
server_name your-domain.com;
return 301 https://$server_name$request_uri;
}
-- Create dedicated user with limited privileges
CREATE USER genai_user WITH PASSWORD 'strong-random-password';
CREATE DATABASE genai_pentest OWNER genai_user;
-- Grant minimal required permissions
GRANT CONNECT ON DATABASE genai_pentest TO genai_user;
GRANT USAGE ON SCHEMA public TO genai_user;
GRANT CREATE ON SCHEMA public TO genai_user;
-- Revoke dangerous permissions
REVOKE ALL ON DATABASE postgres FROM genai_user;
REVOKE ALL ON SCHEMA information_schema FROM genai_user;
# Use SSL for database connections
DATABASE_URL=postgresql+asyncpg://user:pass@host:5432/db?ssl=require&sslmode=verify-full
# Connection pooling limits
DB_POOL_SIZE=10
DB_MAX_OVERFLOW=20
DB_POOL_TIMEOUT=30
# Use environment variables
export OPENAI_API_KEY="sk-your-secure-key"
export ANTHROPIC_API_KEY="sk-ant-your-secure-key"
# Or use a secrets management system
# AWS Secrets Manager
aws secretsmanager get-secret-value --secret-id prod/genai-pentest/openai-key
# HashiCorp Vault
vault kv get -field=api_key secret/genai-pentest/openai
# Azure Key Vault
az keyvault secret show --name openai-api-key --vault-name your-vault
# Implement key rotation
import boto3
from datetime import datetime, timedelta
def rotate_api_keys():
"""Rotate API keys every 90 days"""
secrets_client = boto3.client('secretsmanager')
# Check key age
response = secrets_client.describe_secret(SecretId='prod/genai-pentest/openai-key')
last_changed = response['LastChangedDate']
if datetime.now() - last_changed > timedelta(days=90):
# Rotate key
new_key = generate_new_api_key()
secrets_client.update_secret(
SecretId='prod/genai-pentest/openai-key',
SecretString=new_key
)
# Strong JWT configuration
JWT_ALGORITHM = "RS256" # Use RS256 instead of HS256
JWT_ACCESS_TOKEN_EXPIRE_MINUTES = 15
JWT_REFRESH_TOKEN_EXPIRE_DAYS = 7
# Token validation
def validate_token(token: str) -> Optional[dict]:
try:
payload = jwt.decode(
token,
PUBLIC_KEY,
algorithms=["RS256"],
options={"verify_exp": True, "verify_iat": True}
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(401, "Token expired")
except jwt.InvalidTokenError:
raise HTTPException(401, "Invalid token")
# Define user roles
class UserRole(Enum):
ADMIN = "admin"
ANALYST = "analyst"
VIEWER = "viewer"
# Role-based endpoint protection
@app.post("/api/scan/start")
@require_role([UserRole.ADMIN, UserRole.ANALYST])
async def start_scan(current_user: dict = Depends(get_current_user)):
# Only admins and analysts can start scans
pass
@app.get("/api/reports/{scan_id}")
@require_role([UserRole.ADMIN, UserRole.ANALYST, UserRole.VIEWER])
async def get_report(scan_id: str, current_user: dict = Depends(get_current_user)):
# All authenticated users can view reports
pass
from pydantic import BaseModel, validator, HttpUrl
from typing import List, Optional
class ScanRequest(BaseModel):
target_url: HttpUrl
scan_mode: str
providers: Optional[List[str]] = []
@validator('scan_mode')
def validate_scan_mode(cls, v):
allowed_modes = {'basic', 'standard', 'comprehensive', 'ai-deep'}
if v not in allowed_modes:
raise ValueError(f'Invalid scan mode. Must be one of: {allowed_modes}')
return v
@validator('providers')
def validate_providers(cls, v):
allowed_providers = {'openai', 'anthropic', 'google', 'openrouter', 'local'}
invalid = set(v) - allowed_providers
if invalid:
raise ValueError(f'Invalid providers: {invalid}')
return v
import mimetypes
import magic
from pathlib import Path
ALLOWED_EXTENSIONS = {'.json', '.yaml', '.yml'}
ALLOWED_MIME_TYPES = {
'application/json',
'application/x-yaml',
'text/yaml',
'text/plain'
}
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
async def secure_file_upload(file: UploadFile) -> Path:
# Validate filename
if not file.filename:
raise HTTPException(400, "No filename provided")
# Sanitize filename
safe_filename = re.sub(r'[^\w\-_\.]', '', file.filename)
if not safe_filename:
raise HTTPException(400, "Invalid filename")
# Check file extension
file_ext = Path(safe_filename).suffix.lower()
if file_ext not in ALLOWED_EXTENSIONS:
raise HTTPException(400, f"File type not allowed: {file_ext}")
# Read and validate content
content = await file.read()
# Check file size
if len(content) > MAX_FILE_SIZE:
raise HTTPException(413, "File too large")
# Validate MIME type
mime_type = magic.from_buffer(content, mime=True)
if mime_type not in ALLOWED_MIME_TYPES:
raise HTTPException(400, f"Invalid file type: {mime_type}")
# Generate secure path
upload_dir = Path("data/uploads")
upload_dir.mkdir(exist_ok=True)
file_path = upload_dir / f"{uuid.uuid4()}_{safe_filename}"
# Save securely
async with aiofiles.open(file_path, 'wb') as f:
await f.write(content)
return file_path
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Apply rate limits to endpoints
@app.post("/api/scan/start")
@limiter.limit("5/minute") # Max 5 scans per minute
async def start_scan(request: Request):
pass
@app.post("/api/spec/upload")
@limiter.limit("10/minute") # Max 10 uploads per minute
async def upload_spec(request: Request):
pass
import aioredis
from datetime import timedelta
class RedisRateLimiter:
def __init__(self, redis_url: str):
self.redis = aioredis.from_url(redis_url)
async def is_allowed(self, key: str, limit: int, window: int) -> bool:
"""Check if request is within rate limit"""
current = await self.redis.incr(key)
if current == 1:
await self.redis.expire(key, window)
return current <= limit
async def get_remaining(self, key: str, limit: int) -> int:
"""Get remaining requests in current window"""
current = await self.redis.get(key)
if current is None:
return limit
return max(0, limit - int(current))
async def verify_scan_authorization(target_url: str, user_id: str) -> bool:
"""Verify user has authorization to scan target"""
# Check if domain is in user's authorized list
authorized_domains = await get_user_authorized_domains(user_id)
target_domain = urlparse(target_url).netloc
if target_domain not in authorized_domains:
raise HTTPException(403, "Not authorized to scan this domain")
# Additional verification (e.g., DNS TXT record)
verification_token = await get_user_verification_token(user_id)
if not await verify_domain_ownership(target_domain, verification_token):
raise HTTPException(403, "Domain ownership verification failed")
return True
async def verify_domain_ownership(domain: str, token: str) -> bool:
"""Verify domain ownership via DNS TXT record"""
try:
txt_records = await dns.resolver.resolve(f"_genai-pentest.{domain}", "TXT")
for record in txt_records:
if token in str(record):
return True
except:
pass
return False
class ScanScope:
def __init__(self, allowed_paths: List[str], blocked_paths: List[str]):
self.allowed_paths = allowed_paths
self.blocked_paths = blocked_paths
def is_endpoint_allowed(self, endpoint: str) -> bool:
"""Check if endpoint is within allowed scope"""
# Block sensitive paths
for blocked in self.blocked_paths:
if endpoint.startswith(blocked):
return False
# Allow only whitelisted paths if specified
if self.allowed_paths:
return any(endpoint.startswith(allowed) for allowed in self.allowed_paths)
return True
# Default blocked paths
DEFAULT_BLOCKED_PATHS = [
"/admin",
"/internal",
"/debug",
"/.env",
"/config",
"/metrics"
]
SAFE_SCAN_CONFIG = {
"max_concurrent_requests": 5,
"request_timeout": 10,
"max_retries": 2,
"respect_robots_txt": True,
"follow_redirects": False,
"verify_ssl": True,
"user_agent": "GenAI-Pentest/1.0 (Security Scanner)",
"safe_methods_only": True # Only GET, HEAD, OPTIONS
}
async def make_safe_request(url: str, method: str = "GET") -> Response:
"""Make a safe HTTP request with proper limits"""
# Only allow safe methods in basic mode
if SAFE_SCAN_CONFIG["safe_methods_only"] and method not in ["GET", "HEAD", "OPTIONS"]:
raise ValueError(f"Unsafe method not allowed: {method}")
async with httpx.AsyncClient(
timeout=SAFE_SCAN_CONFIG["request_timeout"],
verify=SAFE_SCAN_CONFIG["verify_ssl"]
) as client:
try:
response = await client.request(
method=method,
url=url,
headers={"User-Agent": SAFE_SCAN_CONFIG["user_agent"]},
follow_redirects=SAFE_SCAN_CONFIG["follow_redirects"]
)
return response
except httpx.TimeoutException:
logger.warning(f"Request timeout for {url}")
raise
import structlog
from enum import Enum
logger = structlog.get_logger("security")
class SecurityEvent(Enum):
LOGIN_SUCCESS = "login_success"
LOGIN_FAILURE = "login_failure"
SCAN_STARTED = "scan_started"
SCAN_UNAUTHORIZED = "scan_unauthorized"
FILE_UPLOAD = "file_upload"
ADMIN_ACTION = "admin_action"
async def log_security_event(
event: SecurityEvent,
user_id: Optional[str] = None,
ip_address: Optional[str] = None,
details: Optional[dict] = None
):
"""Log security-relevant events"""
logger.info(
"Security event",
event=event.value,
user_id=user_id,
ip_address=ip_address,
timestamp=datetime.utcnow().isoformat(),
details=details or {}
)
# Usage in endpoints
@app.post("/api/scan/start")
async def start_scan(request: Request, current_user: dict = Depends(get_current_user)):
await log_security_event(
SecurityEvent.SCAN_STARTED,
user_id=current_user["user_id"],
ip_address=request.client.host,
details={"target_url": scan_request.target_url}
)
from collections import defaultdict
from datetime import datetime, timedelta
class LoginMonitor:
def __init__(self):
self.failed_attempts = defaultdict(list)
self.blocked_ips = set()
async def record_failed_login(self, ip_address: str, username: str):
"""Record failed login attempt"""
now = datetime.utcnow()
# Clean old attempts (older than 1 hour)
self.failed_attempts[ip_address] = [
attempt for attempt in self.failed_attempts[ip_address]
if now - attempt["timestamp"] < timedelta(hours=1)
]
# Add new attempt
self.failed_attempts[ip_address].append({
"timestamp": now,
"username": username
})
# Block IP after 5 failed attempts
if len(self.failed_attempts[ip_address]) >= 5:
self.blocked_ips.add(ip_address)
await log_security_event(
SecurityEvent.LOGIN_FAILURE,
ip_address=ip_address,
details={"blocked": True, "attempt_count": len(self.failed_attempts[ip_address])}
)
def is_blocked(self, ip_address: str) -> bool:
return ip_address in self.blocked_ips
# Security Vulnerability Disclosure
We take security seriously. If you discover a security vulnerability, please:
## Reporting Process
1. **DO NOT** create a public GitHub issue
2. Email [email protected] with:
- Description of the vulnerability
- Steps to reproduce
- Potential impact assessment
- Your contact information
## Our Commitment
- We will acknowledge receipt within 24 hours
- We will provide regular updates on our progress
- We will credit you in our security advisories (if desired)
- We will not pursue legal action for good-faith security research
## Scope
**In Scope:**
- Authentication bypass
- Authorization flaws
- SQL injection
- XSS vulnerabilities
- Remote code execution
- Information disclosure
**Out of Scope:**
- Social engineering
- Physical attacks
- DoS attacks
- Vulnerabilities in third-party services
-
Never Use in Production Without Proper Authorization
- Always obtain written permission before scanning any API
- Comply with all applicable laws and regulations
- Follow responsible disclosure practices
-
API Key Security
- Never commit API keys to version control
- Use environment variables or secure secret management
- Rotate keys regularly
- Monitor API usage for anomalies
-
Data Protection
- Encrypt sensitive data at rest and in transit
- Implement proper access controls
- Regular security audits and penetration testing
- GDPR/CCPA compliance for personal data
-
Network Security
- Deploy behind a WAF (Web Application Firewall)
- Use VPN for administrative access
- Implement network segmentation
- Regular vulnerability scanning
Before deploying to production:
- All default credentials changed
- SSL/TLS properly configured
- Database properly secured
- Rate limiting implemented
- Input validation in place
- Security headers configured
- Audit logging enabled
- API keys properly managed
- Access controls implemented
- Security monitoring configured
- Incident response plan ready
- Regular backups configured
- Security scanning completed
For security-related questions or to report vulnerabilities:
- π Security Email: [email protected]
- π Security Documentation: Security Guide
- π¨ Emergency Contact: Use PGP encryption for sensitive reports
PGP Public Key: Download
Remember: Security is a shared responsibility. Always use this platform ethically and in compliance with all applicable laws and regulations.