Skip to content

Commit 44754e2

Browse files
authored
botocore: handle exceptions when consuming EventStream in bedrock extension (#3211)
1 parent 731054f commit 44754e2

File tree

5 files changed

+286
-11
lines changed

5 files changed

+286
-11
lines changed

instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,12 @@ def _invoke_model_on_success(
266266
if original_body is not None:
267267
original_body.close()
268268

269+
def _on_stream_error_callback(self, span: Span, exception):
270+
span.set_status(Status(StatusCode.ERROR, str(exception)))
271+
if span.is_recording():
272+
span.set_attribute(ERROR_TYPE, type(exception).__qualname__)
273+
span.end()
274+
269275
def on_success(self, span: Span, result: dict[str, Any]):
270276
if self._call_context.operation not in self._HANDLED_OPERATIONS:
271277
return
@@ -282,8 +288,11 @@ def stream_done_callback(response):
282288
self._converse_on_success(span, response)
283289
span.end()
284290

291+
def stream_error_callback(exception):
292+
self._on_stream_error_callback(span, exception)
293+
285294
result["stream"] = ConverseStreamWrapper(
286-
result["stream"], stream_done_callback
295+
result["stream"], stream_done_callback, stream_error_callback
287296
)
288297
return
289298

@@ -307,8 +316,14 @@ def invoke_model_stream_done_callback(response):
307316
self._converse_on_success(span, response)
308317
span.end()
309318

319+
def invoke_model_stream_error_callback(exception):
320+
self._on_stream_error_callback(span, exception)
321+
310322
result["body"] = InvokeModelWithResponseStreamWrapper(
311-
result["body"], invoke_model_stream_done_callback, model_id
323+
result["body"],
324+
invoke_model_stream_done_callback,
325+
invoke_model_stream_error_callback,
326+
model_id,
312327
)
313328
return
314329

instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,14 @@
1919
from __future__ import annotations
2020

2121
import json
22+
from typing import Callable, Dict, Union
2223

23-
from botocore.eventstream import EventStream
24+
from botocore.eventstream import EventStream, EventStreamError
2425
from wrapt import ObjectProxy
2526

27+
_StreamDoneCallableT = Callable[[Dict[str, Union[int, str]]], None]
28+
_StreamErrorCallableT = Callable[[Exception], None]
29+
2630

2731
# pylint: disable=abstract-method
2832
class ConverseStreamWrapper(ObjectProxy):
@@ -31,19 +35,25 @@ class ConverseStreamWrapper(ObjectProxy):
3135
def __init__(
3236
self,
3337
stream: EventStream,
34-
stream_done_callback,
38+
stream_done_callback: _StreamDoneCallableT,
39+
stream_error_callback: _StreamErrorCallableT,
3540
):
3641
super().__init__(stream)
3742

3843
self._stream_done_callback = stream_done_callback
44+
self._stream_error_callback = stream_error_callback
3945
# accumulating things in the same shape of non-streaming version
4046
# {"usage": {"inputTokens": 0, "outputTokens": 0}, "stopReason": "finish"}
4147
self._response = {}
4248

4349
def __iter__(self):
44-
for event in self.__wrapped__:
45-
self._process_event(event)
46-
yield event
50+
try:
51+
for event in self.__wrapped__:
52+
self._process_event(event)
53+
yield event
54+
except EventStreamError as exc:
55+
self._stream_error_callback(exc)
56+
raise
4757

4858
def _process_event(self, event):
4959
if "messageStart" in event:
@@ -85,22 +95,28 @@ class InvokeModelWithResponseStreamWrapper(ObjectProxy):
8595
def __init__(
8696
self,
8797
stream: EventStream,
88-
stream_done_callback,
98+
stream_done_callback: _StreamDoneCallableT,
99+
stream_error_callback: _StreamErrorCallableT,
89100
model_id: str,
90101
):
91102
super().__init__(stream)
92103

93104
self._stream_done_callback = stream_done_callback
105+
self._stream_error_callback = stream_error_callback
94106
self._model_id = model_id
95107

96108
# accumulating things in the same shape of the Converse API
97109
# {"usage": {"inputTokens": 0, "outputTokens": 0}, "stopReason": "finish"}
98110
self._response = {}
99111

100112
def __iter__(self):
101-
for event in self.__wrapped__:
102-
self._process_event(event)
103-
yield event
113+
try:
114+
for event in self.__wrapped__:
115+
self._process_event(event)
116+
yield event
117+
except EventStreamError as exc:
118+
self._stream_error_callback(exc)
119+
raise
104120

105121
def _process_event(self, event):
106122
if "chunk" not in event:
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
interactions:
2+
- request:
3+
body: '{"messages": [{"role": "user", "content": [{"text": "Say this is a test"}]}],
4+
"inferenceConfig": {"maxTokens": 10, "temperature": 0.8, "topP": 1, "stopSequences":
5+
["|"]}}'
6+
headers:
7+
Content-Length:
8+
- '170'
9+
Content-Type:
10+
- !!binary |
11+
YXBwbGljYXRpb24vanNvbg==
12+
User-Agent:
13+
- !!binary |
14+
Qm90bzMvMS4zNS41NiBtZC9Cb3RvY29yZSMxLjM1LjU2IHVhLzIuMCBvcy9saW51eCM2LjEuMC0x
15+
MDM0LW9lbSBtZC9hcmNoI3g4Nl82NCBsYW5nL3B5dGhvbiMzLjEwLjEyIG1kL3B5aW1wbCNDUHl0
16+
aG9uIGNmZy9yZXRyeS1tb2RlI2xlZ2FjeSBCb3RvY29yZS8xLjM1LjU2
17+
X-Amz-Date:
18+
- !!binary |
19+
MjAyNTAxMjdUMTE0NjAyWg==
20+
X-Amz-Security-Token:
21+
- test_aws_security_token
22+
X-Amzn-Trace-Id:
23+
- !!binary |
24+
Um9vdD0xLWI5YzVlMjRlLWRmYzBjYTYyMmFiYjA2ZWEyMjAzZDZkYjtQYXJlbnQ9NDE0MWM4NWIx
25+
ODkzMmI3OTtTYW1wbGVkPTE=
26+
amz-sdk-invocation-id:
27+
- !!binary |
28+
YjA0ZTAzYWEtMDg2MS00NGIzLTk3NmMtMWZjOGE5MzY5YTFl
29+
amz-sdk-request:
30+
- !!binary |
31+
YXR0ZW1wdD0x
32+
authorization:
33+
- Bearer test_aws_authorization
34+
method: POST
35+
uri: https://bedrock-runtime.us-east-1.amazonaws.com/model/amazon.titan-text-lite-v1/converse-stream
36+
response:
37+
body:
38+
string: !!binary |
39+
AAAAswAAAFK3IJ11CzpldmVudC10eXBlBwAMbWVzc2FnZVN0YXJ0DTpjb250ZW50LXR5cGUHABBh
40+
cHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsicCI6ImFiY2RlZmdoaWprbG1u
41+
b3BxcnN0dXZ3eHl6QUJDREVGR0hJSktMTU5PUFFSU1RVVldYWVowMSIsInJvbGUiOiJhc3Npc3Rh
42+
bnQifRl7p7oAAAC3AAAAVzLKzzoLOmV2ZW50LXR5cGUHABFjb250ZW50QmxvY2tEZWx0YQ06Y29u
43+
dGVudC10eXBlBwAQYXBwbGljYXRpb24vanNvbg06bWVzc2FnZS10eXBlBwAFZXZlbnR7ImNvbnRl
44+
bnRCbG9ja0luZGV4IjowLCJkZWx0YSI6eyJ0ZXh0IjoiSGkhIEknbSBhbiBBSSBsYW5ndWFnZSJ9
45+
LCJwIjoiYWJjZGVmZ2gifUn9+AsAAACUAAAAVsOsqngLOmV2ZW50LXR5cGUHABBjb250ZW50Qmxv
46+
Y2tTdG9wDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVl
47+
dmVudHsiY29udGVudEJsb2NrSW5kZXgiOjAsInAiOiJhYmNkZWZnaGlqa2xtbm9wIn3KsHRKAAAA
48+
pgAAAFGGKdQ9CzpldmVudC10eXBlBwALbWVzc2FnZVN0b3ANOmNvbnRlbnQtdHlwZQcAEGFwcGxp
49+
Y2F0aW9uL2pzb24NOm1lc3NhZ2UtdHlwZQcABWV2ZW50eyJwIjoiYWJjZGVmZ2hpamtsbW5vcHFy
50+
c3R1dnd4eXpBQkNERUZHSEkiLCJzdG9wUmVhc29uIjoibWF4X3Rva2VucyJ9eRUDZQAAAPUAAABO
51+
dJJs0ws6ZXZlbnQtdHlwZQcACG1ldGFkYXRhDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9q
52+
c29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsibWV0cmljcyI6eyJsYXRlbmN5TXMiOjY2NH0sInAi
53+
OiJhYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ekFCQ0RFRkdISUpLTE1OT1BRUlNUVVZXWFlaMDEi
54+
LCJ1c2FnZSI6eyJpbnB1dFRva2VucyI6OCwib3V0cHV0VG9rZW5zIjoxMCwidG90YWxUb2tlbnMi
55+
OjE4fX3B+Dpy
56+
headers:
57+
Connection:
58+
- keep-alive
59+
Content-Type:
60+
- application/vnd.amazon.eventstream
61+
Date:
62+
- Mon, 27 Jan 2025 11:46:02 GMT
63+
Set-Cookie: test_set_cookie
64+
Transfer-Encoding:
65+
- chunked
66+
x-amzn-RequestId:
67+
- 657e0bef-5ebb-4387-be65-d3ceafd53dea
68+
status:
69+
code: 200
70+
message: OK
71+
version: 1
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
interactions:
2+
- request:
3+
body: '{"inputText": "Say this is a test", "textGenerationConfig": {"maxTokenCount":
4+
10, "temperature": 0.8, "topP": 1, "stopSequences": ["|"]}}'
5+
headers:
6+
Content-Length:
7+
- '137'
8+
User-Agent:
9+
- !!binary |
10+
Qm90bzMvMS4zNS41NiBtZC9Cb3RvY29yZSMxLjM1LjU2IHVhLzIuMCBvcy9saW51eCM2LjEuMC0x
11+
MDM0LW9lbSBtZC9hcmNoI3g4Nl82NCBsYW5nL3B5dGhvbiMzLjEwLjEyIG1kL3B5aW1wbCNDUHl0
12+
aG9uIGNmZy9yZXRyeS1tb2RlI2xlZ2FjeSBCb3RvY29yZS8xLjM1LjU2
13+
X-Amz-Date:
14+
- !!binary |
15+
MjAyNTAxMjdUMTIwMTU0Wg==
16+
X-Amz-Security-Token:
17+
- test_aws_security_token
18+
X-Amzn-Trace-Id:
19+
- !!binary |
20+
Um9vdD0xLWJhYTFjOTdhLTI3M2UxYTlhYjIyMTM1NGQwN2JjNGNhYztQYXJlbnQ9OTVhNmQzZGEx
21+
YTZkZjM4ZjtTYW1wbGVkPTE=
22+
amz-sdk-invocation-id:
23+
- !!binary |
24+
ZWQxZGViZmQtZTE5NS00N2RiLWIyMzItMTY1MzJhYjQzZTM0
25+
amz-sdk-request:
26+
- !!binary |
27+
YXR0ZW1wdD0x
28+
authorization:
29+
- Bearer test_aws_authorization
30+
method: POST
31+
uri: https://bedrock-runtime.us-east-1.amazonaws.com/model/amazon.titan-text-lite-v1/invoke-with-response-stream
32+
response:
33+
body:
34+
string: !!binary |
35+
AAACBAAAAEs8ZEC6CzpldmVudC10eXBlBwAFY2h1bmsNOmNvbnRlbnQtdHlwZQcAEGFwcGxpY2F0
36+
aW9uL2pzb24NOm1lc3NhZ2UtdHlwZQcABWV2ZW50eyJieXRlcyI6ImV5SnZkWFJ3ZFhSVVpYaDBJ
37+
am9pSUdOdmJXMWxiblJjYmtobGJHeHZJU0JKSUdGdElHRWdZMjl0Y0hWMFpYSWdjSEp2WjNKaGJT
38+
QmtaWE5wWjI1bFpDSXNJbWx1WkdWNElqb3dMQ0owYjNSaGJFOTFkSEIxZEZSbGVIUlViMnRsYmtO
39+
dmRXNTBJam94TUN3aVkyOXRjR3hsZEdsdmJsSmxZWE52YmlJNklreEZUa2RVU0NJc0ltbHVjSFYw
40+
VkdWNGRGUnZhMlZ1UTI5MWJuUWlPalVzSW1GdFlYcHZiaTFpWldSeWIyTnJMV2x1ZG05allYUnBi
41+
MjVOWlhSeWFXTnpJanA3SW1sdWNIVjBWRzlyWlc1RGIzVnVkQ0k2TlN3aWIzVjBjSFYwVkc5clpX
42+
NURiM1Z1ZENJNk1UQXNJbWx1ZG05allYUnBiMjVNWVhSbGJtTjVJam8yTnpRc0ltWnBjbk4wUW5s
43+
MFpVeGhkR1Z1WTNraU9qWTNNMzE5IiwicCI6ImFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6In2J
44+
Hw51
45+
headers:
46+
Connection:
47+
- keep-alive
48+
Content-Type:
49+
- application/vnd.amazon.eventstream
50+
Date:
51+
- Mon, 27 Jan 2025 12:01:55 GMT
52+
Set-Cookie: test_set_cookie
53+
Transfer-Encoding:
54+
- chunked
55+
X-Amzn-Bedrock-Content-Type:
56+
- application/json
57+
x-amzn-RequestId:
58+
- 1eb1af77-fb2f-400f-9bf8-049e38b90f02
59+
status:
60+
code: 200
61+
message: OK
62+
version: 1

instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_bedrock.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
from __future__ import annotations
1616

1717
import json
18+
from unittest import mock
1819

1920
import boto3
2021
import pytest
22+
from botocore.eventstream import EventStream, EventStreamError
2123

2224
from opentelemetry.semconv._incubating.attributes.error_attributes import (
2325
ERROR_TYPE,
@@ -171,6 +173,65 @@ def test_converse_stream_with_content(
171173
assert len(logs) == 0
172174

173175

176+
@pytest.mark.skipif(
177+
BOTO3_VERSION < (1, 35, 56), reason="ConverseStream API not available"
178+
)
179+
@pytest.mark.vcr()
180+
def test_converse_stream_handles_event_stream_error(
181+
span_exporter,
182+
log_exporter,
183+
bedrock_runtime_client,
184+
instrument_with_content,
185+
):
186+
# pylint:disable=too-many-locals
187+
messages = [{"role": "user", "content": [{"text": "Say this is a test"}]}]
188+
189+
llm_model_value = "amazon.titan-text-lite-v1"
190+
max_tokens, temperature, top_p, stop_sequences = 10, 0.8, 1, ["|"]
191+
response = bedrock_runtime_client.converse_stream(
192+
messages=messages,
193+
modelId=llm_model_value,
194+
inferenceConfig={
195+
"maxTokens": max_tokens,
196+
"temperature": temperature,
197+
"topP": top_p,
198+
"stopSequences": stop_sequences,
199+
},
200+
)
201+
202+
with mock.patch.object(
203+
EventStream,
204+
"_parse_event",
205+
side_effect=EventStreamError(
206+
{"modelStreamErrorException": {}}, "ConverseStream"
207+
),
208+
):
209+
with pytest.raises(EventStreamError):
210+
for _event in response["stream"]:
211+
pass
212+
213+
(span,) = span_exporter.get_finished_spans()
214+
input_tokens, output_tokens, finish_reason = None, None, None
215+
assert_stream_completion_attributes(
216+
span,
217+
llm_model_value,
218+
input_tokens,
219+
output_tokens,
220+
finish_reason,
221+
"chat",
222+
top_p,
223+
temperature,
224+
max_tokens,
225+
stop_sequences,
226+
)
227+
228+
assert span.status.status_code == StatusCode.ERROR
229+
assert span.attributes[ERROR_TYPE] == "EventStreamError"
230+
231+
logs = log_exporter.get_finished_logs()
232+
assert len(logs) == 0
233+
234+
174235
@pytest.mark.skipif(
175236
BOTO3_VERSION < (1, 35, 56), reason="ConverseStream API not available"
176237
)
@@ -413,6 +474,56 @@ def test_invoke_model_with_response_stream_with_content(
413474
assert len(logs) == 0
414475

415476

477+
@pytest.mark.vcr()
478+
def test_invoke_model_with_response_stream_handles_stream_error(
479+
span_exporter,
480+
log_exporter,
481+
bedrock_runtime_client,
482+
instrument_with_content,
483+
):
484+
# pylint:disable=too-many-locals
485+
llm_model_value = "amazon.titan-text-lite-v1"
486+
max_tokens, temperature, top_p, stop_sequences = 10, 0.8, 1, ["|"]
487+
body = get_invoke_model_body(
488+
llm_model_value, max_tokens, temperature, top_p, stop_sequences
489+
)
490+
response = bedrock_runtime_client.invoke_model_with_response_stream(
491+
body=body,
492+
modelId=llm_model_value,
493+
)
494+
495+
# consume the stream in order to have it traced
496+
finish_reason = None
497+
input_tokens, output_tokens = None, None
498+
with mock.patch.object(
499+
EventStream,
500+
"_parse_event",
501+
side_effect=EventStreamError(
502+
{"modelStreamErrorException": {}}, "InvokeModelWithRespnseStream"
503+
),
504+
):
505+
with pytest.raises(EventStreamError):
506+
for _event in response["body"]:
507+
pass
508+
509+
(span,) = span_exporter.get_finished_spans()
510+
assert_stream_completion_attributes(
511+
span,
512+
llm_model_value,
513+
input_tokens,
514+
output_tokens,
515+
finish_reason,
516+
"text_completion",
517+
top_p,
518+
temperature,
519+
max_tokens,
520+
stop_sequences,
521+
)
522+
523+
logs = log_exporter.get_finished_logs()
524+
assert len(logs) == 0
525+
526+
416527
@pytest.mark.vcr()
417528
def test_invoke_model_with_response_stream_invalid_model(
418529
span_exporter,

0 commit comments

Comments
 (0)