Skip to content

Commit 0cc1d15

Browse files
authored
Merge pull request #1318 from newrelic/feature-async-bedrock
Add support for Async Bedrock
2 parents bde0098 + e19e74d commit 0cc1d15

File tree

8 files changed

+1864
-72
lines changed

8 files changed

+1864
-72
lines changed

newrelic/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3983,6 +3983,8 @@ def _process_module_builtin_defaults():
39833983
)
39843984
_process_module_definition("gearman.worker", "newrelic.hooks.application_gearman", "instrument_gearman_worker")
39853985

3986+
_process_module_definition("aiobotocore.client", "newrelic.hooks.external_aiobotocore", "instrument_aiobotocore_client")
3987+
39863988
_process_module_definition(
39873989
"aiobotocore.endpoint", "newrelic.hooks.external_aiobotocore", "instrument_aiobotocore_endpoint"
39883990
)

newrelic/hooks/external_aiobotocore.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
12
# Copyright 2010 New Relic, Inc.
23
#
34
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -11,9 +12,37 @@
1112
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1213
# See the License for the specific language governing permissions and
1314
# limitations under the License.
15+
import logging
16+
import traceback
17+
import sys
18+
from aiobotocore.response import StreamingBody
19+
from io import BytesIO
1420

1521
from newrelic.api.external_trace import ExternalTrace
1622
from newrelic.common.object_wrapper import wrap_function_wrapper
23+
from newrelic.hooks.external_botocore import (
24+
AsyncEventStreamWrapper,
25+
handle_bedrock_exception,
26+
run_bedrock_response_extractor,
27+
run_bedrock_request_extractor,
28+
EMBEDDING_STREAMING_UNSUPPORTED_LOG_MESSAGE,
29+
RESPONSE_PROCESSING_FAILURE_LOG_MESSAGE,
30+
)
31+
32+
_logger = logging.getLogger(__name__)
33+
34+
35+
# Class from https://github.com/aio-libs/aiobotocore/blob/master/tests/test_response.py
36+
# aiobotocore Apache 2 license: https://github.com/aio-libs/aiobotocore/blob/master/LICENSE
37+
class AsyncBytesIO(BytesIO):
38+
def __init__(self, *args, **kwargs):
39+
super().__init__(*args, **kwargs)
40+
self.content = self
41+
42+
async def read(self, amt=-1):
43+
if amt == -1: # aiohttp to regular response
44+
amt = None
45+
return super().read(amt)
1746

1847

1948
def _bind_make_request_params(operation_model, request_dict, *args, **kwargs):
@@ -44,5 +73,102 @@ async def wrap_endpoint_make_request(wrapped, instance, args, kwargs):
4473
return result
4574

4675

76+
async def wrap_client__make_api_call(wrapped, instance, args, kwargs):
77+
# This instrumentation only applies to bedrock runtimes so exit if this method was hit through a different path
78+
if not hasattr(instance, "_nr_is_bedrock"):
79+
return await wrapped(*args, **kwargs)
80+
81+
transaction = getattr(instance, "_nr_txn", None)
82+
if not transaction:
83+
return await wrapped(*args, **kwargs)
84+
85+
settings = getattr(instance, "_nr_settings", None)
86+
87+
# Early exit if we can't access the shared settings object from invoke_model instrumentation
88+
# This settings object helps us determine if AIM was enabled as well as streaming
89+
if not (settings and settings.ai_monitoring.enabled):
90+
return await wrapped(*args, **kwargs)
91+
92+
# Grab all context data from botocore invoke_model instrumentation off the shared instance
93+
trace_id = getattr(instance, "_nr_trace_id", "")
94+
span_id = getattr(instance, "_nr_span_id", "")
95+
96+
request_extractor = getattr(instance, "_nr_request_extractor", None)
97+
response_extractor = getattr(instance, "_nr_response_extractor", None)
98+
stream_extractor = getattr(instance, "_nr_stream_extractor", None)
99+
response_streaming = getattr(instance, "_nr_response_streaming", False)
100+
101+
ft = getattr(instance, "_nr_ft", None)
102+
103+
if len(args) >= 2:
104+
model = args[1].get("modelId")
105+
request_body = args[1].get("body")
106+
is_embedding = "embed" in model
107+
else:
108+
model = ""
109+
request_body = None
110+
is_embedding = False
111+
112+
try:
113+
response = await wrapped(*args, **kwargs)
114+
except Exception as exc:
115+
handle_bedrock_exception(
116+
exc, is_embedding, model, span_id, trace_id, request_extractor, request_body, ft, transaction
117+
)
118+
119+
if not response or response_streaming and not settings.ai_monitoring.streaming.enabled:
120+
if ft:
121+
ft.__exit__(None, None, None)
122+
return response
123+
124+
if response_streaming and is_embedding:
125+
# This combination is not supported at time of writing, but may become
126+
# a supported feature in the future. Instrumentation will need to be written
127+
# if this becomes available.
128+
_logger.warning(EMBEDDING_STREAMING_UNSUPPORTED_LOG_MESSAGE)
129+
if ft:
130+
ft.__exit__(None, None, None)
131+
return response
132+
133+
response_headers = response.get("ResponseMetadata", {}).get("HTTPHeaders") or {}
134+
bedrock_attrs = {
135+
"request_id": response_headers.get("x-amzn-requestid"),
136+
"model": model,
137+
"span_id": span_id,
138+
"trace_id": trace_id,
139+
}
140+
141+
run_bedrock_request_extractor(request_extractor, request_body, bedrock_attrs)
142+
143+
try:
144+
if response_streaming:
145+
# Wrap EventStream object here to intercept __iter__ method instead of instrumenting class.
146+
# This class is used in numerous other services in botocore, and would cause conflicts.
147+
response["body"] = body = AsyncEventStreamWrapper(response["body"])
148+
body._nr_ft = ft or None
149+
body._nr_bedrock_attrs = bedrock_attrs or {}
150+
body._nr_model_extractor = stream_extractor or None
151+
return response
152+
153+
# Read and replace response streaming bodies
154+
response_body = await response["body"].read()
155+
156+
if ft:
157+
ft.__exit__(None, None, None)
158+
bedrock_attrs["duration"] = ft.duration * 1000
159+
response["body"] = StreamingBody(AsyncBytesIO(response_body), len(response_body))
160+
run_bedrock_response_extractor(response_extractor, response_body, bedrock_attrs, is_embedding, transaction)
161+
162+
except Exception:
163+
_logger.warning(RESPONSE_PROCESSING_FAILURE_LOG_MESSAGE % traceback.format_exception(*sys.exc_info()))
164+
165+
return response
166+
167+
47168
def instrument_aiobotocore_endpoint(module):
48169
wrap_function_wrapper(module, "AioEndpoint.make_request", wrap_endpoint_make_request)
170+
171+
172+
def instrument_aiobotocore_client(module):
173+
if hasattr(module, "AioBaseClient"):
174+
wrap_function_wrapper(module, "AioBaseClient._make_api_call", wrap_client__make_api_call)

0 commit comments

Comments
 (0)