Skip to content

Add Converse API support. DO NOT MERGE YET #1428

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion newrelic/hooks/external_aiobotocore.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,17 @@ async def wrap_client__make_api_call(wrapped, instance, args, kwargs):
response = await wrapped(*args, **kwargs)
except Exception as exc:
handle_bedrock_exception(
exc, is_embedding, model, span_id, trace_id, request_extractor, request_body, ft, transaction
exc,
is_embedding,
model,
span_id,
trace_id,
request_extractor,
request_body,
ft,
transaction,
kwargs,
is_converse=False,
)
raise

Expand Down
142 changes: 137 additions & 5 deletions newrelic/hooks/external_botocore.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ def extractor_string(*args, **kwargs):


def bedrock_error_attributes(exception, bedrock_attrs):
# In some cases, such as a botocore.exceptions.ParamValidationError, the exception may not have a response attr
# We still want to record the error, so we add `error: True` to bedrock_attrs immediately
response = getattr(exception, "response", None)
if not response:
return bedrock_attrs
Expand Down Expand Up @@ -534,10 +536,23 @@ def extract_bedrock_cohere_model_streaming_response(response_body, bedrock_attrs


def handle_bedrock_exception(
exc, is_embedding, model, span_id, trace_id, request_extractor, request_body, ft, transaction
exc, is_embedding, model, span_id, trace_id, request_extractor, request_body, ft, transaction, kwargs, is_converse
):
try:
bedrock_attrs = {"model": model, "span_id": span_id, "trace_id": trace_id}
if is_converse:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple thoughts on the design of this:

I wonder if there's a way you could move this if is_converse block into a request_extractor? Have you explored that at all/do you think that's worth exploring?

OR

Maybe converse should not use this extractor concept at all since it doesn't need to.

try:
input_message_list = [
{"role": "user", "content": result["text"]} for result in kwargs["messages"][-1].get("content", [])
]
if "system" in kwargs.keys():
input_message_list.append({"role": "system", "content": kwargs.get("system")[0].get("text")})
except Exception:
input_message_list = []

bedrock_attrs["input_message_list"] = input_message_list
bedrock_attrs["request.max_tokens"] = kwargs.get("inferenceConfig", {}).get("maxTokens", None)
bedrock_attrs["request.temperature"] = kwargs.get("inferenceConfig", {}).get("temperature", None)
try:
request_extractor(request_body, bedrock_attrs)
except json.decoder.JSONDecodeError:
Expand All @@ -546,6 +561,7 @@ def handle_bedrock_exception(
_logger.warning(REQUEST_EXTACTOR_FAILURE_LOG_MESSAGE, traceback.format_exception(*sys.exc_info()))

error_attributes = bedrock_error_attributes(exc, bedrock_attrs)

notice_error_attributes = {
"http.statusCode": error_attributes.get("http.statusCode"),
"error.message": error_attributes.get("error.message"),
Expand Down Expand Up @@ -669,7 +685,17 @@ def _wrap_bedrock_runtime_invoke_model(wrapped, instance, args, kwargs):
response = wrapped(*args, **kwargs)
except Exception as exc:
handle_bedrock_exception(
exc, is_embedding, model, span_id, trace_id, request_extractor, request_body, ft, transaction
exc,
is_embedding,
model,
span_id,
trace_id,
request_extractor,
request_body,
ft,
transaction,
kwargs,
is_converse=False,
)
raise

Expand Down Expand Up @@ -725,6 +751,113 @@ def _wrap_bedrock_runtime_invoke_model(wrapped, instance, args, kwargs):
return _wrap_bedrock_runtime_invoke_model


def wrap_bedrock_runtime_converse(response_streaming=False):
@function_wrapper
def _wrap_bedrock_runtime_converse(wrapped, instance, args, kwargs):
# Wrapped function only takes keyword arguments, no need for binding
transaction = current_transaction()
if not transaction:
return wrapped(*args, **kwargs)

settings = transaction.settings or global_settings
if not settings.ai_monitoring.enabled:
return wrapped(*args, **kwargs)

transaction.add_ml_model_info("Bedrock", BOTOCORE_VERSION)
transaction._add_agent_attribute("llm", True)

model = kwargs.get("modelId")
if not model:
return wrapped(*args, **kwargs)

# Extractors are not needed for Converse API since the request and response formats are consistent across models
request_extractor = response_extractor = stream_extractor = NULL_EXTRACTOR

function_name = wrapped.__name__
# Function trace may not be exited in this function in the case of streaming, so start manually
ft = FunctionTrace(name=function_name, group="Llm/completion/Bedrock")
ft.__enter__()

# Get trace information
available_metadata = get_trace_linking_metadata()
span_id = available_metadata.get("span.id")
trace_id = available_metadata.get("trace.id")

# Store data on instance to pass context to async instrumentation in aiobotocore
instance._nr_trace_id = trace_id
instance._nr_span_id = span_id
instance._nr_request_extractor = request_extractor
instance._nr_response_extractor = response_extractor
instance._nr_stream_extractor = stream_extractor
instance._nr_txn = transaction
instance._nr_ft = ft
instance._nr_response_streaming = response_streaming
instance._nr_settings = settings

# Add a bedrock flag to instance so we can determine when make_api_call instrumentation is hit from non-Bedrock paths and bypass it if so
instance._nr_is_bedrock = True

try:
# For aioboto3 clients, this will call make_api_call instrumentation in external_aiobotocore
response = wrapped(*args, **kwargs)
except Exception as exc:
handle_bedrock_exception(
exc, False, model, span_id, trace_id, request_extractor, {}, ft, transaction, kwargs, is_converse=True
)
raise

if not response or response_streaming and not settings.ai_monitoring.streaming.enabled:
ft.__exit__(None, None, None)
return response

# Let the instrumentation of make_api_call in the aioboto3 client handle it if we have an async case
if inspect.iscoroutine(response):
return response

response_headers = response.get("ResponseMetadata", {}).get("HTTPHeaders") or {}
input_message_list = []
# If a system message is supplied, it is under its own key in kwargs rather than with the other input messages
if "system" in kwargs.keys():
input_message_list.extend(
{"role": "system", "content": result["text"]} for result in kwargs.get("system", [])
)

# kwargs["messages"] can hold multiple requests and responses to maintain conversation history
# We grab the last message (the newest request) in the list each time, so we don't duplicate recorded data
input_message_list.extend(
[{"role": "user", "content": result["text"]} for result in kwargs["messages"][-1].get("content", [])]
)

output_message_list = [
{"role": "assistant", "content": result["text"]}
for result in response.get("output").get("message").get("content", [])
]

bedrock_attrs = {
"request_id": response_headers.get("x-amzn-requestid"),
"model": model,
"span_id": span_id,
"trace_id": trace_id,
"response.choices.finish_reason": response.get("stopReason"),
"output_message_list": output_message_list,
"request.max_tokens": kwargs.get("inferenceConfig", {}).get("maxTokens", None),
"request.temperature": kwargs.get("inferenceConfig", {}).get("temperature", None),
"input_message_list": input_message_list,
}

try:
ft.__exit__(None, None, None)
bedrock_attrs["duration"] = ft.duration * 1000
run_bedrock_response_extractor(response_extractor, {}, bedrock_attrs, False, transaction)

except Exception:
_logger.warning(RESPONSE_PROCESSING_FAILURE_LOG_MESSAGE, traceback.format_exception(*sys.exc_info()))

return response

return _wrap_bedrock_runtime_converse


class EventStreamWrapper(ObjectProxy):
def __iter__(self):
g = GeneratorProxy(self.__wrapped__.__iter__())
Expand Down Expand Up @@ -905,7 +1038,6 @@ def handle_embedding_event(transaction, bedrock_attrs):

def handle_chat_completion_event(transaction, bedrock_attrs):
chat_completion_id = str(uuid.uuid4())

# Grab LLM-related custom attributes off of the transaction to store as metadata on LLM events
custom_attrs_dict = transaction._custom_params
llm_metadata_dict = {key: value for key, value in custom_attrs_dict.items() if key.startswith("llm.")}
Expand Down Expand Up @@ -944,7 +1076,6 @@ def handle_chat_completion_event(transaction, bedrock_attrs):
}
chat_completion_summary_dict.update(llm_metadata_dict)
chat_completion_summary_dict = {k: v for k, v in chat_completion_summary_dict.items() if v is not None}

transaction.record_custom_event("LlmChatCompletionSummary", chat_completion_summary_dict)

create_chat_completion_message_event(
Expand Down Expand Up @@ -1390,6 +1521,7 @@ def wrap_serialize_to_request(wrapped, instance, args, kwargs):
("bedrock-runtime", "invoke_model_with_response_stream"): wrap_bedrock_runtime_invoke_model(
response_streaming=True
),
("bedrock-runtime", "converse"): wrap_bedrock_runtime_converse(response_streaming=False),
}


Expand All @@ -1399,8 +1531,8 @@ def bind__create_api_method(py_operation_name, operation_name, service_model, *a

def _nr_clientcreator__create_api_method_(wrapped, instance, args, kwargs):
(py_operation_name, service_model) = bind__create_api_method(*args, **kwargs)

service_name = service_model.service_name.lower()

tracer = CUSTOM_TRACE_POINTS.get((service_name, py_operation_name))

wrapped = wrapped(*args, **kwargs)
Expand Down
2 changes: 1 addition & 1 deletion tests/external_aiobotocore/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import moto.server
import pytest
import werkzeug.serving
from external_botocore._mock_external_bedrock_server import MockExternalBedrockServer
from external_botocore._mock_external_bedrock_server_invoke_model import MockExternalBedrockServer
from testing_support.fixture.event_loop import event_loop as loop
from testing_support.fixtures import collector_agent_registration_fixture, collector_available_fixture

Expand Down
120 changes: 120 additions & 0 deletions tests/external_botocore/_mock_external_bedrock_server_converse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json

from testing_support.mock_external_http_server import MockExternalHTTPServer

RESPONSES = {
"What is 212 degrees Fahrenheit converted to Celsius?": [
{"Content-Type": "application/json", "x-amzn-RequestId": "c20d345e-6878-4778-b674-6b187bae8ecf"},
200,
{
"metrics": {"latencyMs": 1866},
"output": {
"message": {
"content": [
{
"text": "To convert 212°F to Celsius, we can use the formula:\n\nC = (F - 32) × 5/9\n\nWhere:\nC is the temperature in Celsius\nF is the temperature in Fahrenheit\n\nPlugging in 212°F, we get:\n\nC = (212 - 32) × 5/9\nC = 180 × 5/9\nC = 100\n\nTherefore, 212°"
}
],
"role": "assistant",
}
},
"stopReason": "max_tokens",
"usage": {"inputTokens": 26, "outputTokens": 100, "totalTokens": 126},
},
],
"Invalid Token": [
{
"Content-Type": "application/json",
"x-amzn-RequestId": "e1206e19-2318-4a9d-be98-017c73f06118",
"x-amzn-ErrorType": "UnrecognizedClientException:http://internal.amazon.com/coral/com.amazon.coral.service/",
},
403,
{"message": "The security token included in the request is invalid."},
],
"Model does not exist.": [
{
"Content-Type": "application/json",
"x-amzn-RequestId": "f4908827-3db9-4742-9103-2bbc34578b03",
"x-amzn-ErrorType": "ValidationException:http://internal.amazon.com/coral/com.amazon.bedrock/",
},
400,
{"message": "The provided model identifier is invalid."},
],
}


def simple_get(self):
content_len = int(self.headers.get("content-length"))
body = self.rfile.read(content_len).decode("utf-8")
try:
content = json.loads(body)
except Exception:
content = body

prompt = extract_shortened_prompt_converse(content)
if not prompt:
self.send_response(500)
self.end_headers()
self.wfile.write(b"Could not parse prompt.")
return

headers, status_code, response = ({}, 0, "")

for k, v in RESPONSES.items():
if prompt.startswith(k):
headers, status_code, response = v
break

if not response:
# If no matches found
self.send_response(500)
self.end_headers()
self.wfile.write(f"Unknown Prompt:\n{prompt}".encode())
return

# Send response code
self.send_response(status_code)

# Send headers
for k, v in headers.items():
self.send_header(k, v)
self.end_headers()

# Send response body
response_body = json.dumps(response).encode("utf-8")

self.wfile.write(response_body)
return


def extract_shortened_prompt_converse(content):
try:
prompt = content["messages"][0].get("content")[0].get("text", None)
# Sometimes there are leading whitespaces in the prompt.
prompt = prompt.lstrip().split("\n")[0]
except Exception:
prompt = ""
return prompt


class MockExternalBedrockConverseServer(MockExternalHTTPServer):
# To use this class in a test one needs to start and stop this server
# before and after making requests to the test app that makes the external
# calls.

def __init__(self, handler=simple_get, port=None, *args, **kwargs):
super().__init__(handler=handler, port=port, *args, **kwargs) # noqa: B026
Loading
Loading