diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/src/patches/aws/services/bedrock.ts b/aws-distro-opentelemetry-node-autoinstrumentation/src/patches/aws/services/bedrock.ts index 68fa3fe9..4b8c38c0 100644 --- a/aws-distro-opentelemetry-node-autoinstrumentation/src/patches/aws/services/bedrock.ts +++ b/aws-distro-opentelemetry-node-autoinstrumentation/src/patches/aws/services/bedrock.ts @@ -1,7 +1,7 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -import { Attributes, DiagLogger, Span, SpanKind, Tracer } from '@opentelemetry/api'; +import { Attributes, DiagLogger, Span, SpanKind, Tracer, diag } from '@opentelemetry/api'; import { AwsSdkInstrumentationConfig, NormalizedRequest, @@ -331,7 +331,31 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension { responseHook(response: NormalizedResponse, span: Span, tracer: Tracer, config: AwsSdkInstrumentationConfig): void { const currentModelId = response.request.commandInput?.modelId; if (response.data?.body) { - const decodedResponseBody = new TextDecoder().decode(response.data.body); + // Check if this is a streaming response (SmithyMessageDecoderStream) + // Intend to not using instanceOf to avoid import smithy as new dep for this file + // https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/client/bedrock-runtime/command/InvokeModelWithResponseStreamCommand + if (response.data.body.constructor?.name === 'SmithyMessageDecoderStream') { + // TODO: support InvokeModel Streaming API and Converse APIs later + diag.debug('Streaming API for invoking model is not supported', response.request.commandName); + return; + } + + let decodedResponseBody: string; + // For InvokeModel API which should always have reponse body with Uint8Array type + // https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/client/bedrock-runtime/command/InvokeModelCommand/ + if (response.data.body instanceof Uint8Array) { + // Raw Uint8Array from AWS + decodedResponseBody = new TextDecoder().decode(response.data.body); + } else { + // Handle unexpected types - log and skip processing + diag.debug( + `Unexpected body type in Bedrock response: ${typeof response.data.body} for commandName ${ + response.request.commandName + }` + ); + return; + } + const responseBody = JSON.parse(decodedResponseBody); if (currentModelId.includes('amazon.titan')) { if (responseBody.inputTextTokenCount !== undefined) { diff --git a/aws-distro-opentelemetry-node-autoinstrumentation/test/patches/aws/services/bedrock.test.ts b/aws-distro-opentelemetry-node-autoinstrumentation/test/patches/aws/services/bedrock.test.ts index ead43dde..62b0d8ef 100644 --- a/aws-distro-opentelemetry-node-autoinstrumentation/test/patches/aws/services/bedrock.test.ts +++ b/aws-distro-opentelemetry-node-autoinstrumentation/test/patches/aws/services/bedrock.test.ts @@ -736,5 +736,121 @@ describe('BedrockRuntime', () => { expect(invokeModelSpan.attributes[AwsSpanProcessingUtil.GEN_AI_RESPONSE_FINISH_REASONS]).toEqual(['stop']); expect(invokeModelSpan.kind).toBe(SpanKind.CLIENT); }); + + describe('Response Body Type Handling', () => { + it('handles normal Anthropic Claude response correctly', async () => { + const modelId: string = 'anthropic.claude-3-5-sonnet-20240620-v1:0'; + const mockRequestBody: string = JSON.stringify({ + anthropic_version: 'bedrock-2023-05-31', + max_tokens: 1000, + messages: [{ role: 'user', content: [{ type: 'text', text: 'test' }] }], + }); + + // Use standard object format - AWS SDK and instrumentation will handle the conversion + const mockResponseBodyObj = { + stop_reason: 'end_turn', + usage: { input_tokens: 20, output_tokens: 15 }, + }; + + nock(`https://bedrock-runtime.${region}.amazonaws.com`) + .post(`/model/${encodeURIComponent(modelId)}/invoke`) + .reply(200, mockResponseBodyObj); + + await bedrock + .invokeModel({ + modelId: modelId, + body: mockRequestBody, + }) + .catch((err: any) => {}); + + const testSpans: ReadableSpan[] = getTestSpans(); + const invokeModelSpans: ReadableSpan[] = testSpans.filter((s: ReadableSpan) => { + return s.name === 'BedrockRuntime.InvokeModel'; + }); + expect(invokeModelSpans.length).toBe(1); + const invokeModelSpan = invokeModelSpans[0]; + + // Verify attributes are set correctly - this tests our type handling logic works + expect(invokeModelSpan.attributes[AwsSpanProcessingUtil.GEN_AI_USAGE_INPUT_TOKENS]).toBe(20); + expect(invokeModelSpan.attributes[AwsSpanProcessingUtil.GEN_AI_USAGE_OUTPUT_TOKENS]).toBe(15); + expect(invokeModelSpan.attributes[AwsSpanProcessingUtil.GEN_AI_RESPONSE_FINISH_REASONS]).toEqual(['end_turn']); + }); + + it('handles unexpected body type gracefully', async () => { + const modelId: string = 'anthropic.claude-3-5-sonnet-20240620-v1:0'; + const mockRequestBody: string = JSON.stringify({ + anthropic_version: 'bedrock-2023-05-31', + max_tokens: 1000, + messages: [{ role: 'user', content: [{ type: 'text', text: 'test' }] }], + }); + + // Mock response body as unexpected type - using reply function to return a number + nock(`https://bedrock-runtime.${region}.amazonaws.com`) + .post(`/model/${encodeURIComponent(modelId)}/invoke`) + .reply(200, () => 12345 as any); + + await bedrock.invokeModel({ + modelId: modelId, + body: mockRequestBody, + }); + + const testSpans: ReadableSpan[] = getTestSpans(); + const invokeModelSpans: ReadableSpan[] = testSpans.filter((s: ReadableSpan) => { + return s.name === 'BedrockRuntime.InvokeModel'; + }); + expect(invokeModelSpans.length).toBe(1); + const invokeModelSpan = invokeModelSpans[0]; + + // Verify that no AI attributes are set when body type is unexpected + expect(invokeModelSpan.attributes[AwsSpanProcessingUtil.GEN_AI_USAGE_INPUT_TOKENS]).toBeUndefined(); + expect(invokeModelSpan.attributes[AwsSpanProcessingUtil.GEN_AI_USAGE_OUTPUT_TOKENS]).toBeUndefined(); + expect(invokeModelSpan.attributes[AwsSpanProcessingUtil.GEN_AI_RESPONSE_FINISH_REASONS]).toBeUndefined(); + + // Note: We can't easily test diag.debug() output in unit tests, but the important part + // is that the function returns early and doesn't crash when encountering unexpected types + // Debug message will be: "Unexpected body type in Bedrock response: number for commandName InvokeModelCommand" + }); + + it('handles streaming response (SmithyMessageDecoderStream) gracefully', async () => { + const modelId: string = 'anthropic.claude-3-5-sonnet-20240620-v1:0'; + const mockRequestBody: string = JSON.stringify({ + anthropic_version: 'bedrock-2023-05-31', + max_tokens: 1000, + messages: [{ role: 'user', content: [{ type: 'text', text: 'test' }] }], + }); + + // Mock response body as streaming object (constructor name matching) + const mockStreamingBody = { + constructor: { name: 'SmithyMessageDecoderStream' }, + [Symbol.asyncIterator]: function* () { + yield { chunk: { bytes: new TextEncoder().encode('{"type":"chunk"}') } }; + }, + }; + + nock(`https://bedrock-runtime.${region}.amazonaws.com`) + .post(`/model/${encodeURIComponent(modelId)}/invoke-with-response-stream`) + .reply(200, mockStreamingBody); + + await bedrock.invokeModelWithResponseStream({ + modelId: modelId, + body: mockRequestBody, + }); + + const testSpans: ReadableSpan[] = getTestSpans(); + const invokeModelSpans: ReadableSpan[] = testSpans.filter((s: ReadableSpan) => { + return s.name === 'BedrockRuntime.InvokeModelWithResponseStream'; + }); + expect(invokeModelSpans.length).toBe(1); + const invokeModelSpan = invokeModelSpans[0]; + + // Verify that no AI attributes are set when body is streaming (metrics not available in initial response) + expect(invokeModelSpan.attributes[AwsSpanProcessingUtil.GEN_AI_USAGE_INPUT_TOKENS]).toBeUndefined(); + expect(invokeModelSpan.attributes[AwsSpanProcessingUtil.GEN_AI_USAGE_OUTPUT_TOKENS]).toBeUndefined(); + expect(invokeModelSpan.attributes[AwsSpanProcessingUtil.GEN_AI_RESPONSE_FINISH_REASONS]).toBeUndefined(); + + // Streaming responses should be skipped gracefully without crashing + // TODO: support InvokeModel Streaming API and Converse APIs later + }); + }); }); });