Skip to content

[SECURITY FEATURE] Content Size & Type Security Limits for Resources & Prompts #538

@crivetimihai

Description

@crivetimihai

🛡️ FEATURE: Content Size & Type Security for Resources & Prompts

Summary: Implement configurable content validation for resources and prompts with size limits, content type restrictions, and security validation when content is submitted via the API.

Implementation

1. Update config.py with Content Security Settings

# In mcpgateway/config.py

from typing import Set

class Settings(BaseSettings):
    # ... existing settings ...
    
    # ===================================
    # Content Security Configuration
    # ===================================
    
    # Maximum content sizes (in bytes)
    content_max_resource_size: int = 100 * 1024  # 100KB default for resources
    content_max_prompt_size: int = 10 * 1024     # 10KB default for prompt templates
    
    # Allowed MIME types for resources (restrictive by default)
    content_allowed_resource_mimetypes: str = "text/plain,text/markdown"
    
    # Allowed MIME types for prompts (text only)
    content_allowed_prompt_mimetypes: str = "text/plain,text/markdown"
    
    # Content validation
    content_validate_encoding: bool = True  # Validate UTF-8 encoding
    content_validate_patterns: bool = True  # Check for malicious patterns
    content_strip_null_bytes: bool = True   # Remove null bytes from content
    
    # Rate limiting for content creation
    content_create_rate_limit_per_minute: int = 3  # Max creates per minute per user
    content_max_concurrent_operations: int = 2     # Max concurrent operations per user
    
    # Security patterns to block
    content_blocked_patterns: str = "<script,javascript:,vbscript:,onload=,onerror=,onclick=,<iframe,<embed,<object"
    
    # Computed properties for easier access
    @property
    def allowed_resource_mimetypes(self) -> Set[str]:
        return set(self.content_allowed_resource_mimetypes.split(','))
    
    @property
    def allowed_prompt_mimetypes(self) -> Set[str]:
        return set(self.content_allowed_prompt_mimetypes.split(','))
    
    @property
    def blocked_patterns(self) -> Set[str]:
        return set(self.content_blocked_patterns.split(','))

2. Create Content Security Service

# Create mcpgateway/services/content_security.py

import re
from typing import Optional, Tuple
import mimetypes

from mcpgateway.config import settings
from mcpgateway.exceptions import SecurityError, ValidationError

class ContentSecurityService:
    """Service for validating content security for resources and prompts."""
    
    def __init__(self):
        # Compile regex patterns for efficiency
        self.dangerous_patterns = [
            re.compile(pattern, re.IGNORECASE) 
            for pattern in settings.blocked_patterns
        ]
    
    async def validate_resource_content(
        self, 
        content: str,
        uri: str,
        mime_type: Optional[str] = None
    ) -> Tuple[str, str]:
        """
        Validate content for resources.
        
        Args:
            content: The content to validate
            uri: Resource URI (used for mime type detection)
            mime_type: Declared MIME type (optional)
            
        Returns:
            Tuple of (validated_content, detected_mime_type)
            
        Raises:
            ValidationError: If content fails validation
            SecurityError: If content contains malicious patterns
        """
        # Check size first
        content_bytes = content.encode('utf-8')
        if len(content_bytes) > settings.content_max_resource_size:
            raise ValidationError(
                f"Resource content size ({len(content_bytes)} bytes) exceeds maximum "
                f"allowed size ({settings.content_max_resource_size} bytes)"
            )
        
        # Detect MIME type
        detected_mime = self._detect_mime_type(uri, content)
        if mime_type and mime_type != detected_mime:
            # Use declared if provided, but log mismatch
            detected_mime = mime_type
        
        # Validate MIME type
        if detected_mime not in settings.allowed_resource_mimetypes:
            raise ValidationError(
                f"Content type '{detected_mime}' not allowed for resources. "
                f"Allowed types: {', '.join(sorted(settings.allowed_resource_mimetypes))}"
            )
        
        # Validate content
        validated_content = await self._validate_content(
            content=content,
            mime_type=detected_mime,
            context="resource"
        )
        
        return validated_content, detected_mime
    
    async def validate_prompt_content(
        self, 
        template: str,
        name: str
    ) -> str:
        """
        Validate content for prompt templates.
        
        Args:
            template: The prompt template content
            name: Prompt name (for error messages)
            
        Returns:
            Validated template content
            
        Raises:
            ValidationError: If content fails validation
            SecurityError: If content contains malicious patterns
        """
        # Check size
        content_bytes = template.encode('utf-8')
        if len(content_bytes) > settings.content_max_prompt_size:
            raise ValidationError(
                f"Prompt template size ({len(content_bytes)} bytes) exceeds maximum "
                f"allowed size ({settings.content_max_prompt_size} bytes)"
            )
        
        # Prompts are always text
        validated_content = await self._validate_content(
            content=template,
            mime_type="text/plain",
            context="prompt"
        )
        
        # Additional prompt-specific validation
        self._validate_prompt_template_syntax(validated_content, name)
        
        return validated_content
    
    def _detect_mime_type(self, uri: str, content: str) -> str:
        """Detect MIME type from URI and content."""
        # Try from URI first
        mime_type, _ = mimetypes.guess_type(uri)
        if mime_type:
            return mime_type
        
        # For safety, default to text/plain
        return "text/plain"
    
    async def _validate_content(
        self, 
        content: str, 
        mime_type: str,
        context: str
    ) -> str:
        """Validate and sanitize content."""
        
        # Strip null bytes if configured
        if settings.content_strip_null_bytes:
            content = content.replace('\x00', '')
        
        # Validate encoding
        if settings.content_validate_encoding:
            try:
                # Ensure valid UTF-8
                content.encode('utf-8').decode('utf-8')
            except UnicodeError:
                raise ValidationError(f"Invalid UTF-8 encoding in {context} content")
        
        # Check for dangerous patterns
        if settings.content_validate_patterns:
            content_lower = content.lower()
            for pattern in self.dangerous_patterns:
                if pattern.search(content_lower):
                    raise SecurityError(
                        f"{context.capitalize()} content contains potentially "
                        f"dangerous pattern: {pattern.pattern}"
                    )
        
        # Check for excessive whitespace (potential padding attack)
        if len(content) > 1000:  # Only check larger content
            whitespace_ratio = sum(1 for c in content if c.isspace()) / len(content)
            if whitespace_ratio > 0.9:  # 90% whitespace
                raise SecurityError(f"Suspicious amount of whitespace in {context} content")
        
        return content
    
    def _validate_prompt_template_syntax(self, template: str, name: str):
        """Validate prompt template syntax."""
        # Check for balanced braces
        brace_count = template.count('{{') - template.count('}}')
        if brace_count != 0:
            raise ValidationError(
                f"Prompt '{name}' has unbalanced template braces"
            )
        
        # Check for suspicious template patterns
        suspicious_patterns = [
            r'\{\{.*exec.*\}\}',
            r'\{\{.*eval.*\}\}',
            r'\{\{.*__.*\}\}',  # Python magic methods
            r'\{\{.*import.*\}\}'
        ]
        
        for pattern in suspicious_patterns:
            if re.search(pattern, template, re.IGNORECASE):
                raise SecurityError(
                    f"Prompt template contains potentially dangerous pattern"
                )

# Global instance
content_security = ContentSecurityService()

3. Create Rate Limiter for Content Operations

# In mcpgateway/middleware/content_rate_limiter.py

from collections import defaultdict
from datetime import datetime, timedelta
import asyncio
from typing import Dict, List

from mcpgateway.config import settings

class ContentRateLimiter:
    """Rate limiter for content creation operations."""
    
    def __init__(self):
        self.operation_counts: Dict[str, List[datetime]] = defaultdict(list)
        self.concurrent_operations: Dict[str, int] = defaultdict(int)
        self._lock = asyncio.Lock()
    
    async def check_rate_limit(self, user: str, operation: str = "create") -> bool:
        """Check if user is within rate limits."""
        async with self._lock:
            now = datetime.utcnow()
            key = f"{user}:{operation}"
            
            # Check concurrent operations
            if self.concurrent_operations[user] >= settings.content_max_concurrent_operations:
                return False
            
            # Clean old timestamps
            cutoff = now - timedelta(minutes=1)
            self.operation_counts[key] = [
                ts for ts in self.operation_counts[key] if ts > cutoff
            ]
            
            # Check rate limit
            if len(self.operation_counts[key]) >= settings.content_create_rate_limit_per_minute:
                return False
            
            return True
    
    async def record_operation(self, user: str, operation: str = "create"):
        """Record an operation for rate limiting."""
        async with self._lock:
            key = f"{user}:{operation}"
            self.operation_counts[key].append(datetime.utcnow())
            self.concurrent_operations[user] += 1
    
    async def end_operation(self, user: str):
        """Mark operation as completed."""
        async with self._lock:
            self.concurrent_operations[user] = max(0, self.concurrent_operations[user] - 1)

content_rate_limiter = ContentRateLimiter()

4. Update Resource and Prompt Services

# In mcpgateway/services/resource_service.py

from mcpgateway.services.content_security import content_security
from mcpgateway.middleware.content_rate_limiter import content_rate_limiter

class ResourceService:
    # ... existing code ...
    
    async def register_resource(self, db: Session, resource: ResourceCreate) -> ResourceRead:
        """Register a new resource with content validation."""
        
        # Check rate limit
        user = "system"  # In real implementation, get from context
        if not await content_rate_limiter.check_rate_limit(user, "resource_create"):
            raise ResourceError("Rate limit exceeded. Please try again later.")
        
        await content_rate_limiter.record_operation(user, "resource_create")
        
        try:
            # Validate content if provided
            if resource.content:
                validated_content, detected_mime = await content_security.validate_resource_content(
                    content=resource.content,
                    uri=resource.uri,
                    mime_type=resource.mime_type
                )
                resource.content = validated_content
                if not resource.mime_type:
                    resource.mime_type = detected_mime
            
            # Continue with existing registration logic...
            # ... existing code ...
            
        finally:
            await content_rate_limiter.end_operation(user)
    
    async def update_resource(self, db: Session, uri: str, update: ResourceUpdate) -> ResourceRead:
        """Update resource with content validation."""
        
        # If content is being updated, validate it
        if update.content is not None:
            user = "system"  # In real implementation, get from context
            if not await content_rate_limiter.check_rate_limit(user, "resource_update"):
                raise ResourceError("Rate limit exceeded. Please try again later.")
            
            await content_rate_limiter.record_operation(user, "resource_update")
            
            try:
                # Get existing resource for mime type
                existing = db.query(DbResource).filter_by(uri=uri, is_active=True).first()
                if not existing:
                    raise ResourceNotFoundError(f"Resource not found: {uri}")
                
                validated_content, _ = await content_security.validate_resource_content(
                    content=update.content,
                    uri=uri,
                    mime_type=existing.mime_type
                )
                update.content = validated_content
                
                # Continue with existing update logic...
                # ... existing code ...
                
            finally:
                await content_rate_limiter.end_operation(user)

# Similar updates for prompt_service.py

5. Update API Endpoints with Error Handling

# In mcpgateway/main.py

@resource_router.post("", response_model=ResourceRead)
@resource_router.post("/", response_model=ResourceRead)
async def create_resource(
    resource: ResourceCreate,
    db: Session = Depends(get_db),
    user: str = Depends(require_auth),
) -> ResourceRead:
    """Create a new resource with content validation."""
    logger.debug(f"User {user} is creating a new resource")
    try:
        # Pass user context to service
        result = await resource_service.register_resource(db, resource)
        return result
    except SecurityError as e:
        logger.warning(f"Security violation in resource creation by user {user}: {str(e)}")
        raise HTTPException(status_code=400, detail="Content failed security validation")
    except ValidationError as e:
        raise HTTPException(status_code=400, detail=str(e))
    except ResourceURIConflictError as e:
        raise HTTPException(status_code=409, detail=str(e))
    except ResourceError as e:
        if "Rate limit" in str(e):
            raise HTTPException(status_code=429, detail=str(e))
        raise HTTPException(status_code=400, detail=str(e))

@prompt_router.post("", response_model=PromptRead)
@prompt_router.post("/", response_model=PromptRead)
async def create_prompt(
    prompt: PromptCreate,
    db: Session = Depends(get_db),
    user: str = Depends(require_auth),
) -> PromptRead:
    """Create a new prompt with template validation."""
    logger.debug(f"User: {user} requested to create prompt: {prompt}")
    try:
        # Validate template content
        if prompt.template:
            validated_template = await content_security.validate_prompt_content(
                template=prompt.template,
                name=prompt.name
            )
            prompt.template = validated_template
        
        return await prompt_service.register_prompt(db, prompt)
    except SecurityError as e:
        logger.warning(f"Security violation in prompt creation by user {user}: {str(e)}")
        raise HTTPException(status_code=400, detail="Template failed security validation")
    except ValidationError as e:
        raise HTTPException(status_code=400, detail=str(e))
    except PromptNameConflictError as e:
        raise HTTPException(status_code=409, detail=str(e))
    except PromptError as e:
        raise HTTPException(status_code=400, detail=str(e))

6. Update .env.example

#####################################
# Content Security Configuration
#####################################

# Maximum content sizes (in bytes)
CONTENT_MAX_RESOURCE_SIZE=102400        # 100KB for resources
CONTENT_MAX_PROMPT_SIZE=10240           # 10KB for prompt templates

# Allowed MIME types (comma-separated)
CONTENT_ALLOWED_RESOURCE_MIMETYPES=text/plain,text/markdown
CONTENT_ALLOWED_PROMPT_MIMETYPES=text/plain,text/markdown

# Content validation
CONTENT_VALIDATE_ENCODING=true          # Validate UTF-8 encoding
CONTENT_VALIDATE_PATTERNS=true          # Check for malicious patterns
CONTENT_STRIP_NULL_BYTES=true           # Remove null bytes

# Rate limiting
CONTENT_CREATE_RATE_LIMIT_PER_MINUTE=3  # Max creates per minute
CONTENT_MAX_CONCURRENT_OPERATIONS=2     # Max concurrent operations

# Security patterns to block (comma-separated)
CONTENT_BLOCKED_PATTERNS=<script,javascript:,vbscript:,onload=,onerror=,onclick=,<iframe,<embed,<object

7. Add Monitoring and Metrics

# In mcpgateway/services/content_security.py

class ContentSecurityService:
    def __init__(self):
        # ... existing init ...
        self.security_violations = defaultdict(int)
        self.validation_failures = defaultdict(int)
    
    async def log_security_violation(self, user: str, violation_type: str):
        """Log security violations for monitoring."""
        self.security_violations[f"{user}:{violation_type}"] += 1
        logger.warning(f"Security violation: user={user}, type={violation_type}")
    
    async def get_security_metrics(self) -> Dict[str, Any]:
        """Get security metrics for monitoring."""
        return {
            "total_violations": sum(self.security_violations.values()),
            "total_validation_failures": sum(self.validation_failures.values()),
            "violations_by_type": dict(self.security_violations),
            "failures_by_type": dict(self.validation_failures)
        }

Testing

  1. Test content size limits:

    # Create large content
    LARGE_CONTENT=$(python -c "print('x' * 200000)")
    
    # Try to create resource (should fail)
    curl -X POST -H "Authorization: Bearer $TOKEN" \
         -H "Content-Type: application/json" \
         -d "{\"uri\":\"test://large\",\"name\":\"Large\",\"content\":\"$LARGE_CONTENT\"}" \
         http://localhost:4444/resources
  2. Test MIME type restrictions:

    # Try creating resource with disallowed type
    curl -X POST -H "Authorization: Bearer $TOKEN" \
         -H "Content-Type: application/json" \
         -d '{"uri":"test.html","name":"HTML","content":"<html>test</html>","mimeType":"text/html"}' \
         http://localhost:4444/resources
  3. Test malicious content patterns:

    # Try injecting script
    curl -X POST -H "Authorization: Bearer $TOKEN" \
         -H "Content-Type: application/json" \
         -d '{"uri":"test://script","name":"Script","content":"<script>alert(1)</script>"}' \
         http://localhost:4444/resources
  4. Test rate limiting:

    # Rapid creates should trigger rate limit
    for i in {1..5}; do
      curl -X POST -H "Authorization: Bearer $TOKEN" \
           -H "Content-Type: application/json" \
           -d '{"uri":"test://rate'$i'","name":"Rate'$i'","content":"test"}' \
           http://localhost:4444/resources
    done

Security Benefits

  • Size Limits: Prevents DoS through large content submissions
  • Type Restrictions: Only allows safe content types (text/plain, text/markdown)
  • Pattern Detection: Blocks common XSS and injection patterns
  • Rate Limiting: Prevents abuse through rapid content creation
  • Encoding Validation: Ensures proper UTF-8 encoding
  • Template Security: Validates prompt template syntax and blocks dangerous patterns
  • Monitoring: Tracks security violations for incident response

This implementation provides comprehensive content security while working with the existing database-backed resource and prompt system.

Metadata

Metadata

Labels

enhancementNew feature or requestpythonPython / backend development (FastAPI)securityImproves securitytriageIssues / Features awaiting triage

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions