Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
e9ae8b8
feat: add Bedrock InvokeModelWithResponseStream instrumentation
yuliia-fryshko May 13, 2025
da2cbfa
refactor: unify InvokeModel pre-span hooks into a single function wit…
yuliia-fryshko Jun 2, 2025
0a80410
Moved static functions out of the method
yuliia-fryshko Jun 27, 2025
ff59f72
added instrumentation for Bedrock Response Stream of llama, cohere an…
yuliia-fryshko Aug 7, 2025
f7bbec1
added tests for cohere, cohere-r and mistral
yuliia-fryshko Aug 8, 2025
44a3dec
Merge branch 'main' into bedrock-invoke-model-stream
yuliia-fryshko Aug 14, 2025
4e8b39e
Merge branch 'main' into bedrock-invoke-model-stream
yuliia-fryshko Aug 19, 2025
945df2c
Merge branch 'main' into bedrock-invoke-model-stream
yuliia-fryshko Aug 20, 2025
451c95d
Merge branch 'main' into bedrock-invoke-model-stream
yuliia-fryshko Aug 27, 2025
0fa5a4a
Merge branch 'main' into bedrock-invoke-model-stream
yuliia-fryshko Sep 8, 2025
efda112
Merge branch 'main' into bedrock-invoke-model-stream
yuliia-fryshko Sep 8, 2025
6eb6eca
Merge branch 'main' into bedrock-invoke-model-stream
pichlermarc Sep 24, 2025
80d93e3
close span if a chunk cannot be parsed
yuliia-fryshko Oct 7, 2025
74736da
Merge branch 'main' into bedrock-invoke-model-stream
yuliia-fryshko Oct 7, 2025
ee6f51e
if cannot 'parseChunk' for telemetry processing, then skip adding tel…
trentm Oct 7, 2025
5ac99f3
Merge branch 'main' into bedrock-invoke-model-stream
trentm Oct 8, 2025
35143a4
this file was accidentally added in this feature branch
trentm Oct 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 226 additions & 2 deletions packages/instrumentation-aws-sdk/src/services/bedrock-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import {
Attributes,
DiagLogger,
diag,
Histogram,
HrTime,
Meter,
Expand Down Expand Up @@ -61,6 +62,7 @@ import {
export class BedrockRuntimeServiceExtension implements ServiceExtension {
private tokenUsage!: Histogram;
private operationDuration!: Histogram;
private _diag: DiagLogger = diag;

updateMetricInstruments(meter: Meter) {
// https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-metrics/#metric-gen_aiclienttokenusage
Expand Down Expand Up @@ -103,7 +105,9 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
case 'ConverseStream':
return this.requestPreSpanHookConverse(request, config, diag, true);
case 'InvokeModel':
return this.requestPreSpanHookInvokeModel(request, config, diag);
return this.requestPreSpanHookInvokeModel(request, config, diag, false);
case 'InvokeModelWithResponseStream':
return this.requestPreSpanHookInvokeModel(request, config, diag, true);
}

return {
Expand Down Expand Up @@ -159,7 +163,8 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
private requestPreSpanHookInvokeModel(
request: NormalizedRequest,
config: AwsSdkInstrumentationConfig,
diag: DiagLogger
diag: DiagLogger,
isStream: boolean
): RequestMetadata {
let spanName: string | undefined;
const spanAttributes: Attributes = {
Expand Down Expand Up @@ -314,6 +319,7 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
return {
spanName,
isIncoming: false,
isStream,
spanAttributes,
};
}
Expand Down Expand Up @@ -348,6 +354,13 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
);
case 'InvokeModel':
return this.responseHookInvokeModel(response, span, tracer, config);
case 'InvokeModelWithResponseStream':
return this.responseHookInvokeModelWithResponseStream(
response,
span,
tracer,
config
);
}
}

Expand Down Expand Up @@ -581,4 +594,215 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
}
}
}

private async responseHookInvokeModelWithResponseStream(
response: NormalizedResponse,
span: Span,
tracer: Tracer,
config: AwsSdkInstrumentationConfig
): Promise<any> {
const stream = response.data?.body;
const modelId = response.request.commandInput?.modelId;
if (!stream || !modelId) return;

// Replace the original response body with our instrumented stream.
// - Defers span.end() until the entire stream is consumed
// This ensures downstream consumers still receive the full stream correctly,
// while OpenTelemetry can record span attributes from streamed data.
response.data.body = async function* (
this: BedrockRuntimeServiceExtension
) {
try {
for await (const chunk of stream) {
const parsedChunk = this.parseChunk(chunk?.chunk?.bytes);

if (!parsedChunk) {
// pass through
} else if (modelId.includes('amazon.titan')) {
BedrockRuntimeServiceExtension.recordTitanAttributes(
parsedChunk,
span
);
} else if (modelId.includes('anthropic.claude')) {
BedrockRuntimeServiceExtension.recordClaudeAttributes(
parsedChunk,
span
);
} else if (modelId.includes('amazon.nova')) {
BedrockRuntimeServiceExtension.recordNovaAttributes(
parsedChunk,
span
);
} else if (modelId.includes('meta.llama')) {
BedrockRuntimeServiceExtension.recordLlamaAttributes(
parsedChunk,
span
);
} else if (modelId.includes('cohere.command-r')) {
BedrockRuntimeServiceExtension.recordCohereRAttributes(
parsedChunk,
span
);
} else if (modelId.includes('cohere.command')) {
BedrockRuntimeServiceExtension.recordCohereAttributes(
parsedChunk,
span
);
} else if (modelId.includes('mistral')) {
BedrockRuntimeServiceExtension.recordMistralAttributes(
parsedChunk,
span
);
}
yield chunk;
}
} finally {
span.end();
}
}.bind(this)();
return response.data;
}

private parseChunk(bytes?: Uint8Array): any {
if (!bytes || !(bytes instanceof Uint8Array)) return null;
try {
const str = Buffer.from(bytes).toString('utf-8');
return JSON.parse(str);
} catch (err) {
this._diag.warn('Failed to parse streamed chunk', err);
return null;
}
}

private static recordNovaAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.metadata?.usage !== undefined) {
if (parsedChunk.metadata?.usage.inputTokens !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
parsedChunk.metadata.usage.inputTokens
);
}
if (parsedChunk.metadata?.usage.outputTokens !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
parsedChunk.metadata.usage.outputTokens
);
}
}
if (parsedChunk.messageStop?.stopReason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.messageStop.stopReason,
]);
}
}

private static recordClaudeAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.message?.usage?.input_tokens !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
parsedChunk.message.usage.input_tokens
);
}
if (parsedChunk.message?.usage?.output_tokens !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
parsedChunk.message.usage.output_tokens
);
}
if (parsedChunk.delta?.stop_reason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.delta.stop_reason,
]);
}
}

private static recordTitanAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.inputTextTokenCount !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
parsedChunk.inputTextTokenCount
);
}
if (parsedChunk.totalOutputTextTokenCount !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
parsedChunk.totalOutputTextTokenCount
);
}
if (parsedChunk.completionReason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.completionReason,
]);
}
}
private static recordLlamaAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.prompt_token_count !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
parsedChunk.prompt_token_count
);
}
if (parsedChunk.generation_token_count !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
parsedChunk.generation_token_count
);
}
if (parsedChunk.stop_reason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.stop_reason,
]);
}
}

private static recordMistralAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.outputs?.[0]?.text !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
// NOTE: We approximate the token count since this value is not directly available in the body
// According to Bedrock docs they use (total_chars / 6) to approximate token count for pricing.
// https://docs.aws.amazon.com/bedrock/latest/userguide/model-customization-prepare.html
Math.ceil(parsedChunk.outputs[0].text.length / 6)
);
}
if (parsedChunk.outputs?.[0]?.stop_reason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.outputs[0].stop_reason,
]);
}
}

private static recordCohereAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.generations?.[0]?.text !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
// NOTE: We approximate the token count since this value is not directly available in the body
// According to Bedrock docs they use (total_chars / 6) to approximate token count for pricing.
// https://docs.aws.amazon.com/bedrock/latest/userguide/model-customization-prepare.html
Math.ceil(parsedChunk.generations[0].text.length / 6)
);
}
if (parsedChunk.generations?.[0]?.finish_reason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.generations[0].finish_reason,
]);
}
}

private static recordCohereRAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.text !== undefined) {
// NOTE: We approximate the token count since this value is not directly available in the body
// According to Bedrock docs they use (total_chars / 6) to approximate token count for pricing.
// https://docs.aws.amazon.com/bedrock/latest/userguide/model-customization-prepare.html
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
Math.ceil(parsedChunk.text.length / 6)
);
}
if (parsedChunk.finish_reason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.finish_reason,
]);
}
}
}
Loading
Loading