Skip to content

Add Converse API support. DO NOT MERGE YET #1428

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 55 additions & 34 deletions newrelic/hooks/external_aiobotocore.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import sys
import traceback
from io import BytesIO

from aiobotocore.response import StreamingBody
Expand All @@ -23,7 +22,9 @@
from newrelic.hooks.external_botocore import (
EMBEDDING_STREAMING_UNSUPPORTED_LOG_MESSAGE,
RESPONSE_PROCESSING_FAILURE_LOG_MESSAGE,
REQUEST_EXTRACTOR_FAILURE_LOG_MESSAGE,
AsyncEventStreamWrapper,
extract_bedrock_converse_attrs,
handle_bedrock_exception,
run_bedrock_request_extractor,
run_bedrock_response_extractor,
Expand Down Expand Up @@ -97,23 +98,36 @@ async def wrap_client__make_api_call(wrapped, instance, args, kwargs):
response_extractor = getattr(instance, "_nr_response_extractor", None)
stream_extractor = getattr(instance, "_nr_stream_extractor", None)
response_streaming = getattr(instance, "_nr_response_streaming", False)

is_converse = getattr(instance, "_nr_is_converse", False)
ft = getattr(instance, "_nr_ft", None)

if len(args) >= 2:
model = args[1].get("modelId")
request_body = args[1].get("body")
is_embedding = "embed" in model
else:
model = ""
request_body = None
is_embedding = False
try:
bedrock_args = args[1] if len(args) >= 2 else {}
model = bedrock_args.get("modelId") if bedrock_args else ""
is_embedding = "embed" in model if model else False

if is_converse:
request_body = {}
else:
request_body = bedrock_args.get("body") if bedrock_args else ""
except Exception:
_logger.warning(REQUEST_EXTRACTOR_FAILURE_LOG_MESSAGE, exc_info=True)

try:
response = await wrapped(*args, **kwargs)
except Exception as exc:
handle_bedrock_exception(
exc, is_embedding, model, span_id, trace_id, request_extractor, request_body, ft, transaction
exc,
is_embedding,
model,
span_id,
trace_id,
request_extractor,
request_body,
ft,
transaction,
bedrock_args,
is_converse,
)
raise

Expand All @@ -132,36 +146,43 @@ async def wrap_client__make_api_call(wrapped, instance, args, kwargs):
return response

response_headers = response.get("ResponseMetadata", {}).get("HTTPHeaders") or {}
bedrock_attrs = {
"request_id": response_headers.get("x-amzn-requestid"),
"model": model,
"span_id": span_id,
"trace_id": trace_id,
}

run_bedrock_request_extractor(request_extractor, request_body, bedrock_attrs)

try:
if response_streaming:
# Wrap EventStream object here to intercept __iter__ method instead of instrumenting class.
# This class is used in numerous other services in botocore, and would cause conflicts.
response["body"] = body = AsyncEventStreamWrapper(response["body"])
body._nr_ft = ft or None
body._nr_bedrock_attrs = bedrock_attrs or {}
body._nr_model_extractor = stream_extractor or None
return response

# Read and replace response streaming bodies
response_body = await response["body"].read()
if is_converse:
response_body = {}
bedrock_attrs = extract_bedrock_converse_attrs(
args[1], response, response_headers, model, span_id, trace_id
)
else:
bedrock_attrs = {
"request_id": response_headers.get("x-amzn-requestid"),
"model": model,
"span_id": span_id,
"trace_id": trace_id,
}
# We only need to run the request extractor if invoke_model was called since the request formats are different
# across models
run_bedrock_request_extractor(request_extractor, request_body, bedrock_attrs)

if response_streaming:
# Wrap EventStream object here to intercept __iter__ method instead of instrumenting class.
# This class is used in numerous other services in botocore, and would cause conflicts.
response["body"] = body = AsyncEventStreamWrapper(response["body"])
body._nr_ft = ft or None
body._nr_bedrock_attrs = bedrock_attrs or {}
body._nr_model_extractor = stream_extractor or None
return response

# Read and replace response streaming bodies
response_body = await response["body"].read()
response["body"] = StreamingBody(AsyncBytesIO(response_body), len(response_body))

if ft:
ft.__exit__(None, None, None)
bedrock_attrs["duration"] = ft.duration * 1000
response["body"] = StreamingBody(AsyncBytesIO(response_body), len(response_body))
run_bedrock_response_extractor(response_extractor, response_body, bedrock_attrs, is_embedding, transaction)

except Exception:
_logger.warning(RESPONSE_PROCESSING_FAILURE_LOG_MESSAGE, traceback.format_exception(*sys.exc_info()))
_logger.warning(RESPONSE_PROCESSING_FAILURE_LOG_MESSAGE, exc_info=True)

return response

Expand Down
Loading
Loading