2929from  opentelemetry .instrumentation .botocore .extensions .bedrock_utils  import  (
3030    ConverseStreamWrapper ,
3131    InvokeModelWithResponseStreamWrapper ,
32+     _Choice ,
3233    genai_capture_message_content ,
3334    message_to_event ,
3435)
@@ -242,12 +243,12 @@ def before_service_call(
242243        if  self ._call_context .operation  not  in self ._HANDLED_OPERATIONS :
243244            return 
244245
245-         _capture_content  =  genai_capture_message_content ()
246+         capture_content  =  genai_capture_message_content ()
246247
247248        messages  =  self ._get_request_messages ()
248249        for  message  in  messages :
249250            event_logger  =  instrumentor_context .event_logger 
250-             event_logger .emit (message_to_event (message , _capture_content ))
251+             event_logger .emit (message_to_event (message , capture_content ))
251252
252253        if  not  span .is_recording ():
253254            return 
@@ -259,27 +260,52 @@ def before_service_call(
259260            span .update_name (f"{ operation_name } { request_model }  )
260261
261262    # pylint: disable=no-self-use 
262-     def  _converse_on_success (self , span : Span , result : dict [str , Any ]):
263-         if  usage  :=  result .get ("usage" ):
264-             if  input_tokens  :=  usage .get ("inputTokens" ):
265-                 span .set_attribute (
266-                     GEN_AI_USAGE_INPUT_TOKENS ,
267-                     input_tokens ,
268-                 )
269-             if  output_tokens  :=  usage .get ("outputTokens" ):
263+     def  _converse_on_success (
264+         self ,
265+         span : Span ,
266+         result : dict [str , Any ],
267+         instrumentor_context : _BotocoreInstrumentorContext ,
268+         capture_content ,
269+     ):
270+         if  span .is_recording ():
271+             if  usage  :=  result .get ("usage" ):
272+                 if  input_tokens  :=  usage .get ("inputTokens" ):
273+                     span .set_attribute (
274+                         GEN_AI_USAGE_INPUT_TOKENS ,
275+                         input_tokens ,
276+                     )
277+                 if  output_tokens  :=  usage .get ("outputTokens" ):
278+                     span .set_attribute (
279+                         GEN_AI_USAGE_OUTPUT_TOKENS ,
280+                         output_tokens ,
281+                     )
282+ 
283+             if  stop_reason  :=  result .get ("stopReason" ):
270284                span .set_attribute (
271-                     GEN_AI_USAGE_OUTPUT_TOKENS ,
272-                     output_tokens ,
285+                     GEN_AI_RESPONSE_FINISH_REASONS ,
286+                     [ stop_reason ] ,
273287                )
274288
275-         if  stop_reason  :=  result .get ("stopReason" ):
276-             span .set_attribute (
277-                 GEN_AI_RESPONSE_FINISH_REASONS ,
278-                 [stop_reason ],
289+         event_logger  =  instrumentor_context .event_logger 
290+         choice  =  _Choice .from_converse (result , capture_content )
291+         # this path is used by streaming apis, in that case we are already out of the span 
292+         # context so need to add the span context manually 
293+         span_ctx  =  span .get_span_context ()
294+         event_logger .emit (
295+             choice .to_choice_event (
296+                 trace_id = span_ctx .trace_id ,
297+                 span_id = span_ctx .span_id ,
298+                 trace_flags = span_ctx .trace_flags ,
279299            )
300+         )
280301
281302    def  _invoke_model_on_success (
282-         self , span : Span , result : dict [str , Any ], model_id : str 
303+         self ,
304+         span : Span ,
305+         result : dict [str , Any ],
306+         model_id : str ,
307+         instrumentor_context : _BotocoreInstrumentorContext ,
308+         capture_content ,
283309    ):
284310        original_body  =  None 
285311        try :
@@ -292,12 +318,17 @@ def _invoke_model_on_success(
292318
293319            response_body  =  json .loads (body_content .decode ("utf-8" ))
294320            if  "amazon.titan"  in  model_id :
295-                 self ._handle_amazon_titan_response (span , response_body )
321+                 self ._handle_amazon_titan_response (
322+                     span , response_body , instrumentor_context , capture_content 
323+                 )
296324            elif  "amazon.nova"  in  model_id :
297-                 self ._handle_amazon_nova_response (span , response_body )
325+                 self ._handle_amazon_nova_response (
326+                     span , response_body , instrumentor_context , capture_content 
327+                 )
298328            elif  "anthropic.claude"  in  model_id :
299-                 self ._handle_anthropic_claude_response (span , response_body )
300- 
329+                 self ._handle_anthropic_claude_response (
330+                     span , response_body , instrumentor_context , capture_content 
331+                 )
301332        except  json .JSONDecodeError :
302333            _logger .debug ("Error: Unable to parse the response body as JSON" )
303334        except  Exception  as  exc :  # pylint: disable=broad-exception-caught 
@@ -321,80 +352,105 @@ def on_success(
321352        if  self ._call_context .operation  not  in self ._HANDLED_OPERATIONS :
322353            return 
323354
324-         if  not  span .is_recording ():
325-             if  not  self .should_end_span_on_exit ():
326-                 span .end ()
327-             return 
355+         capture_content  =  genai_capture_message_content ()
328356
329-         # ConverseStream 
330-         if  "stream"  in  result  and  isinstance (result ["stream" ], EventStream ):
357+         if  self ._call_context .operation  ==  "ConverseStream" :
358+             if  "stream"  in  result  and  isinstance (
359+                 result ["stream" ], EventStream 
360+             ):
331361
332-             def  stream_done_callback (response ):
333-                 self ._converse_on_success (span , response )
334-                 span .end ()
362+                 def  stream_done_callback (response ):
363+                     self ._converse_on_success (
364+                         span , response , instrumentor_context , capture_content 
365+                     )
366+                     span .end ()
335367
336-             def  stream_error_callback (exception ):
337-                 self ._on_stream_error_callback (span , exception )
368+                  def  stream_error_callback (exception ):
369+                      self ._on_stream_error_callback (span , exception )
338370
339-             result ["stream" ] =  ConverseStreamWrapper (
340-                 result ["stream" ], stream_done_callback , stream_error_callback 
371+                 result ["stream" ] =  ConverseStreamWrapper (
372+                     result ["stream" ],
373+                     stream_done_callback ,
374+                     stream_error_callback ,
375+                 )
376+                 return 
377+         elif  self ._call_context .operation  ==  "Converse" :
378+             self ._converse_on_success (
379+                 span , result , instrumentor_context , capture_content 
341380            )
342-             return 
343- 
344-         # Converse 
345-         self ._converse_on_success (span , result )
346381
347382        model_id  =  self ._call_context .params .get (_MODEL_ID_KEY )
348383        if  not  model_id :
349384            return 
350385
351-         # InvokeModel 
352-         if  "body"  in  result  and  isinstance (result ["body" ], StreamingBody ):
353-             self ._invoke_model_on_success (span , result , model_id )
354-             return 
355- 
356-         # InvokeModelWithResponseStream 
357-         if  "body"  in  result  and  isinstance (result ["body" ], EventStream ):
358- 
359-             def  invoke_model_stream_done_callback (response ):
360-                 # the callback gets data formatted as the simpler converse API 
361-                 self ._converse_on_success (span , response )
362-                 span .end ()
386+         if  self ._call_context .operation  ==  "InvokeModel" :
387+             if  "body"  in  result  and  isinstance (result ["body" ], StreamingBody ):
388+                 self ._invoke_model_on_success (
389+                     span ,
390+                     result ,
391+                     model_id ,
392+                     instrumentor_context ,
393+                     capture_content ,
394+                 )
395+                 return 
396+         elif  self ._call_context .operation  ==  "InvokeModelWithResponseStream" :
397+             if  "body"  in  result  and  isinstance (result ["body" ], EventStream ):
398+ 
399+                 def  invoke_model_stream_done_callback (response ):
400+                     # the callback gets data formatted as the simpler converse API 
401+                     self ._converse_on_success (
402+                         span , response , instrumentor_context , capture_content 
403+                     )
404+                     span .end ()
363405
364-             def  invoke_model_stream_error_callback (exception ):
365-                 self ._on_stream_error_callback (span , exception )
406+                  def  invoke_model_stream_error_callback (exception ):
407+                      self ._on_stream_error_callback (span , exception )
366408
367-             result ["body" ] =  InvokeModelWithResponseStreamWrapper (
368-                 result ["body" ],
369-                 invoke_model_stream_done_callback ,
370-                 invoke_model_stream_error_callback ,
371-                 model_id ,
372-             )
373-             return 
409+                  result ["body" ] =  InvokeModelWithResponseStreamWrapper (
410+                      result ["body" ],
411+                      invoke_model_stream_done_callback ,
412+                      invoke_model_stream_error_callback ,
413+                      model_id ,
414+                  )
415+                  return 
374416
375417    # pylint: disable=no-self-use 
376418    def  _handle_amazon_titan_response (
377-         self , span : Span , response_body : dict [str , Any ]
419+         self ,
420+         span : Span ,
421+         response_body : dict [str , Any ],
422+         instrumentor_context : _BotocoreInstrumentorContext ,
423+         capture_content : bool ,
378424    ):
379425        if  "inputTextTokenCount"  in  response_body :
380426            span .set_attribute (
381427                GEN_AI_USAGE_INPUT_TOKENS , response_body ["inputTextTokenCount" ]
382428            )
383-             if  "results"  in  response_body  and  response_body ["results" ]:
384-                 result  =  response_body ["results" ][0 ]
385-                 if  "tokenCount"  in  result :
386-                     span .set_attribute (
387-                         GEN_AI_USAGE_OUTPUT_TOKENS , result ["tokenCount" ]
388-                     )
389-                 if  "completionReason"  in  result :
390-                     span .set_attribute (
391-                         GEN_AI_RESPONSE_FINISH_REASONS ,
392-                         [result ["completionReason" ]],
393-                     )
429+         if  "results"  in  response_body  and  response_body ["results" ]:
430+             result  =  response_body ["results" ][0 ]
431+             if  "tokenCount"  in  result :
432+                 span .set_attribute (
433+                     GEN_AI_USAGE_OUTPUT_TOKENS , result ["tokenCount" ]
434+                 )
435+             if  "completionReason"  in  result :
436+                 span .set_attribute (
437+                     GEN_AI_RESPONSE_FINISH_REASONS ,
438+                     [result ["completionReason" ]],
439+                 )
440+ 
441+             event_logger  =  instrumentor_context .event_logger 
442+             choice  =  _Choice .from_invoke_amazon_titan (
443+                 response_body , capture_content 
444+             )
445+             event_logger .emit (choice .to_choice_event ())
394446
395447    # pylint: disable=no-self-use 
396448    def  _handle_amazon_nova_response (
397-         self , span : Span , response_body : dict [str , Any ]
449+         self ,
450+         span : Span ,
451+         response_body : dict [str , Any ],
452+         instrumentor_context : _BotocoreInstrumentorContext ,
453+         capture_content : bool ,
398454    ):
399455        if  "usage"  in  response_body :
400456            usage  =  response_body ["usage" ]
@@ -411,9 +467,17 @@ def _handle_amazon_nova_response(
411467                GEN_AI_RESPONSE_FINISH_REASONS , [response_body ["stopReason" ]]
412468            )
413469
470+         event_logger  =  instrumentor_context .event_logger 
471+         choice  =  _Choice .from_converse (response_body , capture_content )
472+         event_logger .emit (choice .to_choice_event ())
473+ 
414474    # pylint: disable=no-self-use 
415475    def  _handle_anthropic_claude_response (
416-         self , span : Span , response_body : dict [str , Any ]
476+         self ,
477+         span : Span ,
478+         response_body : dict [str , Any ],
479+         instrumentor_context : _BotocoreInstrumentorContext ,
480+         capture_content : bool ,
417481    ):
418482        if  usage  :=  response_body .get ("usage" ):
419483            if  "input_tokens"  in  usage :
@@ -429,6 +493,12 @@ def _handle_anthropic_claude_response(
429493                GEN_AI_RESPONSE_FINISH_REASONS , [response_body ["stop_reason" ]]
430494            )
431495
496+         event_logger  =  instrumentor_context .event_logger 
497+         choice  =  _Choice .from_invoke_anthropic_claude (
498+             response_body , capture_content 
499+         )
500+         event_logger .emit (choice .to_choice_event ())
501+ 
432502    def  on_error (
433503        self ,
434504        span : Span ,
0 commit comments