Skip to content

Commit eb6895a

Browse files
committed
feat(client): adds sendMessages for batching
1 parent 4ecf709 commit eb6895a

File tree

1 file changed

+45
-4
lines changed

1 file changed

+45
-4
lines changed

mcp/src/main/java/io/modelcontextprotocol/client/transport/StreamableHttpClientTransport.java

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.net.http.HttpResponse;
1515
import java.nio.charset.StandardCharsets;
1616
import java.time.Duration;
17+
import java.util.List;
1718
import java.util.concurrent.atomic.AtomicBoolean;
1819
import java.util.concurrent.atomic.AtomicReference;
1920
import java.util.function.Consumer;
@@ -195,10 +196,43 @@ public Mono<Void> sendMessage(final McpSchema.JSONRPCMessage message) {
195196

196197
try {
197198
String json = objectMapper.writeValueAsString(message);
198-
HttpRequest request = requestBuilder.copy().POST(HttpRequest.BodyPublishers.ofString(json)).build();
199-
return Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream()))
200-
.flatMap(response -> handleStreamingResponse(msg -> msg, response))
201-
.then();
199+
return sentPost(json);
200+
}
201+
catch (Exception e) {
202+
return Mono.error(e);
203+
}
204+
}
205+
206+
/**
207+
* Sends a list of messages to the server.
208+
* @param messages the list of messages to send
209+
* @return a Mono that completes when all messages have been sent
210+
*/
211+
public Mono<Void> sendMessages(final List<McpSchema.JSONRPCMessage> messages) {
212+
if (state.get() == TransportState.CLOSED) {
213+
return Mono.empty();
214+
}
215+
216+
if (fallbackToSse.get()) {
217+
return Flux.fromIterable(messages).flatMap(this::sendMessage).then();
218+
}
219+
220+
if (state.get() == TransportState.DISCONNECTED) {
221+
state.set(TransportState.CONNECTING);
222+
223+
return sendInitialHandshake().doOnSuccess(v -> state.set(TransportState.CONNECTED)).onErrorResume(e -> {
224+
if (e instanceof UnsupportedOperationException) {
225+
LOGGER.warn("Streamable transport failed, falling back to SSE.", e);
226+
fallbackToSse.set(true);
227+
return Mono.empty();
228+
}
229+
return Mono.error(e);
230+
}).then(sendMessages(messages));
231+
}
232+
233+
try {
234+
String json = objectMapper.writeValueAsString(messages);
235+
return sentPost(json);
202236
}
203237
catch (Exception e) {
204238
return Mono.error(e);
@@ -229,6 +263,13 @@ else if (code >= 400 && code < 500) {
229263
}
230264
}
231265

266+
private Mono<Void> sentPost(String json) {
267+
HttpRequest request = requestBuilder.copy().POST(HttpRequest.BodyPublishers.ofString(json)).build();
268+
return Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream()))
269+
.flatMap(response -> handleStreamingResponse(msg -> msg, response))
270+
.then();
271+
}
272+
232273
private Mono<Void> handleStreamingResponse(
233274
final Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler,
234275
final HttpResponse<InputStream> response) {

0 commit comments

Comments
 (0)