Skip to content

Commit 09f9777

Browse files
feat: add Bedrock InvokeModelWithResponseStream instrumentation
1 parent ab438a0 commit 09f9777

File tree

5 files changed

+649
-0
lines changed

5 files changed

+649
-0
lines changed

plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/bedrock-runtime.ts

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
102102
return this.requestPreSpanHookConverse(request, config, diag, true);
103103
case 'InvokeModel':
104104
return this.requestPreSpanHookInvokeModel(request, config, diag);
105+
case 'InvokeModelWithResponseStream':
106+
return this.requestPreSpanHookInvokeModelWithResponseStream(request, config, diag, true);
105107
}
106108

107109
return {
@@ -316,6 +318,86 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
316318
};
317319
}
318320

321+
private requestPreSpanHookInvokeModelWithResponseStream(
322+
request: NormalizedRequest,
323+
config: AwsSdkInstrumentationConfig,
324+
diag: DiagLogger,
325+
isStream: boolean
326+
): RequestMetadata {
327+
let spanName: string | undefined;
328+
const spanAttributes: Attributes = {
329+
[ATTR_GEN_AI_SYSTEM]: GEN_AI_SYSTEM_VALUE_AWS_BEDROCK,
330+
// add operation name for InvokeModel API
331+
};
332+
333+
const modelId = request.commandInput?.modelId;
334+
if (modelId) {
335+
spanAttributes[ATTR_GEN_AI_REQUEST_MODEL] = modelId;
336+
}
337+
338+
if (request.commandInput?.body) {
339+
const requestBody = JSON.parse(request.commandInput.body);
340+
if (modelId.includes('amazon.titan')) {
341+
if (requestBody.textGenerationConfig?.temperature !== undefined) {
342+
spanAttributes[ATTR_GEN_AI_REQUEST_TEMPERATURE] =
343+
requestBody.textGenerationConfig.temperature;
344+
}
345+
if (requestBody.textGenerationConfig?.topP !== undefined) {
346+
spanAttributes[ATTR_GEN_AI_REQUEST_TOP_P] =
347+
requestBody.textGenerationConfig.topP;
348+
}
349+
if (requestBody.textGenerationConfig?.maxTokenCount !== undefined) {
350+
spanAttributes[ATTR_GEN_AI_REQUEST_MAX_TOKENS] =
351+
requestBody.textGenerationConfig.maxTokenCount;
352+
}
353+
if (requestBody.textGenerationConfig?.stopSequences !== undefined) {
354+
spanAttributes[ATTR_GEN_AI_REQUEST_STOP_SEQUENCES] =
355+
requestBody.textGenerationConfig.stopSequences;
356+
}
357+
} else if (modelId.includes('anthropic.claude')) {
358+
if (requestBody.max_tokens !== undefined) {
359+
spanAttributes[ATTR_GEN_AI_REQUEST_MAX_TOKENS] =
360+
requestBody.max_tokens;
361+
}
362+
if (requestBody.temperature !== undefined) {
363+
spanAttributes[ATTR_GEN_AI_REQUEST_TEMPERATURE] =
364+
requestBody.temperature;
365+
}
366+
if (requestBody.top_p !== undefined) {
367+
spanAttributes[ATTR_GEN_AI_REQUEST_TOP_P] = requestBody.top_p;
368+
}
369+
if (requestBody.stop_sequences !== undefined) {
370+
spanAttributes[ATTR_GEN_AI_REQUEST_STOP_SEQUENCES] =
371+
requestBody.stop_sequences;
372+
}
373+
} else if (modelId.includes('amazon.nova')) {
374+
if (requestBody.inferenceConfig?.temperature !== undefined) {
375+
spanAttributes[ATTR_GEN_AI_REQUEST_TEMPERATURE] =
376+
requestBody.inferenceConfig.temperature;
377+
}
378+
if (requestBody.inferenceConfig?.top_p !== undefined) {
379+
spanAttributes[ATTR_GEN_AI_REQUEST_TOP_P] =
380+
requestBody.inferenceConfig.top_p;
381+
}
382+
if (requestBody.inferenceConfig?.max_new_tokens !== undefined) {
383+
spanAttributes[ATTR_GEN_AI_REQUEST_MAX_TOKENS] =
384+
requestBody.inferenceConfig.max_new_tokens;
385+
}
386+
if (requestBody.inferenceConfig?.stopSequences !== undefined) {
387+
spanAttributes[ATTR_GEN_AI_REQUEST_STOP_SEQUENCES] =
388+
requestBody.inferenceConfig.stopSequences;
389+
}
390+
}
391+
}
392+
393+
return {
394+
spanName,
395+
isIncoming: false,
396+
spanAttributes,
397+
isStream
398+
};
399+
}
400+
319401
responseHook(
320402
response: NormalizedResponse,
321403
span: Span,
@@ -346,6 +428,13 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
346428
);
347429
case 'InvokeModel':
348430
return this.responseHookInvokeModel(response, span, tracer, config);
431+
case 'InvokeModelWithResponseStream':
432+
return this.responseHookInvokeModelWithResponseStream(
433+
response,
434+
span,
435+
tracer,
436+
config
437+
);
349438
}
350439
}
351440

@@ -579,4 +668,134 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
579668
}
580669
}
581670
}
671+
672+
private async responseHookInvokeModelWithResponseStream(
673+
response: NormalizedResponse,
674+
span: Span,
675+
tracer: Tracer,
676+
config: AwsSdkInstrumentationConfig
677+
): Promise<any> {
678+
const stream = response.data?.body;
679+
const modelId = response.request.commandInput?.modelId;
680+
if (!stream || !span.isRecording()) return;
681+
682+
const wrappedStream = instrumentAsyncIterable(
683+
stream,
684+
async (chunk: { chunk?: { bytes?: Uint8Array } }) => {
685+
const parsedChunk = parseChunk(chunk?.chunk?.bytes);
686+
687+
if (!parsedChunk) return;
688+
689+
if (modelId.includes('amazon.titan')) {
690+
recordTitanAttributes(parsedChunk);
691+
} else if (modelId.includes('anthropic.claude')) {
692+
recordClaudeAttributes(parsedChunk);
693+
} else if (modelId.includes('amazon.nova')) {
694+
recordNovaAttributes(parsedChunk);
695+
}
696+
}
697+
);
698+
// Replace the original response body with our instrumented stream.
699+
// - Defers span.end() until the entire stream is consumed
700+
// This ensures downstream consumers still receive the full stream correctly,
701+
// while OpenTelemetry can record span attributes from streamed data.
702+
response.data.body = (async function* () {
703+
try {
704+
for await (const item of wrappedStream) {
705+
yield item;
706+
}
707+
} finally {
708+
span.end();
709+
}
710+
})();
711+
return response.data;
712+
713+
// Tap into the stream at the chunk level without modifying the chunk itself.
714+
function instrumentAsyncIterable<T>(
715+
stream: AsyncIterable<T>,
716+
onChunk: (chunk: T) => void
717+
): AsyncIterable<T> {
718+
return {
719+
[Symbol.asyncIterator]: async function* () {
720+
for await (const chunk of stream) {
721+
onChunk(chunk);
722+
yield chunk;
723+
}
724+
},
725+
};
726+
}
727+
728+
function parseChunk(bytes?: Uint8Array): any {
729+
if (!bytes || !(bytes instanceof Uint8Array)) return null;
730+
try {
731+
const str = Buffer.from(bytes).toString('utf-8');
732+
return JSON.parse(str);
733+
} catch (err) {
734+
console.warn('Failed to parse streamed chunk', err);
735+
return null;
736+
}
737+
}
738+
739+
function recordNovaAttributes(parsedChunk: any) {
740+
if (parsedChunk.metadata?.usage !== undefined) {
741+
if (parsedChunk.metadata?.usage.inputTokens !== undefined) {
742+
span.setAttribute(
743+
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
744+
parsedChunk.metadata.usage.inputTokens
745+
);
746+
}
747+
if (parsedChunk.metadata?.usage.outputTokens !== undefined) {
748+
span.setAttribute(
749+
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
750+
parsedChunk.metadata.usage.outputTokens
751+
);
752+
}
753+
}
754+
if (parsedChunk.messageStop?.stopReason !== undefined) {
755+
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
756+
parsedChunk.messageStop.stopReason,
757+
]);
758+
}
759+
}
760+
761+
function recordClaudeAttributes(parsedChunk: any) {
762+
if (parsedChunk.message?.usage?.input_tokens !== undefined) {
763+
span.setAttribute(
764+
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
765+
parsedChunk.message.usage.input_tokens
766+
);
767+
}
768+
if (parsedChunk.message?.usage?.output_tokens !== undefined) {
769+
span.setAttribute(
770+
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
771+
parsedChunk.message.usage.output_tokens
772+
);
773+
}
774+
if (parsedChunk.delta?.stop_reason !== undefined) {
775+
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
776+
parsedChunk.delta.stop_reason,
777+
]);
778+
}
779+
}
780+
781+
function recordTitanAttributes(parsedChunk: any) {
782+
if (parsedChunk.inputTextTokenCount !== undefined) {
783+
span.setAttribute(
784+
ATTR_GEN_AI_USAGE_INPUT_TOKENS,
785+
parsedChunk.inputTextTokenCount
786+
);
787+
}
788+
if (parsedChunk.totalOutputTextTokenCount !== undefined) {
789+
span.setAttribute(
790+
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS,
791+
parsedChunk.totalOutputTextTokenCount
792+
);
793+
}
794+
if (parsedChunk.completionReason !== undefined) {
795+
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [
796+
parsedChunk.completionReason,
797+
]);
798+
}
799+
}
800+
}
582801
}

0 commit comments

Comments
 (0)