Skip to content

Commit f1a298e

Browse files
authored
Merge pull request #126 from IBM/streamablehttp_servers_support
Streamablehttp servers support
2 parents bd425df + 671a62b commit f1a298e

File tree

8 files changed

+128
-20
lines changed

8 files changed

+128
-20
lines changed

mcpgateway/admin.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,7 @@ async def admin_add_gateway(request: Request, db: Session = Depends(get_db), use
755755
name=form["name"],
756756
url=form["url"],
757757
description=form.get("description"),
758+
transport=form.get("transport", "SSE"),
758759
auth_type=form.get("auth_type", ""),
759760
auth_username=form.get("auth_username", ""),
760761
auth_password=form.get("auth_password", ""),
@@ -769,12 +770,12 @@ async def admin_add_gateway(request: Request, db: Session = Depends(get_db), use
769770
except Exception as ex:
770771
if isinstance(ex, GatewayConnectionError):
771772
return RedirectResponse(f"{root_path}/admin#gateways", status_code=502)
772-
elif isinstance(ex, ValueError):
773+
if isinstance(ex, ValueError):
773774
return RedirectResponse(f"{root_path}/admin#gateways", status_code=400)
774-
elif isinstance(ex, RuntimeError):
775-
return RedirectResponse(f"{root_path}/admin#gateways", status_code=500)
776-
else:
775+
if isinstance(ex, RuntimeError):
777776
return RedirectResponse(f"{root_path}/admin#gateways", status_code=500)
777+
778+
return RedirectResponse(f"{root_path}/admin#gateways", status_code=500)
778779

779780

780781
@admin_router.post("/gateways/{gateway_id}/edit")
@@ -806,6 +807,7 @@ async def admin_edit_gateway(
806807
name=form["name"],
807808
url=form["url"],
808809
description=form.get("description"),
810+
transport=form.get("transport", "SSE"),
809811
auth_type=form.get("auth_type", None),
810812
auth_username=form.get("auth_username", None),
811813
auth_password=form.get("auth_password", None),

mcpgateway/db.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -933,6 +933,7 @@ class Gateway(Base):
933933
name: Mapped[str] = mapped_column(unique=True)
934934
url: Mapped[str]
935935
description: Mapped[Optional[str]]
936+
transport: Mapped[str] = mapped_column(default="SSE")
936937
capabilities: Mapped[Dict[str, Any]] = mapped_column(JSON)
937938
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
938939
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))

mcpgateway/schemas.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -271,15 +271,15 @@ class ToolCreate(BaseModelWithConfig):
271271
- Unique tool name
272272
- Valid endpoint URL
273273
- Valid JSON Schema for input validation
274-
- Request type (HTTP method)
275274
- Integration type: 'MCP' for MCP-compliant tools or 'REST' for REST integrations
275+
- Request type (For REST-GET,POST,PUT,DELETE and for MCP-SSE,STDIO,STREAMABLEHTTP)
276276
- Optional authentication credentials: BasicAuth or BearerTokenAuth or HeadersAuth.
277277
"""
278278

279279
name: str = Field(..., description="Unique name for the tool")
280280
url: Union[str, AnyHttpUrl] = Field(None, description="Tool endpoint URL")
281281
description: Optional[str] = Field(None, description="Tool description")
282-
request_type: Literal["GET", "POST", "PUT", "DELETE", "SSE", "STDIO"] = Field("SSE", description="HTTP method to be used for invoking the tool")
282+
request_type: Literal["GET", "POST", "PUT", "DELETE", "SSE", "STDIO", "STREAMABLEHTTP"] = Field("SSE", description="HTTP method to be used for invoking the tool")
283283
integration_type: Literal["MCP", "REST"] = Field("MCP", description="Tool integration type: 'MCP' for MCP-compliant tools, 'REST' for REST integrations")
284284
headers: Optional[Dict[str, str]] = Field(None, description="Additional headers to send when invoking the tool")
285285
input_schema: Optional[Dict[str, Any]] = Field(
@@ -340,7 +340,7 @@ class ToolUpdate(BaseModelWithConfig):
340340
name: Optional[str] = Field(None, description="Unique name for the tool")
341341
url: Optional[Union[str, AnyHttpUrl]] = Field(None, description="Tool endpoint URL")
342342
description: Optional[str] = Field(None, description="Tool description")
343-
request_type: Optional[Literal["GET", "POST", "PUT", "DELETE", "SSE", "STDIO"]] = Field(None, description="HTTP method to be used for invoking the tool")
343+
request_type: Optional[Literal["GET", "POST", "PUT", "DELETE", "SSE", "STDIO", "STREAMABLEHTTP"]] = Field(None, description="HTTP method to be used for invoking the tool")
344344
integration_type: Optional[Literal["MCP", "REST"]] = Field(None, description="Tool integration type")
345345
headers: Optional[Dict[str, str]] = Field(None, description="Additional headers to send when invoking the tool")
346346
input_schema: Optional[Dict[str, Any]] = Field(None, description="JSON Schema for validating tool parameters")
@@ -641,6 +641,7 @@ class GatewayCreate(BaseModelWithConfig):
641641
name: str = Field(..., description="Unique name for the gateway")
642642
url: Union[str, AnyHttpUrl] = Field(..., description="Gateway endpoint URL")
643643
description: Optional[str] = Field(None, description="Gateway description")
644+
transport: str = Field(default="SSE", description="Transport used by MCP server: SSE or STREAMABLEHTTP")
644645

645646
# Authorizations
646647
auth_type: Optional[str] = Field(None, description="Type of authentication: basic, bearer, headers, or none")
@@ -752,6 +753,7 @@ class GatewayUpdate(BaseModelWithConfig):
752753
name: Optional[str] = Field(None, description="Unique name for the gateway")
753754
url: Optional[Union[str, AnyHttpUrl]] = Field(None, description="Gateway endpoint URL")
754755
description: Optional[str] = Field(None, description="Gateway description")
756+
transport: str = Field(default="SSE", description="Transport used by MCP server: SSE or STREAMABLEHTTP")
755757

756758
name: Optional[str] = Field(None, description="Unique name for the prompt")
757759
# Authorizations
@@ -877,6 +879,7 @@ class GatewayRead(BaseModelWithConfig):
877879
name: str = Field(..., description="Unique name for the gateway")
878880
url: str = Field(..., description="Gateway endpoint URL")
879881
description: Optional[str] = Field(None, description="Gateway description")
882+
transport: str = Field(default="SSE", description="Transport used by MCP server: SSE or STREAMABLEHTTP")
880883
capabilities: Dict[str, Any] = Field(default_factory=dict, description="Gateway capabilities")
881884
created_at: datetime = Field(default_factory=datetime.utcnow, description="Creation timestamp")
882885
updated_at: datetime = Field(default_factory=datetime.utcnow, description="Last update timestamp")

mcpgateway/services/gateway_service.py

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from filelock import FileLock, Timeout
2525
from mcp import ClientSession
2626
from mcp.client.sse import sse_client
27+
from mcp.client.streamable_http import streamablehttp_client
2728
from sqlalchemy import select
2829
from sqlalchemy.orm import Session
2930

@@ -186,7 +187,7 @@ async def register_gateway(self, db: Session, gateway: GatewayCreate) -> Gateway
186187
auth_type = getattr(gateway, "auth_type", None)
187188
auth_value = getattr(gateway, "auth_value", {})
188189

189-
capabilities, tools = await self._initialize_gateway(str(gateway.url), auth_value)
190+
capabilities, tools = await self._initialize_gateway(str(gateway.url), auth_value, gateway.transport)
190191

191192
all_names = [td.name for td in tools]
192193

@@ -217,6 +218,7 @@ async def register_gateway(self, db: Session, gateway: GatewayCreate) -> Gateway
217218
name=gateway.name,
218219
url=str(gateway.url),
219220
description=gateway.description,
221+
transport=gateway.transport,
220222
capabilities=capabilities,
221223
last_seen=datetime.now(timezone.utc),
222224
auth_type=auth_type,
@@ -311,6 +313,8 @@ async def update_gateway(self, db: Session, gateway_id: int, gateway_update: Gat
311313
gateway.url = str(gateway_update.url)
312314
if gateway_update.description is not None:
313315
gateway.description = gateway_update.description
316+
if gateway_update.transport is not None:
317+
gateway.transport = gateway_update.transport
314318

315319
if getattr(gateway, "auth_type", None) is not None:
316320
gateway.auth_type = gateway_update.auth_type
@@ -322,7 +326,7 @@ async def update_gateway(self, db: Session, gateway_id: int, gateway_update: Gat
322326
# Try to reinitialize connection if URL changed
323327
if gateway_update.url is not None:
324328
try:
325-
capabilities, _ = await self._initialize_gateway(gateway.url, gateway.auth_value)
329+
capabilities, _ = await self._initialize_gateway(gateway.url, gateway.auth_value, gateway.transport)
326330
gateway.capabilities = capabilities
327331
gateway.last_seen = datetime.utcnow()
328332

@@ -399,7 +403,7 @@ async def toggle_gateway_status(self, db: Session, gateway_id: int, activate: bo
399403
self._active_gateways.add(gateway.url)
400404
# Try to initialize if activating
401405
try:
402-
capabilities, tools = await self._initialize_gateway(gateway.url, gateway.auth_value)
406+
capabilities, tools = await self._initialize_gateway(gateway.url, gateway.auth_value, gateway.transport)
403407
gateway.capabilities = capabilities.dict()
404408
gateway.last_seen = datetime.utcnow()
405409
except Exception as e:
@@ -571,14 +575,21 @@ async def check_health_of_gateways(self, gateways: List[DbGateway]) -> bool:
571575
headers = decode_auth(auth_data)
572576

573577
# Perform the GET and raise on 4xx/5xx
574-
async with client.stream("GET", gateway.url, headers=headers) as response:
575-
# This will raise immediately if status is 4xx/5xx
576-
response.raise_for_status()
578+
if (gateway.transport).lower() == "sse":
579+
timeout = httpx.Timeout(settings.health_check_timeout)
580+
async with client.stream("GET", gateway.url, headers=headers, timeout=timeout) as response:
581+
# This will raise immediately if status is 4xx/5xx
582+
response.raise_for_status()
583+
elif (gateway.transport).lower() == "streamablehttp":
584+
async with streamablehttp_client(url=gateway.url, headers=headers, timeout=settings.health_check_timeout) as (read_stream, write_stream, get_session_id):
585+
async with ClientSession(read_stream, write_stream) as session:
586+
# Initialize the session
587+
response = await session.initialize()
577588

578589
# Mark successful check
579590
gateway.last_seen = datetime.utcnow()
580591

581-
except Exception:
592+
except Exception as e:
582593
await self._handle_gateway_failure(gateway)
583594

584595
# All gateways passed
@@ -629,7 +640,7 @@ async def subscribe_events(self) -> AsyncGenerator[Dict[str, Any], None]:
629640
finally:
630641
self._event_subscribers.remove(queue)
631642

632-
async def _initialize_gateway(self, url: str, authentication: Optional[Dict[str, str]] = None) -> Any:
643+
async def _initialize_gateway(self, url: str, authentication: Optional[Dict[str, str]] = None, transport: str = "SSE") -> Any:
633644
"""Initialize connection to a gateway and retrieve its capabilities.
634645
635646
Args:
@@ -676,7 +687,45 @@ async def connect_to_sse_server(server_url: str, authentication: Optional[Dict[s
676687

677688
return capabilities, tools
678689

679-
capabilities, tools = await connect_to_sse_server(url, authentication)
690+
async def connect_to_streamablehttp_server(server_url: str, authentication: Optional[Dict[str, str]] = None):
691+
"""
692+
Connect to an MCP server running with Streamable HTTP transport
693+
694+
Args:
695+
server_url: URL to connect to the server
696+
authentication: Authentication headers for connection to URL
697+
698+
Returns:
699+
list, list: List of capabilities and tools
700+
"""
701+
if authentication is None:
702+
authentication = {}
703+
# Store the context managers so they stay alive
704+
decoded_auth = decode_auth(authentication)
705+
706+
# Use async with for both streamablehttp_client and ClientSession
707+
async with streamablehttp_client(url=server_url, headers=decoded_auth) as (read_stream, write_stream, get_session_id):
708+
async with ClientSession(read_stream, write_stream) as session:
709+
# Initialize the session
710+
response = await session.initialize()
711+
# if get_session_id:
712+
# session_id = get_session_id()
713+
# if session_id:
714+
# print(f"Session ID: {session_id}")
715+
capabilities = response.capabilities.model_dump(by_alias=True, exclude_none=True)
716+
response = await session.list_tools()
717+
tools = response.tools
718+
tools = [tool.model_dump(by_alias=True, exclude_none=True) for tool in tools]
719+
tools = [ToolCreate.model_validate(tool) for tool in tools]
720+
for tool in tools:
721+
tool.request_type = "STREAMABLEHTTP"
722+
723+
return capabilities, tools
724+
725+
if transport.lower() == "sse":
726+
capabilities, tools = await connect_to_sse_server(url, authentication)
727+
elif transport.lower() == "streamablehttp":
728+
capabilities, tools = await connect_to_streamablehttp_server(url, authentication)
680729

681730
return capabilities, tools
682731
except Exception as e:

mcpgateway/services/tool_service.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import httpx
2626
from mcp import ClientSession
2727
from mcp.client.sse import sse_client
28+
from mcp.client.streamable_http import streamablehttp_client
2829
from sqlalchemy import delete, func, not_, select
2930
from sqlalchemy.exc import IntegrityError
3031
from sqlalchemy.orm import Session
@@ -534,6 +535,7 @@ async def invoke_tool(self, db: Session, name: str, arguments: Dict[str, Any]) -
534535

535536
success = True
536537
elif tool.integration_type == "MCP":
538+
transport = tool.request_type.lower()
537539
gateway = db.execute(select(DbGateway).where(DbGateway.id == tool.gateway_id).where(DbGateway.is_active)).scalar_one_or_none()
538540
if gateway.auth_type == "bearer":
539541
headers = decode_auth(gateway.auth_value)
@@ -558,10 +560,31 @@ async def connect_to_sse_server(server_url: str) -> str:
558560
tool_call_result = await session.call_tool(name, arguments)
559561
return tool_call_result
560562

563+
async def connect_to_streamablehttp_server(server_url: str) -> str:
564+
"""
565+
Connect to an MCP server running with Streamable HTTP transport
566+
567+
Args:
568+
server_url (str): MCP Server URL
569+
570+
Returns:
571+
str: Result of tool call
572+
"""
573+
# Use async with directly to manage the context
574+
async with streamablehttp_client(url=server_url, headers=headers) as (read_stream, write_stream, get_session_id):
575+
async with ClientSession(read_stream, write_stream) as session:
576+
# Initialize the session
577+
await session.initialize()
578+
tool_call_result = await session.call_tool(name, arguments)
579+
return tool_call_result
580+
561581
tool_gateway_id = tool.gateway_id
562582
tool_gateway = db.execute(select(DbGateway).where(DbGateway.id == tool_gateway_id).where(DbGateway.is_active)).scalar_one_or_none()
563583

564-
tool_call_result = await connect_to_sse_server(tool_gateway.url)
584+
if transport == "sse":
585+
tool_call_result = await connect_to_sse_server(tool_gateway.url)
586+
elif transport == "streamablehttp":
587+
tool_call_result = await connect_to_streamablehttp_server(tool_gateway.url)
565588
content = tool_call_result.model_dump(by_alias=True).get("content", [])
566589

567590
success = True

mcpgateway/static/admin.js

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ document.addEventListener("DOMContentLoaded", function () {
318318
);
319319

320320
const requestTypeMap = {
321-
MCP: ["SSE", "STDIO"],
321+
MCP: ["SSE", "STREAMABLE", "STDIO"],
322322
REST: ["GET", "POST", "PUT", "PATCH", "DELETE"],
323323
};
324324

@@ -896,6 +896,10 @@ async function viewGateway(gatewayId) {
896896
<p><strong>Name:</strong> ${gateway.name}</p>
897897
<p><strong>URL:</strong> ${gateway.url}</p>
898898
<p><strong>Description:</strong> ${gateway.description || "N/A"}</p>
899+
<p><strong>Transport:</strong>
900+
${gateway.transport === "STREAMABLEHTTP" ? "Streamable HTTP" :
901+
gateway.transport === "SSE" ? "SSE" : "N/A"}
902+
</p>
899903
<p><strong>Status:</strong>
900904
<span class="px-2 inline-flex text-xs leading-5 font-semibold rounded-full ${gateway.isActive ? "bg-green-100 text-green-800" : "bg-red-100 text-red-800"}">
901905
${gateway.isActive ? "Active" : "Inactive"}
@@ -927,6 +931,7 @@ async function editGateway(gatewayId) {
927931
document.getElementById("edit-gateway-url").value = gateway.url;
928932
document.getElementById("edit-gateway-description").value =
929933
gateway.description || "";
934+
document.getElementById("edit-gateway-transport").value = gateway.transport;
930935
openModal("gateway-edit-modal");
931936
} catch (error) {
932937
console.error("Error fetching gateway details:", error);
@@ -1753,7 +1758,7 @@ function closeModal(modalId, clearId=null) {
17531758
}
17541759

17551760
const integrationRequestMap = {
1756-
MCP: ["SSE", "STDIO"],
1761+
MCP: ["SSE", "STREAMABLE", "STDIO"],
17571762
REST: ["GET", "POST", "PUT", "PATCH", "DELETE"],
17581763
};
17591764

mcpgateway/templates/admin.html

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1394,6 +1394,18 @@ <h3 class="text-lg font-bold mb-4">Add New Gateway</h3>
13941394
class="mt-1 block w-full rounded-md border-gray-300 shadow-sm focus:border-indigo-500 focus:ring-indigo-500"
13951395
></textarea>
13961396
</div>
1397+
<div>
1398+
<label class="block text-sm font-medium text-gray-700"
1399+
>Transport Type</label
1400+
>
1401+
<select
1402+
name="transport"
1403+
class="mt-1 block w-full rounded-md border-gray-300 shadow-sm focus:border-indigo-500 focus:ring-indigo-500"
1404+
>
1405+
<option value="SSE" selected>SSE</option>
1406+
<option value="STREAMABLEHTTP">Streamable HTTP</option>
1407+
</select>
1408+
</div>
13971409
<div>
13981410
<label class="block text-sm font-medium text-gray-700"
13991411
>Authentication Type</label
@@ -2259,6 +2271,19 @@ <h3 class="text-lg font-medium text-gray-900">Edit Gateway</h3>
22592271
class="mt-1 block w-full rounded-md border-gray-300 shadow-sm focus:border-indigo-500 focus:ring-indigo-500"
22602272
></textarea>
22612273
</div>
2274+
<div>
2275+
<label class="block text-sm font-medium text-gray-700"
2276+
>Transport Type</label
2277+
>
2278+
<select
2279+
name="transport"
2280+
id="edit-gateway-transport"
2281+
class="mt-1 block w-full rounded-md border-gray-300 shadow-sm focus:border-indigo-500 focus:ring-indigo-500"
2282+
>
2283+
<option value="SSE" selected>SSE</option>
2284+
<option value="STREAMABLEHTTP">Streamable HTTP</option>
2285+
</select>
2286+
</div>
22622287
<div>
22632288
<label class="block text-sm font-medium text-gray-700"
22642289
>Authentication Type</label

mcpgateway/transports/streamablehttp_transport.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
from typing import List, Union
2626
from uuid import uuid4
2727

28-
from mcp import types
2928
from fastapi.security.utils import get_authorization_scheme_param
29+
from mcp import types
3030
from mcp.server.lowlevel import Server
3131
from mcp.server.streamable_http import (
3232
EventCallback,

0 commit comments

Comments
 (0)