Skip to content

Commit 15bb3f9

Browse files
committed
wip
1 parent b4fef52 commit 15bb3f9

11 files changed

+304
-59
lines changed

mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,19 @@
1111
import java.util.Map;
1212
import java.util.function.Consumer;
1313
import java.util.function.Function;
14+
import java.util.function.Supplier;
1415

16+
import io.modelcontextprotocol.server.McpTransportContext;
1517
import io.modelcontextprotocol.spec.McpClientTransport;
1618
import io.modelcontextprotocol.spec.McpSchema;
17-
import io.modelcontextprotocol.spec.McpTransport;
1819
import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities;
1920
import io.modelcontextprotocol.spec.McpSchema.CreateMessageRequest;
2021
import io.modelcontextprotocol.spec.McpSchema.CreateMessageResult;
2122
import io.modelcontextprotocol.spec.McpSchema.ElicitRequest;
2223
import io.modelcontextprotocol.spec.McpSchema.ElicitResult;
2324
import io.modelcontextprotocol.spec.McpSchema.Implementation;
2425
import io.modelcontextprotocol.spec.McpSchema.Root;
26+
import io.modelcontextprotocol.spec.McpTransport;
2527
import io.modelcontextprotocol.util.Assert;
2628
import reactor.core.publisher.Mono;
2729

@@ -183,6 +185,8 @@ class SyncSpec {
183185

184186
private Function<ElicitRequest, ElicitResult> elicitationHandler;
185187

188+
private Supplier<McpTransportContext> contextProvider = McpTransportContext.EMPTY::copy;
189+
186190
private SyncSpec(McpClientTransport transport) {
187191
Assert.notNull(transport, "Transport must not be null");
188192
this.transport = transport;
@@ -409,6 +413,18 @@ public SyncSpec progressConsumers(List<Consumer<McpSchema.ProgressNotification>>
409413
return this;
410414
}
411415

416+
/**
417+
* Add a provider of {@link McpTransportContext}, providing a context before
418+
* calling any client operation. This allows to extract thread-locals and hand
419+
* them over to the underlying transport.
420+
* @param contextProvider A supplier to create a context
421+
* @return This builder for method chaining
422+
*/
423+
public SyncSpec transportContextProvider(Supplier<McpTransportContext> contextProvider) {
424+
this.contextProvider = contextProvider;
425+
return this;
426+
}
427+
412428
/**
413429
* Create an instance of {@link McpSyncClient} with the provided configurations or
414430
* sensible defaults.
@@ -423,7 +439,8 @@ public McpSyncClient build() {
423439
McpClientFeatures.Async asyncFeatures = McpClientFeatures.Async.fromSync(syncFeatures);
424440

425441
return new McpSyncClient(
426-
new McpAsyncClient(transport, this.requestTimeout, this.initializationTimeout, asyncFeatures));
442+
new McpAsyncClient(transport, this.requestTimeout, this.initializationTimeout, asyncFeatures),
443+
this.contextProvider);
427444
}
428445

429446
}

mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java

Lines changed: 89 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
package io.modelcontextprotocol.client;
66

77
import java.time.Duration;
8+
import java.util.function.Supplier;
89

910
import org.slf4j.Logger;
1011
import org.slf4j.LoggerFactory;
1112

13+
import io.modelcontextprotocol.server.McpTransportContext;
1214
import io.modelcontextprotocol.spec.McpSchema;
1315
import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities;
1416
import io.modelcontextprotocol.spec.McpSchema.GetPromptRequest;
@@ -63,14 +65,20 @@ public class McpSyncClient implements AutoCloseable {
6365

6466
private final McpAsyncClient delegate;
6567

68+
private final Supplier<McpTransportContext> contextProvider;
69+
6670
/**
6771
* Create a new McpSyncClient with the given delegate.
6872
* @param delegate the asynchronous kernel on top of which this synchronous client
6973
* provides a blocking API.
74+
* @param contextProvider the supplier of context before calling any non-blocking
75+
* operation on underlying delegate
7076
*/
71-
McpSyncClient(McpAsyncClient delegate) {
77+
McpSyncClient(McpAsyncClient delegate, Supplier<McpTransportContext> contextProvider) {
7278
Assert.notNull(delegate, "The delegate can not be null");
79+
Assert.notNull(contextProvider, "The contextProvider can not be null");
7380
this.delegate = delegate;
81+
this.contextProvider = contextProvider;
7482
}
7583

7684
/**
@@ -177,36 +185,43 @@ public boolean closeGracefully() {
177185
public McpSchema.InitializeResult initialize() {
178186
// TODO: block takes no argument here as we assume the async client is
179187
// configured with a requestTimeout at all times
180-
return this.delegate.initialize().block();
188+
var context = this.contextProvider.get();
189+
return this.delegate.initialize().contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context)).block();
181190
}
182191

183192
/**
184193
* Send a roots/list_changed notification.
185194
*/
186195
public void rootsListChangedNotification() {
187-
this.delegate.rootsListChangedNotification().block();
196+
var context = this.contextProvider.get();
197+
this.delegate.rootsListChangedNotification()
198+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
199+
.block();
188200
}
189201

190202
/**
191203
* Add a roots dynamically.
192204
*/
193205
public void addRoot(McpSchema.Root root) {
194-
this.delegate.addRoot(root).block();
206+
var context = this.contextProvider.get();
207+
this.delegate.addRoot(root).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context)).block();
195208
}
196209

197210
/**
198211
* Remove a root dynamically.
199212
*/
200213
public void removeRoot(String rootUri) {
201-
this.delegate.removeRoot(rootUri).block();
214+
var context = this.contextProvider.get();
215+
this.delegate.removeRoot(rootUri).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context)).block();
202216
}
203217

204218
/**
205219
* Send a synchronous ping request.
206220
* @return
207221
*/
208222
public Object ping() {
209-
return this.delegate.ping().block();
223+
var context = this.contextProvider.get();
224+
return this.delegate.ping().contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context)).block();
210225
}
211226

212227
// --------------------------
@@ -224,7 +239,11 @@ public Object ping() {
224239
* Boolean indicating if the execution failed (true) or succeeded (false/absent)
225240
*/
226241
public McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolRequest) {
227-
return this.delegate.callTool(callToolRequest).block();
242+
var context = this.contextProvider.get();
243+
return this.delegate.callTool(callToolRequest)
244+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
245+
.block();
246+
228247
}
229248

230249
/**
@@ -234,7 +253,8 @@ public McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolReque
234253
* pagination if more tools are available
235254
*/
236255
public McpSchema.ListToolsResult listTools() {
237-
return this.delegate.listTools().block();
256+
var context = this.contextProvider.get();
257+
return this.delegate.listTools().contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context)).block();
238258
}
239259

240260
/**
@@ -245,7 +265,9 @@ public McpSchema.ListToolsResult listTools() {
245265
* pagination if more tools are available
246266
*/
247267
public McpSchema.ListToolsResult listTools(String cursor) {
248-
return this.delegate.listTools(cursor).block();
268+
var context = this.contextProvider.get();
269+
return this.delegate.listTools(cursor).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context)).block();
270+
249271
}
250272

251273
// --------------------------
@@ -257,7 +279,9 @@ public McpSchema.ListToolsResult listTools(String cursor) {
257279
* @return The list of all resources result
258280
*/
259281
public McpSchema.ListResourcesResult listResources() {
260-
return this.delegate.listResources().block();
282+
var context = this.contextProvider.get();
283+
return this.delegate.listResources().contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context)).block();
284+
261285
}
262286

263287
/**
@@ -266,7 +290,11 @@ public McpSchema.ListResourcesResult listResources() {
266290
* @return The list of resources result
267291
*/
268292
public McpSchema.ListResourcesResult listResources(String cursor) {
269-
return this.delegate.listResources(cursor).block();
293+
var context = this.contextProvider.get();
294+
return this.delegate.listResources(cursor)
295+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
296+
.block();
297+
270298
}
271299

272300
/**
@@ -275,7 +303,11 @@ public McpSchema.ListResourcesResult listResources(String cursor) {
275303
* @return the resource content.
276304
*/
277305
public McpSchema.ReadResourceResult readResource(McpSchema.Resource resource) {
278-
return this.delegate.readResource(resource).block();
306+
var context = this.contextProvider.get();
307+
return this.delegate.readResource(resource)
308+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
309+
.block();
310+
279311
}
280312

281313
/**
@@ -284,15 +316,23 @@ public McpSchema.ReadResourceResult readResource(McpSchema.Resource resource) {
284316
* @return the resource content.
285317
*/
286318
public McpSchema.ReadResourceResult readResource(McpSchema.ReadResourceRequest readResourceRequest) {
287-
return this.delegate.readResource(readResourceRequest).block();
319+
var context = this.contextProvider.get();
320+
return this.delegate.readResource(readResourceRequest)
321+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
322+
.block();
323+
288324
}
289325

290326
/**
291327
* Retrieves the list of all resource templates provided by the server.
292328
* @return The list of all resource templates result.
293329
*/
294330
public McpSchema.ListResourceTemplatesResult listResourceTemplates() {
295-
return this.delegate.listResourceTemplates().block();
331+
var context = this.contextProvider.get();
332+
return this.delegate.listResourceTemplates()
333+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
334+
.block();
335+
296336
}
297337

298338
/**
@@ -304,7 +344,11 @@ public McpSchema.ListResourceTemplatesResult listResourceTemplates() {
304344
* @return The list of resource templates result.
305345
*/
306346
public McpSchema.ListResourceTemplatesResult listResourceTemplates(String cursor) {
307-
return this.delegate.listResourceTemplates(cursor).block();
347+
var context = this.contextProvider.get();
348+
return this.delegate.listResourceTemplates(cursor)
349+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
350+
.block();
351+
308352
}
309353

310354
/**
@@ -317,7 +361,11 @@ public McpSchema.ListResourceTemplatesResult listResourceTemplates(String cursor
317361
* subscribe to.
318362
*/
319363
public void subscribeResource(McpSchema.SubscribeRequest subscribeRequest) {
320-
this.delegate.subscribeResource(subscribeRequest).block();
364+
var context = this.contextProvider.get();
365+
this.delegate.subscribeResource(subscribeRequest)
366+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
367+
.block();
368+
321369
}
322370

323371
/**
@@ -326,7 +374,11 @@ public void subscribeResource(McpSchema.SubscribeRequest subscribeRequest) {
326374
* to unsubscribe from.
327375
*/
328376
public void unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) {
329-
this.delegate.unsubscribeResource(unsubscribeRequest).block();
377+
var context = this.contextProvider.get();
378+
this.delegate.unsubscribeResource(unsubscribeRequest)
379+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
380+
.block();
381+
330382
}
331383

332384
// --------------------------
@@ -338,7 +390,8 @@ public void unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest)
338390
* @return The list of all prompts result.
339391
*/
340392
public ListPromptsResult listPrompts() {
341-
return this.delegate.listPrompts().block();
393+
var context = this.contextProvider.get();
394+
return this.delegate.listPrompts().contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context)).block();
342395
}
343396

344397
/**
@@ -347,19 +400,29 @@ public ListPromptsResult listPrompts() {
347400
* @return The list of prompts result.
348401
*/
349402
public ListPromptsResult listPrompts(String cursor) {
350-
return this.delegate.listPrompts(cursor).block();
403+
var context = this.contextProvider.get();
404+
return this.delegate.listPrompts(cursor).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context)).block();
405+
351406
}
352407

353408
public GetPromptResult getPrompt(GetPromptRequest getPromptRequest) {
354-
return this.delegate.getPrompt(getPromptRequest).block();
409+
var context = this.contextProvider.get();
410+
return this.delegate.getPrompt(getPromptRequest)
411+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
412+
.block();
413+
355414
}
356415

357416
/**
358417
* Client can set the minimum logging level it wants to receive from the server.
359418
* @param loggingLevel the min logging level
360419
*/
361420
public void setLoggingLevel(McpSchema.LoggingLevel loggingLevel) {
362-
this.delegate.setLoggingLevel(loggingLevel).block();
421+
var context = this.contextProvider.get();
422+
this.delegate.setLoggingLevel(loggingLevel)
423+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
424+
.block();
425+
363426
}
364427

365428
/**
@@ -369,7 +432,11 @@ public void setLoggingLevel(McpSchema.LoggingLevel loggingLevel) {
369432
* @return the completion result containing suggested values.
370433
*/
371434
public McpSchema.CompleteResult completeCompletion(McpSchema.CompleteRequest completeRequest) {
372-
return this.delegate.completeCompletion(completeRequest).block();
435+
var context = this.contextProvider.get();
436+
return this.delegate.completeCompletion(completeRequest)
437+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, context))
438+
.block();
439+
373440
}
374441

375442
}

mcp/src/main/java/io/modelcontextprotocol/client/transport/AsyncHttpRequestCustomizer.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,15 @@
66

77
import java.net.URI;
88
import java.net.http.HttpRequest;
9+
910
import org.reactivestreams.Publisher;
1011
import reactor.core.publisher.Mono;
1112
import reactor.core.scheduler.Schedulers;
1213
import reactor.util.annotation.Nullable;
1314

15+
import io.modelcontextprotocol.server.DefaultMcpTransportContext;
16+
import io.modelcontextprotocol.server.McpTransportContext;
17+
1418
/**
1519
* Customize {@link HttpRequest.Builder} before executing the request, in either SSE or
1620
* Streamable HTTP transport.
@@ -21,8 +25,14 @@
2125
*/
2226
public interface AsyncHttpRequestCustomizer {
2327

28+
@Deprecated
29+
default Publisher<HttpRequest.Builder> customize(HttpRequest.Builder builder, String method, URI endpoint,
30+
@Nullable String body) {
31+
return customize(builder, method, endpoint, body, new DefaultMcpTransportContext());
32+
}
33+
2434
Publisher<HttpRequest.Builder> customize(HttpRequest.Builder builder, String method, URI endpoint,
25-
@Nullable String body);
35+
@Nullable String body, McpTransportContext context);
2636

2737
AsyncHttpRequestCustomizer NOOP = new Noop();
2838

@@ -33,8 +43,8 @@ Publisher<HttpRequest.Builder> customize(HttpRequest.Builder builder, String met
3343
* blocking implementation, consider using {@link Schedulers#boundedElastic()}.
3444
*/
3545
static AsyncHttpRequestCustomizer fromSync(SyncHttpRequestCustomizer customizer) {
36-
return (builder, method, uri, body) -> Mono.fromSupplier(() -> {
37-
customizer.customize(builder, method, uri, body);
46+
return (builder, method, uri, body, context) -> Mono.fromSupplier(() -> {
47+
customizer.customize(builder, method, uri, body, context);
3848
return builder;
3949
});
4050
}
@@ -43,7 +53,7 @@ class Noop implements AsyncHttpRequestCustomizer {
4353

4454
@Override
4555
public Publisher<HttpRequest.Builder> customize(HttpRequest.Builder builder, String method, URI endpoint,
46-
String body) {
56+
String body, McpTransportContext context) {
4757
return Mono.just(builder);
4858
}
4959

0 commit comments

Comments
 (0)