2525import  java .util .Optional ;
2626import  java .util .Set ;
2727import  java .util .concurrent .ConcurrentHashMap ;
28+ import  java .util .concurrent .ConcurrentMap ;
2829import  java .util .concurrent .atomic .AtomicBoolean ;
30+ import  java .util .function .Supplier ;
2931
3032import  com .azure .ai .openai .OpenAIAsyncClient ;
3133import  com .azure .ai .openai .OpenAIClient ;
7880import  org .springframework .ai .chat .observation .ChatModelObservationContext ;
7981import  org .springframework .ai .chat .observation .ChatModelObservationConvention ;
8082import  org .springframework .ai .chat .observation .ChatModelObservationDocumentation ;
83+ import  org .springframework .ai .chat .observation .ChatModelObservationSupport ;
8184import  org .springframework .ai .chat .observation .DefaultChatModelObservationConvention ;
8285import  org .springframework .ai .chat .prompt .ChatOptions ;
8386import  org .springframework .ai .chat .prompt .Prompt ;
8790import  org .springframework .ai .model .function .FunctionCallbackContext ;
8891import  org .springframework .ai .model .function .FunctionCallingOptions ;
8992import  org .springframework .ai .observation .conventions .AiProvider ;
93+ import  org .springframework .ai .util .ValueUtils ;
9094import  org .springframework .util .Assert ;
9195import  org .springframework .util .CollectionUtils ;
9296
@@ -195,24 +199,24 @@ public AzureOpenAiChatOptions getDefaultOptions() {
195199	@ Override 
196200	public  ChatResponse  call (Prompt  prompt ) {
197201
198- 		ChatModelObservationContext  observationContext  = ChatModelObservationContext .builder ()
202+ 		Supplier < ChatModelObservationContext >  observationContext  = () ->  ChatModelObservationContext .builder ()
199203			.prompt (prompt )
200204			.provider (AiProvider .AZURE_OPENAI .value ())
201205			.requestOptions (prompt .getOptions () != null  ? prompt .getOptions () : this .defaultOptions )
202206			.build ();
203207
204- 		ChatResponse   response  = ChatModelObservationDocumentation .CHAT_MODEL_OPERATION 
205- 			. observation ( this .observationConvention , DEFAULT_OBSERVATION_CONVENTION , () ->  observationContext ,
206- 					 this .observationRegistry )
207- 			. observe (() -> { 
208- 				 ChatCompletionsOptions   options  = toAzureChatCompletionsOptions ( prompt ); 
209- 				 options .setStream (false );
210- 
211- 				 ChatCompletions   chatCompletions  = this . openAIClient . getChatCompletions ( options . getModel (),  options );
212- 				 ChatResponse   chatResponse  =  toChatResponse ( chatCompletions ); 
213- 				observationContext . setResponse (chatResponse );
214- 				 return  chatResponse ;
215- 			 });
208+ 		Observation   observation  = ChatModelObservationDocumentation .CHAT_MODEL_OPERATION . observation ( 
209+ 				 this .observationConvention , DEFAULT_OBSERVATION_CONVENTION , observationContext ,
210+ 				this .observationRegistry ); 
211+ 
212+ 		ChatResponse   response  = observation . observe (() -> { 
213+ 			ChatCompletionsOptions   options  =  toAzureChatCompletionsOptions ( prompt ) .setStream (false );
214+ 			 ChatCompletions   chatCompletions  =  this . openAIClient . getChatCompletions ( options . getModel (),  options ); 
215+ 			ChatResponse   chatResponse  = toChatResponse ( chatCompletions );
216+ 			ChatModelObservationSupport . getObservationContext ( observation ) 
217+ 				. ifPresent ( context  ->  context . setResponse (chatResponse ) );
218+ 			return  chatResponse ;
219+ 		});
216220
217221		if  (!isProxyToolCalls (prompt , this .defaultOptions )
218222				&& isToolCall (response , Set .of (String .valueOf (CompletionsFinishReason .TOOL_CALLS ).toLowerCase ()))) {
@@ -229,24 +233,28 @@ && isToolCall(response, Set.of(String.valueOf(CompletionsFinishReason.TOOL_CALLS
229233	public  Flux <ChatResponse > stream (Prompt  prompt ) {
230234
231235		return  Flux .deferContextual (contextView  -> {
232- 			 ChatCompletionsOptions   options  =  toAzureChatCompletionsOptions ( prompt ); 
233- 			options .setStream (true );
236+ 
237+ 			ChatCompletionsOptions   options  =  toAzureChatCompletionsOptions ( prompt ) .setStream (true );
234238
235239			Flux <ChatCompletions > chatCompletionsStream  = this .openAIAsyncClient 
236240				.getChatCompletionsStream (options .getModel (), options );
237241
242+ 			// @formatter:off 
238243			// For chunked responses, only the first chunk contains the choice role. 
239244			// The rest of the chunks with same ID share the same role. 
240- 			ConcurrentHashMap <String , String > roleMap  = new  ConcurrentHashMap <>();
245+ 			// TODO: Why is roleMap not used? I am guessing it should have served the same 
246+ 			//  purpose as the roleMap in OpenAiChatModel.stream(:Prompt) 
247+ 			// @formatter:on 
248+ 			ConcurrentMap <String , String > roleMap  = new  ConcurrentHashMap <>();
241249
242- 			ChatModelObservationContext  observationContext  = ChatModelObservationContext .builder ()
243- 				.prompt ( prompt )
250+ 			Supplier < ChatModelObservationContext >  observationContext  = () ->  ChatModelObservationContext .builder ()
251+ 				.requestOptions ( ValueUtils . defaultIfNull ( prompt . getOptions (),  this . defaultOptions ) )
244252				.provider (AiProvider .AZURE_OPENAI .value ())
245- 				.requestOptions ( prompt . getOptions () !=  null  ?  prompt . getOptions () :  this . defaultOptions )
253+ 				.prompt ( prompt )
246254				.build ();
247255
248256			Observation  observation  = ChatModelObservationDocumentation .CHAT_MODEL_OPERATION .observation (
249- 					this .observationConvention , DEFAULT_OBSERVATION_CONVENTION , () ->  observationContext ,
257+ 					this .observationConvention , DEFAULT_OBSERVATION_CONVENTION , observationContext ,
250258					this .observationRegistry );
251259
252260			observation .parentObservation (contextView .getOrDefault (ObservationThreadLocalAccessor .KEY , null )).start ();
@@ -295,7 +303,8 @@ public Flux<ChatResponse> stream(Prompt prompt) {
295303					.doFinally (s  -> observation .stop ())
296304					.contextWrite (ctx  -> ctx .put (ObservationThreadLocalAccessor .KEY , observation ));
297305
298- 				return  new  MessageAggregator ().aggregate (flux , observationContext ::setResponse );
306+ 				return  new  MessageAggregator ().aggregate (flux ,
307+ 						ChatModelObservationSupport .setChatResponseInObservationContext (observation ));
299308			});
300309
301310		});
0 commit comments