@@ -83,9 +83,15 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
8383 }
8484 String path = request .path ();
8585 final String sessionId = request .param ("sessionId" );
86- final String ssePrefixStr = request .param ("sse_prefix" );
87- boolean ssePrefix = Optional .ofNullable (ssePrefixStr ).map (x -> Boolean .parseBoolean (ssePrefixStr )).orElse (true );
88- final StreamingRestChannelConsumer consumer = (channel ) -> prepareRequestInternal (path , ssePrefix , sessionId , channel , client );
86+ final String sAppendToBaseUrl = request .param ("append_to_base_url" );
87+ boolean appendToBaseUrl = Optional .ofNullable (sAppendToBaseUrl ).map (x -> Boolean .parseBoolean (sAppendToBaseUrl )).orElse (false );
88+ final StreamingRestChannelConsumer consumer = (channel ) -> prepareRequestInternal (
89+ path ,
90+ appendToBaseUrl ,
91+ sessionId ,
92+ channel ,
93+ client
94+ );
8995 return channel -> {
9096 if (channel instanceof StreamingRestChannel ) {
9197 consumer .accept ((StreamingRestChannel ) channel );
@@ -101,25 +107,25 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
101107 @ VisibleForTesting
102108 protected void prepareRequestInternal (
103109 final String path ,
104- final boolean ssePrefix ,
110+ final boolean appendToBaseUrl ,
105111 final String sessionId ,
106112 final StreamingRestChannel channel ,
107113 final NodeClient client
108114 ) {
109- channel
110- .prepareResponse (
111- RestStatus .OK ,
112- Map
113- .of (
114- "Content-Type" ,
115- List .of ("text/event-stream" ),
116- "Cache-Control" ,
117- List .of ("no-cache" ),
118- "Connection" ,
119- List .of ("keep-alive" )
120- )
121- );
122115 if (path .equals (SSE_ENDPOINT )) {
116+ channel
117+ .prepareResponse (
118+ RestStatus .OK ,
119+ Map
120+ .of (
121+ "Content-Type" ,
122+ List .of ("text/event-stream" ),
123+ "Cache-Control" ,
124+ List .of ("no-cache" ),
125+ "Connection" ,
126+ List .of ("keep-alive" )
127+ )
128+ );
123129 // The connection request doesn't have request body, but we're still reading the request body content,
124130 // The reason is that the response producer is created when http channel is been read, so here
125131 // we subscribe to channel to trigger the channel read, but ignoring the content.
@@ -129,7 +135,7 @@ protected void prepareRequestInternal(
129135 .map (HttpChunk ::content )
130136 .flatMap (
131137 x -> McpAsyncServerHolder .mcpServerTransportProvider
132- .handleSseConnection (channel , ssePrefix , clusterService .localNode ().getId (), client )
138+ .handleSseConnection (channel , appendToBaseUrl , clusterService .localNode ().getId (), client )
133139 )
134140 .flatMap (y -> Mono .fromRunnable (() -> {
135141 log .debug ("starting to send sse connection chunk result" );
@@ -145,6 +151,7 @@ protected void prepareRequestInternal(
145151 }))
146152 .subscribe ();
147153 } else if (path .equals (MESSAGE_ENDPOINT )) {
154+ channel .prepareResponse (RestStatus .OK , Map .of ("Content-Type" , List .of ("text/plain" )));
148155 if (sessionId == null ) {
149156 try {
150157 channel
@@ -183,13 +190,8 @@ protected void prepareRequestInternal(
183190 McpAsyncServerHolder .mcpServerTransportProvider
184191 .handleMessage (sessionId , requestBody )
185192 .doOnSuccess (y -> {
186- if (requestBody .contains ("notifications/initialized" )) {
187- log
188- .debug (
189- "Starting to send OK response for notifications/initialized request in local node"
190- );
191- channel .sendChunk (createInitializedNotificationRes ());
192- }
193+ log .debug ("Starting to send rest response to client in local node" );
194+ channel .sendChunk (createRestResponse ());
193195 })
194196 .onErrorResume (e -> Mono .fromRunnable (() -> {
195197 try {
@@ -207,27 +209,25 @@ protected void prepareRequestInternal(
207209 } else {
208210 ActionListener <AcknowledgedResponse > actionListener = ActionListener .wrap (y -> {
209211 if (y .isAcknowledged ()) {
210- log
211- .debug (
212- "MCP request has been dispatched to corresponding node and handled successfully!"
213- );
214- if (requestBody .contains ("notifications/initialized" )) {
215- log
216- .debug (
217- "Starting to send OK response for notifications/initialized request in coordinator node"
218- );
219- channel .sendChunk (createInitializedNotificationRes ());
220- }
212+ log .debug ("Starting to send rest response to client as peer node returns successfully" );
213+ channel .sendChunk (createRestResponse ());
221214 }
222- },
223- e -> {
215+ }, e -> {
216+ log
217+ .error (
218+ "MCP request has been dispatched to corresponding node but peer node failed to handle it" ,
219+ e
220+ );
221+ try {
222+ channel .sendResponse (new BytesRestResponse (channel , e ));
223+ } catch (IOException ex ) {
224224 log
225225 .error (
226- "MCP request has been dispatched to corresponding node but peer node failed to handle it " ,
227- e
226+ "Failed to send exception response to client during message handling in remote node due to IOException, nodeId: {} " ,
227+ nodeId
228228 );
229229 }
230- );
230+ } );
231231 client
232232 .execute (
233233 MLMcpMessageAction .INSTANCE ,
@@ -279,7 +279,7 @@ protected void prepareRequestInternal(
279279 }
280280 }
281281
282- private HttpChunk createInitializedNotificationRes () {
282+ private HttpChunk createRestResponse () {
283283 return new HttpChunk () {
284284 @ Override
285285 public boolean isLast () {
0 commit comments