| 
25 | 25 | import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;  | 
26 | 26 | import reactor.core.publisher.Flux;  | 
27 | 27 | 
 
  | 
 | 28 | +import org.springframework.ai.chat.client.ChatClientMessageAggregator;  | 
28 | 29 | import org.springframework.ai.chat.client.ChatClientRequest;  | 
29 | 30 | import org.springframework.ai.chat.client.ChatClientResponse;  | 
30 | 31 | import org.springframework.ai.chat.client.advisor.api.Advisor;  | 
@@ -55,6 +56,8 @@ public class DefaultAroundAdvisorChain implements BaseAdvisorChain {  | 
55 | 56 | 
 
  | 
56 | 57 | 	public static final AdvisorObservationConvention DEFAULT_OBSERVATION_CONVENTION = new DefaultAdvisorObservationConvention();  | 
57 | 58 | 
 
  | 
 | 59 | +	private static final ChatClientMessageAggregator CHAT_CLIENT_MESSAGE_AGGREGATOR = new ChatClientMessageAggregator();  | 
 | 60 | + | 
58 | 61 | 	private final List<CallAdvisor> originalCallAdvisors;  | 
59 | 62 | 
 
  | 
60 | 63 | 	private final List<StreamAdvisor> originalStreamAdvisors;  | 
@@ -136,11 +139,13 @@ public Flux<ChatClientResponse> nextStream(ChatClientRequest chatClientRequest)  | 
136 | 139 | 			observation.parentObservation(contextView.getOrDefault(ObservationThreadLocalAccessor.KEY, null)).start();  | 
137 | 140 | 
 
  | 
138 | 141 | 			// @formatter:off  | 
139 |  | -			return Flux.defer(() -> advisor.adviseStream(chatClientRequest, this)  | 
 | 142 | +			Flux<ChatClientResponse> chatClientResponse = Flux.defer(() -> advisor.adviseStream(chatClientRequest, this)  | 
140 | 143 | 						.doOnError(observation::error)  | 
141 | 144 | 						.doFinally(s -> observation.stop())  | 
142 | 145 | 						.contextWrite(ctx -> ctx.put(ObservationThreadLocalAccessor.KEY, observation)));  | 
143 | 146 | 			// @formatter:on  | 
 | 147 | +			return CHAT_CLIENT_MESSAGE_AGGREGATOR.aggregateChatClientResponse(chatClientResponse,  | 
 | 148 | +					observationContext::setChatClientResponse);  | 
144 | 149 | 		});  | 
145 | 150 | 	}  | 
146 | 151 | 
 
  | 
 | 
0 commit comments