diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java index 945aeef78..29dc23c35 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java @@ -223,6 +223,8 @@ static class AggregateSubscriber extends BaseSubscriber { */ private ResponseInfo responseInfo; + volatile boolean hasRequestedDemand = false; + /** * Creates a new JsonLineSubscriber that will emit parsed JSON-RPC messages. * @param sink the {@link FluxSink} to emit parsed {@link ResponseEvent} objects @@ -236,7 +238,13 @@ public AggregateSubscriber(ResponseInfo responseInfo, FluxSink si @Override protected void hookOnSubscribe(Subscription subscription) { - sink.onRequest(subscription::request); + + sink.onRequest(n -> { + if (!hasRequestedDemand) { + subscription.request(Long.MAX_VALUE); + } + hasRequestedDemand = true; + }); // Register disposal callback to cancel subscription when Flux is disposed sink.onDispose(subscription::cancel); @@ -249,8 +257,11 @@ protected void hookOnNext(String line) { @Override protected void hookOnComplete() { - String data = this.eventBuilder.toString(); - this.sink.next(new AggregateResponseEvent(responseInfo, data)); + + if (hasRequestedDemand) { + String data = this.eventBuilder.toString(); + this.sink.next(new AggregateResponseEvent(responseInfo, data)); + } this.sink.complete(); } @@ -271,6 +282,8 @@ static class BodilessResponseLineSubscriber extends BaseSubscriber { private final ResponseInfo responseInfo; + volatile boolean hasRequestedDemand = false; + public BodilessResponseLineSubscriber(ResponseInfo responseInfo, FluxSink sink) { this.sink = sink; this.responseInfo = responseInfo; @@ -280,7 +293,10 @@ public BodilessResponseLineSubscriber(ResponseInfo responseInfo, FluxSink { - subscription.request(n); + if (!hasRequestedDemand) { + subscription.request(Long.MAX_VALUE); + } + hasRequestedDemand = true; }); // Register disposal callback to cancel subscription when Flux is disposed @@ -291,11 +307,13 @@ protected void hookOnSubscribe(Subscription subscription) { @Override protected void hookOnComplete() { - // emit dummy event to be able to inspect the response info - // this is a shortcut allowing for a more streamlined processing using - // operator composition instead of having to deal with the CompletableFuture - // along the Subscriber for inspecting the result - this.sink.next(new DummyEvent(responseInfo)); + if (hasRequestedDemand) { + // emit dummy event to be able to inspect the response info + // this is a shortcut allowing for a more streamlined processing using + // operator composition instead of having to deal with the + // CompletableFuture along the Subscriber for inspecting the result + this.sink.next(new DummyEvent(responseInfo)); + } this.sink.complete(); }