Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""allow_multiple_connectors_with_unique_names

Revision ID: 5263aa4e7f94
Revises: ffd7445eb90a
Create Date: 2026-01-13 12:23:31.481643

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = '5263aa4e7f94'
down_revision: Union[str, None] = 'ffd7445eb90a'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Upgrade schema."""
# Drop the old unique constraint
op.drop_constraint(
'uq_searchspace_user_connector_type',
'search_source_connectors',
type_='unique'
)

# Create new unique constraint that includes name
op.create_unique_constraint(
'uq_searchspace_user_connector_type_name',
'search_source_connectors',
['search_space_id', 'user_id', 'connector_type', 'name']
)


def downgrade() -> None:
"""Downgrade schema."""
# Drop the new constraint
op.drop_constraint(
'uq_searchspace_user_connector_type_name',
'search_source_connectors',
type_='unique'
)

# Restore the old constraint
op.create_unique_constraint(
'uq_searchspace_user_connector_type',
'search_source_connectors',
['search_space_id', 'user_id', 'connector_type']
)
37 changes: 37 additions & 0 deletions surfsense_backend/alembic/versions/60_add_mcp_connector_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Add MCP connector type

Revision ID: 60
Revises: 59
Create Date: 2026-01-09 15:19:51.827647

"""
from collections.abc import Sequence

from alembic import op

# revision identifiers, used by Alembic.
revision: str = '60'
down_revision: str | None = '59'
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
"""Add MCP_CONNECTOR to SearchSourceConnectorType enum."""
# Add new enum value using raw SQL
op.execute(
"""
ALTER TYPE searchsourceconnectortype ADD VALUE IF NOT EXISTS 'MCP_CONNECTOR';
"""
)


def downgrade() -> None:
"""Remove MCP_CONNECTOR from SearchSourceConnectorType enum."""
# Note: PostgreSQL does not support removing enum values directly.
# To downgrade, you would need to:
# 1. Create a new enum without MCP_CONNECTOR
# 2. Alter the column to use the new enum
# 3. Drop the old enum
# This is left as a manual operation if needed.
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""add_is_active_to_search_source_connectors

Revision ID: ffd7445eb90a
Revises: 60
Create Date: 2026-01-12 22:11:26.132654

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = 'ffd7445eb90a'
down_revision: Union[str, None] = '60'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Upgrade schema."""
# Add is_active column to search_source_connectors table
op.add_column(
'search_source_connectors',
sa.Column('is_active', sa.Boolean(), nullable=False, server_default=sa.true())
)


def downgrade() -> None:
"""Downgrade schema."""
# Remove is_active column from search_source_connectors table
op.drop_column('search_source_connectors', 'is_active')
8 changes: 4 additions & 4 deletions surfsense_backend/app/agents/new_chat/chat_deepagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
build_configurable_system_prompt,
build_surfsense_system_prompt,
)
from app.agents.new_chat.tools import build_tools
from app.agents.new_chat.tools.registry import build_tools_async
from app.services.connector_service import ConnectorService

# =============================================================================
# Deep Agent Factory
# =============================================================================


def create_surfsense_deep_agent(
async def create_surfsense_deep_agent(
llm: ChatLiteLLM,
search_space_id: int,
db_session: AsyncSession,
Expand Down Expand Up @@ -120,8 +120,8 @@ def create_surfsense_deep_agent(
"firecrawl_api_key": firecrawl_api_key,
}

# Build tools using the registry
tools = build_tools(
# Build tools using the async registry (includes MCP tools)
tools = await build_tools_async(
dependencies=dependencies,
enabled_tools=enabled_tools,
disabled_tools=disabled_tools,
Expand Down
185 changes: 185 additions & 0 deletions surfsense_backend/app/agents/new_chat/tools/mcp_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
"""MCP Client Wrapper.

This module provides a client for communicating with MCP servers via stdio transport.
It handles server lifecycle management, tool discovery, and tool execution.
"""

import asyncio
import logging
import os
from contextlib import asynccontextmanager
from typing import Any

from mcp import ClientSession
from mcp.client.stdio import StdioServerParameters, stdio_client

logger = logging.getLogger(__name__)


class MCPClient:
"""Client for communicating with an MCP server."""

def __init__(self, command: str, args: list[str], env: dict[str, str] | None = None):
"""Initialize MCP client.

Args:
command: Command to spawn the MCP server (e.g., "uvx", "node")
args: Arguments for the command (e.g., ["mcp-server-git"])
env: Optional environment variables for the server process

"""
self.command = command
self.args = args
self.env = env or {}
self.session: ClientSession | None = None

@asynccontextmanager
async def connect(self):
"""Connect to the MCP server and manage its lifecycle.

Yields:
ClientSession: Active MCP session for making requests

"""
try:
# Merge env vars with current environment
server_env = os.environ.copy()
server_env.update(self.env)

# Create server parameters with env
server_params = StdioServerParameters(
command=self.command,
args=self.args,
env=server_env
)

# Spawn server process and create session
async with stdio_client(server=server_params) as (read, write):
async with ClientSession(read, write) as session:
# Initialize the connection
await session.initialize()
self.session = session
logger.info(
f"Connected to MCP server: {self.command} {' '.join(self.args)}"
)
yield session

except Exception as e:
logger.error(f"Failed to connect to MCP server: {e!s}", exc_info=True)
raise
finally:
self.session = None
logger.info(f"Disconnected from MCP server: {self.command}")

async def list_tools(self) -> list[dict[str, Any]]:
"""List all tools available from the MCP server.

Returns:
List of tool definitions with name, description, and input schema

Raises:
RuntimeError: If not connected to server

"""
if not self.session:
raise RuntimeError("Not connected to MCP server. Use 'async with client.connect():'")

try:
# Call tools/list RPC method
response = await self.session.list_tools()

tools = []
for tool in response.tools:
tools.append({
"name": tool.name,
"description": tool.description or "",
"input_schema": tool.inputSchema if hasattr(tool, "inputSchema") else {},
})

logger.info(f"Listed {len(tools)} tools from MCP server")
return tools

except Exception as e:
logger.error(f"Failed to list tools from MCP server: {e!s}", exc_info=True)
raise

async def call_tool(self, tool_name: str, arguments: dict[str, Any]) -> Any:
"""Call a tool on the MCP server.

Args:
tool_name: Name of the tool to call
arguments: Arguments to pass to the tool

Returns:
Tool execution result

Raises:
RuntimeError: If not connected to server

"""
if not self.session:
raise RuntimeError("Not connected to MCP server. Use 'async with client.connect():'")

try:
logger.info(f"Calling MCP tool '{tool_name}' with arguments: {arguments}")

# Call tools/call RPC method
response = await self.session.call_tool(tool_name, arguments=arguments)

# Extract content from response
result = []
for content in response.content:
if hasattr(content, "text"):
result.append(content.text)
elif hasattr(content, "data"):
result.append(str(content.data))
else:
result.append(str(content))

result_str = "\n".join(result) if result else ""
logger.info(f"MCP tool '{tool_name}' succeeded: {result_str[:200]}")
return result_str

except RuntimeError as e:
# Handle validation errors from MCP server responses
# Some MCP servers (like server-memory) return extra fields not in their schema
if "Invalid structured content" in str(e):
logger.warning(f"MCP server returned data not matching its schema, but continuing: {e}")
# Try to extract result from error message or return a success message
return "Operation completed (server returned unexpected format)"
raise
except Exception as e:
logger.error(f"Failed to call MCP tool '{tool_name}': {e!s}", exc_info=True)
return f"Error calling tool: {e!s}"


async def test_mcp_connection(
command: str, args: list[str], env: dict[str, str] | None = None
) -> dict[str, Any]:
"""Test connection to an MCP server and fetch available tools.

Args:
command: Command to spawn the MCP server
args: Arguments for the command
env: Optional environment variables

Returns:
Dict with connection status and available tools

"""
client = MCPClient(command, args, env)

try:
async with client.connect():
tools = await client.list_tools()
return {
"status": "success",
"message": f"Connected successfully. Found {len(tools)} tools.",
"tools": tools,
}
except Exception as e:
return {
"status": "error",
"message": f"Failed to connect: {e!s}",
"tools": [],
}
Loading