diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index 93c28422a..6308eb6d7 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -422,17 +422,19 @@ public Mono sendMessage(McpSchema.JSONRPCMessage sentMessage) { return Mono.from(this.httpRequestCustomizer.customize(builder, "POST", uri, jsonBody)); }).flatMapMany(requestBuilder -> Flux.create(responseEventSink -> { - // Create the async request with proper body subscriber selection - Mono.fromFuture(this.httpClient - .sendAsync(requestBuilder.build(), this.toSendMessageBodySubscriber(responseEventSink)) - .whenComplete((response, throwable) -> { - if (throwable != null) { - responseEventSink.error(throwable); - } - else { - logger.debug("SSE connection established successfully"); - } - })).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe(); + // Create the async request with proper error handling and timeout + Mono.fromFuture(() -> this.httpClient.sendAsync(requestBuilder.build(), + this.toSendMessageBodySubscriber(responseEventSink))) + .doOnSuccess(response -> { + logger.debug("Success: " + response.statusCode()); + }) + .doOnError(throwable -> { + logger.error("HTTP request failed with message {}", throwable.getMessage(), throwable); + // Ensure the sink gets the error if it hasn't been completed yet + responseEventSink.error(throwable); + }) + .onErrorMap(CompletionException.class, Throwable::getCause) + .subscribe(); })).flatMap(responseEvent -> { if (transportSession.markInitialized( @@ -500,6 +502,11 @@ else if (contentType.contains(APPLICATION_JSON)) { return Mono.empty(); } + if (!Utils.hasText(data)) { + deliveredSink.success(); + return Mono.empty(); + } + try { return Mono.just(McpSchema.deserializeJsonRpcMessage(objectMapper, data)); } diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java index 296d1a17d..5fc84f85f 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java @@ -187,7 +187,7 @@ else if (line.startsWith(":")) { @Override protected void hookOnComplete() { - if (this.eventBuilder.length() > 0) { + if (!this.eventBuilder.isEmpty()) { String eventData = this.eventBuilder.toString(); SseEvent sseEvent = new SseEvent(currentEventId.get(), currentEventType.get(), eventData.trim()); this.sink.next(new SseResponseEvent(responseInfo, sseEvent)); @@ -233,9 +233,24 @@ public AggregateSubscriber(ResponseInfo responseInfo, FluxSink si @Override protected void hookOnSubscribe(Subscription subscription) { - sink.onRequest(subscription::request); + var contentLength = responseInfo.headers().firstValue("Content-Length"); + var useUnbounded = false; + if (contentLength.isPresent()) { + useUnbounded = Long.parseLong(contentLength.get()) > 0; + } + if (useUnbounded) { + sink.onRequest(n -> { + // Don't forward downstream requests directly - we manage our own + // requests + }); + + // Request unbounded items to consume the entire response body + subscription.request(Long.MAX_VALUE); + } + else { + sink.onRequest(subscription::request); + } - // Register disposal callback to cancel subscription when Flux is disposed sink.onDispose(subscription::cancel); }