Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions newrelic/hooks/external_aiobotocore.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,17 @@ async def wrap_client__make_api_call(wrapped, instance, args, kwargs):
bedrock_attrs = extract_bedrock_converse_attrs(
args[1], response, response_headers, model, span_id, trace_id
)

if response_streaming:
# Wrap EventStream object here to intercept __iter__ method instead of instrumenting class.
# This class is used in numerous other services in botocore, and would cause conflicts.
response["stream"] = stream = AsyncEventStreamWrapper(response["stream"])
stream._nr_ft = ft or None
stream._nr_bedrock_attrs = bedrock_attrs or {}
stream._nr_model_extractor = stream_extractor or None
stream._nr_is_converse = True
return response

else:
bedrock_attrs = {
"request_id": response_headers.get("x-amzn-requestid"),
Expand Down
200 changes: 120 additions & 80 deletions newrelic/hooks/external_botocore.py
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,16 @@ def _wrap_bedrock_runtime_converse(wrapped, instance, args, kwargs):
bedrock_attrs = extract_bedrock_converse_attrs(kwargs, response, response_headers, model, span_id, trace_id)

try:
if response_streaming:
# Wrap EventStream object here to intercept __iter__ method instead of instrumenting class.
# This class is used in numerous other services in botocore, and would cause conflicts.
response["stream"] = stream = EventStreamWrapper(response["stream"])
stream._nr_ft = ft
stream._nr_bedrock_attrs = bedrock_attrs
stream._nr_model_extractor = stream_extractor
stream._nr_is_converse = True
return response

ft.__exit__(None, None, None)
bedrock_attrs["duration"] = ft.duration * 1000
run_bedrock_response_extractor(response_extractor, {}, bedrock_attrs, False, transaction)
Expand All @@ -833,6 +843,7 @@ def _wrap_bedrock_runtime_converse(wrapped, instance, args, kwargs):

def extract_bedrock_converse_attrs(kwargs, response, response_headers, model, span_id, trace_id):
input_message_list = []
output_message_list = None
# If a system message is supplied, it is under its own key in kwargs rather than with the other input messages
if "system" in kwargs.keys():
input_message_list.extend({"role": "system", "content": result["text"]} for result in kwargs.get("system", []))
Expand All @@ -843,35 +854,129 @@ def extract_bedrock_converse_attrs(kwargs, response, response_headers, model, sp
[{"role": "user", "content": result["text"]} for result in kwargs["messages"][-1].get("content", [])]
)

output_message_list = [
{"role": "assistant", "content": result["text"]}
for result in response.get("output").get("message").get("content", [])
]
if "output" in response:
output_message_list = [
{"role": "assistant", "content": result["text"]}
for result in response.get("output").get("message").get("content", [])
]

bedrock_attrs = {
"request_id": response_headers.get("x-amzn-requestid"),
"model": model,
"span_id": span_id,
"trace_id": trace_id,
"response.choices.finish_reason": response.get("stopReason"),
"output_message_list": output_message_list,
"request.max_tokens": kwargs.get("inferenceConfig", {}).get("maxTokens", None),
"request.temperature": kwargs.get("inferenceConfig", {}).get("temperature", None),
"input_message_list": input_message_list,
}

if output_message_list is not None:
bedrock_attrs["output_message_list"] = output_message_list

return bedrock_attrs


class BedrockRecordEventMixin:
def record_events_on_stop_iteration(self, transaction):
if hasattr(self, "_nr_ft"):
bedrock_attrs = getattr(self, "_nr_bedrock_attrs", {})
self._nr_ft.__exit__(None, None, None)

# If there are no bedrock attrs exit early as there's no data to record.
if not bedrock_attrs:
return

try:
bedrock_attrs["duration"] = self._nr_ft.duration * 1000
handle_chat_completion_event(transaction, bedrock_attrs)
except Exception:
_logger.warning(RESPONSE_PROCESSING_FAILURE_LOG_MESSAGE, exc_info=True)

# Clear cached data as this can be very large.
self._nr_bedrock_attrs.clear()

def record_error(self, transaction, exc):
if hasattr(self, "_nr_ft"):
try:
ft = self._nr_ft
error_attributes = getattr(self, "_nr_bedrock_attrs", {})

# If there are no bedrock attrs exit early as there's no data to record.
if not error_attributes:
return

error_attributes = bedrock_error_attributes(exc, error_attributes)
notice_error_attributes = {
"http.statusCode": error_attributes.get("http.statusCode"),
"error.message": error_attributes.get("error.message"),
"error.code": error_attributes.get("error.code"),
}
notice_error_attributes.update({"completion_id": str(uuid.uuid4())})

ft.notice_error(attributes=notice_error_attributes)

ft.__exit__(*sys.exc_info())
error_attributes["duration"] = ft.duration * 1000

handle_chat_completion_event(transaction, error_attributes)

# Clear cached data as this can be very large.
error_attributes.clear()
except Exception:
_logger.warning(EXCEPTION_HANDLING_FAILURE_LOG_MESSAGE, exc_info=True)

def record_stream_chunk(self, event, transaction):
if event:
try:
if getattr(self, "_nr_is_converse", False):
return self.converse_record_stream_chunk(event, transaction)
else:
return self.invoke_record_stream_chunk(event, transaction)
except Exception:
_logger.warning(RESPONSE_EXTRACTOR_FAILURE_LOG_MESSAGE, exc_info=True)

def invoke_record_stream_chunk(self, event, transaction):
bedrock_attrs = getattr(self, "_nr_bedrock_attrs", {})
chunk = json.loads(event["chunk"]["bytes"].decode("utf-8"))
self._nr_model_extractor(chunk, bedrock_attrs)
# In Langchain, the bedrock iterator exits early if type is "content_block_stop".
# So we need to call the record events here since stop iteration will not be raised.
_type = chunk.get("type")
if _type == "content_block_stop":
self.record_events_on_stop_iteration(transaction)

def converse_record_stream_chunk(self, event, transaction):
bedrock_attrs = getattr(self, "_nr_bedrock_attrs", {})
if "contentBlockDelta" in event:
if not bedrock_attrs:
return

content = ((event.get("contentBlockDelta") or {}).get("delta") or {}).get("text", "")
if "output_message_list" not in bedrock_attrs:
bedrock_attrs["output_message_list"] = [{"role": "assistant", "content": ""}]
bedrock_attrs["output_message_list"][0]["content"] += content

if "messageStop" in event:
bedrock_attrs["response.choices.finish_reason"] = (event.get("messageStop") or {}).get("stopReason", "")

# TODO: Is this also subject to the content_block_stop behavior from Langchain?
# If so, that would preclude us from ever capturing the messageStop event with the stopReason.
# if "contentBlockStop" in event:
# self.record_events_on_stop_iteration(transaction)


class EventStreamWrapper(ObjectProxy):
def __iter__(self):
g = GeneratorProxy(self.__wrapped__.__iter__())
g._nr_ft = getattr(self, "_nr_ft", None)
g._nr_bedrock_attrs = getattr(self, "_nr_bedrock_attrs", {})
g._nr_model_extractor = getattr(self, "_nr_model_extractor", NULL_EXTRACTOR)
g._nr_is_converse = getattr(self, "_nr_is_converse", False)
return g


class GeneratorProxy(ObjectProxy):
class GeneratorProxy(BedrockRecordEventMixin, ObjectProxy):
def __init__(self, wrapped):
super().__init__(wrapped)

Expand All @@ -886,12 +991,12 @@ def __next__(self):
return_val = None
try:
return_val = self.__wrapped__.__next__()
record_stream_chunk(self, return_val, transaction)
self.record_stream_chunk(return_val, transaction)
except StopIteration:
record_events_on_stop_iteration(self, transaction)
self.record_events_on_stop_iteration(transaction)
raise
except Exception as exc:
record_error(self, transaction, exc)
self.record_error(transaction, exc)
raise
return return_val

Expand All @@ -905,13 +1010,11 @@ def __aiter__(self):
g._nr_ft = getattr(self, "_nr_ft", None)
g._nr_bedrock_attrs = getattr(self, "_nr_bedrock_attrs", {})
g._nr_model_extractor = getattr(self, "_nr_model_extractor", NULL_EXTRACTOR)
g._nr_is_converse = getattr(self, "_nr_is_converse", False)
return g


class AsyncGeneratorProxy(ObjectProxy):
def __init__(self, wrapped):
super().__init__(wrapped)

class AsyncGeneratorProxy(BedrockRecordEventMixin, ObjectProxy):
def __aiter__(self):
return self

Expand All @@ -922,83 +1025,19 @@ async def __anext__(self):
return_val = None
try:
return_val = await self.__wrapped__.__anext__()
record_stream_chunk(self, return_val, transaction)
self.record_stream_chunk(return_val, transaction)
except StopAsyncIteration:
record_events_on_stop_iteration(self, transaction)
self.record_events_on_stop_iteration(transaction)
raise
except Exception as exc:
record_error(self, transaction, exc)
self.record_error(transaction, exc)
raise
return return_val

async def aclose(self):
return await super().aclose()


def record_stream_chunk(self, return_val, transaction):
if return_val:
try:
chunk = json.loads(return_val["chunk"]["bytes"].decode("utf-8"))
self._nr_model_extractor(chunk, self._nr_bedrock_attrs)
# In Langchain, the bedrock iterator exits early if type is "content_block_stop".
# So we need to call the record events here since stop iteration will not be raised.
_type = chunk.get("type")
if _type == "content_block_stop":
record_events_on_stop_iteration(self, transaction)
except Exception:
_logger.warning(RESPONSE_EXTRACTOR_FAILURE_LOG_MESSAGE, exc_info=True)


def record_events_on_stop_iteration(self, transaction):
if hasattr(self, "_nr_ft"):
bedrock_attrs = getattr(self, "_nr_bedrock_attrs", {})
self._nr_ft.__exit__(None, None, None)

# If there are no bedrock attrs exit early as there's no data to record.
if not bedrock_attrs:
return

try:
bedrock_attrs["duration"] = self._nr_ft.duration * 1000
handle_chat_completion_event(transaction, bedrock_attrs)
except Exception:
_logger.warning(RESPONSE_PROCESSING_FAILURE_LOG_MESSAGE, exc_info=True)

# Clear cached data as this can be very large.
self._nr_bedrock_attrs.clear()


def record_error(self, transaction, exc):
if hasattr(self, "_nr_ft"):
try:
ft = self._nr_ft
error_attributes = getattr(self, "_nr_bedrock_attrs", {})

# If there are no bedrock attrs exit early as there's no data to record.
if not error_attributes:
return

error_attributes = bedrock_error_attributes(exc, error_attributes)
notice_error_attributes = {
"http.statusCode": error_attributes.get("http.statusCode"),
"error.message": error_attributes.get("error.message"),
"error.code": error_attributes.get("error.code"),
}
notice_error_attributes.update({"completion_id": str(uuid.uuid4())})

ft.notice_error(attributes=notice_error_attributes)

ft.__exit__(*sys.exc_info())
error_attributes["duration"] = ft.duration * 1000

handle_chat_completion_event(transaction, error_attributes)

# Clear cached data as this can be very large.
error_attributes.clear()
except Exception:
_logger.warning(EXCEPTION_HANDLING_FAILURE_LOG_MESSAGE, exc_info=True)


def handle_embedding_event(transaction, bedrock_attrs):
embedding_id = str(uuid.uuid4())

Expand Down Expand Up @@ -1529,6 +1568,7 @@ def wrap_serialize_to_request(wrapped, instance, args, kwargs):
response_streaming=True
),
("bedrock-runtime", "converse"): wrap_bedrock_runtime_converse(response_streaming=False),
("bedrock-runtime", "converse_stream"): wrap_bedrock_runtime_converse(response_streaming=True),
}


Expand Down
Loading
Loading