1616import {
1717 Attributes ,
1818 DiagLogger ,
19+ diag ,
1920 Histogram ,
2021 HrTime ,
2122 Meter ,
@@ -59,6 +60,7 @@ import {
5960export class BedrockRuntimeServiceExtension implements ServiceExtension {
6061 private tokenUsage ! : Histogram ;
6162 private operationDuration ! : Histogram ;
63+ private _diag : DiagLogger = diag ;
6264
6365 updateMetricInstruments ( meter : Meter ) {
6466 // https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-metrics/#metric-gen_aiclienttokenusage
@@ -599,18 +601,20 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
599601 ) : Promise < any > {
600602 const stream = response . data ?. body ;
601603 const modelId = response . request . commandInput ?. modelId ;
602- if ( ! stream ) return ;
603-
604- const wrappedStream =
605- BedrockRuntimeServiceExtension . instrumentAsyncIterable (
606- stream ,
607- async ( chunk : { chunk ?: { bytes ?: Uint8Array } } ) => {
608- const parsedChunk = BedrockRuntimeServiceExtension . parseChunk (
609- chunk ?. chunk ?. bytes
610- ) ;
604+ if ( ! stream || ! modelId ) return ;
611605
612- if ( ! parsedChunk ) return ;
606+ // Replace the original response body with our instrumented stream.
607+ // - Defers span.end() until the entire stream is consumed
608+ // This ensures downstream consumers still receive the full stream correctly,
609+ // while OpenTelemetry can record span attributes from streamed data.
610+ response . data . body = async function * (
611+ this : BedrockRuntimeServiceExtension
612+ ) {
613+ try {
614+ for await ( const chunk of stream ) {
615+ const parsedChunk = this . parseChunk ( chunk ?. chunk ?. bytes ) ;
613616
617+ if ( ! parsedChunk ) return ;
614618 if ( modelId . includes ( 'amazon.titan' ) ) {
615619 BedrockRuntimeServiceExtension . recordTitanAttributes (
616620 parsedChunk ,
@@ -626,46 +630,43 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
626630 parsedChunk ,
627631 span
628632 ) ;
633+ } else if ( modelId . includes ( 'meta.llama' ) ) {
634+ BedrockRuntimeServiceExtension . recordLlamaAttributes (
635+ parsedChunk ,
636+ span
637+ ) ;
638+ } else if ( modelId . includes ( 'cohere.command-r' ) ) {
639+ BedrockRuntimeServiceExtension . recordCohereRAttributes (
640+ parsedChunk ,
641+ span
642+ ) ;
643+ } else if ( modelId . includes ( 'cohere.command' ) ) {
644+ BedrockRuntimeServiceExtension . recordCohereAttributes (
645+ parsedChunk ,
646+ span
647+ ) ;
648+ } else if ( modelId . includes ( 'mistral' ) ) {
649+ BedrockRuntimeServiceExtension . recordMistralAttributes (
650+ parsedChunk ,
651+ span
652+ ) ;
629653 }
630- }
631- ) ;
632- // Replace the original response body with our instrumented stream.
633- // - Defers span.end() until the entire stream is consumed
634- // This ensures downstream consumers still receive the full stream correctly,
635- // while OpenTelemetry can record span attributes from streamed data.
636- response . data . body = ( async function * ( ) {
637- try {
638- for await ( const item of wrappedStream ) {
639- yield item ;
654+ yield chunk ;
640655 }
641656 } finally {
642657 span . end ( ) ;
643658 }
644- } ) ( ) ;
659+ } . bind ( this ) ( ) ;
645660 return response . data ;
646661 }
647- // Tap into the stream at the chunk level without modifying the chunk itself.
648- private static instrumentAsyncIterable < T > (
649- stream : AsyncIterable < T > ,
650- onChunk : ( chunk : T ) => void
651- ) : AsyncIterable < T > {
652- return {
653- [ Symbol . asyncIterator ] : async function * ( ) {
654- for await ( const chunk of stream ) {
655- onChunk ( chunk ) ;
656- yield chunk ;
657- }
658- } ,
659- } ;
660- }
661662
662- private static parseChunk ( bytes ?: Uint8Array ) : any {
663+ private parseChunk ( bytes ?: Uint8Array ) : any {
663664 if ( ! bytes || ! ( bytes instanceof Uint8Array ) ) return null ;
664665 try {
665666 const str = Buffer . from ( bytes ) . toString ( 'utf-8' ) ;
666667 return JSON . parse ( str ) ;
667668 } catch ( err ) {
668- console . warn ( 'Failed to parse streamed chunk' , err ) ;
669+ this . _diag . warn ( 'Failed to parse streamed chunk' , err ) ;
669670 return null ;
670671 }
671672 }
@@ -731,4 +732,74 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
731732 ] ) ;
732733 }
733734 }
735+ private static recordLlamaAttributes ( parsedChunk : any , span : Span ) {
736+ if ( parsedChunk . prompt_token_count !== undefined ) {
737+ span . setAttribute (
738+ ATTR_GEN_AI_USAGE_INPUT_TOKENS ,
739+ parsedChunk . prompt_token_count
740+ ) ;
741+ }
742+ if ( parsedChunk . generation_token_count !== undefined ) {
743+ span . setAttribute (
744+ ATTR_GEN_AI_USAGE_OUTPUT_TOKENS ,
745+ parsedChunk . generation_token_count
746+ ) ;
747+ }
748+ if ( parsedChunk . stop_reason !== undefined ) {
749+ span . setAttribute ( ATTR_GEN_AI_RESPONSE_FINISH_REASONS , [
750+ parsedChunk . stop_reason ,
751+ ] ) ;
752+ }
753+ }
754+
755+ private static recordMistralAttributes ( parsedChunk : any , span : Span ) {
756+ if ( parsedChunk . outputs ?. [ 0 ] ?. text !== undefined ) {
757+ span . setAttribute (
758+ ATTR_GEN_AI_USAGE_OUTPUT_TOKENS ,
759+ // NOTE: We approximate the token count since this value is not directly available in the body
760+ // According to Bedrock docs they use (total_chars / 6) to approximate token count for pricing.
761+ // https://docs.aws.amazon.com/bedrock/latest/userguide/model-customization-prepare.html
762+ Math . ceil ( parsedChunk . outputs [ 0 ] . text . length / 6 )
763+ ) ;
764+ }
765+ if ( parsedChunk . outputs ?. [ 0 ] ?. stop_reason !== undefined ) {
766+ span . setAttribute ( ATTR_GEN_AI_RESPONSE_FINISH_REASONS , [
767+ parsedChunk . outputs [ 0 ] . stop_reason ,
768+ ] ) ;
769+ }
770+ }
771+
772+ private static recordCohereAttributes ( parsedChunk : any , span : Span ) {
773+ if ( parsedChunk . generations ?. [ 0 ] ?. text !== undefined ) {
774+ span . setAttribute (
775+ ATTR_GEN_AI_USAGE_OUTPUT_TOKENS ,
776+ // NOTE: We approximate the token count since this value is not directly available in the body
777+ // According to Bedrock docs they use (total_chars / 6) to approximate token count for pricing.
778+ // https://docs.aws.amazon.com/bedrock/latest/userguide/model-customization-prepare.html
779+ Math . ceil ( parsedChunk . generations [ 0 ] . text . length / 6 )
780+ ) ;
781+ }
782+ if ( parsedChunk . generations ?. [ 0 ] ?. finish_reason !== undefined ) {
783+ span . setAttribute ( ATTR_GEN_AI_RESPONSE_FINISH_REASONS , [
784+ parsedChunk . generations [ 0 ] . finish_reason ,
785+ ] ) ;
786+ }
787+ }
788+
789+ private static recordCohereRAttributes ( parsedChunk : any , span : Span ) {
790+ if ( parsedChunk . text !== undefined ) {
791+ // NOTE: We approximate the token count since this value is not directly available in the body
792+ // According to Bedrock docs they use (total_chars / 6) to approximate token count for pricing.
793+ // https://docs.aws.amazon.com/bedrock/latest/userguide/model-customization-prepare.html
794+ span . setAttribute (
795+ ATTR_GEN_AI_USAGE_OUTPUT_TOKENS ,
796+ Math . ceil ( parsedChunk . text . length / 6 )
797+ ) ;
798+ }
799+ if ( parsedChunk . finish_reason !== undefined ) {
800+ span . setAttribute ( ATTR_GEN_AI_RESPONSE_FINISH_REASONS , [
801+ parsedChunk . finish_reason ,
802+ ] ) ;
803+ }
804+ }
734805}
0 commit comments