22# SPDX-License-Identifier: Apache-2.0
33import abc
44import inspect
5- from typing import Dict , Optional
5+ from typing import Any , Dict , Optional
6+ import json
7+ from botocore .response import StreamingBody
8+ import math
69
710from amazon .opentelemetry .distro ._aws_attribute_keys import (
811 AWS_BEDROCK_AGENT_ID ,
1114 AWS_BEDROCK_GUARDRAIL_ID ,
1215 AWS_BEDROCK_KNOWLEDGE_BASE_ID ,
1316)
14- from amazon .opentelemetry .distro ._aws_span_processing_util import GEN_AI_REQUEST_MODEL , GEN_AI_SYSTEM
17+ from amazon .opentelemetry .distro ._aws_span_processing_util import GEN_AI_REQUEST_MODEL , GEN_AI_SYSTEM , GEN_AI_REQUEST_MAX_TOKENS , GEN_AI_REQUEST_TEMPERATURE , GEN_AI_REQUEST_TOP_P , GEN_AI_RESPONSE_FINISH_REASONS , GEN_AI_USAGE_INPUT_TOKENS , GEN_AI_USAGE_OUTPUT_TOKENS
1518from opentelemetry .instrumentation .botocore .extensions .types import (
1619 _AttributeMapT ,
1720 _AwsSdkCallContext ,
@@ -238,5 +241,164 @@ def extract_attributes(self, attributes: _AttributeMapT):
238241 attributes [GEN_AI_SYSTEM ] = _AWS_BEDROCK_SYSTEM
239242
240243 model_id = self ._call_context .params .get (_MODEL_ID )
244+ #attributes["Testing"]= "Test"
241245 if model_id :
242- attributes [GEN_AI_REQUEST_MODEL ] = model_id
246+ attributes [GEN_AI_REQUEST_MODEL ] = model_id
247+
248+ # Get the request body if it exists
249+ body = self ._call_context .params .get ('body' )
250+ #print("This is the body :",body)
251+ if body :
252+ try :
253+ request_body = json .loads (body )
254+
255+ if 'amazon.titan' in model_id :
256+ self ._extract_titan_attributes (attributes , request_body )
257+ elif 'anthropic.claude' in model_id :
258+ self ._extract_claude_attributes (attributes , request_body )
259+ elif 'meta.llama' in model_id :
260+ self ._extract_llama_attributes (attributes , request_body )
261+ elif 'cohere.command' in model_id :
262+ self ._extract_cohere_attributes (attributes , request_body )
263+ elif 'ai21.jamba' in model_id :
264+ self ._extract_ai21_attributes (attributes , request_body )
265+ elif 'mistral' in model_id :
266+ self ._extract_mistral_attributes (attributes , request_body )
267+
268+ except json .JSONDecodeError :
269+ print ("Error: Unable to parse the body as JSON" )
270+ def _extract_titan_attributes (self , attributes , request_body ):
271+ config = request_body .get ('textGenerationConfig' , {})
272+ self ._set_if_not_none (attributes , GEN_AI_REQUEST_TEMPERATURE , config .get ('temperature' ))
273+ self ._set_if_not_none (attributes , GEN_AI_REQUEST_TOP_P , config .get ('topP' ))
274+ self ._set_if_not_none (attributes , GEN_AI_REQUEST_MAX_TOKENS , config .get ('maxTokenCount' ))
275+
276+ def _extract_claude_attributes (self , attributes , request_body ):
277+ self ._set_if_not_none (attributes , GEN_AI_REQUEST_MAX_TOKENS , request_body .get ('max_tokens' ))
278+ self ._set_if_not_none (attributes , GEN_AI_REQUEST_TEMPERATURE , request_body .get ('temperature' ))
279+ self ._set_if_not_none (attributes , GEN_AI_REQUEST_TOP_P , request_body .get ('top_p' ))
280+
281+ def _extract_cohere_attributes (self , attributes , request_body ):
282+ self ._set_if_not_none (attributes , GEN_AI_REQUEST_MAX_TOKENS , request_body .get ('max_tokens' ))
283+ self ._set_if_not_none (attributes , GEN_AI_REQUEST_TEMPERATURE , request_body .get ('temperature' ))
284+ self ._set_if_not_none (attributes , GEN_AI_REQUEST_TOP_P , request_body .get ('p' ))
285+
286+ def _extract_ai21_attributes (self , attributes , request_body ):
287+ self ._set_if_not_none (attributes , GEN_AI_REQUEST_MAX_TOKENS , request_body .get ('max_tokens' ))
288+ self ._set_if_not_none (attributes , GEN_AI_REQUEST_TEMPERATURE , request_body .get ('temperature' ))
289+ self ._set_if_not_none (attributes , GEN_AI_REQUEST_TOP_P , request_body .get ('top_p' ))
290+
291+ def _extract_llama_attributes (self , attributes , request_body ):
292+ self ._set_if_not_none (attributes , GEN_AI_REQUEST_MAX_TOKENS , request_body .get ('max_gen_len' ))
293+ self ._set_if_not_none (attributes , GEN_AI_REQUEST_TEMPERATURE , request_body .get ('temperature' ))
294+ self ._set_if_not_none (attributes , GEN_AI_REQUEST_TOP_P , request_body .get ('top_p' ))
295+
296+ def _extract_mistral_attributes (self , attributes , request_body ):
297+ prompt = request_body .get ('prompt' )
298+ if prompt :
299+ attributes [GEN_AI_USAGE_INPUT_TOKENS ] = math .ceil (len (prompt ) / 6 )
300+ self ._set_if_not_none (attributes , GEN_AI_REQUEST_MAX_TOKENS , request_body .get ('max_tokens' ))
301+ self ._set_if_not_none (attributes , GEN_AI_REQUEST_TEMPERATURE , request_body .get ('temperature' ))
302+ self ._set_if_not_none (attributes , GEN_AI_REQUEST_TOP_P , request_body .get ('top_p' ))
303+
304+ @staticmethod
305+ def _set_if_not_none (attributes , key , value ):
306+ if value is not None :
307+ attributes [key ] = value
308+
309+ def on_success (self , span : Span , result : Dict [str , Any ]):
310+ super ().on_success (span , result )
311+
312+ model_id = self ._call_context .params .get (_MODEL_ID )
313+ if not model_id :
314+ return
315+
316+ if 'body' in result and isinstance (result ['body' ], StreamingBody ):
317+ try :
318+ # Read the entire content of the StreamingBody
319+ body_content = result ['body' ].read ()
320+ # Decode the bytes to string and parse as JSON
321+ response_body = json .loads (body_content .decode ('utf-8' ))
322+
323+ if 'amazon.titan' in model_id :
324+ self ._handle_amazon_titan_response (span , response_body )
325+ elif 'anthropic.claude' in model_id :
326+ self ._handle_anthropic_claude_response (span , response_body )
327+ elif 'meta.llama' in model_id :
328+ self ._handle_meta_llama_response (span , response_body )
329+ elif 'cohere.command' in model_id :
330+ self ._handle_cohere_command_response (span , response_body )
331+ elif 'ai21.jamba' in model_id :
332+ self ._handle_ai21_jamba_response (span , response_body )
333+ elif 'mistral' in model_id :
334+ self ._handle_mistral_mistral_response (span , response_body )
335+
336+ except json .JSONDecodeError :
337+ print ("Error: Unable to parse the response body as JSON" )
338+ except Exception as e :
339+ print (f"Error processing response: { str (e )} " )
340+ finally :
341+ # Make sure to close the stream
342+ result ['body' ].close ()
343+
344+ def _handle_amazon_titan_response (self , span : Span , response_body : Dict [str , Any ]):
345+ if 'inputTextTokenCount' in response_body :
346+ span .set_attribute (GEN_AI_USAGE_INPUT_TOKENS , response_body ['inputTextTokenCount' ])
347+
348+ result = response_body ['results' ][0 ]
349+ if 'tokenCount' in result :
350+ span .set_attribute (GEN_AI_USAGE_OUTPUT_TOKENS , result ['tokenCount' ])
351+ if 'completionReason' in result :
352+ span .set_attribute (GEN_AI_RESPONSE_FINISH_REASONS , [result ['completionReason' ]])
353+
354+ def _handle_anthropic_claude_response (self , span : Span , response_body : Dict [str , Any ]):
355+ if 'usage' in response_body :
356+ usage = response_body ['usage' ]
357+ if 'input_tokens' in usage :
358+ span .set_attribute (GEN_AI_USAGE_INPUT_TOKENS , usage ['input_tokens' ])
359+ if 'output_tokens' in usage :
360+ span .set_attribute (GEN_AI_USAGE_OUTPUT_TOKENS , usage ['output_tokens' ])
361+ if 'stop_reason' in response_body :
362+ span .set_attribute (GEN_AI_RESPONSE_FINISH_REASONS , [response_body ['stop_reason' ]])
363+
364+ def _handle_cohere_command_response (self , span : Span , response_body : Dict [str , Any ]):
365+ # Input tokens: Approximate from the user's message in chat history
366+ if 'chat_history' in response_body :
367+ user_messages = [msg ['message' ] for msg in response_body ['chat_history' ] if msg ['role' ] == 'USER' ]
368+ input_text = ' ' .join (user_messages )
369+ span .set_attribute (GEN_AI_USAGE_INPUT_TOKENS , math .ceil (len (input_text ) / 6 ))
370+ # Output tokens: Approximate from the response text
371+ if 'text' in response_body :
372+ span .set_attribute (GEN_AI_USAGE_OUTPUT_TOKENS , math .ceil (len (response_body ['text' ]) / 6 ))
373+ if 'finish_reason' in response_body :
374+ span .set_attribute (GEN_AI_RESPONSE_FINISH_REASONS , [response_body ['finish_reason' ]])
375+
376+ def _handle_ai21_jamba_response (self , span : Span , response_body : Dict [str , Any ]):
377+ print ("This is the response body :" , response_body )
378+ if 'usage' in response_body :
379+ usage = response_body ['usage' ]
380+ if 'prompt_tokens' in usage :
381+ span .set_attribute (GEN_AI_USAGE_INPUT_TOKENS , usage ['prompt_tokens' ])
382+ if 'completion_tokens' in usage :
383+ span .set_attribute (GEN_AI_USAGE_OUTPUT_TOKENS , usage ['completion_tokens' ])
384+ if 'choices' in response_body :
385+ choices = response_body ['choices' ][0 ]
386+ if 'finish_reason' in choices :
387+ span .set_attribute (GEN_AI_RESPONSE_FINISH_REASONS , [choices ['finish_reason' ]])
388+
389+ def _handle_meta_llama_response (self , span : Span , response_body : Dict [str , Any ]):
390+ #print("This is the response body :", response_body)
391+ if 'prompt_token_count' in response_body :
392+ span .set_attribute (GEN_AI_USAGE_INPUT_TOKENS , response_body ['prompt_token_count' ])
393+ if 'generation_token_count' in response_body :
394+ span .set_attribute (GEN_AI_USAGE_OUTPUT_TOKENS , response_body ['generation_token_count' ])
395+ if 'stop_reason' in response_body :
396+ span .set_attribute (GEN_AI_RESPONSE_FINISH_REASONS , response_body ['stop_reason' ])
397+
398+ def _handle_mistral_mistral_response (self , span : Span , response_body : Dict [str , Any ]):
399+ if "outputs" in response_body :
400+ outputs = response_body ["outputs" ][0 ]
401+ if "text" in outputs :
402+ span .set_attribute (GEN_AI_USAGE_OUTPUT_TOKENS , math .ceil (len (outputs ["text" ]) / 6 ))
403+ if 'stop_reason' in outputs :
404+ span .set_attribute (GEN_AI_RESPONSE_FINISH_REASONS , [outputs ['stop_reason' ]])
0 commit comments