Skip to content

Commit 7608585

Browse files
committed
feat(instrumentation-aws-sdk): add gen ai conventions for converse stream span
1 parent 9d84216 commit 7608585

File tree

6 files changed

+167
-10
lines changed

6 files changed

+167
-10
lines changed

plugins/node/opentelemetry-instrumentation-aws-sdk/src/aws-sdk.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,12 +400,16 @@ export class AwsInstrumentation extends InstrumentationBase<AwsSdkInstrumentatio
400400
request: normalizedRequest,
401401
requestId: requestId,
402402
};
403-
self.servicesExtensions.responseHook(
403+
const override = self.servicesExtensions.responseHook(
404404
normalizedResponse,
405405
span,
406406
self.tracer,
407407
self.getConfig()
408408
);
409+
if (override) {
410+
response.output = override;
411+
normalizedResponse.data = override;
412+
}
409413
self._callUserResponseHook(span, normalizedResponse);
410414
return response;
411415
})
@@ -439,7 +443,9 @@ export class AwsInstrumentation extends InstrumentationBase<AwsSdkInstrumentatio
439443
throw err;
440444
})
441445
.finally(() => {
442-
span.end();
446+
if (!requestMetadata.isStream) {
447+
span.end();
448+
}
443449
});
444450
promiseWithResponseLogic
445451
.then(res => {

plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServiceExtension.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ import {
2929
export interface RequestMetadata {
3030
// isIncoming - if true, then the operation callback / promise should be bind with the operation's span
3131
isIncoming: boolean;
32+
// isStream - if true, then the response is a stream so the span should not be ended by the middleware.
33+
// the ServiceExtension must end the span itself, generally by wrapping the stream and ending after it is
34+
// consumed.
35+
isStream?: boolean;
3236
spanAttributes?: SpanAttributes;
3337
spanKind?: SpanKind;
3438
spanName?: string;
@@ -45,10 +49,11 @@ export interface ServiceExtension {
4549
// called before request is sent, and after span is started
4650
requestPostSpanHook?: (request: NormalizedRequest) => void;
4751

52+
// called after response is received. If value is returned, it replaces the response output.
4853
responseHook?: (
4954
response: NormalizedResponse,
5055
span: Span,
5156
tracer: Tracer,
5257
config: AwsSdkInstrumentationConfig
53-
) => void;
58+
) => any | undefined;
5459
}

plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServicesExtensions.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,6 @@ export class ServicesExtensions implements ServiceExtension {
6767
config: AwsSdkInstrumentationConfig
6868
) {
6969
const serviceExtension = this.services.get(response.request.serviceName);
70-
serviceExtension?.responseHook?.(response, span, tracer, config);
70+
return serviceExtension?.responseHook?.(response, span, tracer, config);
7171
}
7272
}

plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/bedrock-runtime.ts

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ import {
3434
NormalizedRequest,
3535
NormalizedResponse,
3636
} from '../types';
37+
import {
38+
ConverseStreamOutput,
39+
TokenUsage,
40+
} from '@aws-sdk/client-bedrock-runtime';
3741

3842
export class BedrockRuntimeServiceExtension implements ServiceExtension {
3943
requestPreSpanHook(
@@ -43,7 +47,9 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
4347
): RequestMetadata {
4448
switch (request.commandName) {
4549
case 'Converse':
46-
return this.requestPreSpanHookConverse(request, config, diag);
50+
return this.requestPreSpanHookConverse(request, config, diag, false);
51+
case 'ConverseStream':
52+
return this.requestPreSpanHookConverse(request, config, diag, true);
4753
}
4854

4955
return {
@@ -54,7 +60,8 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
5460
private requestPreSpanHookConverse(
5561
request: NormalizedRequest,
5662
config: AwsSdkInstrumentationConfig,
57-
diag: DiagLogger
63+
diag: DiagLogger,
64+
isStream: boolean
5865
): RequestMetadata {
5966
let spanName = GEN_AI_OPERATION_NAME_VALUE_CHAT;
6067
const spanAttributes: Attributes = {
@@ -90,6 +97,7 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
9097
return {
9198
spanName,
9299
isIncoming: false,
100+
isStream,
93101
spanAttributes,
94102
};
95103
}
@@ -107,6 +115,8 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
107115
switch (response.request.commandName) {
108116
case 'Converse':
109117
return this.responseHookConverse(response, span, tracer, config);
118+
case 'ConverseStream':
119+
return this.responseHookConverseStream(response, span, tracer, config);
110120
}
111121
}
112122

@@ -117,6 +127,49 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
117127
config: AwsSdkInstrumentationConfig
118128
) {
119129
const { stopReason, usage } = response.data;
130+
131+
BedrockRuntimeServiceExtension.setStopReason(span, stopReason);
132+
BedrockRuntimeServiceExtension.setUsage(span, usage);
133+
}
134+
135+
private responseHookConverseStream(
136+
response: NormalizedResponse,
137+
span: Span,
138+
tracer: Tracer,
139+
config: AwsSdkInstrumentationConfig
140+
) {
141+
return {
142+
...response.data,
143+
stream: this.wrapConverseStreamResponse(response.data.stream, span),
144+
};
145+
}
146+
147+
private async *wrapConverseStreamResponse(
148+
response: AsyncIterable<ConverseStreamOutput>,
149+
span: Span
150+
) {
151+
try {
152+
for await (const item of response) {
153+
BedrockRuntimeServiceExtension.setStopReason(
154+
span,
155+
item.messageStop?.stopReason
156+
);
157+
BedrockRuntimeServiceExtension.setUsage(span, item.metadata?.usage);
158+
yield item;
159+
}
160+
} finally {
161+
span.end();
162+
}
163+
}
164+
165+
private static setStopReason(span: Span, stopReason: string | undefined) {
166+
if (stopReason !== undefined) {
167+
console.log(stopReason);
168+
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [stopReason]);
169+
}
170+
}
171+
172+
private static setUsage(span: Span, usage: TokenUsage | undefined) {
120173
if (usage) {
121174
const { inputTokens, outputTokens } = usage;
122175
if (inputTokens !== undefined) {
@@ -126,9 +179,5 @@ export class BedrockRuntimeServiceExtension implements ServiceExtension {
126179
span.setAttribute(ATTR_GEN_AI_USAGE_OUTPUT_TOKENS, outputTokens);
127180
}
128181
}
129-
130-
if (stopReason !== undefined) {
131-
span.setAttribute(ATTR_GEN_AI_RESPONSE_FINISH_REASONS, [stopReason]);
132-
}
133182
}
134183
}

plugins/node/opentelemetry-instrumentation-aws-sdk/test/bedrock-runtime.test.ts

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import {
3939
ConverseCommand,
4040
ConversationRole,
4141
InvokeModelCommand,
42+
ConverseStreamCommand,
4243
} from '@aws-sdk/client-bedrock-runtime';
4344
import { AwsCredentialIdentity } from '@aws-sdk/types';
4445
import * as path from 'path';
@@ -154,6 +155,60 @@ describe('Bedrock', () => {
154155
});
155156
});
156157

158+
describe('ConverseStream', () => {
159+
it('adds genai conventions', async () => {
160+
const modelId = 'amazon.titan-text-lite-v1';
161+
const messages = [
162+
{
163+
role: ConversationRole.USER,
164+
content: [{ text: 'Say this is a test' }],
165+
},
166+
];
167+
const inferenceConfig = {
168+
maxTokens: 10,
169+
temperature: 0.8,
170+
topP: 1,
171+
stopSequences: ['|'],
172+
};
173+
174+
const command = new ConverseStreamCommand({
175+
modelId,
176+
messages,
177+
inferenceConfig,
178+
});
179+
180+
const response = await client.send(command);
181+
const chunks: string[] = [];
182+
for await (const item of response.stream!) {
183+
const text = item.contentBlockDelta?.delta?.text;
184+
if (text) {
185+
chunks.push(text);
186+
}
187+
}
188+
expect(chunks.join('')).toBe('Hi! How are you? How');
189+
190+
const testSpans: ReadableSpan[] = getTestSpans();
191+
const converseSpans: ReadableSpan[] = testSpans.filter(
192+
(s: ReadableSpan) => {
193+
return s.name === 'chat amazon.titan-text-lite-v1';
194+
}
195+
);
196+
expect(converseSpans.length).toBe(1);
197+
expect(converseSpans[0].attributes).toMatchObject({
198+
[ATTR_GEN_AI_SYSTEM]: GEN_AI_SYSTEM_VALUE_AWS_BEDROCK,
199+
[ATTR_GEN_AI_OPERATION_NAME]: GEN_AI_OPERATION_NAME_VALUE_CHAT,
200+
[ATTR_GEN_AI_REQUEST_MODEL]: modelId,
201+
[ATTR_GEN_AI_REQUEST_MAX_TOKENS]: 10,
202+
[ATTR_GEN_AI_REQUEST_TEMPERATURE]: 0.8,
203+
[ATTR_GEN_AI_REQUEST_TOP_P]: 1,
204+
[ATTR_GEN_AI_REQUEST_STOP_SEQUENCES]: ['|'],
205+
[ATTR_GEN_AI_USAGE_INPUT_TOKENS]: 8,
206+
[ATTR_GEN_AI_USAGE_OUTPUT_TOKENS]: 10,
207+
[ATTR_GEN_AI_RESPONSE_FINISH_REASONS]: ['max_tokens'],
208+
});
209+
});
210+
});
211+
157212
// TODO: Instrument InvokeModel
158213
describe('InvokeModel', () => {
159214
it('does not currently add genai conventions', async () => {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
[
2+
{
3+
"scope": "https://bedrock-runtime.us-east-1.amazonaws.com:443",
4+
"method": "POST",
5+
"path": "/model/amazon.titan-text-lite-v1/converse-stream",
6+
"body": {
7+
"inferenceConfig": {
8+
"maxTokens": 10,
9+
"stopSequences": [
10+
"|"
11+
],
12+
"temperature": 0.8,
13+
"topP": 1
14+
},
15+
"messages": [
16+
{
17+
"content": [
18+
{
19+
"text": "Say this is a test"
20+
}
21+
],
22+
"role": "user"
23+
}
24+
]
25+
},
26+
"status": 200,
27+
"response": "00000081000000526cc176930b3a6576656e742d7479706507000c6d65737361676553746172740d3a636f6e74656e742d747970650700106170706c69636174696f6e2f6a736f6e0d3a6d6573736167652d747970650700056576656e747b2270223a2261626364222c22726f6c65223a22617373697374616e74227df512a005000000c600000057f67806450b3a6576656e742d74797065070011636f6e74656e74426c6f636b44656c74610d3a636f6e74656e742d747970650700106170706c69636174696f6e2f6a736f6e0d3a6d6573736167652d747970650700056576656e747b22636f6e74656e74426c6f636b496e646578223a302c2264656c7461223a7b2274657874223a2248692120486f772061726520796f753f20486f77227d2c2270223a226162636465666768696a6b6c6d6e6f70717273747576777879227da88f22a40000009500000056fecc83c80b3a6576656e742d74797065070010636f6e74656e74426c6f636b53746f700d3a636f6e74656e742d747970650700106170706c69636174696f6e2f6a736f6e0d3a6d6573736167652d747970650700056576656e747b22636f6e74656e74426c6f636b496e646578223a302c2270223a226162636465666768696a6b6c6d6e6f7071227d8dc2956d00000097000000511a68450b0b3a6576656e742d7479706507000b6d65737361676553746f700d3a636f6e74656e742d747970650700106170706c69636174696f6e2f6a736f6e0d3a6d6573736167652d747970650700056576656e747b2270223a226162636465666768696a6b6c6d6e6f7071727374222c2273746f70526561736f6e223a226d61785f746f6b656e73227ddb5bf387000000ce0000004ea263e5440b3a6576656e742d747970650700086d657461646174610d3a636f6e74656e742d747970650700106170706c69636174696f6e2f6a736f6e0d3a6d6573736167652d747970650700056576656e747b226d657472696373223a7b226c6174656e63794d73223a3736357d2c2270223a226162636465666768696a6b6c6d6e6f222c227573616765223a7b22696e707574546f6b656e73223a382c226f7574707574546f6b656e73223a31302c22746f74616c546f6b656e73223a31387d7d98eada7f",
28+
"rawHeaders": [
29+
"Date",
30+
"Fri, 21 Mar 2025 02:04:20 GMT",
31+
"Content-Type",
32+
"application/vnd.amazon.eventstream",
33+
"Transfer-Encoding",
34+
"chunked",
35+
"Connection",
36+
"keep-alive",
37+
"x-amzn-RequestId",
38+
"c01898c3-00ef-43e3-b015-e7458e9afc84"
39+
],
40+
"responseIsBinary": true
41+
}
42+
]

0 commit comments

Comments
 (0)