Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
e9ae8b8
feat: add Bedrock InvokeModelWithResponseStream instrumentation
yuliia-fryshko May 13, 2025
da2cbfa
refactor: unify InvokeModel pre-span hooks into a single function wit…
yuliia-fryshko Jun 2, 2025
0a80410
Moved static functions out of the method
yuliia-fryshko Jun 27, 2025
ff59f72
added instrumentation for Bedrock Response Stream of llama, cohere an…
yuliia-fryshko Aug 7, 2025
f7bbec1
added tests for cohere, cohere-r and mistral
yuliia-fryshko Aug 8, 2025
44a3dec
Merge branch 'main' into bedrock-invoke-model-stream
yuliia-fryshko Aug 14, 2025
4e8b39e
Merge branch 'main' into bedrock-invoke-model-stream
yuliia-fryshko Aug 19, 2025
945df2c
Merge branch 'main' into bedrock-invoke-model-stream
yuliia-fryshko Aug 20, 2025
451c95d
Merge branch 'main' into bedrock-invoke-model-stream
yuliia-fryshko Aug 27, 2025
0fa5a4a
Merge branch 'main' into bedrock-invoke-model-stream
yuliia-fryshko Sep 8, 2025
efda112
Merge branch 'main' into bedrock-invoke-model-stream
yuliia-fryshko Sep 8, 2025
6eb6eca
Merge branch 'main' into bedrock-invoke-model-stream
pichlermarc Sep 24, 2025
80d93e3
close span if a chunk cannot be parsed
yuliia-fryshko Oct 7, 2025
74736da
Merge branch 'main' into bedrock-invoke-model-stream
yuliia-fryshko Oct 7, 2025
ee6f51e
if cannot 'parseChunk' for telemetry processing, then skip adding tel…
trentm Oct 7, 2025
5ac99f3
Merge branch 'main' into bedrock-invoke-model-stream
trentm Oct 8, 2025
35143a4
this file was accidentally added in this feature branch
trentm Oct 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 225 additions & 2 deletions packages/instrumentation-aws-sdk/src/services/bedrock-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import {
Attributes,
DiagLogger,
diag,
Histogram,
HrTime,
Meter,
Expand Down Expand Up @@ -59,6 +60,7 @@ import {
export class BedrockRuntimeServiceExtension implements ServiceExtension {
private tokenUsage!: Histogram;
private operationDuration!: Histogram;
private _diag: DiagLogger = diag;

updateMetricInstruments(meter: Meter) {
// https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-metrics/#metric-gen_aiclienttokenusage
Expand Down Expand Up @@ -101,7 +103,9 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
case 'ConverseStream':
return this.requestPreSpanHookConverse(request, config, diag, true);
case 'InvokeModel':
return this.requestPreSpanHookInvokeModel(request, config, diag);
return this.requestPreSpanHookInvokeModel(request, config, diag, false);
case 'InvokeModelWithResponseStream':
return this.requestPreSpanHookInvokeModel(request, config, diag, true);
}

return {
Expand Down Expand Up @@ -157,7 +161,8 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
private requestPreSpanHookInvokeModel(
request: NormalizedRequest,
config: AwsSdkInstrumentationConfig,
diag: DiagLogger
diag: DiagLogger,
isStream: boolean
): RequestMetadata {
let spanName: string | undefined;
const spanAttributes: Attributes = {
Expand Down Expand Up @@ -312,6 +317,7 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
return {
spanName,
isIncoming: false,
isStream,
spanAttributes,
};
}
Expand Down Expand Up @@ -346,6 +352,13 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
);
case 'InvokeModel':
return this.responseHookInvokeModel(response, span, tracer, config);
case 'InvokeModelWithResponseStream':
return this.responseHookInvokeModelWithResponseStream(
response,
span,
tracer,
config
);
}
}

Expand Down Expand Up @@ -579,4 +592,214 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
}
}
}

private async responseHookInvokeModelWithResponseStream(
response: NormalizedResponse,
span: Span,
tracer: Tracer,
config: AwsSdkInstrumentationConfig
): Promise<any> {
const stream = response.data?.body;
const modelId = response.request.commandInput?.modelId;
if (!stream || !modelId) return;

// Replace the original response body with our instrumented stream.
// - Defers span.end() until the entire stream is consumed
// This ensures downstream consumers still receive the full stream correctly,
// while OpenTelemetry can record span attributes from streamed data.
response.data.body = async function* (
this: BedrockRuntimeServiceExtension
) {
try {
for await (const chunk of stream) {
const parsedChunk = this.parseChunk(chunk?.chunk?.bytes);

if (!parsedChunk) return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yuliia-fryshko

nit: If, for whatever reason, a chunk cannot be parsed for adding otel data, should we perhaps continue to yield the chunks and eventually span.end() rather than return; here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, it will be better to close the span in this case. Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your change in commit 80d93e3 is not correct. It would result in span.end() being called and then an attempt to call BedrockRuntimeServiceExtension.record*Attributes(...) which would likely break.

I've change it to what I think it should be in commit ee6f51e.
@yuliia-fryshko Please let me know if my change looks inappropriate.

if (modelId.includes('amazon.titan')) {
BedrockRuntimeServiceExtension.recordTitanAttributes(
parsedChunk,
span
);
} else if (modelId.includes('anthropic.claude')) {
BedrockRuntimeServiceExtension.recordClaudeAttributes(
parsedChunk,
span
);
} else if (modelId.includes('amazon.nova')) {
BedrockRuntimeServiceExtension.recordNovaAttributes(
parsedChunk,
span
);
} else if (modelId.includes('meta.llama')) {
BedrockRuntimeServiceExtension.recordLlamaAttributes(
parsedChunk,
span
);
} else if (modelId.includes('cohere.command-r')) {
BedrockRuntimeServiceExtension.recordCohereRAttributes(
parsedChunk,
span
);
} else if (modelId.includes('cohere.command')) {
BedrockRuntimeServiceExtension.recordCohereAttributes(
parsedChunk,
span
);
} else if (modelId.includes('mistral')) {
BedrockRuntimeServiceExtension.recordMistralAttributes(
parsedChunk,
span
);
}
yield chunk;
}
} finally {
span.end();
}
}.bind(this)();
return response.data;
}

private parseChunk(bytes?: Uint8Array): any {
if (!bytes || !(bytes instanceof Uint8Array)) return null;
try {
const str = Buffer.from(bytes).toString('utf-8');
return JSON.parse(str);
} catch (err) {
this._diag.warn('Failed to parse streamed chunk', err);
return null;
}
}

private static recordNovaAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.metadata?.usage !== undefined) {
if (parsedChunk.metadata?.usage.inputTokens !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
parsedChunk.metadata.usage.inputTokens
);
}
if (parsedChunk.metadata?.usage.outputTokens !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
parsedChunk.metadata.usage.outputTokens
);
}
}
if (parsedChunk.messageStop?.stopReason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.messageStop.stopReason,
]);
}
}

private static recordClaudeAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.message?.usage?.input_tokens !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
parsedChunk.message.usage.input_tokens
);
}
if (parsedChunk.message?.usage?.output_tokens !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
parsedChunk.message.usage.output_tokens
);
}
if (parsedChunk.delta?.stop_reason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.delta.stop_reason,
]);
}
}

private static recordTitanAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.inputTextTokenCount !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
parsedChunk.inputTextTokenCount
);
}
if (parsedChunk.totalOutputTextTokenCount !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
parsedChunk.totalOutputTextTokenCount
);
}
if (parsedChunk.completionReason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.completionReason,
]);
}
}
private static recordLlamaAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.prompt_token_count !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
parsedChunk.prompt_token_count
);
}
if (parsedChunk.generation_token_count !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
parsedChunk.generation_token_count
);
}
if (parsedChunk.stop_reason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.stop_reason,
]);
}
}

private static recordMistralAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.outputs?.[0]?.text !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
// NOTE: We approximate the token count since this value is not directly available in the body
// According to Bedrock docs they use (total_chars / 6) to approximate token count for pricing.
// https://docs.aws.amazon.com/bedrock/latest/userguide/model-customization-prepare.html
Math.ceil(parsedChunk.outputs[0].text.length / 6)
);
}
if (parsedChunk.outputs?.[0]?.stop_reason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.outputs[0].stop_reason,
]);
}
}

private static recordCohereAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.generations?.[0]?.text !== undefined) {
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
// NOTE: We approximate the token count since this value is not directly available in the body
// According to Bedrock docs they use (total_chars / 6) to approximate token count for pricing.
// https://docs.aws.amazon.com/bedrock/latest/userguide/model-customization-prepare.html
Math.ceil(parsedChunk.generations[0].text.length / 6)
);
}
if (parsedChunk.generations?.[0]?.finish_reason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.generations[0].finish_reason,
]);
}
}

private static recordCohereRAttributes(parsedChunk: any, span: Span) {
if (parsedChunk.text !== undefined) {
// NOTE: We approximate the token count since this value is not directly available in the body
// According to Bedrock docs they use (total_chars / 6) to approximate token count for pricing.
// https://docs.aws.amazon.com/bedrock/latest/userguide/model-customization-prepare.html
span.setAttribute(
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
Math.ceil(parsedChunk.text.length / 6)
);
}
if (parsedChunk.finish_reason !== undefined) {
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
parsedChunk.finish_reason,
]);
}
}
}
Loading
Loading