1
1
from __future__ import annotations
2
2
3
3
import os
4
+ from collections .abc import AsyncIterator
4
5
from contextlib import asynccontextmanager
5
6
from datetime import timedelta
6
7
from pathlib import Path
7
- from typing import Any , AsyncIterator , Literal , Protocol
8
+ from typing import Any , Literal , Protocol
8
9
9
10
import httpx
10
11
from mcp import ClientSession , StdioServerParameters
@@ -152,6 +153,7 @@ async def _create_stdio_session(
152
153
encoding: Character encoding
153
154
encoding_error_handler: How to handle encoding errors
154
155
session_kwargs: Additional keyword arguments to pass to the ClientSession
156
+
155
157
"""
156
158
# NOTE: execution commands (e.g., `uvx` / `npx`) require PATH envvar to be set.
157
159
# To address this, we automatically inject existing PATH envvar into the `env` value,
@@ -196,6 +198,7 @@ async def _create_sse_session(
196
198
session_kwargs: Additional keyword arguments to pass to the ClientSession
197
199
httpx_client_factory: Custom factory for httpx.AsyncClient (optional)
198
200
auth: httpx.Auth | None = None
201
+
199
202
"""
200
203
# Create and store the connection
201
204
kwargs = {}
@@ -233,6 +236,7 @@ async def _create_streamable_http_session(
233
236
session_kwargs: Additional keyword arguments to pass to the ClientSession
234
237
httpx_client_factory: Custom factory for httpx.AsyncClient (optional)
235
238
auth: httpx.Auth | None = None
239
+
236
240
"""
237
241
# Create and store the connection
238
242
kwargs = {}
@@ -248,9 +252,7 @@ async def _create_streamable_http_session(
248
252
249
253
@asynccontextmanager
250
254
async def _create_websocket_session (
251
- * ,
252
- url : str ,
253
- session_kwargs : dict [str , Any ] | None = None ,
255
+ * , url : str , session_kwargs : dict [str , Any ] | None = None
254
256
) -> AsyncIterator [ClientSession ]:
255
257
"""Create a new session to an MCP server using Websockets.
256
258
@@ -260,6 +262,7 @@ async def _create_websocket_session(
260
262
261
263
Raises:
262
264
ImportError: If websockets package is not installed
265
+
263
266
"""
264
267
try :
265
268
from mcp .client .websocket import websocket_client
@@ -276,9 +279,7 @@ async def _create_websocket_session(
276
279
277
280
278
281
@asynccontextmanager
279
- async def create_session (
280
- connection : Connection ,
281
- ) -> AsyncIterator [ClientSession ]:
282
+ async def create_session (connection : Connection ) -> AsyncIterator [ClientSession ]:
282
283
"""Create a new session to an MCP server.
283
284
284
285
Args:
@@ -290,8 +291,8 @@ async def create_session(
290
291
291
292
Yields:
292
293
A ClientSession
293
- """
294
294
295
+ """
295
296
if "transport" not in connection :
296
297
raise ValueError (
297
298
"Configuration error: Missing 'transport' key in server configuration. "
@@ -350,8 +351,7 @@ async def create_session(
350
351
if "url" not in connection :
351
352
raise ValueError ("'url' parameter is required for Websocket connection" )
352
353
async with _create_websocket_session (
353
- url = connection ["url" ],
354
- session_kwargs = connection .get ("session_kwargs" ),
354
+ url = connection ["url" ], session_kwargs = connection .get ("session_kwargs" )
355
355
) as session :
356
356
yield session
357
357
else :
0 commit comments