Skip to content

Commit d4122a0

Browse files
committed
test real http transport - baseline
1 parent fb0da5a commit d4122a0

File tree

1 file changed

+169
-0
lines changed

1 file changed

+169
-0
lines changed

tests/test_http_real_transport.py

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
import multiprocessing
2+
import socket
3+
import time
4+
import os
5+
import signal
6+
import atexit
7+
import sys
8+
import threading
9+
import coverage
10+
from typing import AsyncGenerator, Generator
11+
from fastapi import FastAPI
12+
import pytest
13+
import httpx
14+
import uvicorn
15+
from fastapi_mcp import FastApiMCP
16+
import mcp.types as types
17+
18+
19+
HOST = "127.0.0.1"
20+
SERVER_NAME = "Test MCP Server"
21+
22+
23+
def run_server(server_port: int, fastapi_app: FastAPI) -> None:
24+
# Initialize coverage for subprocesses
25+
cov = None
26+
if "COVERAGE_PROCESS_START" in os.environ:
27+
cov = coverage.Coverage(source=["fastapi_mcp"])
28+
cov.start()
29+
30+
# Create a function to save coverage data at exit
31+
def cleanup():
32+
if cov:
33+
cov.stop()
34+
cov.save()
35+
36+
# Register multiple cleanup mechanisms to ensure coverage data is saved
37+
atexit.register(cleanup)
38+
39+
# Setup signal handler for clean termination
40+
def handle_signal(signum, frame):
41+
cleanup()
42+
sys.exit(0)
43+
44+
signal.signal(signal.SIGTERM, handle_signal)
45+
46+
# Backup thread to ensure coverage is written if process is terminated abruptly
47+
def periodic_save():
48+
while True:
49+
time.sleep(1.0)
50+
if cov:
51+
cov.save()
52+
53+
save_thread = threading.Thread(target=periodic_save)
54+
save_thread.daemon = True
55+
save_thread.start()
56+
57+
# Configure the server
58+
mcp = FastApiMCP(
59+
fastapi_app,
60+
name=SERVER_NAME,
61+
description="Test description",
62+
)
63+
mcp.mount_http()
64+
65+
# Start the server
66+
server = uvicorn.Server(config=uvicorn.Config(app=fastapi_app, host=HOST, port=server_port, log_level="error"))
67+
server.run()
68+
69+
# Give server time to start
70+
while not server.started:
71+
time.sleep(0.5)
72+
73+
# Ensure coverage is saved if exiting the normal way
74+
if cov:
75+
cov.stop()
76+
cov.save()
77+
78+
79+
@pytest.fixture(params=["simple_fastapi_app", "simple_fastapi_app_with_root_path"])
80+
def server(request: pytest.FixtureRequest) -> Generator[str, None, None]:
81+
# Ensure COVERAGE_PROCESS_START is set in the environment for subprocesses
82+
coverage_rc = os.path.abspath(".coveragerc")
83+
os.environ["COVERAGE_PROCESS_START"] = coverage_rc
84+
85+
# Get a free port
86+
with socket.socket() as s:
87+
s.bind((HOST, 0))
88+
server_port = s.getsockname()[1]
89+
90+
# Use fork method to avoid pickling issues
91+
ctx = multiprocessing.get_context("fork")
92+
93+
# Run the server in a subprocess
94+
fastapi_app = request.getfixturevalue(request.param)
95+
proc = ctx.Process(
96+
target=run_server,
97+
kwargs={"server_port": server_port, "fastapi_app": fastapi_app},
98+
daemon=True,
99+
)
100+
proc.start()
101+
102+
# Wait for server to be running
103+
max_attempts = 20
104+
attempt = 0
105+
while attempt < max_attempts:
106+
try:
107+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
108+
s.connect((HOST, server_port))
109+
break
110+
except ConnectionRefusedError:
111+
time.sleep(0.1)
112+
attempt += 1
113+
else:
114+
raise RuntimeError(f"Server failed to start after {max_attempts} attempts")
115+
116+
# Return the server URL
117+
yield f"http://{HOST}:{server_port}{fastapi_app.root_path}"
118+
119+
# Signal the server to stop - added graceful shutdown before kill
120+
try:
121+
proc.terminate()
122+
proc.join(timeout=2)
123+
except (OSError, AttributeError):
124+
pass
125+
126+
if proc.is_alive():
127+
proc.kill()
128+
proc.join(timeout=2)
129+
if proc.is_alive():
130+
raise RuntimeError("server process failed to terminate")
131+
132+
133+
@pytest.fixture()
134+
async def http_client(server: str) -> AsyncGenerator[httpx.AsyncClient, None]:
135+
async with httpx.AsyncClient(base_url=server) as client:
136+
yield client
137+
138+
139+
@pytest.mark.anyio
140+
async def test_http_initialize_request(http_client: httpx.AsyncClient, server: str) -> None:
141+
mcp_path = "/mcp" # Always use absolute path since server already includes root_path
142+
143+
response = await http_client.post(
144+
mcp_path,
145+
json={
146+
"jsonrpc": "2.0",
147+
"method": "initialize",
148+
"id": 1,
149+
"params": {
150+
"protocolVersion": types.LATEST_PROTOCOL_VERSION,
151+
"capabilities": {
152+
"sampling": None,
153+
"elicitation": None,
154+
"experimental": None,
155+
"roots": None,
156+
},
157+
"clientInfo": {"name": "test-client", "version": "1.0.0"},
158+
},
159+
},
160+
headers={"Accept": "application/json, text/event-stream", "Content-Type": "application/json"},
161+
)
162+
163+
assert response.status_code == 200
164+
165+
result = response.json()
166+
assert result["jsonrpc"] == "2.0"
167+
assert result["id"] == 1
168+
assert "result" in result
169+
assert result["result"]["serverInfo"]["name"] == SERVER_NAME

0 commit comments

Comments
 (0)