-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathtest_mcp_feature_server.py
More file actions
189 lines (159 loc) · 7.4 KB
/
test_mcp_feature_server.py
File metadata and controls
189 lines (159 loc) · 7.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import unittest
from unittest.mock import MagicMock, Mock, patch
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from pydantic import ValidationError
from feast.feature_store import FeatureStore
from feast.infra.mcp_servers.mcp_config import McpFeatureServerConfig
class TestMCPFeatureServerIntegration(unittest.TestCase):
"""Integration tests for MCP feature server functionality."""
def test_mcp_config_integration(self):
"""Test that MCP configuration integrates properly with the server."""
config = McpFeatureServerConfig(
enabled=True,
mcp_enabled=True,
mcp_server_name="integration-test-server",
mcp_server_version="2.1.0",
mcp_transport="sse",
)
# Verify configuration is properly structured for MCP integration
self.assertTrue(config.enabled)
self.assertTrue(config.mcp_enabled)
self.assertEqual(config.mcp_server_name, "integration-test-server")
self.assertEqual(config.mcp_server_version, "2.1.0")
self.assertEqual(config.mcp_transport, "sse")
def test_mcp_server_functionality_with_mock_store(self):
"""Test MCP server functionality with a mock feature store."""
with patch("feast.infra.mcp_servers.mcp_server.MCP_AVAILABLE", True):
with patch(
"feast.infra.mcp_servers.mcp_server.FastApiMCP"
) as mock_fast_api_mcp:
from feast.infra.mcp_servers.mcp_server import add_mcp_support_to_app
# Create a more realistic mock feature store
mock_store = MagicMock(spec=FeatureStore)
mock_store.list_feature_views.return_value = []
mock_store.list_data_sources.return_value = []
mock_app = FastAPI()
config = McpFeatureServerConfig(
mcp_enabled=True,
mcp_server_name="test-feast-server",
mcp_server_version="1.0.0",
)
mock_mcp_instance = Mock(spec_set=["mount_sse", "mount_http", "mount"])
mock_fast_api_mcp.return_value = mock_mcp_instance
result = add_mcp_support_to_app(mock_app, mock_store, config)
# Verify successful integration
self.assertIsNotNone(result)
self.assertEqual(result, mock_mcp_instance)
mock_fast_api_mcp.assert_called_once()
mock_mcp_instance.mount_sse.assert_called_once()
@patch("feast.infra.mcp_servers.mcp_server.MCP_AVAILABLE", True)
@patch("feast.infra.mcp_servers.mcp_server.FastApiMCP")
def test_complete_mcp_setup_flow(self, mock_fast_api_mcp):
"""Test the complete MCP setup flow from configuration to mounting."""
from feast.infra.mcp_servers.mcp_server import add_mcp_support_to_app
# Setup test data
app = FastAPI()
mock_store = Mock(spec=FeatureStore)
config = McpFeatureServerConfig(
enabled=True,
mcp_enabled=True,
mcp_server_name="e2e-test-server",
mcp_server_version="1.0.0",
transformation_service_endpoint="localhost:6566",
)
mock_mcp_instance = Mock(spec_set=["mount_sse", "mount_http", "mount"])
mock_fast_api_mcp.return_value = mock_mcp_instance
# Execute the flow
result = add_mcp_support_to_app(app, mock_store, config)
# Verify all steps completed successfully
self.assertIsNotNone(result)
mock_fast_api_mcp.assert_called_once_with(
app,
name="e2e-test-server",
description="Feast Feature Store MCP Server - Access feature store data and operations through MCP",
)
mock_mcp_instance.mount_sse.assert_called_once()
self.assertEqual(result, mock_mcp_instance)
@pytest.mark.skipif(
condition=True, # Skip until fastapi_mcp is available
reason="Requires fastapi_mcp package to be installed",
)
def test_real_mcp_integration(self):
"""Test real MCP integration with actual FastAPI app."""
try:
from feast.infra.mcp_servers.mcp_server import add_mcp_support_to_app
# Create a real FastAPI app
app = FastAPI()
# Mock feature store for this test
mock_store = MagicMock(spec=FeatureStore)
mock_store.list_feature_views.return_value = []
mock_store.list_data_sources.return_value = []
config = McpFeatureServerConfig(
enabled=True,
mcp_enabled=True,
mcp_server_name="real-integration-test",
mcp_server_version="1.0.0",
)
# This would require fastapi_mcp to be installed
result = add_mcp_support_to_app(app, mock_store, config)
if result is not None:
# Test that the app has MCP endpoints
client = TestClient(app)
# The exact endpoints would depend on fastapi_mcp implementation
# Verify the client can be created and make a basic request
response = client.get("/health", follow_redirects=False)
# We expect this to either work or return a 404, but not crash
self.assertIn(response.status_code, [200, 404])
self.assertIsNotNone(result)
else:
# If fastapi_mcp is not available, result should be None
self.assertIsNone(result)
except ImportError:
# Expected if fastapi_mcp is not installed
self.skipTest("fastapi_mcp not available")
def test_feature_server_with_mcp_config(self):
"""Test feature server startup with MCP configuration."""
from feast.feature_server import _add_mcp_support_if_enabled
app = FastAPI()
mock_store = Mock()
mock_store.config.feature_server = McpFeatureServerConfig(
enabled=True,
mcp_enabled=True,
mcp_server_name="feature-server-test",
mcp_server_version="1.0.0",
)
# This should not raise an exception even if MCP is not available
try:
_add_mcp_support_if_enabled(app, mock_store)
except Exception as e:
# Should handle gracefully
self.assertIn("MCP", str(e).upper())
def test_mcp_server_configuration_validation(self):
"""Test comprehensive MCP server configuration validation."""
# Test various configuration combinations
for transport in ["sse", "http"]:
config = McpFeatureServerConfig(
enabled=True,
mcp_enabled=True,
mcp_server_name="test-server",
mcp_server_version="1.0.0",
mcp_transport=transport,
)
self.assertEqual(config.mcp_transport, transport)
config_default = McpFeatureServerConfig(
enabled=True,
mcp_enabled=True,
mcp_server_name="test-server-default",
mcp_server_version="1.0.0",
)
self.assertEqual(config_default.mcp_transport, "sse")
with self.assertRaises(ValidationError):
McpFeatureServerConfig(
enabled=True,
mcp_enabled=True,
mcp_server_name="bad-transport",
mcp_server_version="1.0.0",
mcp_transport="websocket",
)