Skip to content

Commit c9bd70b

Browse files
committed
feat(client): handle better sse + batch in response
1 parent eb6895a commit c9bd70b

File tree

1 file changed

+40
-14
lines changed

1 file changed

+40
-14
lines changed

mcp/src/main/java/io/modelcontextprotocol/client/transport/StreamableHttpClientTransport.java

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.slf4j.LoggerFactory;
2525

2626
import com.fasterxml.jackson.core.type.TypeReference;
27+
import com.fasterxml.jackson.databind.JsonNode;
2728
import com.fasterxml.jackson.databind.ObjectMapper;
2829

2930
import io.modelcontextprotocol.spec.McpClientTransport;
@@ -314,29 +315,54 @@ private Mono<Void> handleJsonStream(final HttpResponse<InputStream> response,
314315
private Mono<Void> handleSseStream(final HttpResponse<InputStream> response,
315316
final Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler) {
316317
return Flux.fromStream(new BufferedReader(new InputStreamReader(response.body())).lines())
317-
.scan(new FlowSseClient.SseEvent("", "", ""), (acc, line) -> {
318-
String event = acc.type();
319-
String data = acc.data();
320-
String id = acc.id();
321-
322-
if (line.startsWith("event: "))
323-
event = line.substring(7).trim();
324-
else if (line.startsWith("data: "))
325-
data = line.substring(6).trim();
326-
else if (line.startsWith("id: "))
327-
id = line.substring(4).trim();
318+
.map(String::trim)
319+
.bufferUntil(String::isEmpty)
320+
.map(eventLines -> {
321+
String event = "";
322+
String data = "";
323+
String id = "";
324+
325+
for (String line : eventLines) {
326+
if (line.startsWith("event: "))
327+
event = line.substring(7).trim();
328+
else if (line.startsWith("data: "))
329+
data += line.substring(6).trim() + "\n";
330+
else if (line.startsWith("id: "))
331+
id = line.substring(4).trim();
332+
}
333+
334+
if (data.endsWith("\n")) {
335+
data = data.substring(0, data.length() - 1);
336+
}
328337

329338
return new FlowSseClient.SseEvent(event, data, id);
330339
})
331340
.filter(sseEvent -> "message".equals(sseEvent.type()))
332341
.doOnNext(sseEvent -> {
333342
lastEventId.set(sseEvent.id());
334343
try {
335-
McpSchema.JSONRPCMessage msg = McpSchema.deserializeJsonRpcMessage(objectMapper, sseEvent.data());
336-
handler.apply(Mono.just(msg)).subscribe();
344+
String rawData = sseEvent.data().trim();
345+
JsonNode node = objectMapper.readTree(rawData);
346+
347+
if (node.isArray()) {
348+
for (JsonNode item : node) {
349+
String rawMessage = objectMapper.writeValueAsString(item);
350+
McpSchema.JSONRPCMessage msg = McpSchema.deserializeJsonRpcMessage(objectMapper,
351+
rawMessage);
352+
handler.apply(Mono.just(msg)).subscribe();
353+
}
354+
}
355+
else if (node.isObject()) {
356+
String rawMessage = objectMapper.writeValueAsString(node);
357+
McpSchema.JSONRPCMessage msg = McpSchema.deserializeJsonRpcMessage(objectMapper, rawMessage);
358+
handler.apply(Mono.just(msg)).subscribe();
359+
}
360+
else {
361+
LOGGER.warn("Unexpected JSON in SSE data: {}", rawData);
362+
}
337363
}
338364
catch (IOException e) {
339-
LOGGER.error("Error processing SSE event", e);
365+
LOGGER.error("Error processing SSE event: {}", sseEvent.data(), e);
340366
}
341367
})
342368
.then();

0 commit comments

Comments
 (0)