Skip to content

Commit 1d335ed

Browse files
committed
fix: Implement proper streaming support in PromptChatMemoryAdvisor
This commit fixes a test failure in the streaming chat memory functionality by: 1. Implementing a dedicated adviseStream method in PromptChatMemoryAdvisor that properly handles streaming responses using MessageAggregator
1 parent 4bcfbe0 commit 1d335ed

File tree

1 file changed

+36
-7
lines changed

1 file changed

+36
-7
lines changed

spring-ai-client-chat/src/main/java/org/springframework/ai/chat/client/advisor/PromptChatMemoryAdvisor.java

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,19 @@
2828
import org.springframework.ai.chat.client.ChatClientResponse;
2929
import org.springframework.ai.chat.client.advisor.api.Advisor;
3030
import org.springframework.ai.chat.client.advisor.api.AdvisorChain;
31+
import org.springframework.ai.chat.client.advisor.api.StreamAdvisorChain;
3132
import org.springframework.ai.chat.memory.ChatMemory;
3233
import org.springframework.ai.chat.messages.Message;
3334
import org.springframework.ai.chat.messages.MessageType;
3435
import org.springframework.ai.chat.messages.SystemMessage;
3536
import org.springframework.ai.chat.messages.UserMessage;
37+
import org.springframework.ai.chat.model.MessageAggregator;
3638
import org.springframework.ai.chat.prompt.PromptTemplate;
3739

40+
import reactor.core.publisher.Flux;
41+
import reactor.core.publisher.Mono;
42+
import reactor.core.scheduler.Scheduler;
43+
3844
/**
3945
* Memory is retrieved added into the prompt's system text.
4046
*
@@ -137,16 +143,39 @@ public ChatClientResponse after(ChatClientResponse chatClientResponse, AdvisorCh
137143
.map(g -> (Message) g.getOutput())
138144
.toList();
139145
}
140-
this.getChatMemoryStore().add(this.doGetConversationId(chatClientResponse.context()), assistantMessages);
141-
logger.debug("[PromptChatMemoryAdvisor.after] Added ASSISTANT messages to memory for conversationId={}: {}",
142-
this.doGetConversationId(chatClientResponse.context()), assistantMessages);
143-
List<Message> memoryMessages = this.getChatMemoryStore()
144-
.get(this.doGetConversationId(chatClientResponse.context()));
145-
logger.debug("[PromptChatMemoryAdvisor.after] Memory after ASSISTANT add for conversationId={}: {}",
146-
this.doGetConversationId(chatClientResponse.context()), memoryMessages);
146+
// Handle streaming case where we have a single result
147+
else if (chatClientResponse.chatResponse() != null && chatClientResponse.chatResponse().getResult() != null
148+
&& chatClientResponse.chatResponse().getResult().getOutput() != null) {
149+
assistantMessages = List.of((Message) chatClientResponse.chatResponse().getResult().getOutput());
150+
}
151+
152+
if (!assistantMessages.isEmpty()) {
153+
this.getChatMemoryStore().add(this.doGetConversationId(chatClientResponse.context()), assistantMessages);
154+
logger.debug("[PromptChatMemoryAdvisor.after] Added ASSISTANT messages to memory for conversationId={}: {}",
155+
this.doGetConversationId(chatClientResponse.context()), assistantMessages);
156+
List<Message> memoryMessages = this.getChatMemoryStore()
157+
.get(this.doGetConversationId(chatClientResponse.context()));
158+
logger.debug("[PromptChatMemoryAdvisor.after] Memory after ASSISTANT add for conversationId={}: {}",
159+
this.doGetConversationId(chatClientResponse.context()), memoryMessages);
160+
}
147161
return chatClientResponse;
148162
}
149163

164+
@Override
165+
public Flux<ChatClientResponse> adviseStream(ChatClientRequest chatClientRequest,
166+
StreamAdvisorChain streamAdvisorChain) {
167+
// Get the scheduler from BaseAdvisor
168+
Scheduler scheduler = this.getScheduler();
169+
170+
// Process the request with the before method
171+
return Mono.just(chatClientRequest)
172+
.publishOn(scheduler)
173+
.map(request -> this.before(request, streamAdvisorChain))
174+
.flatMapMany(streamAdvisorChain::nextStream)
175+
.transform(flux -> new MessageAggregator().aggregateChatClientResponse(flux,
176+
response -> this.after(response, streamAdvisorChain)));
177+
}
178+
150179
public static class Builder extends AbstractChatMemoryAdvisor.AbstractBuilder<ChatMemory, Builder> {
151180

152181
private PromptTemplate systemPromptTemplate = DEFAULT_SYSTEM_PROMPT_TEMPLATE;

0 commit comments

Comments
 (0)