Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
# TODO: Use Semantic Conventions once upgrade to 0.47b0
GEN_AI_REQUEST_MODEL: str = "gen_ai.request.model"
GEN_AI_SYSTEM: str = "gen_ai.system"
GEN_AI_REQUEST_MAX_TOKENS: str = "gen_ai.request.max_tokens"
GEN_AI_REQUEST_TEMPERATURE: str = "gen_ai.request.temperature"
GEN_AI_REQUEST_TOP_P: str = "gen_ai.request.top_p"
GEN_AI_RESPONSE_FINISH_REASONS: str = "gen_ai.response.finish_reasons"
GEN_AI_USAGE_INPUT_TOKENS: str = "gen_ai.usage.input_tokens"
GEN_AI_USAGE_OUTPUT_TOKENS: str = "gen_ai.usage.output_tokens"


# Get dialect keywords retrieved from dialect_keywords.json file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
# SPDX-License-Identifier: Apache-2.0
import abc
import inspect
from typing import Dict, Optional
import json
import math
from typing import Any, Dict, Optional

from botocore.response import StreamingBody

from amazon.opentelemetry.distro._aws_attribute_keys import (
AWS_BEDROCK_AGENT_ID,
Expand All @@ -11,7 +15,16 @@
AWS_BEDROCK_GUARDRAIL_ID,
AWS_BEDROCK_KNOWLEDGE_BASE_ID,
)
from amazon.opentelemetry.distro._aws_span_processing_util import GEN_AI_REQUEST_MODEL, GEN_AI_SYSTEM
from amazon.opentelemetry.distro._aws_span_processing_util import (
GEN_AI_REQUEST_MAX_TOKENS,
GEN_AI_REQUEST_MODEL,
GEN_AI_REQUEST_TEMPERATURE,
GEN_AI_REQUEST_TOP_P,
GEN_AI_RESPONSE_FINISH_REASONS,
GEN_AI_SYSTEM,
GEN_AI_USAGE_INPUT_TOKENS,
GEN_AI_USAGE_OUTPUT_TOKENS,
)
from opentelemetry.instrumentation.botocore.extensions.types import (
_AttributeMapT,
_AwsSdkCallContext,
Expand Down Expand Up @@ -238,5 +251,163 @@ def extract_attributes(self, attributes: _AttributeMapT):
attributes[GEN_AI_SYSTEM] = _AWS_BEDROCK_SYSTEM

model_id = self._call_context.params.get(_MODEL_ID)
# attributes["Testing"]= "Test"
if model_id:
attributes[GEN_AI_REQUEST_MODEL] = model_id

# Get the request body if it exists
body = self._call_context.params.get("body")
# print("This is the body :",body)
if body:
try:
request_body = json.loads(body)

if "amazon.titan" in model_id:
self._extract_titan_attributes(attributes, request_body)
elif "anthropic.claude" in model_id:
self._extract_claude_attributes(attributes, request_body)
elif "meta.llama" in model_id:
self._extract_llama_attributes(attributes, request_body)
elif "cohere.command" in model_id:
self._extract_cohere_attributes(attributes, request_body)
elif "ai21.jamba" in model_id:
self._extract_ai21_attributes(attributes, request_body)
elif "mistral" in model_id:
self._extract_mistral_attributes(attributes, request_body)

except json.JSONDecodeError:
print("Error: Unable to parse the body as JSON")

def _extract_titan_attributes(self, attributes, request_body):
config = request_body.get("textGenerationConfig", {})
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, config.get("temperature"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, config.get("topP"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, config.get("maxTokenCount"))

def _extract_claude_attributes(self, attributes, request_body):
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p"))

def _extract_cohere_attributes(self, attributes, request_body):
prompt = request_body.get("message")
if prompt:
attributes[GEN_AI_USAGE_INPUT_TOKENS] = math.ceil(len(prompt) / 6)
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("p"))

def _extract_ai21_attributes(self, attributes, request_body):
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p"))

def _extract_llama_attributes(self, attributes, request_body):
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_gen_len"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p"))

def _extract_mistral_attributes(self, attributes, request_body):
print("This is the request body:", request_body)
prompt = request_body.get("prompt")
if prompt:
attributes[GEN_AI_USAGE_INPUT_TOKENS] = math.ceil(len(prompt) / 6)
self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature"))
self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p"))

@staticmethod
def _set_if_not_none(attributes, key, value):
if value is not None:
attributes[key] = value

def on_success(self, span: Span, result: Dict[str, Any]):
super().on_success(span, result)

model_id = self._call_context.params.get(_MODEL_ID)
if not model_id:
return

if "body" in result and isinstance(result["body"], StreamingBody):
try:
# Read the entire content of the StreamingBody
body_content = result["body"].read()
# Decode the bytes to string and parse as JSON
response_body = json.loads(body_content.decode("utf-8"))

if "amazon.titan" in model_id:
self._handle_amazon_titan_response(span, response_body)
elif "anthropic.claude" in model_id:
self._handle_anthropic_claude_response(span, response_body)
elif "meta.llama" in model_id:
self._handle_meta_llama_response(span, response_body)
elif "cohere.command" in model_id:
self._handle_cohere_command_response(span, response_body)
elif "ai21.jamba" in model_id:
self._handle_ai21_jamba_response(span, response_body)
elif "mistral" in model_id:
self._handle_mistral_mistral_response(span, response_body)

except json.JSONDecodeError:
print("Error: Unable to parse the response body as JSON")
except Exception as e:
print(f"Error processing response: {str(e)}")
finally:
# Make sure to close the stream
result["body"].close()

def _handle_amazon_titan_response(self, span: Span, response_body: Dict[str, Any]):
if "inputTextTokenCount" in response_body:
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, response_body["inputTextTokenCount"])

result = response_body["results"][0]
if "tokenCount" in result:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, result["tokenCount"])
if "completionReason" in result:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [result["completionReason"]])

def _handle_anthropic_claude_response(self, span: Span, response_body: Dict[str, Any]):
if "usage" in response_body:
usage = response_body["usage"]
if "input_tokens" in usage:
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, usage["input_tokens"])
if "output_tokens" in usage:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, usage["output_tokens"])
if "stop_reason" in response_body:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stop_reason"]])

def _handle_cohere_command_response(self, span: Span, response_body: Dict[str, Any]):
# Output tokens: Approximate from the response text
if "text" in response_body:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, math.ceil(len(response_body["text"]) / 6))
if "finish_reason" in response_body:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [response_body["finish_reason"]])

def _handle_ai21_jamba_response(self, span: Span, response_body: Dict[str, Any]):
if "usage" in response_body:
usage = response_body["usage"]
if "prompt_tokens" in usage:
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, usage["prompt_tokens"])
if "completion_tokens" in usage:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, usage["completion_tokens"])
if "choices" in response_body:
choices = response_body["choices"][0]
if "finish_reason" in choices:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [choices["finish_reason"]])

def _handle_meta_llama_response(self, span: Span, response_body: Dict[str, Any]):
if "prompt_token_count" in response_body:
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, response_body["prompt_token_count"])
if "generation_token_count" in response_body:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, response_body["generation_token_count"])
if "stop_reason" in response_body:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stop_reason"]])

def _handle_mistral_mistral_response(self, span: Span, response_body: Dict[str, Any]):
print("This is the response body :", response_body)
if "outputs" in response_body:
outputs = response_body["outputs"][0]
if "text" in outputs:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, math.ceil(len(outputs["text"]) / 6))
if "stop_reason" in outputs:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [outputs["stop_reason"]])
Loading
Loading