Skip to content

Commit c2f8fe4

Browse files
umaannamalaihmstepanekmergify[bot]TimPansino
authored
Add Converse API non-streaming support. (#1428)
* Add Converse API support. * Update aiobotocore instrumentation. * Add support for converse calls made with aioboto3 clients. * Linting fixes. * Review comments. * Remove comment from get call * Trigger tests * Lint --------- Co-authored-by: Hannah Stepanek <[email protected]> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: Tim Pansino <[email protected]>
1 parent 8683e2c commit c2f8fe4

10 files changed

+1514
-61
lines changed

newrelic/hooks/external_aiobotocore.py

Lines changed: 52 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,8 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
1415
import logging
15-
import sys
16-
import traceback
1716
from io import BytesIO
1817

1918
from aiobotocore.response import StreamingBody
@@ -22,8 +21,10 @@
2221
from newrelic.common.object_wrapper import wrap_function_wrapper
2322
from newrelic.hooks.external_botocore import (
2423
EMBEDDING_STREAMING_UNSUPPORTED_LOG_MESSAGE,
24+
REQUEST_EXTRACTOR_FAILURE_LOG_MESSAGE,
2525
RESPONSE_PROCESSING_FAILURE_LOG_MESSAGE,
2626
AsyncEventStreamWrapper,
27+
extract_bedrock_converse_attrs,
2728
handle_bedrock_exception,
2829
run_bedrock_request_extractor,
2930
run_bedrock_response_extractor,
@@ -97,23 +98,33 @@ async def wrap_client__make_api_call(wrapped, instance, args, kwargs):
9798
response_extractor = getattr(instance, "_nr_response_extractor", None)
9899
stream_extractor = getattr(instance, "_nr_stream_extractor", None)
99100
response_streaming = getattr(instance, "_nr_response_streaming", False)
100-
101+
is_converse = getattr(instance, "_nr_is_converse", False)
101102
ft = getattr(instance, "_nr_ft", None)
102103

103-
if len(args) >= 2:
104-
model = args[1].get("modelId")
105-
request_body = args[1].get("body")
106-
is_embedding = "embed" in model
107-
else:
108-
model = ""
109-
request_body = None
110-
is_embedding = False
104+
try:
105+
bedrock_args = args[1] if len(args) >= 2 else {}
106+
model = bedrock_args.get("modelId")
107+
is_embedding = "embed" in model if model else False
108+
109+
request_body = {} if is_converse else bedrock_args.get("body")
110+
except Exception:
111+
_logger.warning(REQUEST_EXTRACTOR_FAILURE_LOG_MESSAGE, exc_info=True)
111112

112113
try:
113114
response = await wrapped(*args, **kwargs)
114115
except Exception as exc:
115116
handle_bedrock_exception(
116-
exc, is_embedding, model, span_id, trace_id, request_extractor, request_body, ft, transaction
117+
exc,
118+
is_embedding,
119+
model,
120+
span_id,
121+
trace_id,
122+
request_extractor,
123+
request_body,
124+
ft,
125+
transaction,
126+
bedrock_args,
127+
is_converse,
117128
)
118129
raise
119130

@@ -132,36 +143,43 @@ async def wrap_client__make_api_call(wrapped, instance, args, kwargs):
132143
return response
133144

134145
response_headers = response.get("ResponseMetadata", {}).get("HTTPHeaders") or {}
135-
bedrock_attrs = {
136-
"request_id": response_headers.get("x-amzn-requestid"),
137-
"model": model,
138-
"span_id": span_id,
139-
"trace_id": trace_id,
140-
}
141-
142-
run_bedrock_request_extractor(request_extractor, request_body, bedrock_attrs)
143-
144146
try:
145-
if response_streaming:
146-
# Wrap EventStream object here to intercept __iter__ method instead of instrumenting class.
147-
# This class is used in numerous other services in botocore, and would cause conflicts.
148-
response["body"] = body = AsyncEventStreamWrapper(response["body"])
149-
body._nr_ft = ft or None
150-
body._nr_bedrock_attrs = bedrock_attrs or {}
151-
body._nr_model_extractor = stream_extractor or None
152-
return response
153-
154-
# Read and replace response streaming bodies
155-
response_body = await response["body"].read()
147+
if is_converse:
148+
response_body = {}
149+
bedrock_attrs = extract_bedrock_converse_attrs(
150+
args[1], response, response_headers, model, span_id, trace_id
151+
)
152+
else:
153+
bedrock_attrs = {
154+
"request_id": response_headers.get("x-amzn-requestid"),
155+
"model": model,
156+
"span_id": span_id,
157+
"trace_id": trace_id,
158+
}
159+
# We only need to run the request extractor if invoke_model was called since the request formats are different
160+
# across models
161+
run_bedrock_request_extractor(request_extractor, request_body, bedrock_attrs)
162+
163+
if response_streaming:
164+
# Wrap EventStream object here to intercept __iter__ method instead of instrumenting class.
165+
# This class is used in numerous other services in botocore, and would cause conflicts.
166+
response["body"] = body = AsyncEventStreamWrapper(response["body"])
167+
body._nr_ft = ft or None
168+
body._nr_bedrock_attrs = bedrock_attrs or {}
169+
body._nr_model_extractor = stream_extractor or None
170+
return response
171+
172+
# Read and replace response streaming bodies
173+
response_body = await response["body"].read()
174+
response["body"] = StreamingBody(AsyncBytesIO(response_body), len(response_body))
156175

157176
if ft:
158177
ft.__exit__(None, None, None)
159178
bedrock_attrs["duration"] = ft.duration * 1000
160-
response["body"] = StreamingBody(AsyncBytesIO(response_body), len(response_body))
161179
run_bedrock_response_extractor(response_extractor, response_body, bedrock_attrs, is_embedding, transaction)
162180

163181
except Exception:
164-
_logger.warning(RESPONSE_PROCESSING_FAILURE_LOG_MESSAGE, traceback.format_exception(*sys.exc_info()))
182+
_logger.warning(RESPONSE_PROCESSING_FAILURE_LOG_MESSAGE, exc_info=True)
165183

166184
return response
167185

0 commit comments

Comments
 (0)