|
18 | 18 |
|
19 | 19 | from __future__ import annotations |
20 | 20 |
|
21 | | -import io |
22 | 21 | import json |
23 | 22 | import logging |
24 | 23 | import math |
25 | 24 | from typing import Any |
26 | 25 |
|
27 | | -from botocore.response import StreamingBody |
28 | | - |
29 | 26 | from opentelemetry.instrumentation.botocore.extensions.types import ( |
30 | 27 | _AttributeMapT, |
31 | 28 | _AwsSdkExtension, |
@@ -58,13 +55,19 @@ class _BedrockRuntimeExtension(_AwsSdkExtension): |
58 | 55 | """ |
59 | 56 |
|
60 | 57 | def extract_attributes(self, attributes: _AttributeMapT): |
61 | | - attributes[GEN_AI_SYSTEM] = GenAiSystemValues.AWS_BEDROCK |
62 | | - attributes[GEN_AI_OPERATION_NAME] = GenAiOperationNameValues.CHAT |
| 58 | + attributes[GEN_AI_SYSTEM] = GenAiSystemValues.AWS_BEDROCK.value |
63 | 59 |
|
64 | 60 | model_id = self._call_context.params.get(_MODEL_ID_KEY) |
65 | 61 | if model_id: |
66 | 62 | attributes[GEN_AI_REQUEST_MODEL] = model_id |
67 | 63 |
|
| 64 | + # FIXME: add other model patterns |
| 65 | + text_model_patterns = ["amazon.titan-text"] |
| 66 | + if any(pattern in model_id for pattern in text_model_patterns): |
| 67 | + attributes[GEN_AI_OPERATION_NAME] = ( |
| 68 | + GenAiOperationNameValues.CHAT.value |
| 69 | + ) |
| 70 | + |
68 | 71 | # Get the request body if it exists |
69 | 72 | body = self._call_context.params.get("body") |
70 | 73 | if body: |
@@ -209,176 +212,38 @@ def _set_if_not_none(attributes, key, value): |
209 | 212 | if value is not None: |
210 | 213 | attributes[key] = value |
211 | 214 |
|
| 215 | + def before_service_call(self, span: Span): |
| 216 | + if not span.is_recording(): |
| 217 | + return |
| 218 | + |
| 219 | + operation_name = span.attributes.get(GEN_AI_OPERATION_NAME, "") |
| 220 | + request_model = span.attributes.get(GEN_AI_REQUEST_MODEL, "") |
| 221 | + # avoid setting to an empty string if are not available |
| 222 | + if operation_name and request_model: |
| 223 | + span.update_name(f"{operation_name} {request_model}") |
| 224 | + |
212 | 225 | # pylint: disable=too-many-branches |
213 | 226 | def on_success(self, span: Span, result: dict[str, Any]): |
214 | 227 | model_id = self._call_context.params.get(_MODEL_ID_KEY) |
215 | 228 |
|
216 | 229 | if not model_id: |
217 | 230 | return |
218 | 231 |
|
219 | | - if "body" in result and isinstance(result["body"], StreamingBody): |
220 | | - original_body = None |
221 | | - try: |
222 | | - original_body = result["body"] |
223 | | - body_content = original_body.read() |
224 | | - |
225 | | - # Use one stream for telemetry |
226 | | - stream = io.BytesIO(body_content) |
227 | | - telemetry_content = stream.read() |
228 | | - response_body = json.loads(telemetry_content.decode("utf-8")) |
229 | | - if "amazon.titan" in model_id: |
230 | | - self._handle_amazon_titan_response(span, response_body) |
231 | | - if "amazon.nova" in model_id: |
232 | | - self._handle_amazon_nova_response(span, response_body) |
233 | | - elif "anthropic.claude" in model_id: |
234 | | - self._handle_anthropic_claude_response(span, response_body) |
235 | | - elif "meta.llama" in model_id: |
236 | | - self._handle_meta_llama_response(span, response_body) |
237 | | - elif "cohere.command" in model_id: |
238 | | - self._handle_cohere_command_response(span, response_body) |
239 | | - elif "ai21.jamba" in model_id: |
240 | | - self._handle_ai21_jamba_response(span, response_body) |
241 | | - elif "mistral" in model_id: |
242 | | - self._handle_mistral_mistral_response(span, response_body) |
243 | | - # Replenish stream for downstream application use |
244 | | - new_stream = io.BytesIO(body_content) |
245 | | - result["body"] = StreamingBody(new_stream, len(body_content)) |
246 | | - |
247 | | - except json.JSONDecodeError: |
248 | | - _logger.debug( |
249 | | - "Error: Unable to parse the response body as JSON" |
250 | | - ) |
251 | | - except Exception as e: # pylint: disable=broad-exception-caught, invalid-name |
252 | | - _logger.debug("Error processing response: %s", e) |
253 | | - finally: |
254 | | - if original_body is not None: |
255 | | - original_body.close() |
256 | | - |
257 | | - # pylint: disable=no-self-use |
258 | | - def _handle_amazon_titan_response( |
259 | | - self, span: Span, response_body: dict[str, Any] |
260 | | - ): |
261 | | - if "inputTextTokenCount" in response_body: |
262 | | - span.set_attribute( |
263 | | - GEN_AI_USAGE_INPUT_TOKENS, response_body["inputTextTokenCount"] |
264 | | - ) |
265 | | - if "results" in response_body and response_body["results"]: |
266 | | - result = response_body["results"][0] |
267 | | - if "tokenCount" in result: |
268 | | - span.set_attribute( |
269 | | - GEN_AI_USAGE_OUTPUT_TOKENS, result["tokenCount"] |
270 | | - ) |
271 | | - if "completionReason" in result: |
272 | | - span.set_attribute( |
273 | | - GEN_AI_RESPONSE_FINISH_REASONS, |
274 | | - [result["completionReason"]], |
275 | | - ) |
276 | | - |
277 | | - # pylint: disable=no-self-use |
278 | | - def _handle_amazon_nova_response( |
279 | | - self, span: Span, response_body: dict[str, Any] |
280 | | - ): |
281 | | - if "usage" in response_body: |
282 | | - usage = response_body["usage"] |
283 | | - if "inputTokens" in usage: |
284 | | - span.set_attribute( |
285 | | - GEN_AI_USAGE_INPUT_TOKENS, usage["inputTokens"] |
286 | | - ) |
287 | | - if "outputTokens" in usage: |
| 232 | + # FIXME: this is tested only with titan |
| 233 | + if usage := result.get("usage"): |
| 234 | + if input_tokens := usage.get("inputTokens"): |
288 | 235 | span.set_attribute( |
289 | | - GEN_AI_USAGE_OUTPUT_TOKENS, usage["outputTokens"] |
| 236 | + GEN_AI_USAGE_INPUT_TOKENS, |
| 237 | + input_tokens, |
290 | 238 | ) |
291 | | - if "stopReason" in response_body: |
292 | | - span.set_attribute( |
293 | | - GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stopReason"]] |
294 | | - ) |
295 | | - |
296 | | - # pylint: disable=no-self-use |
297 | | - def _handle_anthropic_claude_response( |
298 | | - self, span: Span, response_body: dict[str, Any] |
299 | | - ): |
300 | | - if "usage" in response_body: |
301 | | - usage = response_body["usage"] |
302 | | - if "input_tokens" in usage: |
| 239 | + if output_tokens := usage.get("outputTokens"): |
303 | 240 | span.set_attribute( |
304 | | - GEN_AI_USAGE_INPUT_TOKENS, usage["input_tokens"] |
305 | | - ) |
306 | | - if "output_tokens" in usage: |
307 | | - span.set_attribute( |
308 | | - GEN_AI_USAGE_OUTPUT_TOKENS, usage["output_tokens"] |
| 241 | + GEN_AI_USAGE_OUTPUT_TOKENS, |
| 242 | + output_tokens, |
309 | 243 | ) |
310 | | - if "stop_reason" in response_body: |
311 | | - span.set_attribute( |
312 | | - GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stop_reason"]] |
313 | | - ) |
314 | 244 |
|
315 | | - # pylint: disable=no-self-use |
316 | | - def _handle_cohere_command_response( |
317 | | - self, span: Span, response_body: dict[str, Any] |
318 | | - ): |
319 | | - # Output tokens: Approximate from the response text |
320 | | - if "text" in response_body: |
321 | | - span.set_attribute( |
322 | | - GEN_AI_USAGE_OUTPUT_TOKENS, |
323 | | - math.ceil(len(response_body["text"]) / 6), |
324 | | - ) |
325 | | - if "finish_reason" in response_body: |
| 245 | + if stop_reason := result.get("stopReason"): |
326 | 246 | span.set_attribute( |
327 | 247 | GEN_AI_RESPONSE_FINISH_REASONS, |
328 | | - [response_body["finish_reason"]], |
329 | | - ) |
330 | | - |
331 | | - # pylint: disable=no-self-use |
332 | | - def _handle_ai21_jamba_response( |
333 | | - self, span: Span, response_body: dict[str, Any] |
334 | | - ): |
335 | | - if "usage" in response_body: |
336 | | - usage = response_body["usage"] |
337 | | - if "prompt_tokens" in usage: |
338 | | - span.set_attribute( |
339 | | - GEN_AI_USAGE_INPUT_TOKENS, usage["prompt_tokens"] |
340 | | - ) |
341 | | - if "completion_tokens" in usage: |
342 | | - span.set_attribute( |
343 | | - GEN_AI_USAGE_OUTPUT_TOKENS, usage["completion_tokens"] |
344 | | - ) |
345 | | - if "choices" in response_body: |
346 | | - choices = response_body["choices"][0] |
347 | | - if "finish_reason" in choices: |
348 | | - span.set_attribute( |
349 | | - GEN_AI_RESPONSE_FINISH_REASONS, [choices["finish_reason"]] |
350 | | - ) |
351 | | - |
352 | | - # pylint: disable=no-self-use |
353 | | - def _handle_meta_llama_response( |
354 | | - self, span: Span, response_body: dict[str, Any] |
355 | | - ): |
356 | | - if "prompt_token_count" in response_body: |
357 | | - span.set_attribute( |
358 | | - GEN_AI_USAGE_INPUT_TOKENS, response_body["prompt_token_count"] |
359 | | - ) |
360 | | - if "generation_token_count" in response_body: |
361 | | - span.set_attribute( |
362 | | - GEN_AI_USAGE_OUTPUT_TOKENS, |
363 | | - response_body["generation_token_count"], |
364 | | - ) |
365 | | - if "stop_reason" in response_body: |
366 | | - span.set_attribute( |
367 | | - GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stop_reason"]] |
368 | | - ) |
369 | | - |
370 | | - # pylint: disable=no-self-use |
371 | | - def _handle_mistral_mistral_response( |
372 | | - self, span: Span, response_body: dict[str, Any] |
373 | | - ): |
374 | | - if "outputs" in response_body: |
375 | | - outputs = response_body["outputs"][0] |
376 | | - if "text" in outputs: |
377 | | - span.set_attribute( |
378 | | - GEN_AI_USAGE_OUTPUT_TOKENS, |
379 | | - math.ceil(len(outputs["text"]) / 6), |
380 | | - ) |
381 | | - if "stop_reason" in outputs: |
382 | | - span.set_attribute( |
383 | | - GEN_AI_RESPONSE_FINISH_REASONS, [outputs["stop_reason"]] |
| 248 | + [stop_reason], |
384 | 249 | ) |
0 commit comments