77responses, with streaming support for long-running operations.
88"""
99
10+ import asyncio
1011import json
1112import logging
1213import re
2425from starlette .responses import Response
2526from starlette .types import Receive , Scope , Send
2627
28+ from mcp .shared ._httpx_utils import create_mcp_http_client
2729from mcp .shared .message import ServerMessageMetadata , SessionMessage
2830from mcp .types import (
2931 INTERNAL_ERROR ,
3638 JSONRPCRequest ,
3739 JSONRPCResponse ,
3840 RequestId ,
41+ Webhook ,
3942)
4043
4144logger = logging .getLogger (__name__ )
@@ -136,6 +139,7 @@ def __init__(
136139 self ,
137140 mcp_session_id : str | None ,
138141 is_json_response_enabled : bool = False ,
142+ is_webhooks_supported : bool = False ,
139143 event_store : EventStore | None = None ,
140144 ) -> None :
141145 """
@@ -146,6 +150,10 @@ def __init__(
146150 Must contain only visible ASCII characters (0x21-0x7E).
147151 is_json_response_enabled: If True, return JSON responses for requests
148152 instead of SSE streams. Default is False.
153+ is_webhooks_supported: If True and if webhooks are provided in
154+ tools/call request, the client will receive an Accepted
155+ HTTP response and the CallTool response will be sent to
156+ the webhook. Default is False.
149157 event_store: Event store for resumability support. If provided,
150158 resumability will be enabled, allowing clients to
151159 reconnect and resume messages.
@@ -162,6 +170,7 @@ def __init__(
162170
163171 self .mcp_session_id = mcp_session_id
164172 self .is_json_response_enabled = is_json_response_enabled
173+ self .is_webhooks_supported = is_webhooks_supported
165174 self ._event_store = event_store
166175 self ._request_streams : dict [
167176 RequestId ,
@@ -410,9 +419,43 @@ async def _handle_post_request(
410419 ](0 )
411420 request_stream_reader = self ._request_streams [request_id ][1 ]
412421
422+ session_message = SessionMessage (message )
423+ if self ._is_call_tool_request_with_webhooks (
424+ session_message .message
425+ ):
426+ if self .is_webhooks_supported :
427+ response = self ._create_json_response (
428+ JSONRPCMessage (root = JSONRPCResponse (
429+ jsonrpc = "2.0" ,
430+ id = message .root .id ,
431+ result = {
432+ 'content' : [{
433+ 'type' : 'text' ,
434+ 'text' : 'Response will be forwarded to the webhook.'
435+ }],
436+ 'isError' : False
437+ },
438+ )),
439+ HTTPStatus .OK ,
440+ )
441+ asyncio .create_task (
442+ self ._send_response_to_webhooks (
443+ request_id , session_message , request_stream_reader
444+ )
445+ )
446+ else :
447+ logger .exception ("Webhooks not supported error" )
448+ err = "Webhooks not supported"
449+ response = self ._create_error_response (
450+ f"Validation error: { err } " ,
451+ HTTPStatus .BAD_REQUEST ,
452+ INVALID_PARAMS ,
453+ )
454+ await response (scope , receive , send )
455+ return
456+
413457 if self .is_json_response_enabled :
414458 # Process the message
415- session_message = SessionMessage (message )
416459 await writer .send (session_message )
417460 try :
418461 # Process messages from the request-specific stream
@@ -531,6 +574,115 @@ async def sse_writer():
531574 await writer .send (Exception (err ))
532575 return
533576
577+
578+ async def _send_response_to_webhooks (
579+ self ,
580+ request_id : str ,
581+ session_message : SessionMessage ,
582+ request_stream_reader : MemoryObjectReceiveStream [EventMessage ],
583+ ):
584+ webhooks : list [Webhook ] = [Webhook (** webhook ) for webhook in session_message .message .root .webhooks ]
585+ writer = self ._read_stream_writer
586+ if writer is None :
587+ raise ValueError (
588+ "No read stream writer available. Ensure connect() is called first."
589+ )
590+ await writer .send (session_message )
591+
592+ try :
593+ response_message = JSONRPCError (
594+ jsonrpc = "2.0" ,
595+ id = "server-error" , # We don't have a request ID for general errors
596+ error = ErrorData (
597+ code = INTERNAL_ERROR ,
598+ message = "Error processing request: No response received" ,
599+ ),
600+ )
601+
602+ if self .is_json_response_enabled :
603+ # Process messages from the request-specific stream
604+ # We need to collect all messages until we get a response
605+ async for event_message in request_stream_reader :
606+ # If it's a response, this is what we're waiting for
607+ if isinstance (
608+ event_message .message .root , JSONRPCResponse | JSONRPCError
609+ ):
610+ response_message = event_message .message
611+ break
612+ # For notifications and request, keep waiting
613+ else :
614+ logger .debug (
615+ f"received: { event_message .message .root .method } "
616+ )
617+
618+ await self ._send_message_to_webhooks (webhooks , response_message )
619+ else :
620+ # Send each event on the request stream as a separate message
621+ async for event_message in request_stream_reader :
622+ event_data = self ._create_event_data (event_message )
623+ await self ._send_message_to_webhooks (webhooks , event_data )
624+
625+ # If response, remove from pending streams and close
626+ if isinstance (
627+ event_message .message .root ,
628+ JSONRPCResponse | JSONRPCError ,
629+ ):
630+ break
631+
632+ except Exception as e :
633+ logger .exception (f"Error sending response to webhooks: { e } " )
634+
635+ finally :
636+ await self ._clean_up_memory_streams (request_id )
637+
638+
639+ async def _send_message_to_webhooks (
640+ self ,
641+ webhooks : list [Webhook ],
642+ message : JSONRPCMessage | JSONRPCError | dict [str , str ],
643+ ):
644+ for webhook in webhooks :
645+ headers = {"Content-Type" : CONTENT_TYPE_JSON }
646+ # Add authorization headers
647+ if webhook .authentication and webhook .authentication .credentials :
648+ if webhook .authentication .strategy == "bearer" :
649+ headers ["Authorization" ] = f"Bearer { webhook .authentication .credentials } "
650+ elif webhook .authentication .strategy == "apiKey" :
651+ headers ["X-API-Key" ] = webhook .authentication .credentials
652+ elif webhook .authentication .strategy == "basic" :
653+ try :
654+ # Try to parse as JSON
655+ creds_dict = json .loads (webhook .authentication .credentials )
656+ if "username" in creds_dict and "password" in creds_dict :
657+ # Create basic auth header from username and password
658+ import base64
659+ auth_string = f"{ creds_dict ['username' ]} :{ creds_dict ['password' ]} "
660+ credentials = base64 .b64encode (auth_string .encode ()).decode ()
661+ headers ["Authorization" ] = f"Basic { credentials } "
662+ except :
663+ # Not JSON, use as-is
664+ headers ["Authorization" ] = f"Basic { webhook .authentication .credentials } "
665+ elif webhook .authentication .strategy == "customHeader" and webhook .authentication .credentials :
666+ try :
667+ custom_headers = json .loads (webhook .authentication .credentials )
668+ headers .update (custom_headers )
669+ except :
670+ pass
671+
672+ async with create_mcp_http_client (headers = headers ) as client :
673+ try :
674+ if isinstance (message , JSONRPCMessage | JSONRPCError ):
675+ await client .post (
676+ webhook .url ,
677+ json = message .model_dump_json (by_alias = True , exclude_none = True ),
678+ )
679+ else :
680+ await client .post (webhook .url , json = message )
681+
682+ except Exception as e :
683+ logger .exception (f"Error sending response to webhook { webhook .url } : { e } " )
684+
685+
534686 async def _handle_get_request (self , request : Request , send : Send ) -> None :
535687 """
536688 Handle GET request to establish SSE.
@@ -651,6 +803,18 @@ async def _handle_delete_request(self, request: Request, send: Send) -> None:
651803 )
652804 await response (request .scope , request .receive , send )
653805
806+
807+ def _is_call_tool_request_with_webhooks (self , message : JSONRPCMessage ) -> bool :
808+ """Check if the request is a call tool request with webhooks."""
809+ return (
810+ isinstance (message .root , JSONRPCRequest )
811+ and message .root .method == "tools/call"
812+ and hasattr (message .root , "webhooks" )
813+ and message .root .webhooks is not None
814+ and len (message .root .webhooks ) > 0
815+ )
816+
817+
654818 async def _terminate_session (self ) -> None :
655819 """Terminate the current session, closing all streams.
656820
0 commit comments