Skip to content
Merged
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3524](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3524))
- `opentelemetry-instrumentation-grpc`: support non-list interceptors
([#3520](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3520))
- `opentelemetry-instrumentation-botocore` Ensure spans end on early stream closure for Bedrock Streaming APIs
([#3481](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3481))

### Breaking changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,18 +499,20 @@ def _converse_on_success(
[stop_reason],
)

event_logger = instrumentor_context.event_logger
choice = _Choice.from_converse(result, capture_content)
# this path is used by streaming apis, in that case we are already out of the span
# context so need to add the span context manually
span_ctx = span.get_span_context()
event_logger.emit(
choice.to_choice_event(
trace_id=span_ctx.trace_id,
span_id=span_ctx.span_id,
trace_flags=span_ctx.trace_flags,
# In case of an early stream closure, the result may not contain outputs
if self._stream_has_output_content(result):
event_logger = instrumentor_context.event_logger
choice = _Choice.from_converse(result, capture_content)
# this path is used by streaming apis, in that case we are already out of the span
# context so need to add the span context manually
span_ctx = span.get_span_context()
event_logger.emit(
choice.to_choice_event(
trace_id=span_ctx.trace_id,
span_id=span_ctx.span_id,
trace_flags=span_ctx.trace_flags,
)
)
)

metrics = instrumentor_context.metrics
metrics_attributes = self._extract_metrics_attributes()
Expand Down Expand Up @@ -602,11 +604,14 @@ def _on_stream_error_callback(
span: Span,
exception,
instrumentor_context: _BotocoreInstrumentorContext,
span_ended: bool,
):
span.set_status(Status(StatusCode.ERROR, str(exception)))
if span.is_recording():
span.set_attribute(ERROR_TYPE, type(exception).__qualname__)
span.end()

if not span_ended:
span.end()

metrics = instrumentor_context.metrics
metrics_attributes = {
Expand Down Expand Up @@ -638,15 +643,17 @@ def on_success(
result["stream"], EventStream
):

def stream_done_callback(response):
def stream_done_callback(response, span_ended):
self._converse_on_success(
span, response, instrumentor_context, capture_content
)
span.end()

def stream_error_callback(exception):
if not span_ended:
span.end()

def stream_error_callback(exception, span_ended):
self._on_stream_error_callback(
span, exception, instrumentor_context
span, exception, instrumentor_context, span_ended
)

result["stream"] = ConverseStreamWrapper(
Expand Down Expand Up @@ -677,16 +684,17 @@ def stream_error_callback(exception):
elif self._call_context.operation == "InvokeModelWithResponseStream":
if "body" in result and isinstance(result["body"], EventStream):

def invoke_model_stream_done_callback(response):
def invoke_model_stream_done_callback(response, span_ended):
# the callback gets data formatted as the simpler converse API
self._converse_on_success(
span, response, instrumentor_context, capture_content
)
span.end()
if not span_ended:
span.end()

def invoke_model_stream_error_callback(exception):
def invoke_model_stream_error_callback(exception, span_ended):
self._on_stream_error_callback(
span, exception, instrumentor_context
span, exception, instrumentor_context, span_ended
)

result["body"] = InvokeModelWithResponseStreamWrapper(
Expand Down Expand Up @@ -781,9 +789,11 @@ def _handle_amazon_nova_response(
GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stopReason"]]
)

event_logger = instrumentor_context.event_logger
choice = _Choice.from_converse(response_body, capture_content)
event_logger.emit(choice.to_choice_event())
# In case of an early stream closure, the result may not contain outputs
if self._stream_has_output_content(response_body):
event_logger = instrumentor_context.event_logger
choice = _Choice.from_converse(response_body, capture_content)
event_logger.emit(choice.to_choice_event())

metrics = instrumentor_context.metrics
metrics_attributes = self._extract_metrics_attributes()
Expand Down Expand Up @@ -1004,3 +1014,8 @@ def on_error(
duration,
attributes=metrics_attributes,
)

def _stream_has_output_content(self, response_body: dict[str, Any]):
return (
"output" in response_body and "message" in response_body["output"]
)
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,15 @@ def __init__(
self._message = None
self._content_block = {}
self._record_message = False
self._ended = False

def __iter__(self):
try:
for event in self.__wrapped__:
self._process_event(event)
yield event
except EventStreamError as exc:
self._stream_error_callback(exc)
self._handle_stream_error(exc)
raise

def _process_event(self, event):
Expand Down Expand Up @@ -133,11 +134,23 @@ def _process_event(self, event):

if output_tokens := usage.get("outputTokens"):
self._response["usage"]["outputTokens"] = output_tokens

self._stream_done_callback(self._response)
self._complete_stream(self._response)

return

def close(self):
self.__wrapped__.close()
# Treat the stream as done to ensure the span end.
self._complete_stream(self._response)

def _complete_stream(self, response):
self._stream_done_callback(response, self._ended)
self._ended = True

def _handle_stream_error(self, exc):
self._stream_error_callback(exc, self._ended)
self._ended = True


# pylint: disable=abstract-method
class InvokeModelWithResponseStreamWrapper(ObjectProxy):
Expand All @@ -163,14 +176,28 @@ def __init__(
self._content_block = {}
self._tool_json_input_buf = ""
self._record_message = False
self._ended = False

def close(self):
self.__wrapped__.close()
# Treat the stream as done to ensure the span end.
self._stream_done_callback(self._response, self._ended)

def _complete_stream(self, response):
self._stream_done_callback(response, self._ended)
self._ended = True

def _handle_stream_error(self, exc):
self._stream_error_callback(exc, self._ended)
self._ended = True

def __iter__(self):
try:
for event in self.__wrapped__:
self._process_event(event)
yield event
except EventStreamError as exc:
self._stream_error_callback(exc)
self._handle_stream_error(exc)
raise

def _process_event(self, event):
Expand Down Expand Up @@ -213,7 +240,7 @@ def _process_amazon_titan_chunk(self, chunk):
self._response["output"] = {
"message": {"content": [{"text": chunk["outputText"]}]}
}
self._stream_done_callback(self._response)
self._complete_stream(self._response)

def _process_amazon_nova_chunk(self, chunk):
# pylint: disable=too-many-branches
Expand Down Expand Up @@ -283,7 +310,7 @@ def _process_amazon_nova_chunk(self, chunk):
if output_tokens := usage.get("outputTokens"):
self._response["usage"]["outputTokens"] = output_tokens

self._stream_done_callback(self._response)
self._complete_stream(self._response)
return

def _process_anthropic_claude_chunk(self, chunk):
Expand Down Expand Up @@ -355,7 +382,7 @@ def _process_anthropic_claude_chunk(self, chunk):
self._record_message = False
self._message = None

self._stream_done_callback(self._response)
self._complete_stream(self._response)
return


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
interactions:
- request:
body: |-
{
"messages": [
{
"role": "user",
"content": [
{
"text": "Say this is a test"
}
]
}
],
"inferenceConfig": {
"maxTokens": 10,
"temperature": 0.8,
"topP": 1,
"stopSequences": [
"|"
]
}
}
headers:
Content-Length:
- '170'
Content-Type:
- application/json
User-Agent:
- Boto3/1.35.56 md/Botocore#1.35.56 ua/2.0 os/macos#24.4.0 md/arch#arm64 lang/python#3.12.0
md/pyimpl#CPython cfg/retry-mode#legacy Botocore/1.35.56
X-Amz-Date:
- 20250509T104610Z
X-Amz-Security-Token:
- test_aws_security_token
X-Amzn-Trace-Id:
- Root=1-98fd1391-6f6d1be3cc209d9ba2fb3034;Parent=6dc095915efed44e;Sampled=1
amz-sdk-invocation-id:
- 3a37ec88-d989-4397-bfbe-d463b66a3a08
amz-sdk-request:
- attempt=1
authorization:
- Bearer test_aws_authorization
method: POST
uri: https://bedrock-runtime.us-east-1.amazonaws.com/model/amazon.titan-text-lite-v1/converse-stream
response:
body:
string: !!binary |
AAAApAAAAFJl4NbnCzpldmVudC10eXBlBwAMbWVzc2FnZVN0YXJ0DTpjb250ZW50LXR5cGUHABBh
cHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsicCI6ImFiY2RlZmdoaWprbG1u
b3BxcnN0dXZ3eHl6QUJDREVGR0hJSktMTSIsInJvbGUiOiJhc3Npc3RhbnQifSKBqDQAAADZAAAA
VxTIBhYLOmV2ZW50LXR5cGUHABFjb250ZW50QmxvY2tEZWx0YQ06Y29udGVudC10eXBlBwAQYXBw
bGljYXRpb24vanNvbg06bWVzc2FnZS10eXBlBwAFZXZlbnR7ImNvbnRlbnRCbG9ja0luZGV4Ijow
LCJkZWx0YSI6eyJ0ZXh0IjoiSGksIGRpZCB5b3UgaGF2ZSBhIHF1ZXN0aW9uIn0sInAiOiJhYmNk
ZWZnaGlqa2xtbm9wcXJzdHV2d3h5ekFCQ0RFRkdISUpLIn22h6U3AAAAqgAAAFbdvayfCzpldmVu
dC10eXBlBwAQY29udGVudEJsb2NrU3RvcA06Y29udGVudC10eXBlBwAQYXBwbGljYXRpb24vanNv
bg06bWVzc2FnZS10eXBlBwAFZXZlbnR7ImNvbnRlbnRCbG9ja0luZGV4IjowLCJwIjoiYWJjZGVm
Z2hpamtsbW5vcHFyc3R1dnd4eXpBQkNERUZHSElKS0wifQSjvQkAAAChAAAAUTQJCC0LOmV2ZW50
LXR5cGUHAAttZXNzYWdlU3RvcA06Y29udGVudC10eXBlBwAQYXBwbGljYXRpb24vanNvbg06bWVz
c2FnZS10eXBlBwAFZXZlbnR7InAiOiJhYmNkZWZnaGlqa2xtbm9wcXJzdHV2d3h5ekFCQ0QiLCJz
dG9wUmVhc29uIjoibWF4X3Rva2VucyJ9jxPapgAAAMsAAABOaoNqNAs6ZXZlbnQtdHlwZQcACG1l
dGFkYXRhDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVl
dmVudHsibWV0cmljcyI6eyJsYXRlbmN5TXMiOjYyNX0sInAiOiJhYmNkZWZnaGlqa2wiLCJ1c2Fn
ZSI6eyJpbnB1dFRva2VucyI6OCwib3V0cHV0VG9rZW5zIjoxMCwidG90YWxUb2tlbnMiOjE4fX0B
/Sts
headers:
Connection:
- keep-alive
Content-Type:
- application/vnd.amazon.eventstream
Date:
- Fri, 09 May 2025 10:46:10 GMT
Set-Cookie: test_set_cookie
Transfer-Encoding:
- chunked
x-amzn-RequestId:
- 5aaaa521-d2c6-4980-ba0a-7bba19633a40
status:
code: 200
message: OK
version: 1
Loading