Skip to content

Commit df27592

Browse files
SrdjanLLxrmx
andauthored
Ensure spans complete on early stream closure for Bedrock Streaming APIs (open-telemetry#3481)
* Ensure spans end on early stream closure for Bedrock Streaming APIs * Add changelog * End span only if it's still recording in stream done callbacks. Assert that span status is unset. * Extract response output and stream content into helper functions * Update changelog after 1.33.0 release * Keep track of bedrock stream ending and close spans accordingly * Move stream output checks to a single method * Update CHANGELOG.md --------- Co-authored-by: Riccardo Magliocchetti <[email protected]>
1 parent c1a6895 commit df27592

8 files changed

+571
-30
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2727
([#3524](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3524))
2828
- `opentelemetry-instrumentation-grpc`: support non-list interceptors
2929
([#3520](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3520))
30+
- `opentelemetry-instrumentation-botocore` Ensure spans end on early stream closure for Bedrock Streaming APIs
31+
([#3481](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3481))
3032

3133
### Breaking changes
3234

instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -499,18 +499,20 @@ def _converse_on_success(
499499
[stop_reason],
500500
)
501501

502-
event_logger = instrumentor_context.event_logger
503-
choice = _Choice.from_converse(result, capture_content)
504-
# this path is used by streaming apis, in that case we are already out of the span
505-
# context so need to add the span context manually
506-
span_ctx = span.get_span_context()
507-
event_logger.emit(
508-
choice.to_choice_event(
509-
trace_id=span_ctx.trace_id,
510-
span_id=span_ctx.span_id,
511-
trace_flags=span_ctx.trace_flags,
502+
# In case of an early stream closure, the result may not contain outputs
503+
if self._stream_has_output_content(result):
504+
event_logger = instrumentor_context.event_logger
505+
choice = _Choice.from_converse(result, capture_content)
506+
# this path is used by streaming apis, in that case we are already out of the span
507+
# context so need to add the span context manually
508+
span_ctx = span.get_span_context()
509+
event_logger.emit(
510+
choice.to_choice_event(
511+
trace_id=span_ctx.trace_id,
512+
span_id=span_ctx.span_id,
513+
trace_flags=span_ctx.trace_flags,
514+
)
512515
)
513-
)
514516

515517
metrics = instrumentor_context.metrics
516518
metrics_attributes = self._extract_metrics_attributes()
@@ -602,11 +604,14 @@ def _on_stream_error_callback(
602604
span: Span,
603605
exception,
604606
instrumentor_context: _BotocoreInstrumentorContext,
607+
span_ended: bool,
605608
):
606609
span.set_status(Status(StatusCode.ERROR, str(exception)))
607610
if span.is_recording():
608611
span.set_attribute(ERROR_TYPE, type(exception).__qualname__)
609-
span.end()
612+
613+
if not span_ended:
614+
span.end()
610615

611616
metrics = instrumentor_context.metrics
612617
metrics_attributes = {
@@ -638,15 +643,17 @@ def on_success(
638643
result["stream"], EventStream
639644
):
640645

641-
def stream_done_callback(response):
646+
def stream_done_callback(response, span_ended):
642647
self._converse_on_success(
643648
span, response, instrumentor_context, capture_content
644649
)
645-
span.end()
646650

647-
def stream_error_callback(exception):
651+
if not span_ended:
652+
span.end()
653+
654+
def stream_error_callback(exception, span_ended):
648655
self._on_stream_error_callback(
649-
span, exception, instrumentor_context
656+
span, exception, instrumentor_context, span_ended
650657
)
651658

652659
result["stream"] = ConverseStreamWrapper(
@@ -677,16 +684,17 @@ def stream_error_callback(exception):
677684
elif self._call_context.operation == "InvokeModelWithResponseStream":
678685
if "body" in result and isinstance(result["body"], EventStream):
679686

680-
def invoke_model_stream_done_callback(response):
687+
def invoke_model_stream_done_callback(response, span_ended):
681688
# the callback gets data formatted as the simpler converse API
682689
self._converse_on_success(
683690
span, response, instrumentor_context, capture_content
684691
)
685-
span.end()
692+
if not span_ended:
693+
span.end()
686694

687-
def invoke_model_stream_error_callback(exception):
695+
def invoke_model_stream_error_callback(exception, span_ended):
688696
self._on_stream_error_callback(
689-
span, exception, instrumentor_context
697+
span, exception, instrumentor_context, span_ended
690698
)
691699

692700
result["body"] = InvokeModelWithResponseStreamWrapper(
@@ -781,9 +789,11 @@ def _handle_amazon_nova_response(
781789
GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stopReason"]]
782790
)
783791

784-
event_logger = instrumentor_context.event_logger
785-
choice = _Choice.from_converse(response_body, capture_content)
786-
event_logger.emit(choice.to_choice_event())
792+
# In case of an early stream closure, the result may not contain outputs
793+
if self._stream_has_output_content(response_body):
794+
event_logger = instrumentor_context.event_logger
795+
choice = _Choice.from_converse(response_body, capture_content)
796+
event_logger.emit(choice.to_choice_event())
787797

788798
metrics = instrumentor_context.metrics
789799
metrics_attributes = self._extract_metrics_attributes()
@@ -1004,3 +1014,8 @@ def on_error(
10041014
duration,
10051015
attributes=metrics_attributes,
10061016
)
1017+
1018+
def _stream_has_output_content(self, response_body: dict[str, Any]):
1019+
return (
1020+
"output" in response_body and "message" in response_body["output"]
1021+
)

instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,15 @@ def __init__(
6565
self._message = None
6666
self._content_block = {}
6767
self._record_message = False
68+
self._ended = False
6869

6970
def __iter__(self):
7071
try:
7172
for event in self.__wrapped__:
7273
self._process_event(event)
7374
yield event
7475
except EventStreamError as exc:
75-
self._stream_error_callback(exc)
76+
self._handle_stream_error(exc)
7677
raise
7778

7879
def _process_event(self, event):
@@ -133,11 +134,23 @@ def _process_event(self, event):
133134

134135
if output_tokens := usage.get("outputTokens"):
135136
self._response["usage"]["outputTokens"] = output_tokens
136-
137-
self._stream_done_callback(self._response)
137+
self._complete_stream(self._response)
138138

139139
return
140140

141+
def close(self):
142+
self.__wrapped__.close()
143+
# Treat the stream as done to ensure the span end.
144+
self._complete_stream(self._response)
145+
146+
def _complete_stream(self, response):
147+
self._stream_done_callback(response, self._ended)
148+
self._ended = True
149+
150+
def _handle_stream_error(self, exc):
151+
self._stream_error_callback(exc, self._ended)
152+
self._ended = True
153+
141154

142155
# pylint: disable=abstract-method
143156
class InvokeModelWithResponseStreamWrapper(ObjectProxy):
@@ -163,14 +176,28 @@ def __init__(
163176
self._content_block = {}
164177
self._tool_json_input_buf = ""
165178
self._record_message = False
179+
self._ended = False
180+
181+
def close(self):
182+
self.__wrapped__.close()
183+
# Treat the stream as done to ensure the span end.
184+
self._stream_done_callback(self._response, self._ended)
185+
186+
def _complete_stream(self, response):
187+
self._stream_done_callback(response, self._ended)
188+
self._ended = True
189+
190+
def _handle_stream_error(self, exc):
191+
self._stream_error_callback(exc, self._ended)
192+
self._ended = True
166193

167194
def __iter__(self):
168195
try:
169196
for event in self.__wrapped__:
170197
self._process_event(event)
171198
yield event
172199
except EventStreamError as exc:
173-
self._stream_error_callback(exc)
200+
self._handle_stream_error(exc)
174201
raise
175202

176203
def _process_event(self, event):
@@ -213,7 +240,7 @@ def _process_amazon_titan_chunk(self, chunk):
213240
self._response["output"] = {
214241
"message": {"content": [{"text": chunk["outputText"]}]}
215242
}
216-
self._stream_done_callback(self._response)
243+
self._complete_stream(self._response)
217244

218245
def _process_amazon_nova_chunk(self, chunk):
219246
# pylint: disable=too-many-branches
@@ -283,7 +310,7 @@ def _process_amazon_nova_chunk(self, chunk):
283310
if output_tokens := usage.get("outputTokens"):
284311
self._response["usage"]["outputTokens"] = output_tokens
285312

286-
self._stream_done_callback(self._response)
313+
self._complete_stream(self._response)
287314
return
288315

289316
def _process_anthropic_claude_chunk(self, chunk):
@@ -355,7 +382,7 @@ def _process_anthropic_claude_chunk(self, chunk):
355382
self._record_message = False
356383
self._message = None
357384

358-
self._stream_done_callback(self._response)
385+
self._complete_stream(self._response)
359386
return
360387

361388

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
interactions:
2+
- request:
3+
body: |-
4+
{
5+
"messages": [
6+
{
7+
"role": "user",
8+
"content": [
9+
{
10+
"text": "Say this is a test"
11+
}
12+
]
13+
}
14+
],
15+
"inferenceConfig": {
16+
"maxTokens": 10,
17+
"temperature": 0.8,
18+
"topP": 1,
19+
"stopSequences": [
20+
"|"
21+
]
22+
}
23+
}
24+
headers:
25+
Content-Length:
26+
- '170'
27+
Content-Type:
28+
- application/json
29+
User-Agent:
30+
- Boto3/1.35.56 md/Botocore#1.35.56 ua/2.0 os/macos#24.4.0 md/arch#arm64 lang/python#3.12.0
31+
md/pyimpl#CPython cfg/retry-mode#legacy Botocore/1.35.56
32+
X-Amz-Date:
33+
- 20250509T104610Z
34+
X-Amz-Security-Token:
35+
- test_aws_security_token
36+
X-Amzn-Trace-Id:
37+
- Root=1-98fd1391-6f6d1be3cc209d9ba2fb3034;Parent=6dc095915efed44e;Sampled=1
38+
amz-sdk-invocation-id:
39+
- 3a37ec88-d989-4397-bfbe-d463b66a3a08
40+
amz-sdk-request:
41+
- attempt=1
42+
authorization:
43+
- Bearer test_aws_authorization
44+
method: POST
45+
uri: https://bedrock-runtime.us-east-1.amazonaws.com/model/amazon.titan-text-lite-v1/converse-stream
46+
response:
47+
body:
48+
string: !!binary |
49+
AAAApAAAAFJl4NbnCzpldmVudC10eXBlBwAMbWVzc2FnZVN0YXJ0DTpjb250ZW50LXR5cGUHABBh
50+
cHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsicCI6ImFiY2RlZmdoaWprbG1u
51+
b3BxcnN0dXZ3eHl6QUJDREVGR0hJSktMTSIsInJvbGUiOiJhc3Npc3RhbnQifSKBqDQAAADZAAAA
52+
VxTIBhYLOmV2ZW50LXR5cGUHABFjb250ZW50QmxvY2tEZWx0YQ06Y29udGVudC10eXBlBwAQYXBw
53+
bGljYXRpb24vanNvbg06bWVzc2FnZS10eXBlBwAFZXZlbnR7ImNvbnRlbnRCbG9ja0luZGV4Ijow
54+
LCJkZWx0YSI6eyJ0ZXh0IjoiSGksIGRpZCB5b3UgaGF2ZSBhIHF1ZXN0aW9uIn0sInAiOiJhYmNk
55+
ZWZnaGlqa2xtbm9wcXJzdHV2d3h5ekFCQ0RFRkdISUpLIn22h6U3AAAAqgAAAFbdvayfCzpldmVu
56+
dC10eXBlBwAQY29udGVudEJsb2NrU3RvcA06Y29udGVudC10eXBlBwAQYXBwbGljYXRpb24vanNv
57+
bg06bWVzc2FnZS10eXBlBwAFZXZlbnR7ImNvbnRlbnRCbG9ja0luZGV4IjowLCJwIjoiYWJjZGVm
58+
Z2hpamtsbW5vcHFyc3R1dnd4eXpBQkNERUZHSElKS0wifQSjvQkAAAChAAAAUTQJCC0LOmV2ZW50
59+
LXR5cGUHAAttZXNzYWdlU3RvcA06Y29udGVudC10eXBlBwAQYXBwbGljYXRpb24vanNvbg06bWVz
60+
c2FnZS10eXBlBwAFZXZlbnR7InAiOiJhYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ekFCQ0QiLCJz
61+
dG9wUmVhc29uIjoibWF4X3Rva2VucyJ9jxPapgAAAMsAAABOaoNqNAs6ZXZlbnQtdHlwZQcACG1l
62+
dGFkYXRhDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVl
63+
dmVudHsibWV0cmljcyI6eyJsYXRlbmN5TXMiOjYyNX0sInAiOiJhYmNkZWZnaGlqa2wiLCJ1c2Fn
64+
ZSI6eyJpbnB1dFRva2VucyI6OCwib3V0cHV0VG9rZW5zIjoxMCwidG90YWxUb2tlbnMiOjE4fX0B
65+
/Sts
66+
headers:
67+
Connection:
68+
- keep-alive
69+
Content-Type:
70+
- application/vnd.amazon.eventstream
71+
Date:
72+
- Fri, 09 May 2025 10:46:10 GMT
73+
Set-Cookie: test_set_cookie
74+
Transfer-Encoding:
75+
- chunked
76+
x-amzn-RequestId:
77+
- 5aaaa521-d2c6-4980-ba0a-7bba19633a40
78+
status:
79+
code: 200
80+
message: OK
81+
version: 1

0 commit comments

Comments
 (0)