|
2 | 2 | # SPDX-License-Identifier: Apache-2.0
|
3 | 3 | import abc
|
4 | 4 | import inspect
|
5 |
| -from typing import Dict, Optional |
| 5 | +import io |
| 6 | +import json |
| 7 | +import logging |
| 8 | +import math |
| 9 | +from typing import Any, Dict, Optional |
| 10 | + |
| 11 | +from botocore.response import StreamingBody |
6 | 12 |
|
7 | 13 | from amazon.opentelemetry.distro._aws_attribute_keys import (
|
8 | 14 | AWS_BEDROCK_AGENT_ID,
|
|
11 | 17 | AWS_BEDROCK_GUARDRAIL_ID,
|
12 | 18 | AWS_BEDROCK_KNOWLEDGE_BASE_ID,
|
13 | 19 | )
|
14 |
| -from amazon.opentelemetry.distro._aws_span_processing_util import GEN_AI_REQUEST_MODEL, GEN_AI_SYSTEM |
| 20 | +from amazon.opentelemetry.distro._aws_span_processing_util import ( |
| 21 | + GEN_AI_REQUEST_MAX_TOKENS, |
| 22 | + GEN_AI_REQUEST_MODEL, |
| 23 | + GEN_AI_REQUEST_TEMPERATURE, |
| 24 | + GEN_AI_REQUEST_TOP_P, |
| 25 | + GEN_AI_RESPONSE_FINISH_REASONS, |
| 26 | + GEN_AI_SYSTEM, |
| 27 | + GEN_AI_USAGE_INPUT_TOKENS, |
| 28 | + GEN_AI_USAGE_OUTPUT_TOKENS, |
| 29 | +) |
15 | 30 | from opentelemetry.instrumentation.botocore.extensions.types import (
|
16 | 31 | _AttributeMapT,
|
17 | 32 | _AwsSdkCallContext,
|
|
28 | 43 | _MODEL_ID: str = "modelId"
|
29 | 44 | _AWS_BEDROCK_SYSTEM: str = "aws_bedrock"
|
30 | 45 |
|
| 46 | +_logger = logging.getLogger(__name__) |
| 47 | +# Set logger level to DEBUG |
| 48 | +_logger.setLevel(logging.DEBUG) |
| 49 | + |
31 | 50 |
|
32 | 51 | class _BedrockAgentOperation(abc.ABC):
|
33 | 52 | """
|
@@ -240,3 +259,168 @@ def extract_attributes(self, attributes: _AttributeMapT):
|
240 | 259 | model_id = self._call_context.params.get(_MODEL_ID)
|
241 | 260 | if model_id:
|
242 | 261 | attributes[GEN_AI_REQUEST_MODEL] = model_id
|
| 262 | + |
| 263 | + # Get the request body if it exists |
| 264 | + body = self._call_context.params.get("body") |
| 265 | + if body: |
| 266 | + try: |
| 267 | + request_body = json.loads(body) |
| 268 | + |
| 269 | + if "amazon.titan" in model_id: |
| 270 | + self._extract_titan_attributes(attributes, request_body) |
| 271 | + elif "anthropic.claude" in model_id: |
| 272 | + self._extract_claude_attributes(attributes, request_body) |
| 273 | + elif "meta.llama" in model_id: |
| 274 | + self._extract_llama_attributes(attributes, request_body) |
| 275 | + elif "cohere.command" in model_id: |
| 276 | + self._extract_cohere_attributes(attributes, request_body) |
| 277 | + elif "ai21.jamba" in model_id: |
| 278 | + self._extract_ai21_attributes(attributes, request_body) |
| 279 | + elif "mistral" in model_id: |
| 280 | + self._extract_mistral_attributes(attributes, request_body) |
| 281 | + |
| 282 | + except json.JSONDecodeError: |
| 283 | + _logger.debug("Error: Unable to parse the body as JSON") |
| 284 | + |
| 285 | + def _extract_titan_attributes(self, attributes, request_body): |
| 286 | + config = request_body.get("textGenerationConfig", {}) |
| 287 | + self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, config.get("temperature")) |
| 288 | + self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, config.get("topP")) |
| 289 | + self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, config.get("maxTokenCount")) |
| 290 | + |
| 291 | + def _extract_claude_attributes(self, attributes, request_body): |
| 292 | + self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens")) |
| 293 | + self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")) |
| 294 | + self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p")) |
| 295 | + |
| 296 | + def _extract_cohere_attributes(self, attributes, request_body): |
| 297 | + prompt = request_body.get("message") |
| 298 | + if prompt: |
| 299 | + attributes[GEN_AI_USAGE_INPUT_TOKENS] = math.ceil(len(prompt) / 6) |
| 300 | + self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens")) |
| 301 | + self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")) |
| 302 | + self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("p")) |
| 303 | + |
| 304 | + def _extract_ai21_attributes(self, attributes, request_body): |
| 305 | + self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens")) |
| 306 | + self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")) |
| 307 | + self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p")) |
| 308 | + |
| 309 | + def _extract_llama_attributes(self, attributes, request_body): |
| 310 | + self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_gen_len")) |
| 311 | + self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")) |
| 312 | + self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p")) |
| 313 | + |
| 314 | + def _extract_mistral_attributes(self, attributes, request_body): |
| 315 | + prompt = request_body.get("prompt") |
| 316 | + if prompt: |
| 317 | + attributes[GEN_AI_USAGE_INPUT_TOKENS] = math.ceil(len(prompt) / 6) |
| 318 | + self._set_if_not_none(attributes, GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens")) |
| 319 | + self._set_if_not_none(attributes, GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")) |
| 320 | + self._set_if_not_none(attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p")) |
| 321 | + |
| 322 | + @staticmethod |
| 323 | + def _set_if_not_none(attributes, key, value): |
| 324 | + if value is not None: |
| 325 | + attributes[key] = value |
| 326 | + |
| 327 | + def on_success(self, span: Span, result: Dict[str, Any]): |
| 328 | + model_id = self._call_context.params.get(_MODEL_ID) |
| 329 | + |
| 330 | + if not model_id: |
| 331 | + return |
| 332 | + |
| 333 | + if "body" in result and isinstance(result["body"], StreamingBody): |
| 334 | + original_body = None |
| 335 | + try: |
| 336 | + original_body = result["body"] |
| 337 | + body_content = original_body.read() |
| 338 | + |
| 339 | + # Use one stream for telemetry |
| 340 | + stream = io.BytesIO(body_content) |
| 341 | + telemetry_content = stream.read() |
| 342 | + response_body = json.loads(telemetry_content.decode("utf-8")) |
| 343 | + if "amazon.titan" in model_id: |
| 344 | + self._handle_amazon_titan_response(span, response_body) |
| 345 | + elif "anthropic.claude" in model_id: |
| 346 | + self._handle_anthropic_claude_response(span, response_body) |
| 347 | + elif "meta.llama" in model_id: |
| 348 | + self._handle_meta_llama_response(span, response_body) |
| 349 | + elif "cohere.command" in model_id: |
| 350 | + self._handle_cohere_command_response(span, response_body) |
| 351 | + elif "ai21.jamba" in model_id: |
| 352 | + self._handle_ai21_jamba_response(span, response_body) |
| 353 | + elif "mistral" in model_id: |
| 354 | + self._handle_mistral_mistral_response(span, response_body) |
| 355 | + # Replenish stream for downstream application use |
| 356 | + new_stream = io.BytesIO(body_content) |
| 357 | + result["body"] = StreamingBody(new_stream, len(body_content)) |
| 358 | + |
| 359 | + except json.JSONDecodeError: |
| 360 | + _logger.debug("Error: Unable to parse the response body as JSON") |
| 361 | + except Exception as e: # pylint: disable=broad-exception-caught, invalid-name |
| 362 | + _logger.debug("Error processing response: %s", e) |
| 363 | + finally: |
| 364 | + if original_body is not None: |
| 365 | + original_body.close() |
| 366 | + |
| 367 | + # pylint: disable=no-self-use |
| 368 | + def _handle_amazon_titan_response(self, span: Span, response_body: Dict[str, Any]): |
| 369 | + if "inputTextTokenCount" in response_body: |
| 370 | + span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, response_body["inputTextTokenCount"]) |
| 371 | + if "results" in response_body and response_body["results"]: |
| 372 | + result = response_body["results"][0] |
| 373 | + if "tokenCount" in result: |
| 374 | + span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, result["tokenCount"]) |
| 375 | + if "completionReason" in result: |
| 376 | + span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [result["completionReason"]]) |
| 377 | + |
| 378 | + # pylint: disable=no-self-use |
| 379 | + def _handle_anthropic_claude_response(self, span: Span, response_body: Dict[str, Any]): |
| 380 | + if "usage" in response_body: |
| 381 | + usage = response_body["usage"] |
| 382 | + if "input_tokens" in usage: |
| 383 | + span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, usage["input_tokens"]) |
| 384 | + if "output_tokens" in usage: |
| 385 | + span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, usage["output_tokens"]) |
| 386 | + if "stop_reason" in response_body: |
| 387 | + span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stop_reason"]]) |
| 388 | + |
| 389 | + # pylint: disable=no-self-use |
| 390 | + def _handle_cohere_command_response(self, span: Span, response_body: Dict[str, Any]): |
| 391 | + # Output tokens: Approximate from the response text |
| 392 | + if "text" in response_body: |
| 393 | + span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, math.ceil(len(response_body["text"]) / 6)) |
| 394 | + if "finish_reason" in response_body: |
| 395 | + span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [response_body["finish_reason"]]) |
| 396 | + |
| 397 | + # pylint: disable=no-self-use |
| 398 | + def _handle_ai21_jamba_response(self, span: Span, response_body: Dict[str, Any]): |
| 399 | + if "usage" in response_body: |
| 400 | + usage = response_body["usage"] |
| 401 | + if "prompt_tokens" in usage: |
| 402 | + span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, usage["prompt_tokens"]) |
| 403 | + if "completion_tokens" in usage: |
| 404 | + span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, usage["completion_tokens"]) |
| 405 | + if "choices" in response_body: |
| 406 | + choices = response_body["choices"][0] |
| 407 | + if "finish_reason" in choices: |
| 408 | + span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [choices["finish_reason"]]) |
| 409 | + |
| 410 | + # pylint: disable=no-self-use |
| 411 | + def _handle_meta_llama_response(self, span: Span, response_body: Dict[str, Any]): |
| 412 | + if "prompt_token_count" in response_body: |
| 413 | + span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, response_body["prompt_token_count"]) |
| 414 | + if "generation_token_count" in response_body: |
| 415 | + span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, response_body["generation_token_count"]) |
| 416 | + if "stop_reason" in response_body: |
| 417 | + span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stop_reason"]]) |
| 418 | + |
| 419 | + # pylint: disable=no-self-use |
| 420 | + def _handle_mistral_mistral_response(self, span: Span, response_body: Dict[str, Any]): |
| 421 | + if "outputs" in response_body: |
| 422 | + outputs = response_body["outputs"][0] |
| 423 | + if "text" in outputs: |
| 424 | + span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, math.ceil(len(outputs["text"]) / 6)) |
| 425 | + if "stop_reason" in outputs: |
| 426 | + span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [outputs["stop_reason"]]) |
0 commit comments