Skip to content

Commit b632337

Browse files
committed
Add basic handling for invoke.model
1 parent 9460773 commit b632337

File tree

11 files changed

+648
-147
lines changed

11 files changed

+648
-147
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import json
2+
import os
3+
4+
import boto3
5+
6+
7+
def main():
8+
client = boto3.client("bedrock-runtime")
9+
response = client.invoke_model(
10+
modelId=os.getenv("CHAT_MODEL", "amazon.titan-text-lite-v1"),
11+
body=json.dumps(
12+
{
13+
"inputText": "Write a short poem on OpenTelemetry.",
14+
"textGenerationConfig": {},
15+
},
16+
),
17+
)
18+
19+
body = response["body"].read()
20+
response_data = json.loads(body.decode("utf-8"))
21+
print(response_data["results"][0]["outputText"])
22+
23+
24+
if __name__ == "__main__":
25+
main()

instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py

Lines changed: 177 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@
1818

1919
from __future__ import annotations
2020

21+
import io
22+
import json
2123
import logging
2224
from typing import Any
2325

26+
from botocore.response import StreamingBody
27+
2428
from opentelemetry.instrumentation.botocore.extensions.types import (
2529
_AttributeMapT,
2630
_AwsSdkExtension,
@@ -58,7 +62,7 @@ class _BedrockRuntimeExtension(_AwsSdkExtension):
5862
Amazon Bedrock Runtime</a>.
5963
"""
6064

61-
_HANDLED_OPERATIONS = {"Converse"}
65+
_HANDLED_OPERATIONS = {"Converse", "InvokeModel"}
6266

6367
def extract_attributes(self, attributes: _AttributeMapT):
6468
if self._call_context.operation not in self._HANDLED_OPERATIONS:
@@ -73,6 +77,7 @@ def extract_attributes(self, attributes: _AttributeMapT):
7377
GenAiOperationNameValues.CHAT.value
7478
)
7579

80+
# Converse
7681
if inference_config := self._call_context.params.get(
7782
"inferenceConfig"
7883
):
@@ -97,6 +102,84 @@ def extract_attributes(self, attributes: _AttributeMapT):
97102
inference_config.get("stopSequences"),
98103
)
99104

105+
# InvokeModel
106+
# Get the request body if it exists
107+
body = self._call_context.params.get("body")
108+
if body:
109+
try:
110+
request_body = json.loads(body)
111+
112+
if "amazon.titan" in model_id:
113+
# titan interface is a text completion one
114+
attributes[GEN_AI_OPERATION_NAME] = (
115+
GenAiOperationNameValues.TEXT_COMPLETION.value
116+
)
117+
self._extract_titan_attributes(
118+
attributes, request_body
119+
)
120+
elif "amazon.nova" in model_id:
121+
self._extract_nova_attributes(attributes, request_body)
122+
elif "anthropic.claude" in model_id:
123+
self._extract_claude_attributes(
124+
attributes, request_body
125+
)
126+
except json.JSONDecodeError:
127+
_logger.debug("Error: Unable to parse the body as JSON")
128+
129+
def _extract_titan_attributes(self, attributes, request_body):
130+
config = request_body.get("textGenerationConfig", {})
131+
self._set_if_not_none(
132+
attributes, GEN_AI_REQUEST_TEMPERATURE, config.get("temperature")
133+
)
134+
self._set_if_not_none(
135+
attributes, GEN_AI_REQUEST_TOP_P, config.get("topP")
136+
)
137+
self._set_if_not_none(
138+
attributes, GEN_AI_REQUEST_MAX_TOKENS, config.get("maxTokenCount")
139+
)
140+
self._set_if_not_none(
141+
attributes,
142+
GEN_AI_REQUEST_STOP_SEQUENCES,
143+
config.get("stopSequences"),
144+
)
145+
146+
def _extract_nova_attributes(self, attributes, request_body):
147+
config = request_body.get("inferenceConfig", {})
148+
self._set_if_not_none(
149+
attributes, GEN_AI_REQUEST_TEMPERATURE, config.get("temperature")
150+
)
151+
self._set_if_not_none(
152+
attributes, GEN_AI_REQUEST_TOP_P, config.get("topP")
153+
)
154+
self._set_if_not_none(
155+
attributes, GEN_AI_REQUEST_MAX_TOKENS, config.get("max_new_tokens")
156+
)
157+
self._set_if_not_none(
158+
attributes,
159+
GEN_AI_REQUEST_STOP_SEQUENCES,
160+
config.get("stopSequences"),
161+
)
162+
163+
def _extract_claude_attributes(self, attributes, request_body):
164+
self._set_if_not_none(
165+
attributes,
166+
GEN_AI_REQUEST_MAX_TOKENS,
167+
request_body.get("max_tokens"),
168+
)
169+
self._set_if_not_none(
170+
attributes,
171+
GEN_AI_REQUEST_TEMPERATURE,
172+
request_body.get("temperature"),
173+
)
174+
self._set_if_not_none(
175+
attributes, GEN_AI_REQUEST_TOP_P, request_body.get("top_p")
176+
)
177+
self._set_if_not_none(
178+
attributes,
179+
GEN_AI_REQUEST_STOP_SEQUENCES,
180+
request_body.get("stop_sequences"),
181+
)
182+
100183
@staticmethod
101184
def _set_if_not_none(attributes, key, value):
102185
if value is not None:
@@ -122,6 +205,7 @@ def on_success(self, span: Span, result: dict[str, Any]):
122205
if not span.is_recording():
123206
return
124207

208+
# Converse
125209
if usage := result.get("usage"):
126210
if input_tokens := usage.get("inputTokens"):
127211
span.set_attribute(
@@ -140,6 +224,98 @@ def on_success(self, span: Span, result: dict[str, Any]):
140224
[stop_reason],
141225
)
142226

227+
model_id = self._call_context.params.get(_MODEL_ID_KEY)
228+
if not model_id:
229+
return
230+
231+
# InvokeModel
232+
if "body" in result and isinstance(result["body"], StreamingBody):
233+
original_body = None
234+
try:
235+
original_body = result["body"]
236+
body_content = original_body.read()
237+
238+
# Use one stream for telemetry
239+
stream = io.BytesIO(body_content)
240+
telemetry_content = stream.read()
241+
response_body = json.loads(telemetry_content.decode("utf-8"))
242+
if "amazon.titan" in model_id:
243+
self._handle_amazon_titan_response(span, response_body)
244+
elif "amazon.nova" in model_id:
245+
self._handle_amazon_nova_response(span, response_body)
246+
elif "anthropic.claude" in model_id:
247+
self._handle_anthropic_claude_response(span, response_body)
248+
# Replenish stream for downstream application use
249+
new_stream = io.BytesIO(body_content)
250+
result["body"] = StreamingBody(new_stream, len(body_content))
251+
252+
except json.JSONDecodeError:
253+
_logger.debug(
254+
"Error: Unable to parse the response body as JSON"
255+
)
256+
except Exception as exc: # pylint: disable=broad-exception-caught
257+
_logger.debug("Error processing response: %s", exc)
258+
finally:
259+
if original_body is not None:
260+
original_body.close()
261+
262+
# pylint: disable=no-self-use
263+
def _handle_amazon_titan_response(
264+
self, span: Span, response_body: dict[str, Any]
265+
):
266+
if "inputTextTokenCount" in response_body:
267+
span.set_attribute(
268+
GEN_AI_USAGE_INPUT_TOKENS, response_body["inputTextTokenCount"]
269+
)
270+
if "results" in response_body and response_body["results"]:
271+
result = response_body["results"][0]
272+
if "tokenCount" in result:
273+
span.set_attribute(
274+
GEN_AI_USAGE_OUTPUT_TOKENS, result["tokenCount"]
275+
)
276+
if "completionReason" in result:
277+
span.set_attribute(
278+
GEN_AI_RESPONSE_FINISH_REASONS,
279+
[result["completionReason"]],
280+
)
281+
282+
# pylint: disable=no-self-use
283+
def _handle_amazon_nova_response(
284+
self, span: Span, response_body: dict[str, Any]
285+
):
286+
if "usage" in response_body:
287+
usage = response_body["usage"]
288+
if "inputTokens" in usage:
289+
span.set_attribute(
290+
GEN_AI_USAGE_INPUT_TOKENS, usage["inputTokens"]
291+
)
292+
if "outputTokens" in usage:
293+
span.set_attribute(
294+
GEN_AI_USAGE_OUTPUT_TOKENS, usage["outputTokens"]
295+
)
296+
if "stopReason" in response_body:
297+
span.set_attribute(
298+
GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stopReason"]]
299+
)
300+
301+
# pylint: disable=no-self-use
302+
def _handle_anthropic_claude_response(
303+
self, span: Span, response_body: dict[str, Any]
304+
):
305+
if usage := response_body.get("usage"):
306+
if "input_tokens" in usage:
307+
span.set_attribute(
308+
GEN_AI_USAGE_INPUT_TOKENS, usage["input_tokens"]
309+
)
310+
if "output_tokens" in usage:
311+
span.set_attribute(
312+
GEN_AI_USAGE_OUTPUT_TOKENS, usage["output_tokens"]
313+
)
314+
if "stop_reason" in response_body:
315+
span.set_attribute(
316+
GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stop_reason"]]
317+
)
318+
143319
def on_error(self, span: Span, exception: _BotoClientErrorT):
144320
if self._call_context.operation not in self._HANDLED_OPERATIONS:
145321
return

instrumentation/opentelemetry-instrumentation-botocore/tests/bedrock_utils.py

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,84 @@
1414

1515
from __future__ import annotations
1616

17+
import io
18+
import json
1719
from typing import Any
1820

21+
from botocore.response import StreamingBody
22+
1923
from opentelemetry.sdk.trace import ReadableSpan
2024
from opentelemetry.semconv._incubating.attributes import (
2125
gen_ai_attributes as GenAIAttributes,
2226
)
2327

2428

29+
def assert_completion_attributes_from_streaming_body(
30+
span: ReadableSpan,
31+
request_model: str,
32+
response: StreamingBody | None,
33+
operation_name: str = "chat",
34+
request_top_p: int | None = None,
35+
request_temperature: int | None = None,
36+
request_max_tokens: int | None = None,
37+
request_stop_sequences: list[str] | None = None,
38+
):
39+
input_tokens = None
40+
output_tokens = None
41+
finish_reason = None
42+
if response:
43+
original_body = response["body"]
44+
body_content = original_body.read()
45+
stream = io.BytesIO(body_content)
46+
telemetry_content = stream.read()
47+
response = json.loads(telemetry_content.decode("utf-8"))
48+
49+
if "amazon.titan" in request_model:
50+
input_tokens = response.get("inputTextTokenCount")
51+
results = response.get("results")
52+
if results:
53+
first_result = results[0]
54+
output_tokens = first_result.get("tokenCount")
55+
finish_reason = (first_result["completionReason"],)
56+
elif "amazon.nova" in request_model:
57+
if usage := response.get("usage"):
58+
input_tokens = usage["inputTokens"]
59+
output_tokens = usage["outputTokens"]
60+
else:
61+
input_tokens, output_tokens = None, None
62+
63+
if "stopReason" in response:
64+
finish_reason = (response["stopReason"],)
65+
else:
66+
finish_reason = None
67+
elif "anthropic.claude" in request_model:
68+
if usage := response.get("usage"):
69+
input_tokens = usage["input_tokens"]
70+
output_tokens = usage["output_tokens"]
71+
else:
72+
input_tokens, output_tokens = None, None
73+
74+
if "stop_reason" in response:
75+
finish_reason = (response["stop_reason"],)
76+
else:
77+
finish_reason = None
78+
79+
return assert_all_attributes(
80+
span,
81+
request_model,
82+
input_tokens,
83+
output_tokens,
84+
finish_reason,
85+
operation_name,
86+
request_top_p,
87+
request_temperature,
88+
request_max_tokens,
89+
tuple(request_stop_sequences)
90+
if request_stop_sequences is not None
91+
else request_stop_sequences,
92+
)
93+
94+
2595
def assert_completion_attributes(
2696
span: ReadableSpan,
2797
request_model: str,
@@ -38,7 +108,7 @@ def assert_completion_attributes(
38108
else:
39109
input_tokens, output_tokens = None, None
40110

41-
if response:
111+
if response and "stopReason" in response:
42112
finish_reason = (response["stopReason"],)
43113
else:
44114
finish_reason = None
@@ -60,10 +130,10 @@ def assert_completion_attributes(
60130

61131

62132
def assert_equal_or_not_present(value, attribute_name, span):
63-
if value:
133+
if value is not None:
64134
assert value == span.attributes[attribute_name]
65135
else:
66-
assert attribute_name not in span.attributes
136+
assert attribute_name not in span.attributes, attribute_name
67137

68138

69139
def assert_all_attributes(

0 commit comments

Comments
 (0)