16
16
import httpx
17
17
from anyio .abc import TaskGroup
18
18
from anyio .streams .memory import MemoryObjectReceiveStream , MemoryObjectSendStream
19
+ from httpx ._config import DEFAULT_TIMEOUT_CONFIG
19
20
from httpx_sse import EventSource , ServerSentEvent , aconnect_sse
20
21
21
- from mcp .shared ._httpx_utils import McpHttpClientFactory , create_mcp_http_client
22
+ from mcp .shared ._httpx_utils import create_mcp_http_client
22
23
from mcp .shared .message import ClientMessageMetadata , SessionMessage
23
24
from mcp .types import (
24
25
ErrorData ,
33
34
34
35
logger = logging .getLogger (__name__ )
35
36
37
+ HTTPX_DEFAULT_TIMEOUT = DEFAULT_TIMEOUT_CONFIG
36
38
37
39
SessionMessageOrError = SessionMessage | Exception
38
40
StreamWriter = MemoryObjectSendStream [SessionMessageOrError ]
@@ -448,8 +450,8 @@ async def streamablehttp_client(
448
450
timeout : float | timedelta = 30 ,
449
451
sse_read_timeout : float | timedelta = 60 * 5 ,
450
452
terminate_on_close : bool = True ,
451
- httpx_client_factory : McpHttpClientFactory = create_mcp_http_client ,
452
453
auth : httpx .Auth | None = None ,
454
+ httpx_client : httpx .AsyncClient | None = None ,
453
455
) -> AsyncGenerator [
454
456
tuple [
455
457
MemoryObjectReceiveStream [SessionMessage | Exception ],
@@ -464,6 +466,19 @@ async def streamablehttp_client(
464
466
`sse_read_timeout` determines how long (in seconds) the client will wait for a new
465
467
event before disconnecting. All other HTTP operations are controlled by `timeout`.
466
468
469
+ Args:
470
+ url: The StreamableHTTP endpoint URL.
471
+ headers: Optional headers to include in requests.
472
+ timeout: HTTP timeout for regular operations. Defaults to 30 seconds.
473
+ Can be specified as float (seconds) or timedelta object.
474
+ sse_read_timeout: Timeout for SSE read operations. Defaults to 300 seconds (5 minutes).
475
+ Can be specified as float (seconds) or timedelta object.
476
+ terminate_on_close: Whether to send a terminate request when closing the connection.
477
+ auth: Optional HTTPX authentication handler.
478
+ httpx_client: Optional pre-configured httpx.AsyncClient. If provided, the client's
479
+ existing configuration is preserved. Timeout is only overridden if the provided
480
+ client uses httpx's default timeout configuration.
481
+
467
482
Yields:
468
483
Tuple containing:
469
484
- read_stream: Stream for reading messages from the server
@@ -475,15 +490,30 @@ async def streamablehttp_client(
475
490
read_stream_writer , read_stream = anyio .create_memory_object_stream [SessionMessage | Exception ](0 )
476
491
write_stream , write_stream_reader = anyio .create_memory_object_stream [SessionMessage ](0 )
477
492
493
+ if httpx_client is not None :
494
+ client = httpx_client
495
+ if not getattr (client , "follow_redirects" , False ):
496
+ logger .warning ("httpx_client does not have follow_redirects=True, which is recommended for MCP" )
497
+ if headers :
498
+ existing_headers = dict (client .headers ) if client .headers else {}
499
+ existing_headers .update (transport .request_headers )
500
+ client .headers = existing_headers
501
+ if auth and not client .auth :
502
+ client .auth = auth
503
+ if client .timeout == HTTPX_DEFAULT_TIMEOUT :
504
+ client .timeout = httpx .Timeout (transport .timeout , read = transport .sse_read_timeout )
505
+ else :
506
+ client = create_mcp_http_client (
507
+ headers = transport .request_headers ,
508
+ timeout = httpx .Timeout (transport .timeout , read = transport .sse_read_timeout ),
509
+ auth = transport .auth ,
510
+ )
511
+
478
512
async with anyio .create_task_group () as tg :
479
513
try :
480
514
logger .debug (f"Connecting to StreamableHTTP endpoint: { url } " )
481
515
482
- async with httpx_client_factory (
483
- headers = transport .request_headers ,
484
- timeout = httpx .Timeout (transport .timeout , read = transport .sse_read_timeout ),
485
- auth = transport .auth ,
486
- ) as client :
516
+ async with client :
487
517
# Define callbacks that need access to tg
488
518
def start_get_stream () -> None :
489
519
tg .start_soon (transport .handle_get_stream , client , read_stream_writer )
0 commit comments