Skip to content

Integrate Cisco AI Defense Security Scanning into Server Registration API #265

@aarora79

Description

@aarora79

Integrate Cisco AI Defense Security Scanning into Server Registration API

Summary

Integrate the Cisco AI Defense MCP Scanner into the registry's server registration API endpoints so that security scanning happens automatically when servers are registered via the web UI or API, not just via CLI.

Problem Statement

Currently, security scanning for MCP servers is only available through the CLI workflow:

  • ./cli/service_mgmt.sh add <config> runs security scans via cli/mcp_security_scanner.py

However, the FastAPI registration endpoints have no security scanning:

  • POST /api/register (UI form submission) - see registry/api/server_routes.py:278
  • POST /api/internal/register (programmatic API) - see registry/api/server_routes.py:366

This creates a security gap where servers registered via the API bypass security checks.

Current Architecture

CLI Flow (Has Security Scanning)

service_mgmt.sh add <config>
    |
    v
mcp_security_scanner.py --server-url <url> --analyzers yara
    |
    v
If UNSAFE (critical/high issues):
    - Add "security-pending" tag
    - Register server
    - Disable server via /api/internal/toggle
    - Warn user to review before enabling

API Flow (Missing Security Scanning)

POST /register or POST /internal/register
    |
    v
Validate path, create server entry
    |
    v
Register server (auto-enabled)
    |
    v
Update FAISS, Nginx, scopes.yml
    |
    v
NO SECURITY SCANNING!

Implementation Guide

Prerequisites

The cisco-ai-mcp-scanner package is already installed (see pyproject.toml:41):

"cisco-ai-mcp-scanner>=3.0.1",

Phase 1: Create Security Schema Models

File to create: registry/schemas/security.py

This file defines the Pydantic models for security scanning. Follow the pattern in registry/schemas/anthropic_schema.py.

"""
Pydantic models for MCP Security Scanning.

These models define the data structures for security scan results
from the Cisco AI Defense MCP Scanner integration.
"""

import logging
from datetime import datetime
from enum import Enum
from typing import (
    Any,
    Dict,
    List,
    Optional,
)

from pydantic import (
    BaseModel,
    Field,
)


logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s,p%(process)s,{%(filename)s:%(lineno)d},%(levelname)s,%(message)s",
)

logger = logging.getLogger(__name__)


class SeverityLevel(str, Enum):
    """Security finding severity levels."""

    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    INFO = "info"
    UNKNOWN = "unknown"


class AnalyzerType(str, Enum):
    """Available security analyzers from Cisco AI Defense MCP Scanner."""

    YARA = "yara"
    LLM = "llm"
    API = "api"


class SecurityFinding(BaseModel):
    """Individual security finding from a scan."""

    tool_name: str = Field(..., description="Name of the MCP tool that was scanned")
    severity: SeverityLevel = Field(
        default=SeverityLevel.UNKNOWN,
        description="Severity level of the finding"
    )
    threat_names: List[str] = Field(
        default_factory=list,
        description="List of threat identifiers"
    )
    threat_summary: str = Field(
        default="",
        description="Human-readable summary of the threat"
    )
    is_safe: bool = Field(
        default=True,
        description="Whether this specific tool is considered safe"
    )
    analyzer: str = Field(
        default="",
        description="Which analyzer produced this finding"
    )


class AnalyzerResult(BaseModel):
    """Results from a single analyzer."""

    analyzer_name: str = Field(..., description="Name of the analyzer")
    findings: List[SecurityFinding] = Field(
        default_factory=list,
        description="List of findings from this analyzer"
    )


class SecurityScanResult(BaseModel):
    """Complete security scan result for an MCP server."""

    server_url: str = Field(..., description="URL of the scanned MCP server")
    scan_timestamp: str = Field(..., description="ISO timestamp of the scan")
    is_safe: bool = Field(..., description="Overall safety assessment")
    critical_issues: int = Field(
        default=0,
        description="Count of critical severity issues"
    )
    high_severity: int = Field(
        default=0,
        description="Count of high severity issues"
    )
    medium_severity: int = Field(
        default=0,
        description="Count of medium severity issues"
    )
    low_severity: int = Field(
        default=0,
        description="Count of low severity issues"
    )
    analyzers_used: List[str] = Field(
        default_factory=list,
        description="List of analyzers that were used"
    )
    analysis_results: Dict[str, AnalyzerResult] = Field(
        default_factory=dict,
        description="Results organized by analyzer"
    )
    raw_output: Optional[Dict[str, Any]] = Field(
        default=None,
        description="Full raw scanner output"
    )
    error_message: Optional[str] = Field(
        default=None,
        description="Error message if scan failed"
    )
    scan_skipped: bool = Field(
        default=False,
        description="Whether the scan was skipped"
    )
    skip_reason: Optional[str] = Field(
        default=None,
        description="Reason why scan was skipped"
    )

After creating, update registry/schemas/__init__.py:

Add these imports at the top:

from .security import (
    SeverityLevel,
    AnalyzerType,
    SecurityFinding,
    AnalyzerResult,
    SecurityScanResult,
)

Add to __all__ list:

__all__ = [
    # ... existing exports ...
    "SeverityLevel",
    "AnalyzerType",
    "SecurityFinding",
    "AnalyzerResult",
    "SecurityScanResult",
]

Validate the file compiles:

uv run python -m py_compile registry/schemas/security.py

Phase 2: Add Configuration Options

File to modify: registry/core/config.py

Add these new settings to the Settings class (after line 43, after the wellknown_cache_ttl setting):

    # Security scanning settings
    security_scan_enabled: bool = True
    security_scan_analyzers: str = "yara"
    security_scan_timeout_seconds: int = 60
    security_scan_on_registration: bool = True
    security_scan_block_unsafe: bool = True
    mcp_scanner_llm_api_key: str = ""

Explanation of each setting:

Setting Type Default Description
security_scan_enabled bool True Master switch to enable/disable security scanning
security_scan_analyzers str "yara" Comma-separated list of analyzers (yara, llm, api)
security_scan_timeout_seconds int 60 Timeout for scan operations
security_scan_on_registration bool True Whether to scan during server registration
security_scan_block_unsafe bool True Whether to disable servers that fail security scan
mcp_scanner_llm_api_key str "" API key for LLM analyzer (optional)

Phase 3: Create Security Scanner Service

File to create: registry/services/security_scanner.py

This is the main service that wraps the Cisco AI Defense MCP Scanner. Follow the service pattern from registry/services/server_service.py and registry/health/service.py.

"""
Security Scanner Service for MCP Servers.

Integrates the Cisco AI Defense MCP Scanner to scan MCP servers
for security vulnerabilities during registration.
"""

import asyncio
import json
import logging
import os
import re
import subprocess
from datetime import datetime, timezone
from pathlib import Path
from typing import (
    Any,
    Dict,
    List,
    Optional,
    Tuple,
)

from ..core.config import settings
from ..schemas.security import (
    AnalyzerResult,
    SecurityFinding,
    SecurityScanResult,
    SeverityLevel,
)


logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s,p%(process)s,{%(filename)s:%(lineno)d},%(levelname)s,%(message)s",
)

logger = logging.getLogger(__name__)


# Constants
DEFAULT_ANALYZERS: str = "yara"
LLM_API_KEY_ENV: str = "MCP_SCANNER_LLM_API_KEY"
SECURITY_SCANS_DIR: Path = Path(__file__).parent.parent.parent / "security_scans"


class SecurityScannerService:
    """Service for scanning MCP servers for security vulnerabilities."""

    def __init__(self):
        """Initialize the security scanner service."""
        self._ensure_output_directory()

    def _ensure_output_directory(self) -> Path:
        """Ensure the security scans output directory exists."""
        SECURITY_SCANS_DIR.mkdir(parents=True, exist_ok=True)
        return SECURITY_SCANS_DIR

    def _prepare_scan_url(
        self,
        server_url: str
    ) -> str:
        """Prepare the server URL for scanning by appending /mcp if needed.

        Args:
            server_url: Original server URL

        Returns:
            URL with /mcp endpoint appended if not present
        """
        if not server_url.endswith("/mcp") and not server_url.endswith("/mcp/"):
            # Remove trailing slash if present, then add /mcp
            url = server_url.rstrip("/")
            url = f"{url}/mcp"
            logger.debug(f"Appending /mcp to scan URL: {url}")
            return url
        return server_url

    def _parse_headers_for_bearer_token(
        self,
        headers: Optional[List[Dict[str, str]]]
    ) -> Optional[str]:
        """Extract bearer token from headers list.

        Args:
            headers: List of header dictionaries

        Returns:
            Bearer token if found, None otherwise
        """
        if not headers:
            return None

        for header_dict in headers:
            for key, value in header_dict.items():
                if key.lower() in ("authorization", "x-authorization"):
                    if value.startswith("Bearer "):
                        return value.replace("Bearer ", "")
        return None

    def _run_scanner_subprocess(
        self,
        server_url: str,
        analyzers: str,
        bearer_token: Optional[str] = None
    ) -> Dict[str, Any]:
        """Run the mcp-scanner CLI command and return raw output.

        Args:
            server_url: URL of the MCP server to scan
            analyzers: Comma-separated list of analyzers
            bearer_token: Optional bearer token for authentication

        Returns:
            Dictionary containing raw scanner output

        Raises:
            subprocess.CalledProcessError: If scanner command fails
            ValueError: If output cannot be parsed
        """
        logger.info(f"Running security scan on: {server_url}")
        logger.info(f"Using analyzers: {analyzers}")

        # Build command - global options before subcommand
        cmd = [
            "mcp-scanner",
            "--analyzers", analyzers,
            "--raw",
            "remote",
            "--server-url", server_url
        ]

        # Add bearer token if provided
        if bearer_token:
            cmd.extend(["--bearer-token", bearer_token])
            logger.info("Using bearer token authentication")

        # Set environment variable for API key if configured
        env = os.environ.copy()
        if settings.mcp_scanner_llm_api_key:
            env[LLM_API_KEY_ENV] = settings.mcp_scanner_llm_api_key

        # Run scanner with timeout
        try:
            result = subprocess.run(
                cmd,
                capture_output=True,
                text=True,
                check=True,
                env=env,
                timeout=settings.security_scan_timeout_seconds
            )

            logger.debug(f"Raw scanner stdout (first 500 chars):\n{result.stdout[:500]}")
            return self._parse_scanner_output(result.stdout)

        except subprocess.TimeoutExpired:
            logger.error(f"Scanner timed out after {settings.security_scan_timeout_seconds} seconds")
            raise
        except subprocess.CalledProcessError as e:
            logger.error(f"Scanner command failed with exit code {e.returncode}")
            logger.error(f"stderr: {e.stderr}")
            raise

    def _parse_scanner_output(
        self,
        stdout: str
    ) -> Dict[str, Any]:
        """Parse the scanner stdout to extract JSON results.

        Args:
            stdout: Raw stdout from scanner command

        Returns:
            Parsed scanner output dictionary

        Raises:
            ValueError: If JSON cannot be extracted
        """
        stdout = stdout.strip()

        # Remove ANSI color codes
        ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
        stdout = ansi_escape.sub('', stdout)

        # Find JSON array start
        json_start = -1

        # Try to find '[' at start of line
        for i in range(len(stdout) - 1):
            if stdout[i] == '[' and (i == 0 or stdout[i-1] in '\n\r'):
                json_start = i
                break

        # Fallback: find '[\s*{'
        if json_start == -1:
            pattern = r'\[\s*\{'
            match = re.search(pattern, stdout)
            if match:
                json_start = match.start()

        if json_start == -1:
            raise ValueError("No JSON array found in scanner output")

        # Extract and parse JSON
        json_str = stdout[json_start:]
        tool_results = json.loads(json_str)

        # Convert to expected format
        raw_output = {
            "analysis_results": {},
            "tool_results": tool_results
        }

        # Extract findings and organize by analyzer
        for tool_result in tool_results:
            findings_dict = tool_result.get("findings", {})
            for analyzer_name, analyzer_findings in findings_dict.items():
                if analyzer_name not in raw_output["analysis_results"]:
                    raw_output["analysis_results"][analyzer_name] = {"findings": []}

                if isinstance(analyzer_findings, dict):
                    finding = {
                        "tool_name": tool_result.get("tool_name"),
                        "severity": analyzer_findings.get("severity", "unknown"),
                        "threat_names": analyzer_findings.get("threat_names", []),
                        "threat_summary": analyzer_findings.get("threat_summary", ""),
                        "is_safe": tool_result.get("is_safe", True)
                    }
                    raw_output["analysis_results"][analyzer_name]["findings"].append(finding)

        logger.debug(f"Parsed scanner output:\n{json.dumps(raw_output, indent=2, default=str)}")
        return raw_output

    def _count_severities(
        self,
        raw_output: Dict[str, Any]
    ) -> Tuple[int, int, int, int]:
        """Count findings by severity level.

        Args:
            raw_output: Parsed scanner output

        Returns:
            Tuple of (critical_count, high_count, medium_count, low_count)
        """
        critical_count = 0
        high_count = 0
        medium_count = 0
        low_count = 0

        analysis_results = raw_output.get("analysis_results", {})

        for analyzer_data in analysis_results.values():
            if isinstance(analyzer_data, dict):
                findings = analyzer_data.get("findings", [])
                for finding in findings:
                    severity = finding.get("severity", "").lower()
                    if severity == "critical":
                        critical_count += 1
                    elif severity == "high":
                        high_count += 1
                    elif severity == "medium":
                        medium_count += 1
                    elif severity == "low":
                        low_count += 1

        return critical_count, high_count, medium_count, low_count

    def _build_analyzer_results(
        self,
        raw_output: Dict[str, Any]
    ) -> Dict[str, AnalyzerResult]:
        """Build AnalyzerResult objects from raw output.

        Args:
            raw_output: Parsed scanner output

        Returns:
            Dictionary mapping analyzer name to AnalyzerResult
        """
        results = {}
        analysis_results = raw_output.get("analysis_results", {})

        for analyzer_name, analyzer_data in analysis_results.items():
            if isinstance(analyzer_data, dict):
                findings = []
                for finding_data in analyzer_data.get("findings", []):
                    severity_str = finding_data.get("severity", "unknown").lower()
                    try:
                        severity = SeverityLevel(severity_str)
                    except ValueError:
                        severity = SeverityLevel.UNKNOWN

                    findings.append(SecurityFinding(
                        tool_name=finding_data.get("tool_name", ""),
                        severity=severity,
                        threat_names=finding_data.get("threat_names", []),
                        threat_summary=finding_data.get("threat_summary", ""),
                        is_safe=finding_data.get("is_safe", True),
                        analyzer=analyzer_name
                    ))

                results[analyzer_name] = AnalyzerResult(
                    analyzer_name=analyzer_name,
                    findings=findings
                )

        return results

    async def scan_server(
        self,
        server_url: str,
        analyzers: Optional[List[str]] = None,
        headers: Optional[List[Dict[str, str]]] = None
    ) -> SecurityScanResult:
        """Scan an MCP server for security vulnerabilities.

        This is the main entry point for security scanning.

        Args:
            server_url: URL of the MCP server to scan (e.g., http://server:8000/)
            analyzers: List of analyzers to use (default: ["yara"])
            headers: Optional list of header dictionaries for authentication

        Returns:
            SecurityScanResult with scan results
        """
        # Check if scanning is enabled
        if not settings.security_scan_enabled:
            logger.info("Security scanning is disabled via configuration")
            return SecurityScanResult(
                server_url=server_url,
                scan_timestamp=datetime.now(timezone.utc).isoformat(),
                is_safe=True,
                scan_skipped=True,
                skip_reason="Security scanning disabled via SECURITY_SCAN_ENABLED=false"
            )

        # Prepare URL and analyzers
        scan_url = self._prepare_scan_url(server_url)
        analyzer_list = analyzers or settings.security_scan_analyzers.split(",")
        analyzer_str = ",".join(analyzer_list)

        # Extract bearer token from headers if present
        bearer_token = self._parse_headers_for_bearer_token(headers)

        # Run scan in thread pool to avoid blocking
        try:
            loop = asyncio.get_event_loop()
            raw_output = await loop.run_in_executor(
                None,
                self._run_scanner_subprocess,
                scan_url,
                analyzer_str,
                bearer_token
            )
        except subprocess.TimeoutExpired:
            return SecurityScanResult(
                server_url=server_url,
                scan_timestamp=datetime.now(timezone.utc).isoformat(),
                is_safe=False,
                error_message=f"Scan timed out after {settings.security_scan_timeout_seconds} seconds",
                analyzers_used=analyzer_list
            )
        except subprocess.CalledProcessError as e:
            return SecurityScanResult(
                server_url=server_url,
                scan_timestamp=datetime.now(timezone.utc).isoformat(),
                is_safe=False,
                error_message=f"Scanner failed with exit code {e.returncode}: {e.stderr}",
                analyzers_used=analyzer_list
            )
        except FileNotFoundError:
            logger.error("mcp-scanner command not found - is cisco-ai-mcp-scanner installed?")
            return SecurityScanResult(
                server_url=server_url,
                scan_timestamp=datetime.now(timezone.utc).isoformat(),
                is_safe=True,
                scan_skipped=True,
                skip_reason="mcp-scanner command not found",
                analyzers_used=analyzer_list
            )
        except Exception as e:
            logger.exception(f"Unexpected error during security scan: {e}")
            return SecurityScanResult(
                server_url=server_url,
                scan_timestamp=datetime.now(timezone.utc).isoformat(),
                is_safe=False,
                error_message=f"Unexpected error: {str(e)}",
                analyzers_used=analyzer_list
            )

        # Analyze results
        critical, high, medium, low = self._count_severities(raw_output)
        is_safe = (critical == 0 and high == 0)

        logger.info(f"Security scan results for {server_url}:")
        logger.info(f"  Critical: {critical}, High: {high}, Medium: {medium}, Low: {low}")
        logger.info(f"  Overall: {'SAFE' if is_safe else 'UNSAFE'}")

        # Build result object
        return SecurityScanResult(
            server_url=server_url,
            scan_timestamp=datetime.now(timezone.utc).isoformat(),
            is_safe=is_safe,
            critical_issues=critical,
            high_severity=high,
            medium_severity=medium,
            low_severity=low,
            analyzers_used=analyzer_list,
            analysis_results=self._build_analyzer_results(raw_output),
            raw_output=raw_output
        )

    def is_result_safe(
        self,
        result: SecurityScanResult
    ) -> bool:
        """Check if a scan result indicates the server is safe.

        Args:
            result: SecurityScanResult to check

        Returns:
            True if safe (no critical or high severity issues)
        """
        # If scan was skipped, treat as safe
        if result.scan_skipped:
            return True

        # If scan had errors, treat as unsafe
        if result.error_message:
            return False

        return result.is_safe


# Global service instance (singleton pattern like other services)
security_scanner_service = SecurityScannerService()

Validate the file compiles:

uv run python -m py_compile registry/services/security_scanner.py

Phase 4: Integrate into Registration Endpoints

File to modify: registry/api/server_routes.py

Step 4.1: Add imports at the top of the file

Add these imports after the existing imports (around line 15):

from ..services.security_scanner import security_scanner_service
from ..schemas.security import SecurityScanResult

Step 4.2: Modify internal_register_service function

This is the main registration endpoint used by the CLI and programmatic access.

Location: registry/api/server_routes.py around line 366

Find the line (around line 540):

    logger.warning("INTERNAL REGISTER: Auto-enabling newly registered server")  # TODO: replace with debug

Insert this code BEFORE that line (after the server_entry is created, around line 505):

    # =========================================================================
    # SECURITY SCANNING - Scan server before registration
    # =========================================================================
    scan_result: Optional[SecurityScanResult] = None
    auto_enable = True  # Default to auto-enable

    if settings.security_scan_enabled and settings.security_scan_on_registration:
        logger.info(f"INTERNAL REGISTER: Running security scan on {proxy_pass_url}")
        try:
            scan_result = await security_scanner_service.scan_server(
                server_url=proxy_pass_url,
                headers=headers_list if headers_list else None
            )

            if not security_scanner_service.is_result_safe(scan_result):
                logger.warning(
                    f"INTERNAL REGISTER: Server {path} failed security scan - "
                    f"critical={scan_result.critical_issues}, high={scan_result.high_severity}"
                )

                # Add security-pending tag
                if "security-pending" not in tag_list:
                    tag_list.append("security-pending")
                    server_entry["tags"] = tag_list

                # Disable auto-enable if configured to block unsafe servers
                if settings.security_scan_block_unsafe:
                    auto_enable = False
                    logger.warning(f"INTERNAL REGISTER: Server {path} will be registered but NOT enabled")

        except Exception as e:
            logger.error(f"INTERNAL REGISTER: Security scan failed with error: {e}")
            # On scan error, add tag and disable
            if "security-pending" not in tag_list:
                tag_list.append("security-pending")
                server_entry["tags"] = tag_list
            auto_enable = False
    else:
        logger.info("INTERNAL REGISTER: Security scanning disabled or not configured for registration")
    # =========================================================================

Step 4.3: Modify the auto-enable logic

Find the existing auto-enable code (around line 542-551):

    # Automatically enable the newly registered server BEFORE FAISS indexing
    try:
        toggle_success = server_service.toggle_service(path, True)

Replace it with:

    # Automatically enable the newly registered server BEFORE FAISS indexing
    # Only auto-enable if security scan passed (or scanning is disabled)
    if auto_enable:
        try:
            toggle_success = server_service.toggle_service(path, True)
            if toggle_success:
                logger.info(f"Successfully auto-enabled server {path} after registration")
            else:
                logger.warning(f"Failed to auto-enable server {path} after registration")
        except Exception as e:
            logger.error(f"Error auto-enabling server {path}: {e}")
    else:
        logger.info(f"Server {path} registered but NOT auto-enabled due to security scan result")
        toggle_success = False

Step 4.4: Update the response to include scan results

Find the return statement (around line 594):

    return JSONResponse(
        status_code=201,
        content={
            "message": "Service registered successfully",
            "service": server_entry,
        },
    )

Replace it with:

    return JSONResponse(
        status_code=201,
        content={
            "message": "Service registered successfully",
            "service": server_entry,
            "auto_enabled": auto_enable,
            "security_scan": scan_result.model_dump() if scan_result else None,
        },
    )

Step 4.5: Add import for Optional and settings

Make sure these imports exist at the top of the file:

from typing import Annotated, Optional
from ..core.config import settings

Phase 5: Update main.py for Initialization (Optional)

If you want to log that security scanning is available on startup, add this to registry/main.py in the lifespan function (around line 130, after federation service init):

        # Log security scanning status
        if settings.security_scan_enabled:
            logger.info(f"Security scanning enabled with analyzers: {settings.security_scan_analyzers}")
        else:
            logger.info("Security scanning is disabled")

File Changes Summary

New Files to Create

File Description
registry/schemas/security.py Pydantic models for security scanning (~130 lines)
registry/services/security_scanner.py Security scanner service (~350 lines)

Files to Modify (Python/Registry)

File Changes
registry/schemas/__init__.py Add exports for security models (5 lines)
registry/core/config.py Add 6 new settings (6 lines)
registry/api/server_routes.py Add security scanning integration (~50 lines)
registry/main.py Optional: Add startup logging (3 lines)

Files to Modify (Configuration)

File Changes
.env.example Add security scanning section (~20 lines)

Files to Modify (Terraform - AWS ECS)

File Changes
terraform/aws-ecs/variables.tf Add 6 new variables (~40 lines)
terraform/aws-ecs/terraform.tfvars.example Add example values (~15 lines)
terraform/aws-ecs/modules/mcp-gateway/variables.tf Add module variables (~40 lines)
terraform/aws-ecs/modules/mcp-gateway/ecs-services.tf Add env vars and secrets (~30 lines)
terraform/aws-ecs/main.tf Pass variables to module (~8 lines)

Cisco AI Defense MCP Scanner Reference

CLI Command Format (what we're wrapping)

mcp-scanner --analyzers yara,llm --raw remote --server-url https://example.com/mcp

Available Analyzers

Analyzer Description Requirements
yara Pattern matching with YARA rules None (built-in)
llm LLM-as-a-judge evaluation MCP_SCANNER_LLM_API_KEY env var
api Cisco AI Defense inspect API Cisco API credentials

Scanner Output Format

The scanner outputs a JSON array of tool results:

[
  {
    "tool_name": "dangerous_tool",
    "is_safe": false,
    "findings": {
      "yara": {
        "severity": "high",
        "threat_names": ["SUSPICIOUS_PATTERN"],
        "threat_summary": "Detected suspicious pattern in tool"
      }
    }
  }
]

Behavior Matrix

Scan Result Registration Server State Tags Added
Safe (no issues) Success Enabled None
Unsafe (critical/high) Success Disabled security-pending
Scan error/timeout Success Disabled security-pending
Scanner not found Success Enabled None (scan skipped)
Scanning disabled Success Enabled None

Configuration Guide

Overview

Security scanning settings can be configured via:

  1. .env file - For local development and Docker Compose deployments
  2. Terraform terraform.tfvars - For AWS ECS deployments
  3. Environment variables - Direct setting in any environment

Configuration Variables Reference

Variable Type Default Description
SECURITY_SCAN_ENABLED bool true Master switch to enable/disable security scanning
SECURITY_SCAN_ANALYZERS string yara Comma-separated list of analyzers (yara, llm, api)
SECURITY_SCAN_TIMEOUT_SECONDS int 60 Timeout for scan operations in seconds
SECURITY_SCAN_ON_REGISTRATION bool true Whether to scan during server registration
SECURITY_SCAN_BLOCK_UNSAFE bool true Whether to disable servers that fail security scan
MCP_SCANNER_LLM_API_KEY string "" API key for LLM analyzer (OpenAI API key)

Configuration Method 1: .env File (Local/Docker Compose)

File to modify: .env (copy from .env.example if not exists)

Add the following section to your .env file:

# =============================================================================
# SECURITY SCANNING CONFIGURATION
# =============================================================================

# Enable/disable security scanning during server registration
# Default: true
SECURITY_SCAN_ENABLED=true

# Analyzers to use for security scanning (comma-separated)
# Available: yara, llm, api
# - yara: Pattern matching with YARA rules (no API key required)
# - llm: LLM-as-a-judge evaluation (requires MCP_SCANNER_LLM_API_KEY)
# - api: Cisco AI Defense inspect API (requires Cisco credentials)
# Default: yara
SECURITY_SCAN_ANALYZERS=yara

# Timeout for security scan operations (in seconds)
# Increase if scanning large servers or slow networks
# Default: 60
SECURITY_SCAN_TIMEOUT_SECONDS=60

# Whether to scan servers during registration
# Set to false to skip scanning on registration (can still scan manually)
# Default: true
SECURITY_SCAN_ON_REGISTRATION=true

# Whether to block unsafe servers from being auto-enabled
# If true: Servers with critical/high issues are registered but disabled
# If false: Servers are enabled regardless of scan results (not recommended)
# Default: true
SECURITY_SCAN_BLOCK_UNSAFE=true

# OpenAI API key for LLM-based security analysis (optional)
# Only required if SECURITY_SCAN_ANALYZERS includes 'llm'
# Get from: https://platform.openai.com/api-keys
MCP_SCANNER_LLM_API_KEY=your_openai_api_key_here

Note: The MCP_SCANNER_LLM_API_KEY is already documented in .env.example (line 179-183).


Configuration Method 2: Terraform tfvars (AWS ECS)

Files to modify:

  1. terraform/aws-ecs/variables.tf - Add variable definitions
  2. terraform/aws-ecs/terraform.tfvars.example - Add example values
  3. terraform/aws-ecs/modules/mcp-gateway/variables.tf - Add module variables
  4. terraform/aws-ecs/modules/mcp-gateway/ecs-services.tf - Pass to ECS task

Step 2.1: Add to terraform/aws-ecs/variables.tf

Add after the existing variables (around line 188):

#
# Security Scanning Configuration
#

variable "security_scan_enabled" {
  description = "Enable security scanning during server registration"
  type        = bool
  default     = true
}

variable "security_scan_analyzers" {
  description = "Comma-separated list of analyzers (yara, llm, api)"
  type        = string
  default     = "yara"
}

variable "security_scan_timeout_seconds" {
  description = "Timeout for security scan operations"
  type        = number
  default     = 60
}

variable "security_scan_on_registration" {
  description = "Whether to scan servers during registration"
  type        = bool
  default     = true
}

variable "security_scan_block_unsafe" {
  description = "Whether to disable servers that fail security scan"
  type        = bool
  default     = true
}

variable "mcp_scanner_llm_api_key" {
  description = "OpenAI API key for LLM-based security analysis (optional)"
  type        = string
  sensitive   = true
  default     = ""
}

Step 2.2: Add to terraform/aws-ecs/terraform.tfvars.example

Add after the service replica counts section (around line 129):

# ============================================================================
# OPTIONAL: SECURITY SCANNING CONFIGURATION
# ============================================================================

# Enable/disable security scanning during server registration
# Default: true
# security_scan_enabled = true

# Analyzers to use (comma-separated): yara, llm, api
# Default: "yara"
# security_scan_analyzers = "yara"

# Timeout for scan operations (seconds)
# Default: 60
# security_scan_timeout_seconds = 60

# Block unsafe servers from auto-enabling
# Default: true
# security_scan_block_unsafe = true

# OpenAI API key for LLM analyzer (optional, only if using llm analyzer)
# mcp_scanner_llm_api_key = "sk-..."

Step 2.3: Add to terraform/aws-ecs/modules/mcp-gateway/variables.tf

Add the module variables:

variable "security_scan_enabled" {
  description = "Enable security scanning during server registration"
  type        = bool
  default     = true
}

variable "security_scan_analyzers" {
  description = "Comma-separated list of analyzers"
  type        = string
  default     = "yara"
}

variable "security_scan_timeout_seconds" {
  description = "Timeout for security scan operations"
  type        = number
  default     = 60
}

variable "security_scan_on_registration" {
  description = "Whether to scan servers during registration"
  type        = bool
  default     = true
}

variable "security_scan_block_unsafe" {
  description = "Whether to disable servers that fail security scan"
  type        = bool
  default     = true
}

variable "mcp_scanner_llm_api_key" {
  description = "OpenAI API key for LLM analyzer"
  type        = string
  sensitive   = true
  default     = ""
}

Step 2.4: Add to terraform/aws-ecs/modules/mcp-gateway/ecs-services.tf

Add environment variables to the registry container definition (around line 90, in the environment block):

        {
          name  = "SECURITY_SCAN_ENABLED"
          value = tostring(var.security_scan_enabled)
        },
        {
          name  = "SECURITY_SCAN_ANALYZERS"
          value = var.security_scan_analyzers
        },
        {
          name  = "SECURITY_SCAN_TIMEOUT_SECONDS"
          value = tostring(var.security_scan_timeout_seconds)
        },
        {
          name  = "SECURITY_SCAN_ON_REGISTRATION"
          value = tostring(var.security_scan_on_registration)
        },
        {
          name  = "SECURITY_SCAN_BLOCK_UNSAFE"
          value = tostring(var.security_scan_block_unsafe)
        },

For the sensitive LLM API key, add to the secrets block (around line 133):

        # Only include if API key is provided
        dynamic "secret" {
          for_each = var.mcp_scanner_llm_api_key != "" ? [1] : []
          content {
            name      = "MCP_SCANNER_LLM_API_KEY"
            valueFrom = aws_secretsmanager_secret.mcp_scanner_llm_api_key[0].arn
          }
        }

And create the secret resource:

resource "aws_secretsmanager_secret" "mcp_scanner_llm_api_key" {
  count       = var.mcp_scanner_llm_api_key != "" ? 1 : 0
  name        = "${var.name}-mcp-scanner-llm-api-key"
  description = "OpenAI API key for MCP security scanner LLM analyzer"
}

resource "aws_secretsmanager_secret_version" "mcp_scanner_llm_api_key" {
  count         = var.mcp_scanner_llm_api_key != "" ? 1 : 0
  secret_id     = aws_secretsmanager_secret.mcp_scanner_llm_api_key[0].id
  secret_string = var.mcp_scanner_llm_api_key
}

Step 2.5: Pass variables through module call

In terraform/aws-ecs/main.tf, add to the mcp-gateway module call:

module "mcp_gateway" {
  source = "./modules/mcp-gateway"

  # ... existing variables ...

  # Security scanning configuration
  security_scan_enabled           = var.security_scan_enabled
  security_scan_analyzers         = var.security_scan_analyzers
  security_scan_timeout_seconds   = var.security_scan_timeout_seconds
  security_scan_on_registration   = var.security_scan_on_registration
  security_scan_block_unsafe      = var.security_scan_block_unsafe
  mcp_scanner_llm_api_key         = var.mcp_scanner_llm_api_key
}

Configuration Examples

Example 1: Enable scanning with YARA only (default, recommended)

# .env or environment variables
SECURITY_SCAN_ENABLED=true
SECURITY_SCAN_ANALYZERS=yara
# terraform.tfvars
security_scan_enabled   = true
security_scan_analyzers = "yara"

Example 2: Enable scanning with YARA and LLM

# .env or environment variables
SECURITY_SCAN_ENABLED=true
SECURITY_SCAN_ANALYZERS=yara,llm
MCP_SCANNER_LLM_API_KEY=sk-proj-...
# terraform.tfvars
security_scan_enabled     = true
security_scan_analyzers   = "yara,llm"
mcp_scanner_llm_api_key   = "sk-proj-..."  # Or use AWS Secrets Manager

Example 3: Disable security scanning

# .env or environment variables
SECURITY_SCAN_ENABLED=false
# terraform.tfvars
security_scan_enabled = false

Example 4: Allow unsafe servers (not recommended for production)

# .env or environment variables
SECURITY_SCAN_BLOCK_UNSAFE=false
# terraform.tfvars
security_scan_block_unsafe = false

Testing Checklist

Unit Tests to Write

Create tests/unit/services/test_security_scanner.py:

  • test_scan_server_safe_result - Mock scanner returning safe result
  • test_scan_server_unsafe_result - Mock scanner returning critical issues
  • test_scan_server_scanner_not_found - Test when mcp-scanner not installed
  • test_scan_server_timeout - Test scanner timeout handling
  • test_prepare_scan_url_adds_mcp - Test URL preparation
  • test_parse_headers_for_bearer_token - Test token extraction
  • test_count_severities - Test severity counting logic
  • test_scan_disabled_returns_skipped - Test when scanning disabled

Integration Tests

Create tests/integration/test_security_registration.py:

  • test_register_safe_server_enabled - Register server that passes scan, verify enabled
  • test_register_unsafe_server_disabled - Register server that fails scan, verify disabled and tagged
  • test_register_with_scanning_disabled - Register with scanning off, verify enabled

Manual Testing Steps

  1. Test with safe server:
# Register a known-safe server
curl -X POST http://localhost:7860/api/internal/register \
  -u admin:password \
  -d "name=test-safe&path=/test-safe/&proxy_pass_url=http://currenttime-server:8000/&description=Test"

# Verify: should be enabled, no security-pending tag
  1. Test with scanning disabled:
export SECURITY_SCAN_ENABLED=false
# Restart registry, register server, verify auto-enabled
  1. Test scanner timeout:
export SECURITY_SCAN_TIMEOUT_SECONDS=1
# Register server with slow endpoint, verify timeout handling

Migration Notes

  • Existing registered servers are unaffected
  • New registrations will be scanned if SECURITY_SCAN_ENABLED=true (default)
  • CLI workflow (service_mgmt.sh) continues to work as before
  • Backward compatible - scanning is additive, not breaking
  • No database migrations required

Troubleshooting

Scanner not found

ERROR: mcp-scanner command not found

Solution: Ensure cisco-ai-mcp-scanner is installed: uv pip install cisco-ai-mcp-scanner

Scan timeout

ERROR: Scan timed out after 60 seconds

Solution: Increase SECURITY_SCAN_TIMEOUT_SECONDS or check server connectivity

LLM analyzer not working

ERROR: LLM API key not configured

Solution: Set MCP_SCANNER_LLM_API_KEY environment variable


References

  • Cisco AI Defense MCP Scanner: https://github.com/cisco-ai-defense/mcp-scanner
  • Existing CLI integration: cli/mcp_security_scanner.py, cli/service_mgmt.sh
  • Server routes: registry/api/server_routes.py
  • Health service pattern: registry/health/service.py
  • Server service pattern: registry/services/server_service.py

Labels

enhancement, security, api

Metadata

Metadata

Assignees

Labels

apiAPI related issuesenhancementNew feature or requestsecuritySecurity-related issues and vulnerabilities

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions