@@ -102,6 +102,13 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
102102 return this . requestPreSpanHookConverse ( request , config , diag , true ) ;
103103 case 'InvokeModel' :
104104 return this . requestPreSpanHookInvokeModel ( request , config , diag ) ;
105+ case 'InvokeModelWithResponseStream' :
106+ return this . requestPreSpanHookInvokeModelWithResponseStream (
107+ request ,
108+ config ,
109+ diag ,
110+ true
111+ ) ;
105112 }
106113
107114 return {
@@ -316,6 +323,86 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
316323 } ;
317324 }
318325
326+ private requestPreSpanHookInvokeModelWithResponseStream (
327+ request : NormalizedRequest ,
328+ config : AwsSdkInstrumentationConfig ,
329+ diag : DiagLogger ,
330+ isStream : boolean
331+ ) : RequestMetadata {
332+ let spanName : string | undefined ;
333+ const spanAttributes : Attributes = {
334+ [ ATTR_GEN_AI_SYSTEM ] : GEN_AI_SYSTEM_VALUE_AWS_BEDROCK ,
335+ // add operation name for InvokeModel API
336+ } ;
337+
338+ const modelId = request . commandInput ?. modelId ;
339+ if ( modelId ) {
340+ spanAttributes [ ATTR_GEN_AI_REQUEST_MODEL ] = modelId ;
341+ }
342+
343+ if ( request . commandInput ?. body ) {
344+ const requestBody = JSON . parse ( request . commandInput . body ) ;
345+ if ( modelId . includes ( 'amazon.titan' ) ) {
346+ if ( requestBody . textGenerationConfig ?. temperature !== undefined ) {
347+ spanAttributes [ ATTR_GEN_AI_REQUEST_TEMPERATURE ] =
348+ requestBody . textGenerationConfig . temperature ;
349+ }
350+ if ( requestBody . textGenerationConfig ?. topP !== undefined ) {
351+ spanAttributes [ ATTR_GEN_AI_REQUEST_TOP_P ] =
352+ requestBody . textGenerationConfig . topP ;
353+ }
354+ if ( requestBody . textGenerationConfig ?. maxTokenCount !== undefined ) {
355+ spanAttributes [ ATTR_GEN_AI_REQUEST_MAX_TOKENS ] =
356+ requestBody . textGenerationConfig . maxTokenCount ;
357+ }
358+ if ( requestBody . textGenerationConfig ?. stopSequences !== undefined ) {
359+ spanAttributes [ ATTR_GEN_AI_REQUEST_STOP_SEQUENCES ] =
360+ requestBody . textGenerationConfig . stopSequences ;
361+ }
362+ } else if ( modelId . includes ( 'anthropic.claude' ) ) {
363+ if ( requestBody . max_tokens !== undefined ) {
364+ spanAttributes [ ATTR_GEN_AI_REQUEST_MAX_TOKENS ] =
365+ requestBody . max_tokens ;
366+ }
367+ if ( requestBody . temperature !== undefined ) {
368+ spanAttributes [ ATTR_GEN_AI_REQUEST_TEMPERATURE ] =
369+ requestBody . temperature ;
370+ }
371+ if ( requestBody . top_p !== undefined ) {
372+ spanAttributes [ ATTR_GEN_AI_REQUEST_TOP_P ] = requestBody . top_p ;
373+ }
374+ if ( requestBody . stop_sequences !== undefined ) {
375+ spanAttributes [ ATTR_GEN_AI_REQUEST_STOP_SEQUENCES ] =
376+ requestBody . stop_sequences ;
377+ }
378+ } else if ( modelId . includes ( 'amazon.nova' ) ) {
379+ if ( requestBody . inferenceConfig ?. temperature !== undefined ) {
380+ spanAttributes [ ATTR_GEN_AI_REQUEST_TEMPERATURE ] =
381+ requestBody . inferenceConfig . temperature ;
382+ }
383+ if ( requestBody . inferenceConfig ?. top_p !== undefined ) {
384+ spanAttributes [ ATTR_GEN_AI_REQUEST_TOP_P ] =
385+ requestBody . inferenceConfig . top_p ;
386+ }
387+ if ( requestBody . inferenceConfig ?. max_new_tokens !== undefined ) {
388+ spanAttributes [ ATTR_GEN_AI_REQUEST_MAX_TOKENS ] =
389+ requestBody . inferenceConfig . max_new_tokens ;
390+ }
391+ if ( requestBody . inferenceConfig ?. stopSequences !== undefined ) {
392+ spanAttributes [ ATTR_GEN_AI_REQUEST_STOP_SEQUENCES ] =
393+ requestBody . inferenceConfig . stopSequences ;
394+ }
395+ }
396+ }
397+
398+ return {
399+ spanName,
400+ isIncoming : false ,
401+ spanAttributes,
402+ isStream,
403+ } ;
404+ }
405+
319406 responseHook (
320407 response : NormalizedResponse ,
321408 span : Span ,
@@ -346,6 +433,13 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
346433 ) ;
347434 case 'InvokeModel' :
348435 return this . responseHookInvokeModel ( response , span , tracer , config ) ;
436+ case 'InvokeModelWithResponseStream' :
437+ return this . responseHookInvokeModelWithResponseStream (
438+ response ,
439+ span ,
440+ tracer ,
441+ config
442+ ) ;
349443 }
350444 }
351445
@@ -579,4 +673,134 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
579673 }
580674 }
581675 }
676+
677+ private async responseHookInvokeModelWithResponseStream (
678+ response : NormalizedResponse ,
679+ span : Span ,
680+ tracer : Tracer ,
681+ config : AwsSdkInstrumentationConfig
682+ ) : Promise < any > {
683+ const stream = response . data ?. body ;
684+ const modelId = response . request . commandInput ?. modelId ;
685+ if ( ! stream || ! span . isRecording ( ) ) return ;
686+
687+ const wrappedStream = instrumentAsyncIterable (
688+ stream ,
689+ async ( chunk : { chunk ?: { bytes ?: Uint8Array } } ) => {
690+ const parsedChunk = parseChunk ( chunk ?. chunk ?. bytes ) ;
691+
692+ if ( ! parsedChunk ) return ;
693+
694+ if ( modelId . includes ( 'amazon.titan' ) ) {
695+ recordTitanAttributes ( parsedChunk ) ;
696+ } else if ( modelId . includes ( 'anthropic.claude' ) ) {
697+ recordClaudeAttributes ( parsedChunk ) ;
698+ } else if ( modelId . includes ( 'amazon.nova' ) ) {
699+ recordNovaAttributes ( parsedChunk ) ;
700+ }
701+ }
702+ ) ;
703+ // Replace the original response body with our instrumented stream.
704+ // - Defers span.end() until the entire stream is consumed
705+ // This ensures downstream consumers still receive the full stream correctly,
706+ // while OpenTelemetry can record span attributes from streamed data.
707+ response . data . body = ( async function * ( ) {
708+ try {
709+ for await ( const item of wrappedStream ) {
710+ yield item ;
711+ }
712+ } finally {
713+ span . end ( ) ;
714+ }
715+ } ) ( ) ;
716+ return response . data ;
717+
718+ // Tap into the stream at the chunk level without modifying the chunk itself.
719+ function instrumentAsyncIterable < T > (
720+ stream : AsyncIterable < T > ,
721+ onChunk : ( chunk : T ) => void
722+ ) : AsyncIterable < T > {
723+ return {
724+ [ Symbol . asyncIterator ] : async function * ( ) {
725+ for await ( const chunk of stream ) {
726+ onChunk ( chunk ) ;
727+ yield chunk ;
728+ }
729+ } ,
730+ } ;
731+ }
732+
733+ function parseChunk ( bytes ?: Uint8Array ) : any {
734+ if ( ! bytes || ! ( bytes instanceof Uint8Array ) ) return null ;
735+ try {
736+ const str = Buffer . from ( bytes ) . toString ( 'utf-8' ) ;
737+ return JSON . parse ( str ) ;
738+ } catch ( err ) {
739+ console . warn ( 'Failed to parse streamed chunk' , err ) ;
740+ return null ;
741+ }
742+ }
743+
744+ function recordNovaAttributes ( parsedChunk : any ) {
745+ if ( parsedChunk . metadata ?. usage !== undefined ) {
746+ if ( parsedChunk . metadata ?. usage . inputTokens !== undefined ) {
747+ span . setAttribute (
748+ ATTR_GEN_AI_USAGE_INPUT_TOKENS ,
749+ parsedChunk . metadata . usage . inputTokens
750+ ) ;
751+ }
752+ if ( parsedChunk . metadata ?. usage . outputTokens !== undefined ) {
753+ span . setAttribute (
754+ ATTR_GEN_AI_USAGE_OUTPUT_TOKENS ,
755+ parsedChunk . metadata . usage . outputTokens
756+ ) ;
757+ }
758+ }
759+ if ( parsedChunk . messageStop ?. stopReason !== undefined ) {
760+ span . setAttribute ( ATTR_GEN_AI_RESPONSE_FINISH_REASONS , [
761+ parsedChunk . messageStop . stopReason ,
762+ ] ) ;
763+ }
764+ }
765+
766+ function recordClaudeAttributes ( parsedChunk : any ) {
767+ if ( parsedChunk . message ?. usage ?. input_tokens !== undefined ) {
768+ span . setAttribute (
769+ ATTR_GEN_AI_USAGE_INPUT_TOKENS ,
770+ parsedChunk . message . usage . input_tokens
771+ ) ;
772+ }
773+ if ( parsedChunk . message ?. usage ?. output_tokens !== undefined ) {
774+ span . setAttribute (
775+ ATTR_GEN_AI_USAGE_OUTPUT_TOKENS ,
776+ parsedChunk . message . usage . output_tokens
777+ ) ;
778+ }
779+ if ( parsedChunk . delta ?. stop_reason !== undefined ) {
780+ span . setAttribute ( ATTR_GEN_AI_RESPONSE_FINISH_REASONS , [
781+ parsedChunk . delta . stop_reason ,
782+ ] ) ;
783+ }
784+ }
785+
786+ function recordTitanAttributes ( parsedChunk : any ) {
787+ if ( parsedChunk . inputTextTokenCount !== undefined ) {
788+ span . setAttribute (
789+ ATTR_GEN_AI_USAGE_INPUT_TOKENS ,
790+ parsedChunk . inputTextTokenCount
791+ ) ;
792+ }
793+ if ( parsedChunk . totalOutputTextTokenCount !== undefined ) {
794+ span . setAttribute (
795+ ATTR_GEN_AI_USAGE_OUTPUT_TOKENS ,
796+ parsedChunk . totalOutputTextTokenCount
797+ ) ;
798+ }
799+ if ( parsedChunk . completionReason !== undefined ) {
800+ span . setAttribute ( ATTR_GEN_AI_RESPONSE_FINISH_REASONS , [
801+ parsedChunk . completionReason ,
802+ ] ) ;
803+ }
804+ }
805+ }
582806}
0 commit comments