Skip to content

Commit 91e27a7

Browse files
committed
[anthropic] Replace switchMap -> flatMap to avoid cancellation of the original request
Previously, internalStream used switchMap to process ChatCompletionResponses, which caused the active stream (including potential recursive calls) to be canceled whenever a new response arrived. This led to incomplete processing of streaming tool calls and unexpected behavior when handling tool_use events. Replaced switchMap with flatMap to ensure that each response is fully processed without being interrupted, allowing recursive internalStream calls to complete as expected. Without this fix during the stream event handling when `EventType.MESSAGE_STOP` occurs, the latest content block was resent again and it caused to the additional tool call(if it was the latest event) Signed-off-by: Mikhail Mazurkevich <[email protected]>
1 parent f3adcae commit 91e27a7

File tree

1 file changed

+1
-1
lines changed

1 file changed

+1
-1
lines changed

models/spring-ai-anthropic/src/main/java/org/springframework/ai/anthropic/AnthropicChatModel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ public Flux<ChatResponse> internalStream(Prompt prompt, ChatResponse previousCha
237237
this.getAdditionalHttpHeaders(prompt));
238238

239239
// @formatter:off
240-
Flux<ChatResponse> chatResponseFlux = response.switchMap(chatCompletionResponse -> {
240+
Flux<ChatResponse> chatResponseFlux = response.flatMap(chatCompletionResponse -> {
241241
AnthropicApi.Usage usage = chatCompletionResponse.usage();
242242
Usage currentChatResponseUsage = usage != null ? this.getDefaultUsage(chatCompletionResponse.usage()) : new EmptyUsage();
243243
Usage accumulatedUsage = UsageUtils.getCumulativeUsage(currentChatResponseUsage, previousChatResponse);

0 commit comments

Comments
 (0)