Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3385](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3385))
- `opentelemetry-instrumentation` Make auto instrumentation use the same dependency resolver as manual instrumentation does
([#3202](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3202))
- `opentelemetry-instrumentation-botocore` Add GenAI instrumentation for additional Bedrock models for InvokeModel API
([#3419](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3419))

### Fixed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io
import json
import logging
import math
from timeit import default_timer
from typing import Any

Expand Down Expand Up @@ -223,6 +224,23 @@ def extract_attributes(self, attributes: _AttributeMapT):
self._extract_claude_attributes(
attributes, request_body
)
elif "cohere.command-r" in model_id:
self._extract_command_r_attributes(
attributes, request_body
)
elif "cohere.command" in model_id:
self._extract_command_attributes(
attributes, request_body
)
elif "meta.llama" in model_id:
self._extract_llama_attributes(
attributes, request_body
)
elif "mistral" in model_id:
self._extract_mistral_attributes(
attributes, request_body
)

except json.JSONDecodeError:
_logger.debug("Error: Unable to parse the body as JSON")

Expand All @@ -238,9 +256,7 @@ def _extract_titan_attributes(self, attributes, request_body):
attributes, GEN_AI_REQUEST_MAX_TOKENS, config.get("maxTokenCount")
)
self._set_if_not_none(
attributes,
GEN_AI_REQUEST_STOP_SEQUENCES,
config.get("stopSequences"),
attributes, GEN_AI_REQUEST_STOP_SEQUENCES, config.get("stopSequences")
)

def _extract_nova_attributes(self, attributes, request_body):
Expand All @@ -255,29 +271,88 @@ def _extract_nova_attributes(self, attributes, request_body):
attributes, GEN_AI_REQUEST_MAX_TOKENS, config.get("max_new_tokens")
)
self._set_if_not_none(
attributes,
GEN_AI_REQUEST_STOP_SEQUENCES,
config.get("stopSequences"),
attributes, GEN_AI_REQUEST_STOP_SEQUENCES, config.get("stopSequences")
)

def _extract_claude_attributes(self, attributes, request_body):
self._set_if_not_none(
attributes,
GEN_AI_REQUEST_MAX_TOKENS,
request_body.get("max_tokens"),
attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens")
)
self._set_if_not_none(
attributes,
GEN_AI_REQUEST_TEMPERATURE,
request_body.get("temperature"),
attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p")
)
self._set_if_not_none(
attributes,
GEN_AI_REQUEST_STOP_SEQUENCES,
request_body.get("stop_sequences"),
attributes, GEN_AI_REQUEST_STOP_SEQUENCES, request_body.get("stop_sequences")
)

def _extract_command_r_attributes(self, attributes, request_body):
prompt = request_body.get("message")
self._set_if_not_none(
attributes, GEN_AI_USAGE_INPUT_TOKENS, math.ceil(len(prompt) / 6)
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_TOP_P, request_body.get("p")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_STOP_SEQUENCES, request_body.get("stop_sequences")
)

def _extract_command_attributes(self, attributes, request_body):
prompt = request_body.get("prompt")
self._set_if_not_none(
attributes, GEN_AI_USAGE_INPUT_TOKENS, math.ceil(len(prompt) / 6)
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_TOP_P, request_body.get("p")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_STOP_SEQUENCES, request_body.get("stop_sequences")
)

def _extract_llama_attributes(self, attributes, request_body):
self._set_if_not_none(
attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_gen_len")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p")
)
# request for meta llama models does not contain stop_sequences field

def _extract_mistral_attributes(self, attributes, request_body):
prompt = request_body.get("prompt")
if prompt:
self._set_if_not_none(
attributes, GEN_AI_USAGE_INPUT_TOKENS, math.ceil(len(prompt) / 6)
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p")
)
self._set_if_not_none(
attributes, GEN_AI_REQUEST_STOP_SEQUENCES, request_body.get("stop")
)

@staticmethod
Expand Down Expand Up @@ -305,11 +380,25 @@ def _get_request_messages(self):

messages = decoded_body.get("messages", [])
if not messages:
# transform old school amazon titan invokeModel api to messages
if input_text := decoded_body.get("inputText"):
messages = [
{"role": "user", "content": [{"text": input_text}]}
]
model_id = self._call_context.params.get(_MODEL_ID_KEY)
if "amazon.titan" in model_id:
# transform old school amazon titan invokeModel api to messages
if input_text := decoded_body.get("inputText"):
messages = [
{"role": "user", "content": [{"text": input_text}]}
]
elif "cohere.command-r" in model_id:
# chat_history can be converted to messages; for now, just use message
if input_text := decoded_body.get("message"):
messages = [
{"role": "user", "content": [{"text": input_text}]}
]
elif "cohere.command" in model_id or "meta.llama" in model_id or "mistral.mistral" in model_id:
# transform old school cohere command api to messages
if input_text := decoded_body.get("prompt"):
messages = [
{"role": "user", "content": [{"text": input_text}]}
]

return system_messages + messages

Expand Down Expand Up @@ -439,6 +528,22 @@ def _invoke_model_on_success(
self._handle_anthropic_claude_response(
span, response_body, instrumentor_context, capture_content
)
elif "cohere.command-r" in model_id:
self._handle_cohere_command_r_response(
span, response_body, instrumentor_context, capture_content
)
elif "cohere.command" in model_id:
self._handle_cohere_command_response(
span, response_body, instrumentor_context, capture_content
)
elif "meta.llama" in model_id:
self._handle_meta_llama_response(
span, response_body, instrumentor_context, capture_content
)
elif "mistral" in model_id:
self._handle_mistral_ai_response(
span, response_body, instrumentor_context, capture_content
)
except json.JSONDecodeError:
_logger.debug("Error: Unable to parse the response body as JSON")
except Exception as exc: # pylint: disable=broad-exception-caught
Expand Down Expand Up @@ -724,6 +829,98 @@ def _handle_anthropic_claude_response(
token_usage_histogram.record(
output_tokens, output_attributes
)

def _handle_cohere_command_r_response(
self,
span: Span,
response_body: dict[str, Any],
instrumentor_context: _BotocoreInstrumentorContext,
capture_content: bool,
):
if "text" in response_body:
span.set_attribute(
GEN_AI_USAGE_OUTPUT_TOKENS, math.ceil(len(response_body["text"]) / 6)
)
if "finish_reason" in response_body:
span.set_attribute(
GEN_AI_RESPONSE_FINISH_REASONS, [response_body["finish_reason"]]
)

event_logger = instrumentor_context.event_logger
choice = _Choice.from_invoke_cohere_command_r(
response_body, capture_content
)
event_logger.emit(choice.to_choice_event())

def _handle_cohere_command_response(
self,
span: Span,
response_body: dict[str, Any],
instrumentor_context: _BotocoreInstrumentorContext,
capture_content: bool,
):
if "generations" in response_body and response_body["generations"]:
generations = response_body["generations"][0]
if "text" in generations:
span.set_attribute(
GEN_AI_USAGE_OUTPUT_TOKENS, math.ceil(len(generations["text"]) / 6)
)
if "finish_reason" in generations:
span.set_attribute(
GEN_AI_RESPONSE_FINISH_REASONS, [generations["finish_reason"]]
)

event_logger = instrumentor_context.event_logger
choice = _Choice.from_invoke_cohere_command(
response_body, capture_content
)
event_logger.emit(choice.to_choice_event())

def _handle_meta_llama_response(
self,
span: Span,
response_body: dict[str, Any],
instrumentor_context: _BotocoreInstrumentorContext,
capture_content: bool,
):
if "prompt_token_count" in response_body:
span.set_attribute(
GEN_AI_USAGE_INPUT_TOKENS, response_body["prompt_token_count"]
)
if "generation_token_count" in response_body:
span.set_attribute(
GEN_AI_USAGE_OUTPUT_TOKENS, response_body["generation_token_count"],
)
if "stop_reason" in response_body:
span.set_attribute(
GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stop_reason"]]
)

event_logger = instrumentor_context.event_logger
choice = _Choice.from_invoke_meta_llama(
response_body, capture_content
)
event_logger.emit(choice.to_choice_event())

def _handle_mistral_ai_response(
self,
span: Span,
response_body: dict[str, Any],
instrumentor_context: _BotocoreInstrumentorContext,
capture_content: bool,
):
if "outputs" in response_body:
outputs = response_body["outputs"][0]
if "text" in outputs:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, math.ceil(len(outputs["text"]) / 6))
if "stop_reason" in outputs:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [outputs["stop_reason"]])

event_logger = instrumentor_context.event_logger
choice = _Choice.from_invoke_mistral_mistral(
response_body, capture_content
)
event_logger.emit(choice.to_choice_event())

def on_error(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,48 @@ def from_invoke_anthropic_claude(
message["content"] = response["content"]
return cls(message, response["stop_reason"], index=0)

@classmethod
def from_invoke_cohere_command_r(
cls, response: dict[str, Any], capture_content: bool
) -> _Choice:
if capture_content:
message = {"content": response["text"]}
else:
message = {}
return cls(message, response["finish_reason"], index=0)

@classmethod
def from_invoke_cohere_command(
cls, response: dict[str, Any], capture_content: bool
) -> _Choice:
result = response["generations"][0]
if capture_content:
message = {"content": result["text"]}
else:
message = {}
return cls(message, result["finish_reason"], index=0)

@classmethod
def from_invoke_meta_llama(
cls, response: dict[str, Any], capture_content: bool
) -> _Choice:
if capture_content:
message = {"content": response["generation"]}
else:
message = {}
return cls(message, response["stop_reason"], index=0)

@classmethod
def from_invoke_mistral_mistral(
cls, response: dict[str, Any], capture_content: bool
) -> _Choice:
result = response["outputs"][0]
if capture_content:
message = {"content": result["text"]}
else:
message = {}
return cls(message, result["stop_reason"], index=0)

def _to_body_dict(self) -> dict[str, Any]:
return {
"finish_reason": self.finish_reason,
Expand Down
Loading
Loading