@@ -88,12 +88,16 @@ public class McpAsyncServer {
8888
8989 private final McpSchema .ServerCapabilities serverCapabilities ;
9090
91+ private final boolean isStreamableHttp ;
92+
9193 private final McpSchema .Implementation serverInfo ;
9294
9395 private final String instructions ;
9496
9597 private final CopyOnWriteArrayList <McpServerFeatures .AsyncToolSpecification > tools = new CopyOnWriteArrayList <>();
9698
99+ private final CopyOnWriteArrayList <McpServerFeatures .AsyncStreamingToolSpecification > streamTools = new CopyOnWriteArrayList <>();
100+
97101 private final CopyOnWriteArrayList <McpSchema .ResourceTemplate > resourceTemplates = new CopyOnWriteArrayList <>();
98102
99103 private final ConcurrentHashMap <String , McpServerFeatures .AsyncResourceSpecification > resources = new ConcurrentHashMap <>();
@@ -119,7 +123,7 @@ public class McpAsyncServer {
119123 */
120124 McpAsyncServer (McpServerTransportProvider mcpTransportProvider , ObjectMapper objectMapper ,
121125 McpServerFeatures .Async features , Duration requestTimeout ,
122- McpUriTemplateManagerFactory uriTemplateManagerFactory ) {
126+ McpUriTemplateManagerFactory uriTemplateManagerFactory , boolean isStreamableHttp ) {
123127 this .mcpTransportProvider = mcpTransportProvider ;
124128 this .objectMapper = objectMapper ;
125129 this .serverInfo = features .serverInfo ();
@@ -131,6 +135,7 @@ public class McpAsyncServer {
131135 this .prompts .putAll (features .prompts ());
132136 this .completions .putAll (features .completions ());
133137 this .uriTemplateManagerFactory = uriTemplateManagerFactory ;
138+ this .isStreamableHttp = isStreamableHttp ;
134139
135140 Map <String , McpServerSession .RequestHandler <?>> requestHandlers = new HashMap <>();
136141
@@ -188,6 +193,13 @@ public class McpAsyncServer {
188193 notificationHandlers ));
189194 }
190195
196+ // Alternate constructor for HTTP+SSE servers (past spec)
197+ McpAsyncServer (McpServerTransportProvider mcpTransportProvider , ObjectMapper objectMapper ,
198+ McpServerFeatures .Async features , Duration requestTimeout ,
199+ McpUriTemplateManagerFactory uriTemplateManagerFactory ) {
200+ this (mcpTransportProvider , objectMapper , features , requestTimeout , uriTemplateManagerFactory , false );
201+ }
202+
191203 // ---------------------------------------
192204 // Lifecycle Management
193205 // ---------------------------------------
@@ -329,6 +341,69 @@ public Mono<Void> removeTool(String toolName) {
329341 });
330342 }
331343
344+ /**
345+ * Add a new tool specification at runtime.
346+ * @param toolSpecification The tool specification to add
347+ * @return Mono that completes when clients have been notified of the change
348+ */
349+ public Mono <Void > addStreamTool (McpServerFeatures .AsyncStreamingToolSpecification toolSpecification ) {
350+ if (toolSpecification == null ) {
351+ return Mono .error (new McpError ("Tool specification must not be null" ));
352+ }
353+ if (toolSpecification .tool () == null ) {
354+ return Mono .error (new McpError ("Tool must not be null" ));
355+ }
356+ if (toolSpecification .call () == null ) {
357+ return Mono .error (new McpError ("Tool call handler must not be null" ));
358+ }
359+ if (this .serverCapabilities .tools () == null ) {
360+ return Mono .error (new McpError ("Server must be configured with tool capabilities" ));
361+ }
362+
363+ return Mono .defer (() -> {
364+ // Check for duplicate tool names
365+ if (this .streamTools .stream ().anyMatch (th -> th .tool ().name ().equals (toolSpecification .tool ().name ()))) {
366+ return Mono
367+ .error (new McpError ("Tool with name '" + toolSpecification .tool ().name () + "' already exists" ));
368+ }
369+
370+ this .streamTools .add (toolSpecification );
371+ logger .debug ("Added tool handler: {}" , toolSpecification .tool ().name ());
372+
373+ if (this .serverCapabilities .tools ().listChanged ()) {
374+ return notifyToolsListChanged ();
375+ }
376+ return Mono .empty ();
377+ });
378+ }
379+
380+ /**
381+ * Remove a tool handler at runtime.
382+ * @param toolName The name of the tool handler to remove
383+ * @return Mono that completes when clients have been notified of the change
384+ */
385+ public Mono <Void > removeStreamTool (String toolName ) {
386+ if (toolName == null ) {
387+ return Mono .error (new McpError ("Tool name must not be null" ));
388+ }
389+ if (this .serverCapabilities .tools () == null ) {
390+ return Mono .error (new McpError ("Server must be configured with tool capabilities" ));
391+ }
392+
393+ return Mono .defer (() -> {
394+ boolean removed = this .tools
395+ .removeIf (toolSpecification -> toolSpecification .tool ().name ().equals (toolName ));
396+ if (removed ) {
397+ logger .debug ("Removed tool handler: {}" , toolName );
398+ if (this .serverCapabilities .tools ().listChanged ()) {
399+ return notifyToolsListChanged ();
400+ }
401+ return Mono .empty ();
402+ }
403+ return Mono .error (new McpError ("Tool with name '" + toolName + "' not found" ));
404+ });
405+ }
406+
332407 /**
333408 * Notifies clients that the list of available tools has changed.
334409 * @return A Mono that completes when all clients have been notified
@@ -339,29 +414,97 @@ public Mono<Void> notifyToolsListChanged() {
339414
340415 private McpServerSession .RequestHandler <McpSchema .ListToolsResult > toolsListRequestHandler () {
341416 return (exchange , params ) -> {
342- List <Tool > tools = this .tools .stream ().map (McpServerFeatures .AsyncToolSpecification ::tool ).toList ();
417+ List <Tool > tools = new ArrayList <>();
418+ tools .addAll (this .tools .stream ().map (McpServerFeatures .AsyncToolSpecification ::tool ).toList ());
419+ tools .addAll (
420+ this .streamTools .stream ().map (McpServerFeatures .AsyncStreamingToolSpecification ::tool ).toList ());
343421
344422 return Mono .just (new McpSchema .ListToolsResult (tools , null ));
345423 };
346424 }
347425
348426 private McpServerSession .RequestHandler <CallToolResult > toolsCallRequestHandler () {
349- return (exchange , params ) -> {
350- McpSchema .CallToolRequest callToolRequest = objectMapper .convertValue (params ,
351- new TypeReference <McpSchema .CallToolRequest >() {
352- });
427+ if (isStreamableHttp ) {
428+ return new McpServerSession .StreamingRequestHandler <CallToolResult >() {
429+ @ Override
430+ public Mono <CallToolResult > handle (McpAsyncServerExchange exchange , Object params ) {
431+ var callToolRequest = objectMapper .convertValue (params , McpSchema .CallToolRequest .class );
432+
433+ // Check regular tools first
434+ var regularTool = tools .stream ()
435+ .filter (tool -> callToolRequest .name ().equals (tool .tool ().name ()))
436+ .findFirst ();
437+
438+ if (regularTool .isPresent ()) {
439+ return regularTool .get ().call ().apply (exchange , callToolRequest .arguments ());
440+ }
441+
442+ // Check streaming tools (take first result)
443+ var streamingTool = streamTools .stream ()
444+ .filter (tool -> callToolRequest .name ().equals (tool .tool ().name ()))
445+ .findFirst ();
446+
447+ if (streamingTool .isPresent ()) {
448+ return streamingTool .get ().call ().apply (exchange , callToolRequest .arguments ()).next ();
449+ }
450+
451+ return Mono .error (new McpError ("Tool not found: " + callToolRequest .name ()));
452+ }
353453
354- Optional < McpServerFeatures . AsyncToolSpecification > toolSpecification = this . tools . stream ()
355- . filter ( tr -> callToolRequest . name (). equals ( tr . tool (). name ()))
356- . findAny ( );
454+ @ Override
455+ public Flux < CallToolResult > handleStreaming ( McpAsyncServerExchange exchange , Object params ) {
456+ var callToolRequest = objectMapper . convertValue ( params , McpSchema . CallToolRequest . class );
357457
358- if (toolSpecification .isEmpty ()) {
359- return Mono .error (new McpError ("Tool not found: " + callToolRequest .name ()));
360- }
458+ // Check streaming tools first (preferred for streaming)
459+ var streamingTool = streamTools .stream ()
460+ .filter (tool -> callToolRequest .name ().equals (tool .tool ().name ()))
461+ .findFirst ();
361462
362- return toolSpecification .map (tool -> tool .call ().apply (exchange , callToolRequest .arguments ()))
363- .orElse (Mono .error (new McpError ("Tool not found: " + callToolRequest .name ())));
364- };
463+ if (streamingTool .isPresent ()) {
464+ return streamingTool .get ().call ().apply (exchange , callToolRequest .arguments ());
465+ }
466+
467+ // Fallback to regular tools (convert Mono to Flux)
468+ var regularTool = tools .stream ()
469+ .filter (tool -> callToolRequest .name ().equals (tool .tool ().name ()))
470+ .findFirst ();
471+
472+ if (regularTool .isPresent ()) {
473+ return regularTool .get ().call ().apply (exchange , callToolRequest .arguments ()).flux ();
474+ }
475+
476+ return Flux .error (new McpError ("Tool not found: " + callToolRequest .name ()));
477+ }
478+ };
479+ }
480+ else {
481+ return (exchange , params ) -> {
482+ McpSchema .CallToolRequest callToolRequest = objectMapper .convertValue (params ,
483+ new TypeReference <McpSchema .CallToolRequest >() {
484+ });
485+
486+ // Check regular tools first
487+ Optional <McpServerFeatures .AsyncToolSpecification > toolSpecification = this .tools .stream ()
488+ .filter (tr -> callToolRequest .name ().equals (tr .tool ().name ()))
489+ .findAny ();
490+
491+ if (toolSpecification .isPresent ()) {
492+ return toolSpecification .get ().call ().apply (exchange , callToolRequest .arguments ());
493+ }
494+
495+ // Check streaming tools (take first result)
496+ Optional <McpServerFeatures .AsyncStreamingToolSpecification > streamToolSpecification = this .streamTools
497+ .stream ()
498+ .filter (tr -> callToolRequest .name ().equals (tr .tool ().name ()))
499+ .findAny ();
500+
501+ if (streamToolSpecification .isPresent ()) {
502+ return streamToolSpecification .get ().call ().apply (exchange , callToolRequest .arguments ()).next ();
503+ }
504+
505+ return Mono .error (new McpError ("Tool not found: " + callToolRequest .name ()));
506+ };
507+ }
365508 }
366509
367510 // ---------------------------------------
0 commit comments