33using  System . Text . Json ; 
44using  System . Threading . Channels ; 
55using  ModelContextProtocol . Protocol . Messages ; 
6- using  ModelContextProtocol . Protocol . Transport ; 
76using  ModelContextProtocol . Utils . Json ; 
87
9- namespace  AspNetCoreSseServer ; 
8+ namespace  ModelContextProtocol . Protocol . Transport ; 
109
11- public  class  SseServerStreamTransport ( Stream  sseResponseStream )  :  ITransport 
10+ /// <summary> 
11+ /// Implements the MCP SSE server transport protocol using the SSE response <see cref="Stream"/>. 
12+ /// </summary> 
13+ /// <param name="sseResponseStream">The stream to write the SSE response body to.</param> 
14+ public  sealed  class  SseResponseStreamTransport ( Stream  sseResponseStream )  :  ITransport 
1215{ 
1316    private  readonly  Channel < IJsonRpcMessage >  _incomingChannel  =  CreateSingleItemChannel < IJsonRpcMessage > ( ) ; 
1417    private  readonly  Channel < SseItem < IJsonRpcMessage ? > >  _outgoingSseChannel  =  CreateSingleItemChannel < SseItem < IJsonRpcMessage ? > > ( ) ; 
1518
1619    private  Task ?  _sseWriteTask ; 
1720    private  Utf8JsonWriter ?  _jsonWriter ; 
1821
22+     /// <inherityydoc/> 
1923    public  bool  IsConnected  =>  _sseWriteTask ? . IsCompleted  ==  false ; 
2024
25+     /// <summary> 
26+     /// Starts the transport and writes the JSON-RPC messages sent via <see cref="SendMessageAsync(IJsonRpcMessage, CancellationToken)"/> 
27+     /// to the SSE response stream until cancelled or disposed. 
28+     /// </summary> 
29+     /// <param name="cancellationToken">A token to cancel writing to the SSE response stream.</param> 
30+     /// <returns>A task representing the send loop that writes JSON-RPC messages to the SSE response stream.</returns> 
2131    public  Task  RunAsync ( CancellationToken  cancellationToken ) 
2232    { 
2333        void  WriteJsonRpcMessageToBuffer ( SseItem < IJsonRpcMessage ? >  item ,  IBufferWriter < byte >  writer ) 
@@ -28,7 +38,7 @@ void WriteJsonRpcMessageToBuffer(SseItem<IJsonRpcMessage?> item, IBufferWriter<b
2838                return ; 
2939            } 
3040
31-             JsonSerializer . Serialize ( GetUtf8JsonWriter ( writer ) ,  item . Data ,  McpJsonUtilities . DefaultOptions ) ; 
41+             JsonSerializer . Serialize ( GetUtf8JsonWriter ( writer ) ,  item . Data ,  McpJsonUtilities . DefaultOptions . GetTypeInfo < IJsonRpcMessage ? > ( ) ) ; 
3242        } 
3343
3444        // The very first SSE event isn't really an IJsonRpcMessage, but there's no API to write a single item of a different type, 
@@ -39,23 +49,40 @@ void WriteJsonRpcMessageToBuffer(SseItem<IJsonRpcMessage?> item, IBufferWriter<b
3949        return  _sseWriteTask  =  SseFormatter . WriteAsync ( sseItems ,  sseResponseStream ,  WriteJsonRpcMessageToBuffer ,  cancellationToken ) ; 
4050    } 
4151
52+     /// <inheritdoc/> 
4253    public  ChannelReader < IJsonRpcMessage >  MessageReader  =>  _incomingChannel . Reader ; 
4354
55+     /// <inheritdoc/> 
4456    public  ValueTask  DisposeAsync ( ) 
4557    { 
4658        _incomingChannel . Writer . TryComplete ( ) ; 
4759        _outgoingSseChannel . Writer . TryComplete ( ) ; 
4860        return  new  ValueTask ( _sseWriteTask  ??  Task . CompletedTask ) ; 
4961    } 
5062
51-     public  Task  SendMessageAsync ( IJsonRpcMessage  message ,  CancellationToken  cancellationToken  =  default )  => 
52-         _outgoingSseChannel . Writer . WriteAsync ( new  SseItem < IJsonRpcMessage ? > ( message ) ,  cancellationToken ) . AsTask ( ) ; 
63+     /// <inheritdoc/> 
64+     public  Task  SendMessageAsync ( IJsonRpcMessage  message ,  CancellationToken  cancellationToken  =  default ) 
65+     { 
66+         if  ( _sseWriteTask  is  null ) 
67+         { 
68+             throw  new  InvalidOperationException ( $ "Transport is not connected. Make sure to call { nameof ( RunAsync ) }  first.") ; 
69+         } 
70+ 
71+         return  _outgoingSseChannel . Writer . WriteAsync ( new  SseItem < IJsonRpcMessage ? > ( message ) ,  cancellationToken ) . AsTask ( ) ; 
72+     } 
5373
74+     /// <summary> 
75+     /// Handles incoming JSON-RPC messages received on the /message endpoint. 
76+     /// </summary> 
77+     /// <param name="message">The JSON-RPC message received.</param> 
78+     /// <param name="cancellationToken">A token to cancel the operation.</param> 
79+     /// <returns>A task representing the potentially asynchronous operation to buffer or process the JSON-RPC message.</returns> 
80+     /// <exception cref="InvalidOperationException">Thrown when there is an attempt to process a message before calling <see cref="RunAsync(CancellationToken)"/>.</exception> 
5481    public  Task  OnMessageReceivedAsync ( IJsonRpcMessage  message ,  CancellationToken  cancellationToken ) 
5582    { 
56-         if  ( ! IsConnected ) 
83+         if  ( _sseWriteTask   is   null ) 
5784        { 
58-             throw  new  McpTransportException ( "Transport is not connected" ) ; 
85+             throw  new  InvalidOperationException ( $ "Transport is not connected. Make sure to call  { nameof ( RunAsync ) }  first. ") ; 
5986        } 
6087
6188        return  _incomingChannel . Writer . WriteAsync ( message ,  cancellationToken ) . AsTask ( ) ; 
0 commit comments