6363EventId = str
6464
6565
66+ class EventMessage :
67+ """
68+ A JSONRPCMessage with an optional event ID for stream resumability.
69+ """
70+
71+ message : JSONRPCMessage
72+ event_id : str | None
73+
74+ def __init__ (self , message : JSONRPCMessage , event_id : str | None = None ):
75+ self .message = message
76+ self .event_id = event_id
77+
78+
79+ EventCallback = Callable [[EventMessage ], Awaitable [None ]]
80+
81+
6682class EventStore (ABC ):
6783 """
6884 Interface for resumability support via event storage.
@@ -88,8 +104,8 @@ async def store_event(
88104 async def replay_events_after (
89105 self ,
90106 last_event_id : EventId ,
91- send_callback : Callable [[ EventId , JSONRPCMessage ], Awaitable [ None ]] ,
92- ) -> StreamId :
107+ send_callback : EventCallback ,
108+ ) -> StreamId | None :
93109 """
94110 Replays events that occurred after the specified event ID.
95111
@@ -149,7 +165,7 @@ def __init__(
149165 self .is_json_response_enabled = is_json_response_enabled
150166 self ._event_store = event_store
151167 self ._request_streams : dict [
152- RequestId , MemoryObjectSendStream [tuple [ JSONRPCMessage , str | None ] ]
168+ RequestId , MemoryObjectSendStream [EventMessage ]
153169 ] = {}
154170 self ._terminated = False
155171
@@ -358,7 +374,7 @@ async def _handle_post_request(
358374 request_id = str (message .root .id )
359375 # Create promise stream for getting response
360376 request_stream_writer , request_stream_reader = (
361- anyio .create_memory_object_stream [tuple [ JSONRPCMessage , str | None ] ](0 )
377+ anyio .create_memory_object_stream [EventMessage ](0 )
362378 )
363379
364380 # Register this stream for the request ID
@@ -373,16 +389,18 @@ async def _handle_post_request(
373389 response_message = None
374390
375391 # Use similar approach to SSE writer for consistency
376- async for received_message , _ in request_stream_reader :
392+ async for event_message in request_stream_reader :
377393 # If it's a response, this is what we're waiting for
378394 if isinstance (
379- received_message .root , JSONRPCResponse | JSONRPCError
395+ event_message . message .root , JSONRPCResponse | JSONRPCError
380396 ):
381- response_message = received_message
397+ response_message = event_message . message
382398 break
383399 # For notifications and request, keep waiting
384400 else :
385- logger .debug (f"received: { received_message .root .method } " )
401+ logger .debug (
402+ f"received: { event_message .message .root .method } "
403+ )
386404
387405 # At this point we should have a response
388406 if response_message :
@@ -424,27 +442,24 @@ async def sse_writer():
424442 try :
425443 async with sse_stream_writer , request_stream_reader :
426444 # Process messages from the request-specific stream
427- async for (
428- received_message ,
429- event_id ,
430- ) in request_stream_reader :
445+ async for event_message in request_stream_reader :
431446 # Build the event data
432447 event_data = {
433448 "event" : "message" ,
434- "data" : received_message .model_dump_json (
449+ "data" : event_message . message .model_dump_json (
435450 by_alias = True , exclude_none = True
436451 ),
437452 }
438453
439454 # If an event ID was provided, include it
440- if event_id :
441- event_data ["id" ] = event_id
455+ if event_message . event_id :
456+ event_data ["id" ] = event_message . event_id
442457
443458 await sse_stream_writer .send (event_data )
444459
445460 # If response, remove from pending streams and close
446461 if isinstance (
447- received_message .root ,
462+ event_message . message .root ,
448463 JSONRPCResponse | JSONRPCError ,
449464 ):
450465 if request_id :
@@ -563,20 +578,15 @@ async def standalone_sse_writer():
563578 try :
564579 # Create a standalone message stream for server-initiated messages
565580 standalone_stream_writer , standalone_stream_reader = (
566- anyio .create_memory_object_stream [
567- tuple [JSONRPCMessage , str | None ]
568- ](0 )
581+ anyio .create_memory_object_stream [EventMessage ](0 )
569582 )
570583
571584 # Register this stream using the special key
572585 self ._request_streams [GET_STREAM_KEY ] = standalone_stream_writer
573586
574587 async with sse_stream_writer , standalone_stream_reader :
575588 # Process messages from the standalone stream
576- async for item in standalone_stream_reader :
577- # The message router always sends a tuple of (message, event_id)
578- received_message , event_id = item
579-
589+ async for event_message in standalone_stream_reader :
580590 # For the standalone stream, we handle:
581591 # - JSONRPCNotification (server sends notifications to client)
582592 # - JSONRPCRequest (server sends requests to client)
@@ -585,14 +595,14 @@ async def standalone_sse_writer():
585595 # Send the message via SSE
586596 event_data = {
587597 "event" : "message" ,
588- "data" : received_message .model_dump_json (
598+ "data" : event_message . message .model_dump_json (
589599 by_alias = True , exclude_none = True
590600 ),
591601 }
592602
593603 # If an event ID was provided, include it in the SSE stream
594- if event_id :
595- event_data ["id" ] = event_id
604+ if event_message . event_id :
605+ event_data ["id" ] = event_message . event_id
596606
597607 await sse_stream_writer .send (event_data )
598608 except Exception as e :
@@ -741,14 +751,12 @@ async def replay_sender():
741751 try :
742752 async with sse_stream_writer :
743753 # Define an async callback for sending events
744- async def send_event (
745- event_id : EventId , message : JSONRPCMessage
746- ) -> None :
754+ async def send_event (event_message : EventMessage ) -> None :
747755 await sse_stream_writer .send (
748756 {
749757 "event" : "message" ,
750- "id" : event_id ,
751- "data" : message .model_dump_json (
758+ "id" : event_message . event_id ,
759+ "data" : event_message . message .model_dump_json (
752760 by_alias = True , exclude_none = True
753761 ),
754762 }
@@ -762,22 +770,21 @@ async def send_event(
762770 # If stream ID not in mapping, create it
763771 if stream_id and stream_id not in self ._request_streams :
764772 msg_writer , msg_reader = anyio .create_memory_object_stream [
765- tuple [ JSONRPCMessage , str | None ]
773+ EventMessage
766774 ](0 )
767775 self ._request_streams [stream_id ] = msg_writer
768776
769777 # Forward messages to SSE
770778 async with msg_reader :
771- async for item in msg_reader :
772- message , event_id = item
773-
779+ async for event_message in msg_reader :
780+ event_data = event_message .message .model_dump_json (
781+ by_alias = True , exclude_none = True
782+ )
774783 await sse_stream_writer .send (
775784 {
776785 "event" : "message" ,
777- "id" : event_id ,
778- "data" : message .model_dump_json (
779- by_alias = True , exclude_none = True
780- ),
786+ "id" : event_message .event_id ,
787+ "data" : event_data ,
781788 }
782789 )
783790 except Exception as e :
@@ -871,7 +878,7 @@ async def message_router():
871878 try :
872879 # Send both the message and the event ID
873880 await self ._request_streams [request_stream_id ].send (
874- (message , event_id )
881+ EventMessage (message , event_id )
875882 )
876883 except (
877884 anyio .BrokenResourceError ,
0 commit comments