Skip to content

Commit 150b758

Browse files
committed
Fix HttpClient resource leak with ExecutorService-based cleanup
Implement automatic ExecutorService shutdown for internal HttpClient and add support for external HttpClient injection to prevent thread accumulation. Related to #610
1 parent 993628c commit 150b758

File tree

5 files changed

+409
-159
lines changed

5 files changed

+409
-159
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java

Lines changed: 70 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,16 @@
55
package io.modelcontextprotocol.client.transport;
66

77
import java.io.IOException;
8-
import java.lang.reflect.Field;
9-
import java.lang.reflect.Method;
108
import java.net.URI;
119
import java.net.http.HttpClient;
1210
import java.net.http.HttpRequest;
1311
import java.net.http.HttpResponse;
1412
import java.time.Duration;
1513
import java.util.List;
1614
import java.util.concurrent.CompletableFuture;
15+
import java.util.concurrent.ExecutorService;
16+
import java.util.concurrent.Executors;
17+
import java.util.concurrent.TimeUnit;
1718
import java.util.concurrent.atomic.AtomicReference;
1819
import java.util.function.Consumer;
1920
import java.util.function.Function;
@@ -38,7 +39,6 @@
3839
import reactor.core.publisher.Flux;
3940
import reactor.core.publisher.Mono;
4041
import reactor.core.publisher.Sinks;
41-
import sun.misc.Unsafe;
4242

4343
/**
4444
* Server-Sent Events (SSE) implementation of the
@@ -146,6 +146,7 @@ public class HttpClientSseClientTransport implements McpClientTransport {
146146
Assert.notNull(httpClient, "httpClient must not be null");
147147
Assert.notNull(requestBuilder, "requestBuilder must not be null");
148148
Assert.notNull(httpRequestCustomizer, "httpRequestCustomizer must not be null");
149+
Assert.notNull(onCloseClient, "onCloseClient must not be null");
149150
this.baseUri = URI.create(baseUri);
150151
this.sseEndpoint = sseEndpoint;
151152
this.jsonMapper = jsonMapper;
@@ -178,8 +179,6 @@ public static class Builder {
178179

179180
private String sseEndpoint = DEFAULT_SSE_ENDPOINT;
180181

181-
private HttpClient.Builder clientBuilder = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1);
182-
183182
private HttpClient externalHttpClient;
184183

185184
private McpJsonMapper jsonMapper;
@@ -190,7 +189,8 @@ public static class Builder {
190189

191190
private Duration connectTimeout = Duration.ofSeconds(10);
192191

193-
private Consumer<HttpClient> onCloseClient;
192+
private Consumer<HttpClient> onCloseClient = (HttpClient client) -> {
193+
};
194194

195195
/**
196196
* Creates a new builder instance.
@@ -235,13 +235,17 @@ public Builder sseEndpoint(String sseEndpoint) {
235235
}
236236

237237
/**
238-
* Sets an external HttpClient instance to use instead of creating a new one. When
239-
* an external HttpClient is provided, the transport will not attempt to close it
240-
* during graceful shutdown, leaving resource management to the caller.
238+
* Provides an external HttpClient instance to use instead of creating a new one.
239+
* When an external HttpClient is provided, the transport will not attempt to
240+
* close it during graceful shutdown, leaving resource management to the caller.
241+
* <p>
242+
* Use this method when you want to share a single HttpClient instance across
243+
* multiple transports or when you need fine-grained control over HttpClient
244+
* lifecycle.
241245
* @param httpClient the HttpClient instance to use
242246
* @return this builder
243247
*/
244-
public Builder httpClient(HttpClient httpClient) {
248+
public Builder withExternalHttpClient(HttpClient httpClient) {
245249
Assert.notNull(httpClient, "httpClient must not be null");
246250
this.externalHttpClient = httpClient;
247251
return this;
@@ -317,11 +321,15 @@ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer as
317321

318322
/**
319323
* Sets a custom consumer to handle HttpClient closure when the transport is
320-
* closed.
324+
* closed. This allows for custom cleanup logic beyond the default behavior.
325+
* <p>
326+
* Note: This is typically used for advanced use cases. The default behavior
327+
* (shutting down the internal ExecutorService) is sufficient for most scenarios.
321328
* @param onCloseClient the consumer to handle HttpClient closure
322329
* @return this builder
323330
*/
324-
public Builder onCloseClient(Consumer<HttpClient> onCloseClient) {
331+
public Builder onHttpClientClose(Consumer<HttpClient> onCloseClient) {
332+
Assert.notNull(onCloseClient, "onCloseClient must not be null");
325333
this.onCloseClient = onCloseClient;
326334
return this;
327335
}
@@ -337,13 +345,29 @@ public HttpClientSseClientTransport build() {
337345
if (externalHttpClient != null) {
338346
// Use external HttpClient, use custom close handler or no-op
339347
httpClient = externalHttpClient;
340-
closeHandler = onCloseClient; // null means no cleanup
348+
closeHandler = onCloseClient;
341349
}
342350
else {
343-
// Create new HttpClient, use custom close handler or default cleanup
344-
httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build();
345-
closeHandler = onCloseClient != null ? onCloseClient
346-
: HttpClientSseClientTransport::closeHttpClientResourcesStatic;
351+
// Create internal HttpClient with custom ExecutorService
352+
// Create a custom ExecutorService with meaningful thread names
353+
ExecutorService internalExecutor = Executors.newCachedThreadPool(runnable -> {
354+
Thread thread = new Thread(runnable);
355+
thread.setName("MCP-HttpClient-" + thread.getId());
356+
thread.setDaemon(true);
357+
return thread;
358+
});
359+
360+
httpClient = HttpClient.newBuilder()
361+
.version(HttpClient.Version.HTTP_1_1)
362+
.connectTimeout(this.connectTimeout)
363+
.executor(internalExecutor)
364+
.build();
365+
366+
// Combine default cleanup (shutdown executor) with custom handler if
367+
// provided
368+
closeHandler = (client) -> shutdownHttpClientExecutor(internalExecutor);
369+
closeHandler = closeHandler.andThen(onCloseClient);
370+
347371
}
348372

349373
return new HttpClientSseClientTransport(httpClient, requestBuilder, baseUri, sseEndpoint,
@@ -519,53 +543,43 @@ public Mono<Void> closeGracefully() {
519543
}
520544

521545
/**
522-
* Static method to close HttpClient resources using reflection.
546+
* Closes HttpClient resources by shutting down its associated ExecutorService. This
547+
* allows the GC to reclaim HttpClient-related threads (including SelectorManager) on
548+
* the next garbage collection cycle.
549+
* <p>
550+
* This approach avoids using reflection, Unsafe, or Java 21+ specific APIs, making it
551+
* compatible with Java 17+.
552+
* @param executor the ExecutorService to shutdown
523553
*/
524-
private static void closeHttpClientResourcesStatic(HttpClient httpClient) {
554+
private static void shutdownHttpClientExecutor(ExecutorService executor) {
555+
if (executor == null) {
556+
return;
557+
}
558+
525559
try {
526-
// unsafe
527-
Class<?> UnsafeClass = Class.forName("sun.misc.Unsafe");
528-
Field unsafeField = UnsafeClass.getDeclaredField("theUnsafe");
529-
unsafeField.setAccessible(true);
530-
Unsafe unsafe = (Unsafe) unsafeField.get(null);
531-
Module ObjectModule = Object.class.getModule();
532-
Class<HttpClientSseClientTransport> currentClass = HttpClientSseClientTransport.class;
533-
long addr = unsafe.objectFieldOffset(Class.class.getDeclaredField("module"));
534-
unsafe.getAndSetObject(currentClass, addr, ObjectModule);
560+
logger.debug("Shutting down HttpClient ExecutorService");
561+
executor.shutdown();
535562

536-
try {
537-
Method closeMethod = httpClient.getClass().getMethod("close");
538-
closeMethod.invoke(httpClient);
539-
logger.debug("Successfully used JDK 21+ close() method to close HttpClient");
540-
return;
541-
}
542-
catch (NoSuchMethodException e) {
543-
logger.debug("JDK 21+ close() method not available, falling back to internal reflection");
544-
}
545-
// This prevents the accumulation of HttpClient-xxx-SelectorManager threads
546-
try {
547-
java.lang.reflect.Field implField = httpClient.getClass().getDeclaredField("impl");
548-
implField.setAccessible(true);
549-
Object implObj = implField.get(httpClient);
550-
java.lang.reflect.Field selmgrField = implObj.getClass().getDeclaredField("selmgr");
551-
selmgrField.setAccessible(true);
552-
Object selmgrObj = selmgrField.get(implObj);
553-
554-
if (selmgrObj != null) {
555-
Method shutDownMethod = selmgrObj.getClass().getDeclaredMethod("shutdown");
556-
shutDownMethod.setAccessible(true);
557-
shutDownMethod.invoke(selmgrObj);
558-
logger.debug("HttpClient SelectorManager shutdown completed via reflection");
563+
// Wait for graceful shutdown
564+
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
565+
logger.debug("ExecutorService did not terminate in time, forcing shutdown");
566+
executor.shutdownNow();
567+
568+
// Wait a bit more after forced shutdown
569+
if (!executor.awaitTermination(2, TimeUnit.SECONDS)) {
570+
logger.warn("ExecutorService did not terminate even after shutdownNow()");
559571
}
560572
}
561-
catch (NoSuchFieldException | NoSuchMethodException e) {
562-
// Field/method structure might differ across JDK versions
563-
logger.debug("SelectorManager field/method not found, skipping internal cleanup: {}", e.getMessage());
564-
}
565573

574+
logger.debug("HttpClient ExecutorService shutdown completed");
575+
}
576+
catch (InterruptedException e) {
577+
logger.warn("Interrupted while shutting down HttpClient ExecutorService");
578+
executor.shutdownNow();
579+
Thread.currentThread().interrupt();
566580
}
567581
catch (Exception e) {
568-
logger.warn("Failed to close HttpClient resources cleanly: {}", e.getMessage());
582+
logger.warn("Failed to shutdown HttpClient ExecutorService cleanly: {}", e.getMessage());
569583
}
570584
}
571585

0 commit comments

Comments
 (0)