|
1 | 1 | /* |
2 | | - * Copyright 2024-2024 the original author or authors. |
| 2 | + * Copyright 2024-2025 the original author or authors. |
3 | 3 | */ |
4 | 4 |
|
5 | 5 | package io.modelcontextprotocol.server; |
@@ -122,28 +122,30 @@ public Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities s |
122 | 122 | * blocking code offloading to prevent accidental blocking of the non-blocking |
123 | 123 | * transport. |
124 | 124 | * @param syncSpec a potentially blocking, synchronous specification. |
| 125 | + * @param immediateExecution when true, do not offload. Do NOT set to true when |
| 126 | + * using a non-blocking transport. |
125 | 127 | * @return a specification which is protected from blocking calls specified by the |
126 | 128 | * user. |
127 | 129 | */ |
128 | | - static Async fromSync(Sync syncSpec) { |
| 130 | + static Async fromSync(Sync syncSpec, boolean immediateExecution) { |
129 | 131 | List<McpServerFeatures.AsyncToolSpecification> tools = new ArrayList<>(); |
130 | 132 | for (var tool : syncSpec.tools()) { |
131 | | - tools.add(AsyncToolSpecification.fromSync(tool)); |
| 133 | + tools.add(AsyncToolSpecification.fromSync(tool, immediateExecution)); |
132 | 134 | } |
133 | 135 |
|
134 | 136 | Map<String, AsyncResourceSpecification> resources = new HashMap<>(); |
135 | 137 | syncSpec.resources().forEach((key, resource) -> { |
136 | | - resources.put(key, AsyncResourceSpecification.fromSync(resource)); |
| 138 | + resources.put(key, AsyncResourceSpecification.fromSync(resource, immediateExecution)); |
137 | 139 | }); |
138 | 140 |
|
139 | 141 | Map<String, AsyncPromptSpecification> prompts = new HashMap<>(); |
140 | 142 | syncSpec.prompts().forEach((key, prompt) -> { |
141 | | - prompts.put(key, AsyncPromptSpecification.fromSync(prompt)); |
| 143 | + prompts.put(key, AsyncPromptSpecification.fromSync(prompt, immediateExecution)); |
142 | 144 | }); |
143 | 145 |
|
144 | 146 | Map<McpSchema.CompleteReference, McpServerFeatures.AsyncCompletionSpecification> completions = new HashMap<>(); |
145 | 147 | syncSpec.completions().forEach((key, completion) -> { |
146 | | - completions.put(key, AsyncCompletionSpecification.fromSync(completion)); |
| 148 | + completions.put(key, AsyncCompletionSpecification.fromSync(completion, immediateExecution)); |
147 | 149 | }); |
148 | 150 |
|
149 | 151 | List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootChangeConsumers = new ArrayList<>(); |
@@ -267,15 +269,15 @@ record Sync(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities se |
267 | 269 | public record AsyncToolSpecification(McpSchema.Tool tool, |
268 | 270 | BiFunction<McpAsyncServerExchange, Map<String, Object>, Mono<McpSchema.CallToolResult>> call) { |
269 | 271 |
|
270 | | - static AsyncToolSpecification fromSync(SyncToolSpecification tool) { |
| 272 | + static AsyncToolSpecification fromSync(SyncToolSpecification tool, boolean immediate) { |
271 | 273 | // FIXME: This is temporary, proper validation should be implemented |
272 | 274 | if (tool == null) { |
273 | 275 | return null; |
274 | 276 | } |
275 | | - return new AsyncToolSpecification(tool.tool(), |
276 | | - (exchange, map) -> Mono |
277 | | - .fromCallable(() -> tool.call().apply(new McpSyncServerExchange(exchange), map)) |
278 | | - .subscribeOn(Schedulers.boundedElastic())); |
| 277 | + return new AsyncToolSpecification(tool.tool(), (exchange, map) -> { |
| 278 | + var toolResult = Mono.fromCallable(() -> tool.call().apply(new McpSyncServerExchange(exchange), map)); |
| 279 | + return immediate ? toolResult : toolResult.subscribeOn(Schedulers.boundedElastic()); |
| 280 | + }); |
279 | 281 | } |
280 | 282 | } |
281 | 283 |
|
@@ -343,15 +345,16 @@ public record AsyncStreamingToolSpecification(McpSchema.Tool tool, |
343 | 345 | public record AsyncResourceSpecification(McpSchema.Resource resource, |
344 | 346 | BiFunction<McpAsyncServerExchange, McpSchema.ReadResourceRequest, Mono<McpSchema.ReadResourceResult>> readHandler) { |
345 | 347 |
|
346 | | - static AsyncResourceSpecification fromSync(SyncResourceSpecification resource) { |
| 348 | + static AsyncResourceSpecification fromSync(SyncResourceSpecification resource, boolean immediateExecution) { |
347 | 349 | // FIXME: This is temporary, proper validation should be implemented |
348 | 350 | if (resource == null) { |
349 | 351 | return null; |
350 | 352 | } |
351 | | - return new AsyncResourceSpecification(resource.resource(), |
352 | | - (exchange, req) -> Mono |
353 | | - .fromCallable(() -> resource.readHandler().apply(new McpSyncServerExchange(exchange), req)) |
354 | | - .subscribeOn(Schedulers.boundedElastic())); |
| 353 | + return new AsyncResourceSpecification(resource.resource(), (exchange, req) -> { |
| 354 | + var resourceResult = Mono |
| 355 | + .fromCallable(() -> resource.readHandler().apply(new McpSyncServerExchange(exchange), req)); |
| 356 | + return immediateExecution ? resourceResult : resourceResult.subscribeOn(Schedulers.boundedElastic()); |
| 357 | + }); |
355 | 358 | } |
356 | 359 | } |
357 | 360 |
|
@@ -389,15 +392,16 @@ static AsyncResourceSpecification fromSync(SyncResourceSpecification resource) { |
389 | 392 | public record AsyncPromptSpecification(McpSchema.Prompt prompt, |
390 | 393 | BiFunction<McpAsyncServerExchange, McpSchema.GetPromptRequest, Mono<McpSchema.GetPromptResult>> promptHandler) { |
391 | 394 |
|
392 | | - static AsyncPromptSpecification fromSync(SyncPromptSpecification prompt) { |
| 395 | + static AsyncPromptSpecification fromSync(SyncPromptSpecification prompt, boolean immediateExecution) { |
393 | 396 | // FIXME: This is temporary, proper validation should be implemented |
394 | 397 | if (prompt == null) { |
395 | 398 | return null; |
396 | 399 | } |
397 | | - return new AsyncPromptSpecification(prompt.prompt(), |
398 | | - (exchange, req) -> Mono |
399 | | - .fromCallable(() -> prompt.promptHandler().apply(new McpSyncServerExchange(exchange), req)) |
400 | | - .subscribeOn(Schedulers.boundedElastic())); |
| 400 | + return new AsyncPromptSpecification(prompt.prompt(), (exchange, req) -> { |
| 401 | + var promptResult = Mono |
| 402 | + .fromCallable(() -> prompt.promptHandler().apply(new McpSyncServerExchange(exchange), req)); |
| 403 | + return immediateExecution ? promptResult : promptResult.subscribeOn(Schedulers.boundedElastic()); |
| 404 | + }); |
401 | 405 | } |
402 | 406 | } |
403 | 407 |
|
@@ -428,14 +432,17 @@ public record AsyncCompletionSpecification(McpSchema.CompleteReference reference |
428 | 432 | * @return an asynchronous wrapper of the provided sync specification, or |
429 | 433 | * {@code null} if input is null |
430 | 434 | */ |
431 | | - static AsyncCompletionSpecification fromSync(SyncCompletionSpecification completion) { |
| 435 | + static AsyncCompletionSpecification fromSync(SyncCompletionSpecification completion, |
| 436 | + boolean immediateExecution) { |
432 | 437 | if (completion == null) { |
433 | 438 | return null; |
434 | 439 | } |
435 | | - return new AsyncCompletionSpecification(completion.referenceKey(), |
436 | | - (exchange, request) -> Mono.fromCallable( |
437 | | - () -> completion.completionHandler().apply(new McpSyncServerExchange(exchange), request)) |
438 | | - .subscribeOn(Schedulers.boundedElastic())); |
| 440 | + return new AsyncCompletionSpecification(completion.referenceKey(), (exchange, request) -> { |
| 441 | + var completionResult = Mono.fromCallable( |
| 442 | + () -> completion.completionHandler().apply(new McpSyncServerExchange(exchange), request)); |
| 443 | + return immediateExecution ? completionResult |
| 444 | + : completionResult.subscribeOn(Schedulers.boundedElastic()); |
| 445 | + }); |
439 | 446 | } |
440 | 447 | } |
441 | 448 |
|
|
0 commit comments