diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index d65e9807df1..3aa592b11f1 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -96,3 +96,80 @@ jobs: run: make test-python-integration - name: Minimize uv cache run: uv cache prune --ci + + mcp-feature-server-runtime: + if: + ((github.event.action == 'labeled' && (github.event.label.name == 'approved' || github.event.label.name == 'lgtm' || github.event.label.name == 'ok-to-test')) || + (github.event.action != 'labeled' && (contains(github.event.pull_request.labels.*.name, 'ok-to-test') || contains(github.event.pull_request.labels.*.name, 'approved') || contains(github.event.pull_request.labels.*.name, 'lgtm')))) && + github.repository == 'feast-dev/feast' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + ref: refs/pull/${{ github.event.pull_request.number }}/merge + submodules: recursive + persist-credentials: false + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: "3.11" + architecture: x64 + - name: Install the latest version of uv + uses: astral-sh/setup-uv@v5 + with: + enable-cache: true + - name: Install dependencies + run: make install-python-dependencies-ci + - name: Start feature server (MCP HTTP) + run: | + cd examples/mcp_feature_store + uv run python -m feast.cli.cli serve --host 127.0.0.1 --port 6566 --workers 1 --no-access-log & + SERVER_PID=$! + echo $SERVER_PID > /tmp/feast_server_pid + for i in $(seq 1 60); do + kill -0 "$SERVER_PID" || { echo "server died"; exit 1; } + if curl -fsS http://127.0.0.1:6566/health >/dev/null; then + break + fi + sleep 1 + done + curl -fsS http://127.0.0.1:6566/health >/dev/null + - name: Validate MCP endpoint + run: | + rm -f /tmp/mcp_headers /tmp/mcp_headers2 /tmp/mcp_body2 + + curl -sS -D /tmp/mcp_headers -o /dev/null --max-time 10 \ + -X POST \ + -H "Accept: application/json, text/event-stream" \ + -H "Content-Type: application/json" \ + -H "mcp-protocol-version: 2025-03-26" \ + --data '{}' \ + http://127.0.0.1:6566/mcp + + SESSION_ID=$(grep -i "^mcp-session-id:" /tmp/mcp_headers | head -1 | awk '{print $2}' | tr -d '\r') + if [ -z "${SESSION_ID}" ]; then + cat /tmp/mcp_headers || true + exit 1 + fi + + curl -sS -D /tmp/mcp_headers2 -o /tmp/mcp_body2 --max-time 10 \ + -X POST \ + -H "Accept: application/json, text/event-stream" \ + -H "Content-Type: application/json" \ + -H "mcp-protocol-version: 2025-03-26" \ + -H "mcp-session-id: ${SESSION_ID}" \ + --data '{}' \ + http://127.0.0.1:6566/mcp || true + + grep -Eq "^HTTP/.* 400" /tmp/mcp_headers2 + grep -Eiq "^content-type: application/json" /tmp/mcp_headers2 + grep -Eiq "^mcp-session-id: ${SESSION_ID}" /tmp/mcp_headers2 + - name: Stop feature server + if: always() + run: | + if [ -f /tmp/feast_server_pid ]; then + kill "$(cat /tmp/feast_server_pid)" || true + fi + - name: Minimize uv cache + if: always() + run: uv cache prune --ci diff --git a/docs/getting-started/genai.md b/docs/getting-started/genai.md index b4bdf1d1dc8..d8798b481d7 100644 --- a/docs/getting-started/genai.md +++ b/docs/getting-started/genai.md @@ -146,10 +146,13 @@ Feast supports the Model Context Protocol (MCP), which enables AI agents and app type: mcp enabled: true mcp_enabled: true + mcp_transport: http mcp_server_name: "feast-feature-store" mcp_server_version: "1.0.0" ``` +By default, Feast uses the SSE-based MCP transport (`mcp_transport: sse`). Streamable HTTP (`mcp_transport: http`) is recommended for improved compatibility with some MCP clients. + ### How It Works The MCP integration uses the `fastapi_mcp` library to automatically transform your Feast feature server's FastAPI endpoints into MCP-compatible tools. When you enable MCP support: diff --git a/docs/reference/feature-servers/mcp-feature-server.md b/docs/reference/feature-servers/mcp-feature-server.md new file mode 100644 index 00000000000..8bfc96b1891 --- /dev/null +++ b/docs/reference/feature-servers/mcp-feature-server.md @@ -0,0 +1,51 @@ +# MCP Feature Server + +## Overview + +Feast can expose the Python Feature Server as an MCP (Model Context Protocol) server using `fastapi_mcp`. When enabled, MCP clients can discover and call Feast tools such as online feature retrieval. + +## Installation + +```bash +pip install feast[mcp] +``` + +## Configuration + +Add an MCP `feature_server` block to your `feature_store.yaml`: + +```yaml +feature_server: + type: mcp + enabled: true + mcp_enabled: true + mcp_transport: http + mcp_server_name: "feast-feature-store" + mcp_server_version: "1.0.0" +``` + +### mcp_transport + +`mcp_transport` controls how MCP is mounted into the Feature Server: + +- `sse`: SSE-based transport. This is the default for backward compatibility. +- `http`: Streamable HTTP transport. This is recommended for improved compatibility with some MCP clients. + +If `mcp_transport: http` is configured but your installed `fastapi_mcp` version does not support Streamable HTTP mounting, Feast will fail fast with an error asking you to upgrade `fastapi_mcp` (or reinstall `feast[mcp]`). + +## Endpoints + +MCP is mounted at: + +- `/mcp` + +## Connecting an MCP client + +Use your MCP client’s “HTTP” configuration and point it to the Feature Server base URL. For example, if your Feature Server runs at `http://localhost:6566`, use: + +- `http://localhost:6566/mcp` + +## Troubleshooting + +- If you see a deprecation warning about `mount()` at runtime, upgrade `fastapi_mcp` and use `mcp_transport: http` or `mcp_transport: sse`. +- If your MCP client has intermittent connectivity issues with `mcp_transport: sse`, switch to `mcp_transport: http`. diff --git a/examples/mcp_feature_store/feature_store.yaml b/examples/mcp_feature_store/feature_store.yaml index 305be159956..82029eb111f 100644 --- a/examples/mcp_feature_store/feature_store.yaml +++ b/examples/mcp_feature_store/feature_store.yaml @@ -14,9 +14,10 @@ feature_server: type: mcp enabled: true mcp_enabled: true # Enable MCP support - defaults to false + mcp_transport: http mcp_server_name: "feast-feature-store" mcp_server_version: "1.0.0" feature_logging: enabled: false -entity_key_serialization_version: 3 \ No newline at end of file +entity_key_serialization_version: 3 diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index a093795c42a..b205d99f720 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -722,6 +722,7 @@ async def websocket_endpoint(websocket: WebSocket): def _add_mcp_support_if_enabled(app, store: "feast.FeatureStore"): """Add MCP support to the FastAPI app if enabled in configuration.""" + mcp_transport_not_supported_error = None try: # Check if MCP is enabled in feature server config if ( @@ -730,7 +731,16 @@ def _add_mcp_support_if_enabled(app, store: "feast.FeatureStore"): and store.config.feature_server.type == "mcp" and getattr(store.config.feature_server, "mcp_enabled", False) ): - from feast.infra.mcp_servers.mcp_server import add_mcp_support_to_app + try: + from feast.infra.mcp_servers.mcp_server import ( + McpTransportNotSupportedError, + add_mcp_support_to_app, + ) + + mcp_transport_not_supported_error = McpTransportNotSupportedError + except ImportError as e: + logger.error(f"Error checking/adding MCP support: {e}") + return mcp_server = add_mcp_support_to_app(app, store, store.config.feature_server) @@ -741,6 +751,10 @@ def _add_mcp_support_if_enabled(app, store: "feast.FeatureStore"): else: logger.debug("MCP support is not enabled in feature server configuration") except Exception as e: + if mcp_transport_not_supported_error and isinstance( + e, mcp_transport_not_supported_error + ): + raise logger.error(f"Error checking/adding MCP support: {e}") # Don't fail the entire server if MCP fails to initialize diff --git a/sdk/python/feast/infra/mcp_servers/mcp_config.py b/sdk/python/feast/infra/mcp_servers/mcp_config.py index dfb391eb137..8e48da58246 100644 --- a/sdk/python/feast/infra/mcp_servers/mcp_config.py +++ b/sdk/python/feast/infra/mcp_servers/mcp_config.py @@ -1,4 +1,4 @@ -from typing import Literal, Optional +from typing import Literal from pydantic import StrictBool, StrictStr @@ -20,8 +20,7 @@ class McpFeatureServerConfig(BaseFeatureServerConfig): # MCP server version mcp_server_version: StrictStr = "1.0.0" - # Optional MCP transport configuration - mcp_transport: Optional[StrictStr] = None + mcp_transport: Literal["sse", "http"] = "sse" # The endpoint definition for transformation_service (inherited from base) transformation_service_endpoint: StrictStr = "localhost:6566" diff --git a/sdk/python/feast/infra/mcp_servers/mcp_server.py b/sdk/python/feast/infra/mcp_servers/mcp_server.py index 611b4688795..dde4aed0cd3 100644 --- a/sdk/python/feast/infra/mcp_servers/mcp_server.py +++ b/sdk/python/feast/infra/mcp_servers/mcp_server.py @@ -6,7 +6,7 @@ """ import logging -from typing import Optional +from typing import Literal, Optional from feast.feature_store import FeatureStore @@ -26,6 +26,10 @@ FastApiMCP = None +class McpTransportNotSupportedError(RuntimeError): + pass + + def add_mcp_support_to_app(app, store: FeatureStore, config) -> Optional["FastApiMCP"]: """Add MCP support to the FastAPI app if enabled in configuration.""" if not MCP_AVAILABLE: @@ -40,8 +44,29 @@ def add_mcp_support_to_app(app, store: FeatureStore, config) -> Optional["FastAp description="Feast Feature Store MCP Server - Access feature store data and operations through MCP", ) - # Mount the MCP server to the FastAPI app - mcp.mount() + transport: Literal["sse", "http"] = getattr(config, "mcp_transport", "sse") + if transport == "http": + mount_http = getattr(mcp, "mount_http", None) + if mount_http is None: + raise McpTransportNotSupportedError( + "mcp_transport=http requires fastapi_mcp with FastApiMCP.mount_http(). " + "Upgrade fastapi_mcp (or install feast[mcp]) to a newer version." + ) + mount_http() + elif transport == "sse": + mount_sse = getattr(mcp, "mount_sse", None) + if mount_sse is not None: + mount_sse() + else: + logger.warning( + "transport sse not supported, fallback to the deprecated mount()." + ) + mcp.mount() + else: + # Code should not reach here + raise McpTransportNotSupportedError( + f"Unsupported mcp_transport={transport!r}. Expected 'sse' or 'http'." + ) logger.info( "MCP support has been enabled for the Feast feature server at /mcp endpoint" @@ -53,6 +78,8 @@ def add_mcp_support_to_app(app, store: FeatureStore, config) -> Optional["FastAp return mcp + except McpTransportNotSupportedError: + raise except Exception as e: - logger.error(f"Failed to initialize MCP integration: {e}") + logger.error(f"Failed to initialize MCP integration: {e}", exc_info=True) return None diff --git a/sdk/python/tests/integration/test_mcp_feature_server.py b/sdk/python/tests/integration/test_mcp_feature_server.py index 6920c2e3f2a..0e59a71dfae 100644 --- a/sdk/python/tests/integration/test_mcp_feature_server.py +++ b/sdk/python/tests/integration/test_mcp_feature_server.py @@ -4,6 +4,7 @@ import pytest from fastapi import FastAPI from fastapi.testclient import TestClient +from pydantic import ValidationError from feast.feature_store import FeatureStore from feast.infra.mcp_servers.mcp_config import McpFeatureServerConfig @@ -49,7 +50,7 @@ def test_mcp_server_functionality_with_mock_store(self): mcp_server_version="1.0.0", ) - mock_mcp_instance = Mock() + mock_mcp_instance = Mock(spec_set=["mount_sse", "mount_http", "mount"]) mock_fast_api_mcp.return_value = mock_mcp_instance result = add_mcp_support_to_app(mock_app, mock_store, config) @@ -58,7 +59,7 @@ def test_mcp_server_functionality_with_mock_store(self): self.assertIsNotNone(result) self.assertEqual(result, mock_mcp_instance) mock_fast_api_mcp.assert_called_once() - mock_mcp_instance.mount.assert_called_once() + mock_mcp_instance.mount_sse.assert_called_once() @patch("feast.infra.mcp_servers.mcp_server.MCP_AVAILABLE", True) @patch("feast.infra.mcp_servers.mcp_server.FastApiMCP") @@ -77,7 +78,7 @@ def test_complete_mcp_setup_flow(self, mock_fast_api_mcp): transformation_service_endpoint="localhost:6566", ) - mock_mcp_instance = Mock() + mock_mcp_instance = Mock(spec_set=["mount_sse", "mount_http", "mount"]) mock_fast_api_mcp.return_value = mock_mcp_instance # Execute the flow @@ -90,7 +91,7 @@ def test_complete_mcp_setup_flow(self, mock_fast_api_mcp): name="e2e-test-server", description="Feast Feature Store MCP Server - Access feature store data and operations through MCP", ) - mock_mcp_instance.mount.assert_called_once() + mock_mcp_instance.mount_sse.assert_called_once() self.assertEqual(result, mock_mcp_instance) @pytest.mark.skipif( @@ -160,36 +161,29 @@ def test_feature_server_with_mcp_config(self): def test_mcp_server_configuration_validation(self): """Test comprehensive MCP server configuration validation.""" # Test various configuration combinations - test_configs = [ - { - "enabled": True, - "mcp_enabled": True, - "mcp_server_name": "test-server-1", - "mcp_server_version": "1.0.0", - "mcp_transport": "sse", - }, - { - "enabled": True, - "mcp_enabled": True, - "mcp_server_name": "test-server-2", - "mcp_server_version": "2.0.0", - "mcp_transport": "websocket", - }, - { - "enabled": False, - "mcp_enabled": False, - "mcp_server_name": "disabled-server", - "mcp_server_version": "1.0.0", - "mcp_transport": None, - }, - ] - - for config_dict in test_configs: - config = McpFeatureServerConfig(**config_dict) - self.assertEqual(config.enabled, config_dict["enabled"]) - self.assertEqual(config.mcp_enabled, config_dict["mcp_enabled"]) - self.assertEqual(config.mcp_server_name, config_dict["mcp_server_name"]) - self.assertEqual( - config.mcp_server_version, config_dict["mcp_server_version"] + for transport in ["sse", "http"]: + config = McpFeatureServerConfig( + enabled=True, + mcp_enabled=True, + mcp_server_name="test-server", + mcp_server_version="1.0.0", + mcp_transport=transport, + ) + self.assertEqual(config.mcp_transport, transport) + + config_default = McpFeatureServerConfig( + enabled=True, + mcp_enabled=True, + mcp_server_name="test-server-default", + mcp_server_version="1.0.0", + ) + self.assertEqual(config_default.mcp_transport, "sse") + + with self.assertRaises(ValidationError): + McpFeatureServerConfig( + enabled=True, + mcp_enabled=True, + mcp_server_name="bad-transport", + mcp_server_version="1.0.0", + mcp_transport="websocket", ) - self.assertEqual(config.mcp_transport, config_dict["mcp_transport"]) diff --git a/sdk/python/tests/unit/infra/feature_servers/test_mcp_server.py b/sdk/python/tests/unit/infra/feature_servers/test_mcp_server.py index f5748be1acc..b23372d9eab 100644 --- a/sdk/python/tests/unit/infra/feature_servers/test_mcp_server.py +++ b/sdk/python/tests/unit/infra/feature_servers/test_mcp_server.py @@ -1,6 +1,9 @@ import unittest +from types import SimpleNamespace from unittest.mock import Mock, patch +from pydantic import ValidationError + from feast.feature_store import FeatureStore from feast.infra.mcp_servers.mcp_config import McpFeatureServerConfig @@ -17,7 +20,7 @@ def test_default_config(self): self.assertFalse(config.mcp_enabled) self.assertEqual(config.mcp_server_name, "feast-mcp-server") self.assertEqual(config.mcp_server_version, "1.0.0") - self.assertIsNone(config.mcp_transport) + self.assertEqual(config.mcp_transport, "sse") self.assertEqual(config.transformation_service_endpoint, "localhost:6566") def test_custom_config(self): @@ -41,11 +44,11 @@ def test_custom_config(self): def test_config_validation(self): """Test configuration validation.""" - # Test valid transport options - valid_transports = ["sse", "websocket", None] - for transport in valid_transports: + for transport in ["sse", "http"]: config = McpFeatureServerConfig(mcp_transport=transport) self.assertEqual(config.mcp_transport, transport) + with self.assertRaises(ValidationError): + McpFeatureServerConfig(mcp_transport="websocket") def test_config_inheritance(self): """Test that McpFeatureServerConfig properly inherits from BaseFeatureServerConfig.""" @@ -66,12 +69,13 @@ def test_add_mcp_support_success(self, mock_fast_api_mcp): mock_app = Mock() mock_store = Mock(spec=FeatureStore) - mock_config = Mock() - mock_config.mcp_server_name = "test-server" - mock_config.mcp_server_version = "1.0.0" + mock_config = SimpleNamespace( + mcp_server_name="test-server", + mcp_server_version="1.0.0", + mcp_transport="sse", + ) - # Mock the FastApiMCP instance - mock_mcp_instance = Mock() + mock_mcp_instance = Mock(spec_set=["mount_sse", "mount", "mount_http"]) mock_fast_api_mcp.return_value = mock_mcp_instance result = add_mcp_support_to_app(mock_app, mock_store, mock_config) @@ -83,8 +87,7 @@ def test_add_mcp_support_success(self, mock_fast_api_mcp): description="Feast Feature Store MCP Server - Access feature store data and operations through MCP", ) - # Verify mount was called - mock_mcp_instance.mount.assert_called_once() + mock_mcp_instance.mount_sse.assert_called_once() # Verify the result self.assertEqual(result, mock_mcp_instance) @@ -96,11 +99,9 @@ def test_add_mcp_support_with_defaults(self, mock_fast_api_mcp): mock_app = Mock() mock_store = Mock(spec=FeatureStore) - mock_config = Mock() - # Don't set mcp_server_name to test default - del mock_config.mcp_server_name + mock_config = SimpleNamespace(mcp_transport="sse") - mock_mcp_instance = Mock() + mock_mcp_instance = Mock(spec_set=["mount_sse", "mount", "mount_http"]) mock_fast_api_mcp.return_value = mock_mcp_instance result = add_mcp_support_to_app(mock_app, mock_store, mock_config) @@ -114,6 +115,40 @@ def test_add_mcp_support_with_defaults(self, mock_fast_api_mcp): self.assertEqual(result, mock_mcp_instance) + @patch("feast.infra.mcp_servers.mcp_server.FastApiMCP") + def test_add_mcp_support_http_transport(self, mock_fast_api_mcp): + from feast.infra.mcp_servers.mcp_server import add_mcp_support_to_app + + mock_app = Mock() + mock_store = Mock(spec=FeatureStore) + mock_config = SimpleNamespace( + mcp_server_name="test-server", mcp_transport="http" + ) + + mock_mcp_instance = Mock(spec_set=["mount_http"]) + mock_fast_api_mcp.return_value = mock_mcp_instance + + result = add_mcp_support_to_app(mock_app, mock_store, mock_config) + mock_mcp_instance.mount_http.assert_called_once() + self.assertEqual(result, mock_mcp_instance) + + @patch("feast.infra.mcp_servers.mcp_server.FastApiMCP") + def test_add_mcp_support_http_missing_mount_http_fails(self, mock_fast_api_mcp): + from feast.infra.mcp_servers.mcp_server import ( + McpTransportNotSupportedError, + add_mcp_support_to_app, + ) + + mock_app = Mock() + mock_store = Mock(spec=FeatureStore) + mock_config = SimpleNamespace(mcp_transport="http") + + mock_mcp_instance = Mock(spec_set=["mount"]) + mock_fast_api_mcp.return_value = mock_mcp_instance + + with self.assertRaises(McpTransportNotSupportedError): + add_mcp_support_to_app(mock_app, mock_store, mock_config) + @patch("feast.infra.mcp_servers.mcp_server.FastApiMCP") @patch("feast.infra.mcp_servers.mcp_server.logger") def test_add_mcp_support_with_exception(self, mock_logger, mock_fast_api_mcp): @@ -122,8 +157,9 @@ def test_add_mcp_support_with_exception(self, mock_logger, mock_fast_api_mcp): mock_app = Mock() mock_store = Mock(spec=FeatureStore) - mock_config = Mock() - mock_config.mcp_server_name = "test-server" + mock_config = SimpleNamespace( + mcp_server_name="test-server", mcp_transport="sse" + ) # Mock FastApiMCP to raise an exception mock_fast_api_mcp.side_effect = Exception("MCP initialization failed") @@ -135,7 +171,8 @@ def test_add_mcp_support_with_exception(self, mock_logger, mock_fast_api_mcp): # Verify error was logged mock_logger.error.assert_called_once_with( - "Failed to initialize MCP integration: MCP initialization failed" + "Failed to initialize MCP integration: MCP initialization failed", + exc_info=True, ) @patch("feast.infra.mcp_servers.mcp_server.FastApiMCP") @@ -145,10 +182,11 @@ def test_add_mcp_support_mount_exception(self, mock_fast_api_mcp): mock_app = Mock() mock_store = Mock(spec=FeatureStore) - mock_config = Mock() - mock_config.mcp_server_name = "test-server" + mock_config = SimpleNamespace( + mcp_server_name="test-server", mcp_transport="sse" + ) - mock_mcp_instance = Mock() + mock_mcp_instance = Mock(spec_set=["mount"]) mock_mcp_instance.mount.side_effect = Exception("Mount failed") mock_fast_api_mcp.return_value = mock_mcp_instance @@ -203,3 +241,20 @@ def test_add_mcp_support_if_enabled_exception(self, mock_logger): mock_logger.error.assert_called_with( "Error checking/adding MCP support: Test error" ) + + @patch("feast.infra.mcp_servers.mcp_server.add_mcp_support_to_app") + def test_add_mcp_support_if_enabled_transport_not_supported_fails(self, mock_add): + from feast.feature_server import _add_mcp_support_if_enabled + from feast.infra.mcp_servers.mcp_server import McpTransportNotSupportedError + + mock_app = Mock() + mock_store = Mock() + mock_store.config.feature_server = Mock() + mock_store.config.feature_server.type = "mcp" + mock_store.config.feature_server.mcp_enabled = True + mock_store.config.feature_server.mcp_transport = "http" + + mock_add.side_effect = McpTransportNotSupportedError("bad") + + with self.assertRaises(McpTransportNotSupportedError): + _add_mcp_support_if_enabled(mock_app, mock_store)