2929from opentelemetry .instrumentation .botocore .extensions .bedrock_utils import (
3030 ConverseStreamWrapper ,
3131 InvokeModelWithResponseStreamWrapper ,
32+ _Choice ,
3233 genai_capture_message_content ,
3334 message_to_event ,
3435)
@@ -242,12 +243,12 @@ def before_service_call(
242243 if self ._call_context .operation not in self ._HANDLED_OPERATIONS :
243244 return
244245
245- _capture_content = genai_capture_message_content ()
246+ capture_content = genai_capture_message_content ()
246247
247248 messages = self ._get_request_messages ()
248249 for message in messages :
249250 event_logger = instrumentor_context .event_logger
250- event_logger .emit (message_to_event (message , _capture_content ))
251+ event_logger .emit (message_to_event (message , capture_content ))
251252
252253 if not span .is_recording ():
253254 return
@@ -259,27 +260,52 @@ def before_service_call(
259260 span .update_name (f"{ operation_name } { request_model } " )
260261
261262 # pylint: disable=no-self-use
262- def _converse_on_success (self , span : Span , result : dict [str , Any ]):
263- if usage := result .get ("usage" ):
264- if input_tokens := usage .get ("inputTokens" ):
265- span .set_attribute (
266- GEN_AI_USAGE_INPUT_TOKENS ,
267- input_tokens ,
268- )
269- if output_tokens := usage .get ("outputTokens" ):
263+ def _converse_on_success (
264+ self ,
265+ span : Span ,
266+ result : dict [str , Any ],
267+ instrumentor_context : _BotocoreInstrumentorContext ,
268+ capture_content ,
269+ ):
270+ if span .is_recording ():
271+ if usage := result .get ("usage" ):
272+ if input_tokens := usage .get ("inputTokens" ):
273+ span .set_attribute (
274+ GEN_AI_USAGE_INPUT_TOKENS ,
275+ input_tokens ,
276+ )
277+ if output_tokens := usage .get ("outputTokens" ):
278+ span .set_attribute (
279+ GEN_AI_USAGE_OUTPUT_TOKENS ,
280+ output_tokens ,
281+ )
282+
283+ if stop_reason := result .get ("stopReason" ):
270284 span .set_attribute (
271- GEN_AI_USAGE_OUTPUT_TOKENS ,
272- output_tokens ,
285+ GEN_AI_RESPONSE_FINISH_REASONS ,
286+ [ stop_reason ] ,
273287 )
274288
275- if stop_reason := result .get ("stopReason" ):
276- span .set_attribute (
277- GEN_AI_RESPONSE_FINISH_REASONS ,
278- [stop_reason ],
289+ event_logger = instrumentor_context .event_logger
290+ choice = _Choice .from_converse (result , capture_content )
291+ # this path is used by streaming apis, in that case we are already out of the span
292+ # context so need to add the span context manually
293+ span_ctx = span .get_span_context ()
294+ event_logger .emit (
295+ choice .to_choice_event (
296+ trace_id = span_ctx .trace_id ,
297+ span_id = span_ctx .span_id ,
298+ trace_flags = span_ctx .trace_flags ,
279299 )
300+ )
280301
281302 def _invoke_model_on_success (
282- self , span : Span , result : dict [str , Any ], model_id : str
303+ self ,
304+ span : Span ,
305+ result : dict [str , Any ],
306+ model_id : str ,
307+ instrumentor_context : _BotocoreInstrumentorContext ,
308+ capture_content ,
283309 ):
284310 original_body = None
285311 try :
@@ -292,12 +318,17 @@ def _invoke_model_on_success(
292318
293319 response_body = json .loads (body_content .decode ("utf-8" ))
294320 if "amazon.titan" in model_id :
295- self ._handle_amazon_titan_response (span , response_body )
321+ self ._handle_amazon_titan_response (
322+ span , response_body , instrumentor_context , capture_content
323+ )
296324 elif "amazon.nova" in model_id :
297- self ._handle_amazon_nova_response (span , response_body )
325+ self ._handle_amazon_nova_response (
326+ span , response_body , instrumentor_context , capture_content
327+ )
298328 elif "anthropic.claude" in model_id :
299- self ._handle_anthropic_claude_response (span , response_body )
300-
329+ self ._handle_anthropic_claude_response (
330+ span , response_body , instrumentor_context , capture_content
331+ )
301332 except json .JSONDecodeError :
302333 _logger .debug ("Error: Unable to parse the response body as JSON" )
303334 except Exception as exc : # pylint: disable=broad-exception-caught
@@ -321,80 +352,105 @@ def on_success(
321352 if self ._call_context .operation not in self ._HANDLED_OPERATIONS :
322353 return
323354
324- if not span .is_recording ():
325- if not self .should_end_span_on_exit ():
326- span .end ()
327- return
355+ capture_content = genai_capture_message_content ()
328356
329- # ConverseStream
330- if "stream" in result and isinstance (result ["stream" ], EventStream ):
357+ if self ._call_context .operation == "ConverseStream" :
358+ if "stream" in result and isinstance (
359+ result ["stream" ], EventStream
360+ ):
331361
332- def stream_done_callback (response ):
333- self ._converse_on_success (span , response )
334- span .end ()
362+ def stream_done_callback (response ):
363+ self ._converse_on_success (
364+ span , response , instrumentor_context , capture_content
365+ )
366+ span .end ()
335367
336- def stream_error_callback (exception ):
337- self ._on_stream_error_callback (span , exception )
368+ def stream_error_callback (exception ):
369+ self ._on_stream_error_callback (span , exception )
338370
339- result ["stream" ] = ConverseStreamWrapper (
340- result ["stream" ], stream_done_callback , stream_error_callback
371+ result ["stream" ] = ConverseStreamWrapper (
372+ result ["stream" ],
373+ stream_done_callback ,
374+ stream_error_callback ,
375+ )
376+ return
377+ elif self ._call_context .operation == "Converse" :
378+ self ._converse_on_success (
379+ span , result , instrumentor_context , capture_content
341380 )
342- return
343-
344- # Converse
345- self ._converse_on_success (span , result )
346381
347382 model_id = self ._call_context .params .get (_MODEL_ID_KEY )
348383 if not model_id :
349384 return
350385
351- # InvokeModel
352- if "body" in result and isinstance (result ["body" ], StreamingBody ):
353- self ._invoke_model_on_success (span , result , model_id )
354- return
355-
356- # InvokeModelWithResponseStream
357- if "body" in result and isinstance (result ["body" ], EventStream ):
358-
359- def invoke_model_stream_done_callback (response ):
360- # the callback gets data formatted as the simpler converse API
361- self ._converse_on_success (span , response )
362- span .end ()
386+ if self ._call_context .operation == "InvokeModel" :
387+ if "body" in result and isinstance (result ["body" ], StreamingBody ):
388+ self ._invoke_model_on_success (
389+ span ,
390+ result ,
391+ model_id ,
392+ instrumentor_context ,
393+ capture_content ,
394+ )
395+ return
396+ elif self ._call_context .operation == "InvokeModelWithResponseStream" :
397+ if "body" in result and isinstance (result ["body" ], EventStream ):
398+
399+ def invoke_model_stream_done_callback (response ):
400+ # the callback gets data formatted as the simpler converse API
401+ self ._converse_on_success (
402+ span , response , instrumentor_context , capture_content
403+ )
404+ span .end ()
363405
364- def invoke_model_stream_error_callback (exception ):
365- self ._on_stream_error_callback (span , exception )
406+ def invoke_model_stream_error_callback (exception ):
407+ self ._on_stream_error_callback (span , exception )
366408
367- result ["body" ] = InvokeModelWithResponseStreamWrapper (
368- result ["body" ],
369- invoke_model_stream_done_callback ,
370- invoke_model_stream_error_callback ,
371- model_id ,
372- )
373- return
409+ result ["body" ] = InvokeModelWithResponseStreamWrapper (
410+ result ["body" ],
411+ invoke_model_stream_done_callback ,
412+ invoke_model_stream_error_callback ,
413+ model_id ,
414+ )
415+ return
374416
375417 # pylint: disable=no-self-use
376418 def _handle_amazon_titan_response (
377- self , span : Span , response_body : dict [str , Any ]
419+ self ,
420+ span : Span ,
421+ response_body : dict [str , Any ],
422+ instrumentor_context : _BotocoreInstrumentorContext ,
423+ capture_content : bool ,
378424 ):
379425 if "inputTextTokenCount" in response_body :
380426 span .set_attribute (
381427 GEN_AI_USAGE_INPUT_TOKENS , response_body ["inputTextTokenCount" ]
382428 )
383- if "results" in response_body and response_body ["results" ]:
384- result = response_body ["results" ][0 ]
385- if "tokenCount" in result :
386- span .set_attribute (
387- GEN_AI_USAGE_OUTPUT_TOKENS , result ["tokenCount" ]
388- )
389- if "completionReason" in result :
390- span .set_attribute (
391- GEN_AI_RESPONSE_FINISH_REASONS ,
392- [result ["completionReason" ]],
393- )
429+ if "results" in response_body and response_body ["results" ]:
430+ result = response_body ["results" ][0 ]
431+ if "tokenCount" in result :
432+ span .set_attribute (
433+ GEN_AI_USAGE_OUTPUT_TOKENS , result ["tokenCount" ]
434+ )
435+ if "completionReason" in result :
436+ span .set_attribute (
437+ GEN_AI_RESPONSE_FINISH_REASONS ,
438+ [result ["completionReason" ]],
439+ )
440+
441+ event_logger = instrumentor_context .event_logger
442+ choice = _Choice .from_invoke_amazon_titan (
443+ response_body , capture_content
444+ )
445+ event_logger .emit (choice .to_choice_event ())
394446
395447 # pylint: disable=no-self-use
396448 def _handle_amazon_nova_response (
397- self , span : Span , response_body : dict [str , Any ]
449+ self ,
450+ span : Span ,
451+ response_body : dict [str , Any ],
452+ instrumentor_context : _BotocoreInstrumentorContext ,
453+ capture_content : bool ,
398454 ):
399455 if "usage" in response_body :
400456 usage = response_body ["usage" ]
@@ -411,9 +467,17 @@ def _handle_amazon_nova_response(
411467 GEN_AI_RESPONSE_FINISH_REASONS , [response_body ["stopReason" ]]
412468 )
413469
470+ event_logger = instrumentor_context .event_logger
471+ choice = _Choice .from_converse (response_body , capture_content )
472+ event_logger .emit (choice .to_choice_event ())
473+
414474 # pylint: disable=no-self-use
415475 def _handle_anthropic_claude_response (
416- self , span : Span , response_body : dict [str , Any ]
476+ self ,
477+ span : Span ,
478+ response_body : dict [str , Any ],
479+ instrumentor_context : _BotocoreInstrumentorContext ,
480+ capture_content : bool ,
417481 ):
418482 if usage := response_body .get ("usage" ):
419483 if "input_tokens" in usage :
@@ -429,6 +493,12 @@ def _handle_anthropic_claude_response(
429493 GEN_AI_RESPONSE_FINISH_REASONS , [response_body ["stop_reason" ]]
430494 )
431495
496+ event_logger = instrumentor_context .event_logger
497+ choice = _Choice .from_invoke_anthropic_claude (
498+ response_body , capture_content
499+ )
500+ event_logger .emit (choice .to_choice_event ())
501+
432502 def on_error (
433503 self ,
434504 span : Span ,
0 commit comments