@@ -599,24 +599,36 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
599599 ) : Promise < any > {
600600 const stream = response . data ?. body ;
601601 const modelId = response . request . commandInput ?. modelId ;
602- if ( ! stream || ! span . isRecording ( ) ) return ;
603-
604- const wrappedStream = instrumentAsyncIterable (
605- stream ,
606- async ( chunk : { chunk ?: { bytes ?: Uint8Array } } ) => {
607- const parsedChunk = parseChunk ( chunk ?. chunk ?. bytes ) ;
602+ if ( ! stream ) return ;
603+
604+ const wrappedStream =
605+ BedrockRuntimeServiceExtension . instrumentAsyncIterable (
606+ stream ,
607+ async ( chunk : { chunk ?: { bytes ?: Uint8Array } } ) => {
608+ const parsedChunk = BedrockRuntimeServiceExtension . parseChunk (
609+ chunk ?. chunk ?. bytes
610+ ) ;
608611
609- if ( ! parsedChunk ) return ;
612+ if ( ! parsedChunk ) return ;
610613
611- if ( modelId . includes ( 'amazon.titan' ) ) {
612- recordTitanAttributes ( parsedChunk ) ;
613- } else if ( modelId . includes ( 'anthropic.claude' ) ) {
614- recordClaudeAttributes ( parsedChunk ) ;
615- } else if ( modelId . includes ( 'amazon.nova' ) ) {
616- recordNovaAttributes ( parsedChunk ) ;
614+ if ( modelId . includes ( 'amazon.titan' ) ) {
615+ BedrockRuntimeServiceExtension . recordTitanAttributes (
616+ parsedChunk ,
617+ span
618+ ) ;
619+ } else if ( modelId . includes ( 'anthropic.claude' ) ) {
620+ BedrockRuntimeServiceExtension . recordClaudeAttributes (
621+ parsedChunk ,
622+ span
623+ ) ;
624+ } else if ( modelId . includes ( 'amazon.nova' ) ) {
625+ BedrockRuntimeServiceExtension . recordNovaAttributes (
626+ parsedChunk ,
627+ span
628+ ) ;
629+ }
617630 }
618- }
619- ) ;
631+ ) ;
620632 // Replace the original response body with our instrumented stream.
621633 // - Defers span.end() until the entire stream is consumed
622634 // This ensures downstream consumers still receive the full stream correctly,
@@ -631,93 +643,92 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
631643 }
632644 } ) ( ) ;
633645 return response . data ;
634-
635- // Tap into the stream at the chunk level without modifying the chunk itself.
636- function instrumentAsyncIterable < T > (
637- stream : AsyncIterable < T > ,
638- onChunk : ( chunk : T ) => void
639- ) : AsyncIterable < T > {
640- return {
641- [ Symbol . asyncIterator ] : async function * ( ) {
642- for await ( const chunk of stream ) {
643- onChunk ( chunk ) ;
644- yield chunk ;
645- }
646- } ,
647- } ;
648- }
649-
650- function parseChunk ( bytes ?: Uint8Array ) : any {
651- if ( ! bytes || ! ( bytes instanceof Uint8Array ) ) return null ;
652- try {
653- const str = Buffer . from ( bytes ) . toString ( 'utf-8' ) ;
654- return JSON . parse ( str ) ;
655- } catch ( err ) {
656- console . warn ( 'Failed to parse streamed chunk' , err ) ;
657- return null ;
658- }
659- }
660-
661- function recordNovaAttributes ( parsedChunk : any ) {
662- if ( parsedChunk . metadata ?. usage !== undefined ) {
663- if ( parsedChunk . metadata ?. usage . inputTokens !== undefined ) {
664- span . setAttribute (
665- ATTR_GEN_AI_USAGE_INPUT_TOKENS ,
666- parsedChunk . metadata . usage . inputTokens
667- ) ;
668- }
669- if ( parsedChunk . metadata ?. usage . outputTokens !== undefined ) {
670- span . setAttribute (
671- ATTR_GEN_AI_USAGE_OUTPUT_TOKENS ,
672- parsedChunk . metadata . usage . outputTokens
673- ) ;
646+ }
647+ // Tap into the stream at the chunk level without modifying the chunk itself.
648+ private static instrumentAsyncIterable < T > (
649+ stream : AsyncIterable < T > ,
650+ onChunk : ( chunk : T ) => void
651+ ) : AsyncIterable < T > {
652+ return {
653+ [ Symbol . asyncIterator ] : async function * ( ) {
654+ for await ( const chunk of stream ) {
655+ onChunk ( chunk ) ;
656+ yield chunk ;
674657 }
675- }
676- if ( parsedChunk . messageStop ?. stopReason !== undefined ) {
677- span . setAttribute ( ATTR_GEN_AI_RESPONSE_FINISH_REASONS , [
678- parsedChunk . messageStop . stopReason ,
679- ] ) ;
680- }
658+ } ,
659+ } ;
660+ }
661+
662+ private static parseChunk ( bytes ?: Uint8Array ) : any {
663+ if ( ! bytes || ! ( bytes instanceof Uint8Array ) ) return null ;
664+ try {
665+ const str = Buffer . from ( bytes ) . toString ( 'utf-8' ) ;
666+ return JSON . parse ( str ) ;
667+ } catch ( err ) {
668+ console . warn ( 'Failed to parse streamed chunk' , err ) ;
669+ return null ;
681670 }
671+ }
682672
683- function recordClaudeAttributes ( parsedChunk : any ) {
684- if ( parsedChunk . message ?. usage ?. input_tokens !== undefined ) {
673+ private static recordNovaAttributes ( parsedChunk : any , span : Span ) {
674+ if ( parsedChunk . metadata ?. usage !== undefined ) {
675+ if ( parsedChunk . metadata ?. usage . inputTokens !== undefined ) {
685676 span . setAttribute (
686677 ATTR_GEN_AI_USAGE_INPUT_TOKENS ,
687- parsedChunk . message . usage . input_tokens
678+ parsedChunk . metadata . usage . inputTokens
688679 ) ;
689680 }
690- if ( parsedChunk . message ?. usage ?. output_tokens !== undefined ) {
681+ if ( parsedChunk . metadata ?. usage . outputTokens !== undefined ) {
691682 span . setAttribute (
692683 ATTR_GEN_AI_USAGE_OUTPUT_TOKENS ,
693- parsedChunk . message . usage . output_tokens
684+ parsedChunk . metadata . usage . outputTokens
694685 ) ;
695686 }
696- if ( parsedChunk . delta ?. stop_reason !== undefined ) {
697- span . setAttribute ( ATTR_GEN_AI_RESPONSE_FINISH_REASONS , [
698- parsedChunk . delta . stop_reason ,
699- ] ) ;
700- }
701687 }
688+ if ( parsedChunk . messageStop ?. stopReason !== undefined ) {
689+ span . setAttribute ( ATTR_GEN_AI_RESPONSE_FINISH_REASONS , [
690+ parsedChunk . messageStop . stopReason ,
691+ ] ) ;
692+ }
693+ }
702694
703- function recordTitanAttributes ( parsedChunk : any ) {
704- if ( parsedChunk . inputTextTokenCount !== undefined ) {
705- span . setAttribute (
706- ATTR_GEN_AI_USAGE_INPUT_TOKENS ,
707- parsedChunk . inputTextTokenCount
708- ) ;
709- }
710- if ( parsedChunk . totalOutputTextTokenCount !== undefined ) {
711- span . setAttribute (
712- ATTR_GEN_AI_USAGE_OUTPUT_TOKENS ,
713- parsedChunk . totalOutputTextTokenCount
714- ) ;
715- }
716- if ( parsedChunk . completionReason !== undefined ) {
717- span . setAttribute ( ATTR_GEN_AI_RESPONSE_FINISH_REASONS , [
718- parsedChunk . completionReason ,
719- ] ) ;
720- }
695+ private static recordClaudeAttributes ( parsedChunk : any , span : Span ) {
696+ if ( parsedChunk . message ?. usage ?. input_tokens !== undefined ) {
697+ span . setAttribute (
698+ ATTR_GEN_AI_USAGE_INPUT_TOKENS ,
699+ parsedChunk . message . usage . input_tokens
700+ ) ;
701+ }
702+ if ( parsedChunk . message ?. usage ?. output_tokens !== undefined ) {
703+ span . setAttribute (
704+ ATTR_GEN_AI_USAGE_OUTPUT_TOKENS ,
705+ parsedChunk . message . usage . output_tokens
706+ ) ;
707+ }
708+ if ( parsedChunk . delta ?. stop_reason !== undefined ) {
709+ span . setAttribute ( ATTR_GEN_AI_RESPONSE_FINISH_REASONS , [
710+ parsedChunk . delta . stop_reason ,
711+ ] ) ;
712+ }
713+ }
714+
715+ private static recordTitanAttributes ( parsedChunk : any , span : Span ) {
716+ if ( parsedChunk . inputTextTokenCount !== undefined ) {
717+ span . setAttribute (
718+ ATTR_GEN_AI_USAGE_INPUT_TOKENS ,
719+ parsedChunk . inputTextTokenCount
720+ ) ;
721+ }
722+ if ( parsedChunk . totalOutputTextTokenCount !== undefined ) {
723+ span . setAttribute (
724+ ATTR_GEN_AI_USAGE_OUTPUT_TOKENS ,
725+ parsedChunk . totalOutputTextTokenCount
726+ ) ;
727+ }
728+ if ( parsedChunk . completionReason !== undefined ) {
729+ span . setAttribute ( ATTR_GEN_AI_RESPONSE_FINISH_REASONS , [
730+ parsedChunk . completionReason ,
731+ ] ) ;
721732 }
722733 }
723734}
0 commit comments