diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 6b82b892e..f706b7e18 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -130,6 +130,16 @@ jobs: cache-from: type=registry,ref=ghcr.io/${{ github.repository }}/agentstack-server:cache cache-to: type=registry,ref=ghcr.io/${{ github.repository }}/agentstack-server:cache,mode=max + - uses: docker/build-push-action@v6 + with: + context: ./apps/supergateway + file: ./apps/supergateway/Dockerfile + push: true + platforms: linux/amd64,linux/arm64 + tags: ghcr.io/${{ github.repository }}/supergateway:latest + cache-from: type=registry,ref=ghcr.io/${{ github.repository }}/supergateway:cache + cache-to: type=registry,ref=ghcr.io/${{ github.repository }}/supergateway:cache,mode=max + - run: mise run 'agentstack-ui:build:*' - uses: docker/build-push-action@v6 with: diff --git a/apps/agentstack-server/Dockerfile b/apps/agentstack-server/Dockerfile index 5527a89fa..ff4c4c296 100644 --- a/apps/agentstack-server/Dockerfile +++ b/apps/agentstack-server/Dockerfile @@ -1,7 +1,8 @@ -FROM python:3.13-alpine3.22 AS builder +FROM python:3.13-alpine3.22 ENV UV_COMPILE_BYTECODE=1 \ HOME="/tmp" \ AGENT_REGISTRY__LOCATIONS__FILE="file:///app/registry.yaml" +RUN apk add --no-cache kubectl RUN --mount=type=cache,target=/tmp/.cache/uv \ --mount=type=bind,source=dist/requirements.txt,target=/requirements.txt \ --mount=type=bind,from=ghcr.io/astral-sh/uv:0.9.5,source=/uv,target=/bin/uv \ diff --git a/apps/agentstack-server/src/agentstack_server/api/dependencies.py b/apps/agentstack-server/src/agentstack_server/api/dependencies.py index 2e9fa2449..94836f89c 100644 --- a/apps/agentstack-server/src/agentstack_server/api/dependencies.py +++ b/apps/agentstack-server/src/agentstack_server/api/dependencies.py @@ -27,6 +27,7 @@ from agentstack_server.service_layer.services.configurations import ConfigurationService from agentstack_server.service_layer.services.connector import ConnectorService from agentstack_server.service_layer.services.contexts import ContextService +from agentstack_server.service_layer.services.external_mcp_service import ExternalMcpService from agentstack_server.service_layer.services.files import FileService from agentstack_server.service_layer.services.mcp import McpService from agentstack_server.service_layer.services.model_providers import ModelProviderService @@ -50,6 +51,7 @@ AuthServiceDependency = Annotated[AuthService, Depends(lambda: di[AuthService])] ModelProviderServiceDependency = Annotated[ModelProviderService, Depends(lambda: di[ModelProviderService])] ConnectorServiceDependency = Annotated[ConnectorService, Depends(lambda: di[ConnectorService])] +ExternalMcpServiceDependency = Annotated[ExternalMcpService, Depends(lambda: di[ExternalMcpService])] logger = logging.getLogger(__name__) diff --git a/apps/agentstack-server/src/agentstack_server/api/routes/connectors.py b/apps/agentstack-server/src/agentstack_server/api/routes/connectors.py index d9079fd29..e83e502e9 100644 --- a/apps/agentstack-server/src/agentstack_server/api/routes/connectors.py +++ b/apps/agentstack-server/src/agentstack_server/api/routes/connectors.py @@ -9,6 +9,7 @@ from agentstack_server.api.dependencies import ( ConnectorServiceDependency, + ExternalMcpServiceDependency, RequiresPermissions, ) from agentstack_server.api.schema.connector import ( @@ -96,6 +97,7 @@ async def connect_connector( user=user.user, redirect_url=connect_request.redirect_url, callback_uri=str(request.url_for(oauth_callback.__name__)), + access_token=connect_request.access_token, ) ) @@ -125,11 +127,16 @@ async def oauth_callback( request: Request, state: str, connector_service: ConnectorServiceDependency, + external_mcp_service: ExternalMcpServiceDependency, error: str | None = None, error_description: str | None = None, ): - return await connector_service.oauth_callback( - callback_url=str(request.url), state=state, error=error, error_description=error_description + return await external_mcp_service.oauth_callback( + callback_url=str(request.url), + state=state, + error=error, + error_description=error_description, + probe_fn=lambda connector: connector_service.probe_connector(connector=connector), ) diff --git a/apps/agentstack-server/src/agentstack_server/api/schema/connector.py b/apps/agentstack-server/src/agentstack_server/api/schema/connector.py index 494679764..a636240d1 100644 --- a/apps/agentstack-server/src/agentstack_server/api/schema/connector.py +++ b/apps/agentstack-server/src/agentstack_server/api/schema/connector.py @@ -40,6 +40,7 @@ class ConnectorResponse(BaseModel): class ConnectorConnectRequest(BaseModel): redirect_url: AnyUrl | None = None + access_token: str | None = None class ConnectorPresetResponse(BaseModel): diff --git a/apps/agentstack-server/src/agentstack_server/bootstrap.py b/apps/agentstack-server/src/agentstack_server/bootstrap.py index 3b2d02d29..86f0e59aa 100644 --- a/apps/agentstack-server/src/agentstack_server/bootstrap.py +++ b/apps/agentstack-server/src/agentstack_server/bootstrap.py @@ -21,6 +21,7 @@ from agentstack_server.service_layer.build_manager import IProviderBuildManager from agentstack_server.service_layer.deployment_manager import IProviderDeploymentManager from agentstack_server.service_layer.unit_of_work import IUnitOfWorkFactory +from agentstack_server.utils.kubectl import Kubectl from agentstack_server.utils.utils import async_to_sync_isolated logger = logging.getLogger(__name__) @@ -89,5 +90,19 @@ def _set_di[T](service: type[T], instance: T): _set_di(ITextExtractionBackend, DoclingTextExtractionBackend(di[Configuration].text_extraction)) + # TODO: unify all services under single k8s client library (kr8s or kubectl wrapper) + _set_di( + Kubectl, + Kubectl( + kubeconfig=di[Configuration].k8s_kubeconfig, + namespace=di[Configuration].k8s_namespace + or ( + p.read_text().strip() + if (p := pathlib.Path("/var/run/secrets/kubernetes.io/serviceaccount/namespace")).is_file() + else "default" + ), + ), + ) + bootstrap_dependencies_sync = async_to_sync_isolated(bootstrap_dependencies) diff --git a/apps/agentstack-server/src/agentstack_server/configuration.py b/apps/agentstack-server/src/agentstack_server/configuration.py index 33f6d70e2..96369a2a8 100644 --- a/apps/agentstack-server/src/agentstack_server/configuration.py +++ b/apps/agentstack-server/src/agentstack_server/configuration.py @@ -219,11 +219,30 @@ class ManagedProviderConfiguration(BaseModel): ) +class ConnectorStdioPreset(BaseModel): + image: str + command: list[str] | None = None + args: list[str] | None = None + env: dict[str, str] = Field(default_factory=dict) + auth_token_env_name: str | None = None + + class ConnectorPreset(BaseModel): url: AnyUrl client_id: str | None = None client_secret: str | None = None metadata: dict[str, str] | None = None + stdio: ConnectorStdioPreset | None = None + + @model_validator(mode="after") + def validate_url_scheme(self): + if self.stdio is None: + if self.url.scheme not in ("http", "https"): + raise ValueError(f"Stdio is not configured, URL scheme must be http(s), got: {self.url.scheme}") + else: + if self.url.scheme != "mcp+stdio": + raise ValueError(f"Stdio is configured, URL scheme must be mcp+stdio, got: {self.url.scheme}") + return self class ConnectorConfiguration(BaseModel): diff --git a/apps/agentstack-server/src/agentstack_server/service_layer/services/connector.py b/apps/agentstack-server/src/agentstack_server/service_layer/services/connector.py index 6218ae9e6..6cf71c0b9 100644 --- a/apps/agentstack-server/src/agentstack_server/service_layer/services/connector.py +++ b/apps/agentstack-server/src/agentstack_server/service_layer/services/connector.py @@ -3,36 +3,30 @@ from __future__ import annotations -import html import logging from contextlib import AsyncExitStack -from datetime import timedelta -from secrets import token_urlsafe -from urllib.parse import parse_qs, urlencode, urlparse, urlunparse from uuid import UUID import httpx -from async_lru import alru_cache -from authlib.integrations.httpx_client import AsyncOAuth2Client -from authlib.oauth2.rfc8414 import AuthorizationServerMetadata, get_well_known_url from fastapi import Request, status -from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse +from fastapi.responses import StreamingResponse from kink import inject from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client -from pydantic import AnyUrl, BaseModel +from pydantic import AnyUrl from agentstack_server.configuration import Configuration, ConnectorPreset from agentstack_server.domain.models.common import Metadata from agentstack_server.domain.models.connector import ( Authorization, - AuthorizationCodeFlow, Connector, ConnectorState, Token, ) from agentstack_server.domain.models.user import User -from agentstack_server.exceptions import EntityNotFoundError, PlatformError +from agentstack_server.exceptions import PlatformError +from agentstack_server.service_layer.services.external_mcp_service import ExternalMcpService +from agentstack_server.service_layer.services.managed_mcp_service import ManagedMcpService from agentstack_server.service_layer.unit_of_work import IUnitOfWorkFactory logger = logging.getLogger(__name__) @@ -40,10 +34,18 @@ @inject class ConnectorService: - def __init__(self, uow: IUnitOfWorkFactory, configuration: Configuration): + def __init__( + self, + uow: IUnitOfWorkFactory, + configuration: Configuration, + managed_mcp: ManagedMcpService, + external_mcp: ExternalMcpService, + ): self._uow = uow self._configuration = configuration self._proxy_client = httpx.AsyncClient(timeout=None) + self._managed_mcp = managed_mcp + self._external_mcp = external_mcp async def create_connector( self, @@ -60,7 +62,19 @@ async def create_connector( "client_id must be present when client_secret is specified", status_code=status.HTTP_400_BAD_REQUEST ) - preset = self._find_preset(url=url) if match_preset else None + preset = ( + next((p for p in self._configuration.connector.presets if str(p.url) == str(url)), None) + if match_preset or url.scheme == "mcp+stdio" + else None + ) + if url.scheme not in ("http", "https", "mcp+stdio"): + raise PlatformError("Connector URL has an unsupported scheme", status_code=status.HTTP_400_BAD_REQUEST) + if not preset and url.scheme == "mcp+stdio": + raise PlatformError( + "Connector URL has mcp+stdio scheme but does not match a known connector preset", + status_code=status.HTTP_400_BAD_REQUEST, + ) + if preset: if not client_id: client_id = preset.client_id @@ -85,7 +99,7 @@ async def read_connector(self, *, connector_id: UUID, user: User | None = None) async def delete_connector(self, *, connector_id: UUID, user: User | None = None) -> None: async with self._uow() as uow: connector = await uow.connectors.get(connector_id=connector_id, user_id=user.id if user else None) - await self._revoke_auth_token(connector=connector) + await self._external_mcp.revoke_token(connector=connector) await uow.connectors.delete(connector_id=connector_id, user_id=user.id if user else None) await uow.commit() @@ -94,11 +108,25 @@ async def list_connectors(self, *, user: User | None = None) -> list[Connector]: return [c async for c in uow.connectors.list(user_id=user.id if user else None)] async def connect_connector( - self, *, connector_id: UUID, callback_uri: str, redirect_url: AnyUrl | None = None, user: User | None = None + self, + *, + connector_id: UUID, + callback_uri: str, + redirect_url: AnyUrl | None = None, + user: User | None = None, + access_token: str | None = None, ) -> Connector: async with self._uow() as uow: connector = await uow.connectors.get(connector_id=connector_id, user_id=user.id if user else None) + if access_token: + if not connector.auth: + connector.auth = Authorization() + connector.auth.token = Token(access_token=access_token, token_type="bearer") + + if self._managed_mcp.is_managed(connector=connector) and (preset := self._find_preset(url=connector.url)): + await self._managed_mcp.deploy(connector=connector, preset=preset) + try: await self.probe_connector(connector=connector) connector.state = ConnectorState.connected @@ -106,7 +134,7 @@ async def connect_connector( except Exception as err: if isinstance(err, httpx.HTTPStatusError): if err.response.status_code == status.HTTP_401_UNAUTHORIZED: - await self._bootstrap_auth( + await self._external_mcp.bootstrap_auth( connector=connector, callback_url=callback_uri, redirect_url=redirect_url ) connector.state = ConnectorState.auth_required @@ -145,103 +173,21 @@ async def disconnect_connector(self, *, connector_id: UUID, user: User | None = status_code=status.HTTP_400_BAD_REQUEST, ) - await self._revoke_auth_token(connector=connector) + await self._external_mcp.revoke_token(connector=connector) if connector.auth: connector.auth.flow = None connector.state = ConnectorState.disconnected connector.disconnect_reason = "Client request" + if self._managed_mcp.is_managed(connector=connector): + await self._managed_mcp.undeploy(connector=connector) + async with self._uow() as uow: await uow.connectors.update(connector=connector) await uow.commit() return connector - async def oauth_callback(self, *, callback_url: str, state: str, error: str | None, error_description: str | None): - redirect_url = None - try: - async with self._uow() as uow: - connector = await uow.connectors.get_by_auth(auth_state=state) - - assert connector.auth is not None - assert connector.auth.flow is not None - assert connector.auth.flow.type == "code" - - redirect_url = connector.auth.flow.client_redirect_uri - - if error: - return self._create_callback_response( - redirect_url=redirect_url, error=error, error_description=error_description - ) - - if connector.state not in (ConnectorState.auth_required): - return self._create_callback_response( - redirect_url=redirect_url, - error="invalid_request", - error_description="Connector must be in auth_required state.", - ) - - async with self._create_oauth_client(connector=connector) as client: - auth_metadata = await self._discover_auth_metadata(connector=connector) - if not auth_metadata: - raise RuntimeError("Authorization server no longer contains necessary metadata") - token_endpoint = auth_metadata.get("token_endpoint") - if not token_endpoint: - raise RuntimeError("Authorization server has no token endpoint in metadata") - token = await client.fetch_token( - token_endpoint, - authorization_response=callback_url, - code_verifier=connector.auth.flow.code_verifier, - redirect_uri=connector.auth.flow.redirect_uri, - ) - connector.auth.token = Token.model_validate(token) - connector.auth.token_endpoint = AnyUrl(str(token_endpoint)) - connector.auth.flow = None - try: - await self.probe_connector(connector=connector) - connector.state = ConnectorState.connected - except Exception as err: - logger.error("Failed to probe resource with a valid token", exc_info=True) - connector.state = ConnectorState.disconnected - connector.disconnect_reason = str(err) - - async with self._uow() as uow: - await uow.connectors.update(connector=connector) - await uow.commit() - - return self._create_callback_response(redirect_url=redirect_url) - except EntityNotFoundError: - return self._create_callback_response( - redirect_url=redirect_url, - error="invalid_request", - error_description="Invalid or expired login attempt.", - ) - except Exception: - logger.error("oAuth callback failed", exc_info=True) - return self._create_callback_response( - redirect_url=redirect_url, - error="server_error", - error_description="An internal error has occurred. Please try again later.", - ) - - def _create_callback_response( - self, *, redirect_url: AnyUrl | None, error: str | None = None, error_description: str | None = None - ): - if redirect_url: - if error: - parsed = urlparse(str(redirect_url)) - query_params = parse_qs(parsed.query) - query_params["error"] = [error] - if error_description: - query_params["error_description"] = [error_description] - modified_url = urlunparse(parsed._replace(query=urlencode(query_params, doseq=True))) - redirect_url = AnyUrl(modified_url) - return RedirectResponse(str(redirect_url)) - else: - return HTMLResponse( - _render_success() if not error else _render_failure(error, error_description=error_description) - ) - async def refresh_connector(self, *, connector_id: UUID, user: User | None = None) -> None: async with self._uow() as uow: connector = await uow.connectors.get(connector_id=connector_id, user_id=user.id if user else None) @@ -255,7 +201,7 @@ async def refresh_connector(self, *, connector_id: UUID, user: User | None = Non connector.disconnect_reason = None except Exception as err: if isinstance(err, httpx.HTTPStatusError) and err.response.status_code == status.HTTP_401_UNAUTHORIZED: - await self._revoke_auth_token(connector=connector) + await self._external_mcp.revoke_token(connector=connector) if connector.auth: connector.auth.flow = None connector.state = ConnectorState.disconnected @@ -269,120 +215,23 @@ async def list_presets(self) -> list[ConnectorPreset]: return self._configuration.connector.presets def _find_preset(self, *, url: AnyUrl) -> ConnectorPreset | None: - for preset in self._configuration.connector.presets: - if preset.url == url: - return preset - return None - - async def _bootstrap_auth(self, *, connector: Connector, callback_url: str, redirect_url: AnyUrl | None) -> None: - auth_metadata = await self._discover_auth_metadata(connector=connector) - if not auth_metadata: - raise RuntimeError("Not authorization server found for the connector") - - if not connector.auth: - connector.auth = Authorization() - - await self._revoke_auth_token(connector=connector) - code_verifier = token_urlsafe(64) - - await self._ensure_oauth_client_registered(connector=connector, redirect_uri=callback_url) - - async with self._create_oauth_client(connector=connector) as client: - uri, state = client.create_authorization_url( - auth_metadata.get("authorization_endpoint"), code_verifier=code_verifier, redirect_uri=callback_url - ) - connector.auth.flow = AuthorizationCodeFlow( - authorization_endpoint=uri, - state=state, - code_verifier=code_verifier, - redirect_uri=callback_url, - client_redirect_uri=redirect_url, - ) - - async def _revoke_auth_token(self, *, connector: Connector) -> None: - if not connector.auth or not connector.auth.token: - return - - if connector.auth.token: - try: - async with self._create_oauth_client(connector=connector) as client: - auth_metadata = await self._discover_auth_metadata(connector=connector) - if not auth_metadata: - raise RuntimeError("Authorization server no longer contains necessary metadata") - revoke_endpoint = auth_metadata.get("revocation_endpoint") - if not isinstance(revoke_endpoint, str): - raise RuntimeError("Authorization server does not support token revocation") - await client.revoke_token(revoke_endpoint, token=connector.auth.token.access_token) - except Exception: - logger.warning("Token revocation failed", exc_info=True) - - connector.auth.token = None - connector.auth.token_endpoint = None - async with self._uow() as uow: - await uow.connectors.update(connector=connector) - await uow.commit() - - def _create_client( - self, *, connector: Connector, headers: dict[str, str] | None = None, timeout: int | None = None - ) -> httpx.AsyncClient: - if not connector.auth or not connector.auth.token: - return httpx.AsyncClient(base_url=str(connector.url), headers=headers, timeout=timeout) - else: - return self._create_oauth_client(connector=connector) - - def _create_oauth_client( - self, *, connector: Connector, headers: dict[str, str] | None = None, timeout: int | None = None - ) -> AsyncOAuth2Client: - if not connector.auth: - raise RuntimeError("Connector does not support auth") - - async def update_token(token, refresh_token=None, access_token=None): - if not connector.auth: - raise RuntimeError("Authorization has been removed from the connector") - connector.auth.token = Token.model_validate(token) - async with self._uow() as uow: - await uow.connectors.update(connector=connector) - await uow.commit() - - return AsyncOAuth2Client( - client_id=connector.auth.client_id, - client_secret=connector.auth.client_secret, - token=connector.auth.token.model_dump() if connector.auth.token else None, - update_token=update_token, - code_challenge_method="S256", - headers=headers, - timeout=timeout, - leeway=60, # A job probes connectors every 30 seconds, ensuring the token is valid roughly for at least 30 seconds per request. - token_endpoint=str(connector.auth.token_endpoint), - ) - - async def _discover_auth_metadata(self, *, connector: Connector) -> AuthorizationServerMetadata | None: - resource_metadata = await _discover_resource_metadata(str(connector.url)) - if not resource_metadata or not resource_metadata.authorization_servers: - return None - auth_metadata = await _discover_auth_metadata(resource_metadata.authorization_servers[0]) - return auth_metadata - - async def _ensure_oauth_client_registered(self, *, connector: Connector, redirect_uri: str) -> Connector: - if not connector.auth: - raise RuntimeError("Authoriztion hasn't been activated for connector") - if not connector.auth.client_id: - registration_response = await _register_client(str(connector.url), redirect_uri=redirect_uri) - async with self._uow() as uow: - connector.auth.client_id = registration_response.client_id - connector.auth.client_secret = registration_response.client_secret - await uow.connectors.update(connector=connector) - await uow.commit() - return connector + return next((p for p in self._configuration.connector.presets if str(p.url) == str(url)), None) async def probe_connector(self, *, connector: Connector): def client_factory(headers=None, timeout=None, auth=None): assert auth is None - return self._create_client(connector=connector, headers=headers, timeout=timeout) + return self._external_mcp.create_http_client(connector=connector, headers=headers, timeout=timeout) try: async with ( - streamablehttp_client(str(connector.url), httpx_client_factory=client_factory) as (read, write, _), + streamablehttp_client( + ( + f"{self._managed_mcp.get_service_url(connector=connector)}/mcp" + if self._managed_mcp.is_managed(connector=connector) + else str(connector.url) + ), + httpx_client_factory=client_factory, + ) as (read, write, _), ClientSession(read, write) as session, ): await session.initialize() @@ -394,27 +243,31 @@ def client_factory(headers=None, timeout=None, auth=None): async def mcp_proxy(self, *, connector_id: UUID, request: Request, user: User | None = None): connector = await self.read_connector(connector_id=connector_id, user=user) - forward_headers = { - key: request.headers[key] - for key in ["accept", "content-type", "mcp-protocol-version", "mcp-session-id", "last-event-id"] - if key in request.headers - } + auth_headers = {} + if ( + connector.auth + and connector.state == ConnectorState.connected + and connector.auth.token + and connector.auth.token.token_type == "bearer" + ): + auth_headers["authorization"] = f"Bearer {connector.auth.token.access_token}" exit_stack = AsyncExitStack() try: response = await exit_stack.enter_async_context( self._proxy_client.stream( request.method, - str(connector.url), - headers=forward_headers - | ( - {"authorization": f"Bearer {connector.auth.token.access_token}"} - if connector.state == ConnectorState.connected - and connector.auth - and connector.auth.token - and connector.auth.token.token_type == "bearer" - else {} + ( + f"{self._managed_mcp.get_service_url(connector=connector)}/mcp" + if self._managed_mcp.is_managed(connector=connector) + else str(connector.url) ), + headers={ + key: request.headers[key] + for key in ["accept", "content-type", "mcp-protocol-version", "mcp-session-id", "last-event-id"] + if key in request.headers + } + | auth_headers, content=request.stream(), ) ) @@ -430,120 +283,3 @@ async def stream_fn(): except BaseException: await exit_stack.pop_all().aclose() raise - - -@alru_cache(ttl=timedelta(days=1).seconds) -async def _register_client(resource_server_url: str, *, redirect_uri: str) -> _ClientRegistrationResponse: - resource_metadata = await _discover_resource_metadata(resource_server_url) - if not resource_metadata or not resource_metadata.authorization_servers: - raise RuntimeError("Resource server metadata not found") - auth_metadata = await _discover_auth_metadata(resource_metadata.authorization_servers[0]) - if not auth_metadata: - raise RuntimeError("Authorization server metadata not found") - registration_endpoint = auth_metadata.get("registration_endpoint") - if not isinstance(registration_endpoint, str): - raise RuntimeError("Authorization server does not support dynamic client registration") - async with httpx.AsyncClient() as client: - response = await client.post( - str(registration_endpoint), - json={"client_name": "Agent Stack", "redirect_uris": [redirect_uri]}, - ) - response.raise_for_status() - registration_response = _ClientRegistrationResponse.model_validate(response.json()) - return registration_response - - -@alru_cache(ttl=timedelta(minutes=10).seconds) -async def _discover_auth_metadata(authorization_server_url: str) -> AuthorizationServerMetadata | None: - url = get_well_known_url(authorization_server_url, external=True) - async with httpx.AsyncClient(headers={"Accept": "application/json"}, follow_redirects=True) as client: - response = await client.get(url) - if response.status_code == status.HTTP_404_NOT_FOUND: - return None - response.raise_for_status() - metadata = AuthorizationServerMetadata(response.json()) - metadata.validate() - return metadata - - -@alru_cache(ttl=timedelta(minutes=10).seconds) -async def _discover_resource_metadata(resource_url: str) -> _ResourceServerMetadata | None: - parsed = urlparse(resource_url) - resource_root_url = f"{parsed.scheme}://{parsed.netloc}" - - # RFC9728 hasn't been implemented yet in authlib - # Reusing util from RFC8414 - path_url = get_well_known_url(resource_url, external=True, suffix="oauth-protected-resource") - root_url = get_well_known_url(resource_root_url, external=True, suffix="oauth-protected-resource") - urls = [path_url] - if path_url != root_url: # avoid duplicate - urls.append(root_url) - exceptions = [] - async with httpx.AsyncClient( - headers={"Accept": "application/json"}, - follow_redirects=True, - ) as client: - for url in urls: - try: - response = await client.get(url) - response.raise_for_status() - return _ResourceServerMetadata.model_validate(response.json()) - except Exception as exc: - exceptions.append(exc) - logger.warning( - "Resource metadata discovery failed", - exc_info=ExceptionGroup(f"Unable to discover metadata for resource {resource_url}", exceptions), - ) - return None - - -def _render_success(): - return """ - - - - Agent Stack - - - -

Authorization Successful

-

You can now close this window and return to your application.

- - - -""" - - -def _render_failure(error: str, error_description: str | None): - return ( - """ - - - - Agent Stack - - - -

Authorization Failed

-

""" - + html.escape(error_description or error) - + """

- -""" - ) - - -class _ResourceServerMetadata(BaseModel): - authorization_servers: list[str] - - -class _ClientRegistrationResponse(BaseModel): - client_id: str - client_secret: str | None = None diff --git a/apps/agentstack-server/src/agentstack_server/service_layer/services/external_mcp_service.py b/apps/agentstack-server/src/agentstack_server/service_layer/services/external_mcp_service.py new file mode 100644 index 000000000..efbb2d2cf --- /dev/null +++ b/apps/agentstack-server/src/agentstack_server/service_layer/services/external_mcp_service.py @@ -0,0 +1,342 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import logging +from collections.abc import Callable +from datetime import timedelta +from secrets import token_urlsafe +from types import CoroutineType +from typing import Any +from urllib.parse import parse_qs, urlencode, urlparse, urlunparse + +import httpx +from async_lru import alru_cache +from authlib.integrations.httpx_client import AsyncOAuth2Client +from authlib.oauth2.rfc8414 import AuthorizationServerMetadata, get_well_known_url +from fastapi import status +from fastapi.responses import HTMLResponse, RedirectResponse +from kink import inject +from pydantic import AnyUrl, BaseModel + +from agentstack_server.domain.models.connector import ( + Authorization, + AuthorizationCodeFlow, + Connector, + ConnectorState, + Token, +) +from agentstack_server.exceptions import EntityNotFoundError +from agentstack_server.service_layer.unit_of_work import IUnitOfWorkFactory + +logger = logging.getLogger(__name__) + + +@inject +class ExternalMcpService: + def __init__(self, uow: IUnitOfWorkFactory): + self._uow = uow + + async def bootstrap_auth(self, *, connector: Connector, callback_url: str, redirect_url: AnyUrl | None) -> None: + if not (auth_metadata := await self._discover_auth_metadata(connector=connector)): + raise RuntimeError("Not authorization server found for the connector") + + if not connector.auth: + connector.auth = Authorization() + + await self.revoke_token(connector=connector) + await self._ensure_client_registered(connector=connector, redirect_uri=callback_url) + + async with self._create_client(connector=connector) as client: + uri, state = client.create_authorization_url( + auth_metadata.get("authorization_endpoint"), + code_verifier=(code_verifier := token_urlsafe(64)), + redirect_uri=callback_url, + ) + connector.auth.flow = AuthorizationCodeFlow( + authorization_endpoint=uri, + state=state, + code_verifier=code_verifier, + redirect_uri=callback_url, + client_redirect_uri=redirect_url, + ) + + async def revoke_token(self, *, connector: Connector) -> None: + if not connector.auth or not connector.auth.token: + return + + try: + async with self._create_client(connector=connector) as client: + if not (auth_metadata := await self._discover_auth_metadata(connector=connector)): + raise RuntimeError("Authorization server no longer contains necessary metadata") + if not isinstance(revoke_endpoint := auth_metadata.get("revocation_endpoint"), str): + raise RuntimeError("Authorization server does not support token revocation") + await client.revoke_token(revoke_endpoint, token=connector.auth.token.access_token) + except Exception: + logger.warning("Token revocation failed", exc_info=True) + + connector.auth.token = None + connector.auth.token_endpoint = None + async with self._uow() as uow: + await uow.connectors.update(connector=connector) + await uow.commit() + + def create_http_client( + self, *, connector: Connector, headers: dict[str, str] | None = None, timeout: int | None = None + ) -> httpx.AsyncClient: + if not connector.auth or not connector.auth.token: + return httpx.AsyncClient( + headers=headers, + timeout=timeout or 30, + base_url=str(connector.url), + ) + else: + return self._create_client(connector=connector, headers=headers, timeout=timeout) + + def _create_client( + self, *, connector: Connector, headers: dict[str, str] | None = None, timeout: int | None = None + ) -> AsyncOAuth2Client: + if not connector.auth: + raise RuntimeError("Connector does not support auth") + + async def update_token(token, refresh_token=None, access_token=None): + if not connector.auth: + raise RuntimeError("Authorization has been removed from the connector") + connector.auth.token = Token.model_validate(token) + async with self._uow() as uow: + await uow.connectors.update(connector=connector) + await uow.commit() + + return AsyncOAuth2Client( + client_id=connector.auth.client_id, + client_secret=connector.auth.client_secret, + token=connector.auth.token.model_dump() if connector.auth.token else None, + update_token=update_token, + code_challenge_method="S256", + headers=headers, + timeout=timeout, + leeway=60, + token_endpoint=str(connector.auth.token_endpoint), + ) + + async def _discover_auth_metadata(self, *, connector: Connector) -> AuthorizationServerMetadata | None: + resource_metadata = await _discover_resource_metadata(str(connector.url)) + if not resource_metadata or not resource_metadata.authorization_servers: + return None + return await _discover_auth_metadata(resource_metadata.authorization_servers[0]) + + async def _ensure_client_registered(self, *, connector: Connector, redirect_uri: str) -> None: + if not connector.auth: + raise RuntimeError("Authoriztion hasn't been activated for connector") + if not connector.auth.client_id: + registration_response = await _register_client(str(connector.url), redirect_uri=redirect_uri) + async with self._uow() as uow: + connector.auth.client_id = registration_response.client_id + connector.auth.client_secret = registration_response.client_secret + await uow.connectors.update(connector=connector) + await uow.commit() + + async def fetch_token_from_callback(self, *, connector: Connector, callback_url: str) -> Token: + async with self._create_client(connector=connector) as client: + if not (auth_metadata := await self._discover_auth_metadata(connector=connector)): + raise RuntimeError("Authorization server no longer contains necessary metadata") + if not (token_endpoint := auth_metadata.get("token_endpoint")): + raise RuntimeError("Authorization server has no token endpoint in metadata") + assert connector.auth and connector.auth.flow + token = Token.model_validate( + await client.fetch_token( + token_endpoint, + authorization_response=callback_url, + code_verifier=connector.auth.flow.code_verifier, + redirect_uri=connector.auth.flow.redirect_uri, + ) + ) + connector.auth.token = token + connector.auth.token_endpoint = AnyUrl(str(token_endpoint)) + connector.auth.flow = None + return token + + async def oauth_callback( + self, + *, + callback_url: str, + state: str, + error: str | None, + error_description: str | None, + probe_fn: Callable[[Connector], CoroutineType[Any, Any, None]], + ): + redirect_url = None + try: + async with self._uow() as uow: + connector = await uow.connectors.get_by_auth(auth_state=state) + + assert connector.auth is not None + assert connector.auth.flow is not None + assert connector.auth.flow.type == "code" + + redirect_url = connector.auth.flow.client_redirect_uri + + if error: + return self._create_callback_response( + redirect_url=redirect_url, error=error, error_description=error_description + ) + + if connector.state not in (ConnectorState.auth_required,): + return self._create_callback_response( + redirect_url=redirect_url, + error="invalid_request", + error_description="Connector must be in auth_required state.", + ) + + await self.fetch_token_from_callback(connector=connector, callback_url=callback_url) + try: + await probe_fn(connector) + connector.state = ConnectorState.connected + except Exception as err: + logger.error("Failed to probe resource with a valid token", exc_info=True) + connector.state = ConnectorState.disconnected + connector.disconnect_reason = str(err) + + async with self._uow() as uow: + await uow.connectors.update(connector=connector) + await uow.commit() + + return self._create_callback_response(redirect_url=redirect_url) + except EntityNotFoundError: + return self._create_callback_response( + redirect_url=redirect_url, + error="invalid_request", + error_description="Invalid or expired login attempt.", + ) + except Exception: + logger.error("oAuth callback failed", exc_info=True) + return self._create_callback_response( + redirect_url=redirect_url, + error="server_error", + error_description="An internal error has occurred. Please try again later.", + ) + + def _create_callback_response( + self, *, redirect_url: AnyUrl | None, error: str | None = None, error_description: str | None = None + ): + if redirect_url: + if error: + parsed = urlparse(str(redirect_url)) + query_params = parse_qs(parsed.query) + query_params["error"] = [error] + if error_description: + query_params["error_description"] = [error_description] + redirect_url = AnyUrl(urlunparse(parsed._replace(query=urlencode(query_params, doseq=True)))) + return RedirectResponse(str(redirect_url)) + return HTMLResponse(_render_success() if not error else _render_failure(error, error_description)) + + +@alru_cache(ttl=timedelta(days=1).seconds) +async def _register_client(resource_server_url: str, *, redirect_uri: str) -> _ClientRegistrationResponse: + if ( + not (resource_metadata := await _discover_resource_metadata(resource_server_url)) + or not resource_metadata.authorization_servers + ): + raise RuntimeError("Resource server metadata not found") + if not (auth_metadata := await _discover_auth_metadata(resource_metadata.authorization_servers[0])): + raise RuntimeError("Authorization server metadata not found") + if not isinstance(registration_endpoint := auth_metadata.get("registration_endpoint"), str): + raise RuntimeError("Authorization server does not support dynamic client registration") + async with httpx.AsyncClient() as client: + response = await client.post( + str(registration_endpoint), + json={"client_name": "Agent Stack", "redirect_uris": [redirect_uri]}, + ) + response.raise_for_status() + return _ClientRegistrationResponse.model_validate(response.json()) + + +@alru_cache(ttl=timedelta(minutes=10).seconds) +async def _discover_auth_metadata(authorization_server_url: str) -> AuthorizationServerMetadata | None: + url = get_well_known_url(authorization_server_url, external=True) + async with httpx.AsyncClient(headers={"Accept": "application/json"}, follow_redirects=True) as client: + response = await client.get(url) + if response.status_code == status.HTTP_404_NOT_FOUND: + return None + response.raise_for_status() + metadata = AuthorizationServerMetadata(response.json()) + metadata.validate() + return metadata + + +@alru_cache(ttl=timedelta(minutes=10).seconds) +async def _discover_resource_metadata(resource_url: str) -> _ResourceServerMetadata | None: + parsed = urlparse(resource_url) + path_url = get_well_known_url(resource_url, external=True, suffix="oauth-protected-resource") + root_url = get_well_known_url( + f"{parsed.scheme}://{parsed.netloc}", external=True, suffix="oauth-protected-resource" + ) + exceptions = [] + async with httpx.AsyncClient(headers={"Accept": "application/json"}, follow_redirects=True) as client: + for url in [path_url] if path_url == root_url else [path_url, root_url]: + try: + response = await client.get(url) + response.raise_for_status() + return _ResourceServerMetadata.model_validate(response.json()) + except Exception as exc: + exceptions.append(exc) + logger.warning( + "Resource metadata discovery failed", + exc_info=ExceptionGroup(f"Unable to discover metadata for resource {resource_url}", exceptions), + ) + return None + + +class _ResourceServerMetadata(BaseModel): + authorization_servers: list[str] + + +class _ClientRegistrationResponse(BaseModel): + client_id: str + client_secret: str | None = None + + +def _render_success(): + return """ + + + + Agent Stack + + + +

Authorization Successful

+

You can now close this window and return to your application.

+ + + +""" + + +def _render_failure(error: str, error_description: str | None): + import html + + return ( + """ + + + + Agent Stack + + + +

Authorization Failed

+

""" + + html.escape(error_description or error) + + """

+ +""" + ) diff --git a/apps/agentstack-server/src/agentstack_server/service_layer/services/managed_mcp_service.py b/apps/agentstack-server/src/agentstack_server/service_layer/services/managed_mcp_service.py new file mode 100644 index 000000000..50b5ccf98 --- /dev/null +++ b/apps/agentstack-server/src/agentstack_server/service_layer/services/managed_mcp_service.py @@ -0,0 +1,231 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import logging + +from fastapi import status +from kink import inject +from pydantic import AnyUrl + +from agentstack_server.configuration import Configuration, ConnectorPreset +from agentstack_server.domain.models.connector import Connector +from agentstack_server.exceptions import PlatformError +from agentstack_server.utils.kubectl import Kubectl + +logger = logging.getLogger(__name__) + + +@inject +class ManagedMcpService: + def __init__(self, configuration: Configuration, kubectl: Kubectl): + self._configuration = configuration + self._kubectl = kubectl + + def find_preset(self, url: AnyUrl) -> ConnectorPreset | None: + return next((p for p in self._configuration.connector.presets if str(p.url) == str(url)), None) + + def is_managed(self, *, connector: Connector) -> bool: + return (preset := self.find_preset(connector.url)) is not None and preset.url.scheme == "mcp+stdio" + + def get_service_url(self, *, connector: Connector) -> str: + return f"http://managed-mcp-supergateway-{connector.id.hex[:16]}.{self._kubectl._default_kwargs['namespace']}.svc.cluster.local:8080" + + async def deploy(self, *, connector: Connector, preset: ConnectorPreset) -> None: + assert preset.stdio + try: + await self._kubectl.apply( + "-f", + "-", + input={ + "apiVersion": "v1", + "kind": "List", + "items": [ + { + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": { + "name": f"managed-mcp-server-{connector.id.hex[:16]}", + "labels": { + "app": "managed-mcp-server", + "connector-id": str(connector.id), + }, + }, + "spec": { + "replicas": 1, + "selector": { + "matchLabels": { + "app": "managed-mcp-server", + "connector-id": str(connector.id), + } + }, + "template": { + "metadata": { + "labels": { + "app": "managed-mcp-server", + "connector-id": str(connector.id), + } + }, + "spec": { + "automountServiceAccountToken": False, + "containers": [ + { + "name": "mcp-server", + "image": preset.stdio.image, + "imagePullPolicy": "IfNotPresent", + "stdin": True, + "tty": False, + **( + {} + if not preset.stdio.command + else {"command": preset.stdio.command} + ), + **({} if not preset.stdio.args else {"args": preset.stdio.args}), + **( + {} + if not preset.stdio.env + and not ( + connector.auth + and connector.auth.token + and preset.stdio.auth_token_env_name + ) + else { + "env": [ + {"name": k, "value": v} + for k, v in (preset.stdio.env or {}).items() + ] + + ( + [ + { + "name": preset.stdio.auth_token_env_name, + "value": connector.auth.token.access_token, + } + ] + if connector.auth + and connector.auth.token + and preset.stdio.auth_token_env_name + else [] + ) + } + ), + }, + ], + }, + }, + }, + }, + { + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": { + "name": f"managed-mcp-supergateway-{connector.id.hex[:16]}", + "labels": { + "app": "managed-mcp-supergateway", + "connector-id": str(connector.id), + }, + }, + "spec": { + "replicas": 1, + "selector": { + "matchLabels": { + "app": "managed-mcp-supergateway", + "connector-id": str(connector.id), + } + }, + "template": { + "metadata": { + "labels": { + "app": "managed-mcp-supergateway", + "connector-id": str(connector.id), + } + }, + "spec": { + "serviceAccountName": "managed-mcp-supergateway", + "containers": [ + { + "name": "supergateway", + "image": "ghcr.io/i-am-bee/agentstack/supergateway:latest", + "imagePullPolicy": "IfNotPresent", + "command": ["supergateway"], + "args": [ + "--stdio", + f"kubectl attach $(kubectl get pod -l app=managed-mcp-server,connector-id={connector.id} -o jsonpath='{{.items[0].metadata.name}}') -c mcp-server --stdin --tty=false", + "--outputTransport", + "streamableHttp", + "--stateful", + "--port", + "8080", + "--streamableHttpPath", + "/mcp", + "--logLevel", + "info", + ], + "ports": [{"containerPort": 8080, "protocol": "TCP"}], + "readinessProbe": { + "tcpSocket": {"port": 8080}, + "initialDelaySeconds": 2, + "periodSeconds": 5, + "failureThreshold": 3, + }, + }, + ], + }, + }, + }, + }, + { + "apiVersion": "v1", + "kind": "Service", + "metadata": {"name": f"managed-mcp-supergateway-{connector.id.hex[:16]}"}, + "spec": { + "selector": { + "app": "managed-mcp-supergateway", + "connector-id": str(connector.id), + }, + "ports": [ + { + "name": "http", + "port": 8080, + "targetPort": 8080, + "protocol": "TCP", + } + ], + }, + }, + ], + }, + ) + except RuntimeError as err: + raise PlatformError( + f"Failed to create MCP server deployment: {err}", + status_code=status.HTTP_502_BAD_GATEWAY, + ) from err + + try: + await self._kubectl.wait( + f"deployment/managed-mcp-server-{connector.id.hex[:16]}", + _for="condition=Available", + timeout="60s", + ) + await self._kubectl.wait( + f"deployment/managed-mcp-supergateway-{connector.id.hex[:16]}", + _for="condition=Available", + timeout="60s", + ) + except RuntimeError as err: + raise PlatformError( + "Managed MCP deployment failed to become ready", + status_code=status.HTTP_504_GATEWAY_TIMEOUT, + ) from err + + async def undeploy(self, *, connector: Connector) -> None: + for resource_type, resource_name in ( + ("deployment", f"managed-mcp-server-{connector.id.hex[:16]}"), + ("deployment", f"managed-mcp-supergateway-{connector.id.hex[:16]}"), + ("service", f"managed-mcp-supergateway-{connector.id.hex[:16]}"), + ): + try: + await self._kubectl.delete(resource_type, resource_name, ignore_not_found=True) + except RuntimeError as err: + logger.warning("Failed to delete %s/%s: %s", resource_type, resource_name, err) diff --git a/apps/agentstack-server/src/agentstack_server/utils/kubectl.py b/apps/agentstack-server/src/agentstack_server/utils/kubectl.py new file mode 100644 index 000000000..37db32328 --- /dev/null +++ b/apps/agentstack-server/src/agentstack_server/utils/kubectl.py @@ -0,0 +1,170 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +import json +import logging +import pathlib +import shlex +from collections.abc import Awaitable, Callable, Mapping +from inspect import signature +from typing import Any, Literal, get_overloads, overload + +logger = logging.getLogger("kubectl") + + +class Kubectl: + """ + A dumb wrapper around the `kubectl` CLI. It depends on the `kubectl` binary being available in the PATH. + Unlike the official and unofficial Python clients for Kubernetes, this wrapper is both async and fully supports `kubectl exec`. + + The sub-command of `kubectl` translates to method name, the rest of the arguments are passed as string arguments: + `kubectl.exec("my-pod", "--", "ls", "-l")` -> `kubectl exec my-pod -- ls -l` + + Keyword arguments are passed as `--key=value` or `--key` if the value is `True`: + `kubectl.delete("pod", "my-pod", now=True, grace_period=0)` -> `kubectl delete pod my-pod --now --grace-period=0` + + For commands that support JSON, `--output=json` is automatically added and the output is parsed and returned as a Python dict: + `kubectl.get("pod", "my-pod")` -> `kubectl get pod my-pod --output=json` + + As a special case, `exec_raw` is provided to run a command and get the `asyncio.subprocess.Process` object back. This is useful for streaming data to/from the process. + + The keyword arguments passed to the constructor are used as default arguments for all commands. Useful for setting the namespace or context. + """ + + _default_kwargs: dict[str, str | bool] + + def __init__(self, **kwargs: str | bool | pathlib.Path | None): + self._default_kwargs = self._fix_kwargs(kwargs) + + def _fix_kwargs(self, kwargs: Mapping[str, str | bool | pathlib.Path | None]) -> dict[str, str | bool]: + return { + key.removeprefix("_"): str(value) if isinstance(value, pathlib.Path) else value + for key, value in kwargs.items() + if value + } + + async def _spawn_process(self, *args: str, **kwargs: str | bool) -> asyncio.subprocess.Process: + dashdash_position = next((i for i, arg in enumerate(args) if arg == "--"), len(args)) + all_args = ( + list(args[:dashdash_position]) + + [ + (f"--{key}={value}" if value is not True else f"--{key}") + for key, value in (self._default_kwargs | self._fix_kwargs(kwargs)).items() + ] + + list(args[dashdash_position:]) + ) + logger.info("kubectl %s", shlex.join(all_args)) + return await asyncio.create_subprocess_exec( + "kubectl", + *all_args, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + async def _command( + self, + *args: str, + input: bytes | str | list | dict | None = None, + **kwargs: str | bool, + ) -> str: + process = await self._spawn_process(*args, **kwargs) + if input and process.stdin: + if isinstance(input, (list, dict)): + input = json.dumps(input) + if isinstance(input, str): + input = input.encode() + process.stdin.write(input) + process.stdin.write_eof() + stdout, stderr = await process.communicate() + if process.returncode != 0: + raise RuntimeError(f"Error ({process.returncode}) running kubectl command: {stderr.decode()}") + return stdout.decode() + + @overload + def __getattr__( + self, + name: Literal[ + "annotate", + "apply", + "autoscale", + "create", + "edit", + "events", + "expose", + "get", + "label", + "patch", + "replace", + "run", + "scale", + "taint", + "version", + "wait", + ], + ) -> Callable[..., Awaitable[dict]]: + async def command_json( + *args: str, + input: bytes | str | list | dict | None = None, + **kwargs: str | bool | None, + ) -> dict: + output_str = await self._command(name.replace("_", "-"), *args, input=input, output="json", **kwargs) + return json.loads(output_str) + + return command_json + + @overload + def __getattr__( + self, + name: Literal[ + "api_resources", + "api_versions", + "attach", + "auth", + "certificate", + "cluster_info", + "completion", + "config", + "cordon", + "cp", + "ctx", + "debug", + "delete", + "describe", + "diff", + "drain", + "exec", + "explain", + "help", + "kustomize", + "logs", + "ns", + "options", + "plugin", + "port_forward", + "proxy", + "rollout", + "set", + "top", + "uncordon", + ], + ) -> Callable[..., Awaitable[str]]: + async def command( + *args: str, + input: bytes | str | list | dict | None = None, + **kwargs: str | bool | None, + ) -> str: + return await self._command(name.replace("_", "-"), *args, input=input, **kwargs) + + return command + + def __getattr__(self, name: str) -> Callable[..., Awaitable[Any]]: + getattr_overloads = get_overloads(self.__getattr__) + for getattr_overload in getattr_overloads: + if name in signature(getattr_overload).parameters["name"].annotation.__args__: + return getattr_overload(self, name) # type: ignore + raise AttributeError(f"Command {name} not found") + + async def exec_raw(self, *args, **kwargs: str | bool) -> asyncio.subprocess.Process: + return await self._spawn_process("exec", *args, **kwargs) diff --git a/apps/agentstack-server/tasks.toml b/apps/agentstack-server/tasks.toml index d152f12ff..a71236738 100644 --- a/apps/agentstack-server/tasks.toml +++ b/apps/agentstack-server/tasks.toml @@ -78,7 +78,7 @@ outputs = { auto = true } # run ["agentstack-server:run"] -depends = ["agentstack-server:setup"] +depends = ["agentstack-server:setup", "supergateway:build"] dir = "{{config_root}}/apps/agentstack-server" run = "uv run agentstack-server" @@ -266,7 +266,12 @@ curl http://localhost:8333 >/dev/null 2>&1 && echo "Another instance at localhos --set auth.basic.enabled="true" \ --set auth.basic.adminPassword="test-password" \ --set auth.jwtSecretKey="test-secret-key" \ - --set docling.enabled=true + --set docling.enabled=true \ + --set providerBuilds.enabled=true \ + --set localDockerRegistry.enabled=true \ + --set connector.presets[0].url=mcp+stdio://test \ + --set connector.presets[0].stdio.image=mcp/aws-documentation \ + --set connector.presets[0].metadata.name="Test MCP Server" eval "$( {{ mise_bin }} run agentstack:shell --vm-name="$VM_NAME" )" @@ -276,6 +281,9 @@ export DB_URL="postgresql+asyncpg://agentstack-user:password@localhost:5432/agen export LLM_API_BASE="${LLM_API_BASE:-http://host.docker.internal:11434/v1}" export OIDC__DISABLE_OIDC=true +echo "Waiting for agentstack-server deployment to be ready..." +kubectl wait --for=condition=available --timeout=300s deployment/agentstack-server + kubectl port-forward svc/postgresql 5432:5432 2>/dev/null 1>&2 & uv run pytest -m e2e result=$? @@ -288,9 +296,9 @@ if [ $result -ne 0 ]; then kubectl get event fi -{{ mise_bin }} run agentstack-cli:run -- platform delete --vm-name=${VM_NAME} -kill %1 -exit $result +# {{ mise_bin }} run agentstack-cli:run -- platform delete --vm-name=${VM_NAME} +# kill %1 +# exit $result """ ["agentstack-server:test:integration"] diff --git a/apps/agentstack-server/tests/e2e/routes/test_connectors.py b/apps/agentstack-server/tests/e2e/routes/test_connectors.py new file mode 100644 index 000000000..0730f90fe --- /dev/null +++ b/apps/agentstack-server/tests/e2e/routes/test_connectors.py @@ -0,0 +1,149 @@ +# Copyright 2025 © BeeAI a Series of LF Projects, LLC +# SPDX-License-Identifier: Apache-2.0 + +import json +import logging + +import httpx +import pytest + +logger = logging.getLogger(__name__) + +pytestmark = pytest.mark.e2e + + +@pytest.mark.usefixtures("clean_up") +async def test_list_connector_presets(test_configuration): + async with httpx.AsyncClient( + base_url=test_configuration.server_url, + auth=("admin", "test-password"), + timeout=30.0, + ) as client: + response = await client.get("/api/v1/connectors/presets") + assert response.status_code == 200, f"Failed to list presets: {response.text}" + + data = response.json() + assert "items" in data + presets = data["items"] + + test_mcp_preset = next( + (p for p in presets if "mcp+stdio://test" in str(p["url"])), + None, + ) + + assert test_mcp_preset is not None, "Expected to find mcp+stdio://test preset" + assert test_mcp_preset["metadata"]["name"] == "Test MCP Server" + + +@pytest.mark.usefixtures("clean_up") +async def test_stdio_connector_happy_path(test_configuration): + async with httpx.AsyncClient( + base_url=test_configuration.server_url, + auth=("admin", "test-password"), + timeout=120.0, + ) as client: + logger.info("Creating stdio connector with URL mcp+stdio://test") + create_response = await client.post( + "/api/v1/connectors", + json={ + "url": "mcp+stdio://test", + "match_preset": True, + }, + ) + assert create_response.status_code == 201, f"Failed to create connector: {create_response.text}" + connector_data = create_response.json() + connector_id = connector_data["id"] + logger.info("Connector created: connector_id=%s state=%s", connector_id, connector_data["state"]) + + assert connector_data["url"] == "mcp+stdio://test" + assert connector_data["state"] == "created" + assert connector_data["metadata"]["name"] == "Test MCP Server" + + logger.info("Connecting to connector: connector_id=%s", connector_id) + connect_response = await client.post( + f"/api/v1/connectors/{connector_id}/connect", + json={}, + ) + assert connect_response.status_code == 200, f"Failed to connect: {connect_response.text}" + connect_data = connect_response.json() + logger.info("Connector connected successfully: connector_id=%s state=%s", connector_id, connect_data["state"]) + + assert connect_data["state"] == "connected" + + logger.info("Initializing MCP protocol: connector_id=%s", connector_id) + init_response = await client.post( + f"/api/v1/connectors/{connector_id}/mcp", + json={ + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": { + "protocolVersion": "2024-11-05", + "capabilities": {}, + "clientInfo": {"name": "test-client", "version": "1.0.0"}, + }, + }, + headers={"Accept": "application/json, text/event-stream"}, + ) + assert init_response.status_code == 200, f"Failed to initialize: {init_response.text}" + logger.info("MCP protocol initialized successfully: connector_id=%s", connector_id) + + # Extract session ID from response headers for stateful connections + session_id = init_response.headers.get("mcp-session-id") + logger.info("MCP session ID: %s", session_id) + + # Prepare headers with session ID for subsequent requests + session_headers = {"Accept": "application/json, text/event-stream"} + if session_id: + session_headers["mcp-session-id"] = session_id + + # Send initialized notification + logger.info("Sending initialized notification: connector_id=%s", connector_id) + await client.post( + f"/api/v1/connectors/{connector_id}/mcp", + json={ + "jsonrpc": "2.0", + "method": "notifications/initialized", + }, + headers=session_headers, + ) + + # List MCP tools via the proxy endpoint to verify it's working + logger.info("Listing MCP tools: connector_id=%s", connector_id) + mcp_response = await client.post( + f"/api/v1/connectors/{connector_id}/mcp", + json={ + "jsonrpc": "2.0", + "id": 2, + "method": "tools/list", + }, + headers=session_headers, + ) + assert mcp_response.status_code == 200, f"Failed to list tools: {mcp_response.text}" + logger.info(f"MCP RESPONSE: {mcp_response.text}") + mcp_data = json.loads(mcp_response.text.strip().removeprefix("event: message\ndata: ")) + + assert "result" in mcp_data + assert "tools" in mcp_data["result"] + tools = mcp_data["result"]["tools"] + assert len(tools) > 0, "Expected at least one tool from MCP server" + + tool_names = [tool["name"] for tool in tools] + logger.info( + "MCP tools retrieved: connector_id=%s tool_count=%d tool_names=%s", + connector_id, + len(tool_names), + tool_names, + ) + + assert len(tool_names) > 0, "Expected at least one tool from test MCP server" + + logger.info("Disconnecting connector: connector_id=%s", connector_id) + disconnect_response = await client.post(f"/api/v1/connectors/{connector_id}/disconnect") + assert disconnect_response.status_code == 200, f"Failed to disconnect: {disconnect_response.text}" + logger.info("Connector disconnected successfully: connector_id=%s", connector_id) + + logger.info("Deleting connector: connector_id=%s", connector_id) + delete_response = await client.delete(f"/api/v1/connectors/{connector_id}") + assert delete_response.status_code == 204, f"Failed to delete: {delete_response.text}" + logger.info("Connector deleted successfully: connector_id=%s", connector_id) diff --git a/apps/supergateway/.dockerignore b/apps/supergateway/.dockerignore new file mode 100644 index 000000000..72e8ffc0d --- /dev/null +++ b/apps/supergateway/.dockerignore @@ -0,0 +1 @@ +* diff --git a/apps/supergateway/Dockerfile b/apps/supergateway/Dockerfile new file mode 100644 index 000000000..12e778d17 --- /dev/null +++ b/apps/supergateway/Dockerfile @@ -0,0 +1,2 @@ +FROM node:23-alpine3.22 +RUN apk add --no-cache kubectl && npm install -g supergateway diff --git a/apps/supergateway/tasks.toml b/apps/supergateway/tasks.toml new file mode 100644 index 000000000..7c855f4f8 --- /dev/null +++ b/apps/supergateway/tasks.toml @@ -0,0 +1,5 @@ +# build + +["supergateway:build"] +dir = "{{config_root}}/apps/supergateway" +run = "docker build -f Dockerfile -t ghcr.io/i-am-bee/agentstack/supergateway:latest --load ." diff --git a/docs/experimental/connectors.mdx b/docs/experimental/connectors.mdx index 4cd22937c..3084be04c 100644 --- a/docs/experimental/connectors.mdx +++ b/docs/experimental/connectors.mdx @@ -51,9 +51,34 @@ stateDiagram-v2 Usual flow works as follows: 1. **Create**: Client creates a connector by calling `POST /api/v1/connectors` with MCP server URL. -2. **Connect**: Client initiates connection by calling `POST /api/v1/connectors/{id}/connect`. If the MCP server requires user authorization, the response will contain an authorization URL. -3. **Authorize**: User visits the authorization URL, authenticates and grants access. Authorization server redirects the user back to the platform with an authorization code. -4. **Complete**: Platform exchanges the authorization code for access and refresh tokens. Once completed, the connector is in `connected` state and ready to be used. +2. **Connect**: Client initiates connection by calling `POST /api/v1/connectors/{id}/connect`. + - For OAuth-enabled servers: The response will contain an authorization URL for the user to complete authentication + - For token-based authentication: Provide an `access_token` in the connect request body (see [Authentication](#authentication)) +3. **Authorize** (OAuth only): User visits the authorization URL, authenticates and grants access. Authorization server redirects the user back to the platform with an authorization code. +4. **Complete**: Platform exchanges the authorization code for access and refresh tokens (OAuth) or stores the provided access token. Once completed, the connector is in `connected` state and ready to be used. + +## Authentication + +Connectors support two authentication methods: + +### OAuth (External MCP Servers) + +For MCP servers that support OAuth, the platform handles the full authorization code flow. No additional configuration is needed in the connect request. + +### Token-based Authentication + +For MCP servers that use simple token-based authentication, provide the token when connecting: + +```json +POST /api/v1/connectors/{id}/connect +{ + "access_token": "YOUR_API_TOKEN" +} +``` + +The authentication token is used differently depending on the connector type: +- **External HTTP/HTTPS MCP servers**: Token is sent as a Bearer token in the `Authorization` header for all requests +- **Managed stdio MCP servers**: Token is injected as an environment variable in the container (requires `accessTokenEnvName` in preset configuration) ## Error handling @@ -73,9 +98,10 @@ Connector can be asynchronously disconnected at any time. This can happen for va Connector presets provide pre-configured settings for common MCP servers, simplifying the connector creation process. Presets can include: -- **URL**: The MCP server endpoint +- **URL**: The MCP server endpoint (supports `http://`, `https://`, and `mcp+stdio://` schemes) - **Client credentials**: Pre-configured OAuth `client_id` and `client_secret` for public clients - **Metadata**: Display information such as name and description +- **Stdio configuration**: For `mcp+stdio://` URLs, container image and runtime settings for managed MCP servers ### Available Presets @@ -111,6 +137,10 @@ To disable preset matching and provide all credentials manually, set `match_pres Connector presets are configurable via Helm values when deploying Agent Stack. The presets are defined in the `connector.presets` section of `values.yaml`: +#### Remote MCP Servers (HTTP/HTTPS) + +For MCP servers accessible over HTTP/HTTPS with Streamable HTTP transport: + ```yaml connector: presets: @@ -126,11 +156,52 @@ connector: description: "Search, access and get insights on your Box content" ``` +#### Managed MCP Servers (stdio) + +For MCP servers that use stdio transport, Agent Stack can manage them as Kubernetes deployments using the `mcp+stdio://` scheme. The platform automatically: + +- Deploys the MCP server as a Kubernetes pod with a sidecar container running [supergateway](https://github.com/supercorp-ai/supergateway) +- Exposes the stdio MCP server over Streamable HTTP transport via supergateway +- Manages the deployment lifecycle (creates on connect, deletes on disconnect) +- Handles authentication tokens via environment variables + +```yaml +connector: + presets: + - url: "mcp+stdio://example-mcp" + metadata: + name: "Example MCP" + description: "Example stdio-based MCP server" + stdio: + image: "registry.example.com/mcp-server:latest" + command: ["node"] # Optional: override container command + args: ["dist/index.js"] # Optional: override container args + env: # Optional: additional environment variables + LOG_LEVEL: "info" + accessTokenEnvName: "API_TOKEN" # Optional: env var name to inject access token +``` + +The managed MCP architecture uses a sidecar pattern: +- **MCP Server Container**: Runs your stdio-based MCP server +- **Supergateway Sidecar**: Wraps the stdio interface and exposes it as Streamable HTTP at port 8080 + +When a connector using a managed preset is connected, the platform: +1. Creates a Kubernetes Deployment with both containers +2. Creates a Kubernetes Service to expose the supergateway +3. Waits for the deployment to become ready (up to 60 seconds) +4. Proxies MCP requests to the managed service +5. If an `accessToken` was provided in the connect request and the preset defines `accessTokenEnvName`, injects the token as an environment variable + +When disconnected, the platform cleans up the Deployment and Service resources. + +#### Configuration Details + The presets are injected into the platform via the `CONNECTOR__PRESETS` environment variable, which is populated from a Kubernetes Secret created by Helm. This allows administrators to: -- Add custom MCP server presets for their organization +- Add custom MCP server presets for their organization (both remote and managed) - Modify or remove default presets -- Configure client credentials for private MCP servers +- Configure client credentials for remote MCP servers with OAuth +- Configure stdio container images and settings for managed MCP servers - Customize metadata (names, descriptions) for better user experience After modifying preset configuration in Helm values, redeploy the platform for changes to take effect. diff --git a/docs/experimental/mcp.mdx b/docs/experimental/mcp.mdx index 7e734add0..69aa9bea5 100644 --- a/docs/experimental/mcp.mdx +++ b/docs/experimental/mcp.mdx @@ -14,10 +14,9 @@ description: "Learn how to register MCP servers into Agent Stack and create cura to complete the tutorial. - - If your server only supports `stdio` transport, use - [supergateway](https://github.com/supercorp-ai/supergateway) to wrap it. - + + If your server uses `stdio` transport, you can register it as a [Managed MCP Connector](/experimental/connectors#managed-mcp-servers-stdio). The platform will automatically deploy and manage the server for you. + ## Register the MCP server diff --git a/helm/templates/managed-mcp/role.yaml b/helm/templates/managed-mcp/role.yaml new file mode 100644 index 000000000..21241d396 --- /dev/null +++ b/helm/templates/managed-mcp/role.yaml @@ -0,0 +1,14 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: managed-mcp-supergateway + labels: + {{- include "agentstack.labels" . | nindent 4 }} + app.kubernetes.io/component: managed-mcp-supergateway +rules: + - apiGroups: [""] + resources: ["pods"] + verbs: ["get", "list"] + - apiGroups: [""] + resources: ["pods/attach"] + verbs: ["create", "get"] diff --git a/helm/templates/managed-mcp/rolebinding.yaml b/helm/templates/managed-mcp/rolebinding.yaml new file mode 100644 index 000000000..a9ccd2091 --- /dev/null +++ b/helm/templates/managed-mcp/rolebinding.yaml @@ -0,0 +1,14 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: managed-mcp-supergateway + labels: + {{- include "agentstack.labels" . | nindent 4 }} + app.kubernetes.io/component: managed-mcp-supergateway +subjects: + - kind: ServiceAccount + name: managed-mcp-supergateway +roleRef: + kind: Role + name: managed-mcp-supergateway + apiGroup: rbac.authorization.k8s.io diff --git a/helm/templates/managed-mcp/serviceaccount.yaml b/helm/templates/managed-mcp/serviceaccount.yaml new file mode 100644 index 000000000..17a44fbb7 --- /dev/null +++ b/helm/templates/managed-mcp/serviceaccount.yaml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: managed-mcp-supergateway + labels: + {{- include "agentstack.labels" . | nindent 4 }} + app.kubernetes.io/component: managed-mcp-supergateway +automountServiceAccountToken: true diff --git a/mise.toml b/mise.toml index d0551df16..21a638f88 100644 --- a/mise.toml +++ b/mise.toml @@ -50,5 +50,6 @@ includes = [ "apps/agentstack-ui/tasks.toml", "apps/beeai-web/tasks.toml", "apps/agentstack-sdk-ts/tasks.toml", + "apps/supergateway/tasks.toml", "docs/tasks.toml", ] diff --git a/tasks.toml b/tasks.toml index 6ec155d3a..500fb76f2 100644 --- a/tasks.toml +++ b/tasks.toml @@ -95,7 +95,7 @@ run = "true" # Empty tests in case there are no tests # Platform tasks ["agentstack:start"] -depends = ["agentstack-server:build"] +depends = ["agentstack-server:build", "supergateway:build"] dir = "{{config_root}}" run = """ #!/bin/bash @@ -112,6 +112,7 @@ fi {{ mise_bin }} run agentstack-cli:run -- platform start \ --import "ghcr.io/i-am-bee/agentstack/agentstack-server:local" \ + --import "ghcr.io/i-am-bee/agentstack/supergateway:latest" \ $UI_IMPORT \ --set image.tag=local \ $UI_TAG "$@"