Skip to content

Commit d0bdb5a

Browse files
author
Jeel Mehta
committed
Fixed Streaming body issue
1 parent f6bf83c commit d0bdb5a

File tree

1 file changed

+14
-6
lines changed

1 file changed

+14
-6
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# SPDX-License-Identifier: Apache-2.0
33
import abc
44
import inspect
5+
import io
56
import json
67
import math
78
from typing import Any, Dict, Optional
@@ -320,16 +321,20 @@ def _set_if_not_none(attributes, key, value):
320321

321322
def on_success(self, span: Span, result: Dict[str, Any]):
322323
model_id = self._call_context.params.get(_MODEL_ID)
324+
323325
if not model_id:
324326
return
325327

326328
if "body" in result and isinstance(result["body"], StreamingBody):
329+
original_body = None
327330
try:
328-
# Read the entire content of the StreamingBody
329-
body_content = result["body"].read()
330-
# Decode the bytes to string and parse as JSON
331-
response_body = json.loads(body_content.decode("utf-8"))
331+
original_body = result["body"]
332+
body_content = original_body.read()
332333

334+
# Use one stream for telemetry
335+
stream = io.BytesIO(body_content)
336+
telemetry_content = stream.read()
337+
response_body = json.loads(telemetry_content.decode("utf-8"))
333338
if "amazon.titan" in model_id:
334339
self._handle_amazon_titan_response(span, response_body)
335340
elif "anthropic.claude" in model_id:
@@ -342,14 +347,17 @@ def on_success(self, span: Span, result: Dict[str, Any]):
342347
self._handle_ai21_jamba_response(span, response_body)
343348
elif "mistral" in model_id:
344349
self._handle_mistral_mistral_response(span, response_body)
350+
# Replenish stream for downstream application use
351+
new_stream = io.BytesIO(body_content)
352+
result["body"] = StreamingBody(new_stream, len(body_content))
345353

346354
except json.JSONDecodeError:
347355
print("Error: Unable to parse the response body as JSON")
348356
except Exception as e: # pylint: disable=broad-exception-caught, invalid-name
349357
print(f"Error processing response: {str(e)}")
350358
finally:
351-
# Make sure to close the stream
352-
result["body"].close()
359+
if original_body is not None:
360+
original_body.close()
353361

354362
# pylint: disable=no-self-use
355363
def _handle_amazon_titan_response(self, span: Span, response_body: Dict[str, Any]):

0 commit comments

Comments
 (0)