Skip to content

Commit 35a1a02

Browse files
committed
Differentiate between handling of stream: true and client.messages.stream
1 parent 8267cfb commit 35a1a02

File tree

4 files changed

+243
-8
lines changed

4 files changed

+243
-8
lines changed

dev-packages/node-integration-tests/suites/tracing/anthropic/scenario.mjs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,55 @@ function startMockAnthropicServer() {
2929
return;
3030
}
3131

32+
// Check if streaming is requested
33+
if (req.body.stream === true) {
34+
res.writeHead(200, {
35+
'Content-Type': 'text/event-stream',
36+
'Cache-Control': 'no-cache',
37+
Connection: 'keep-alive',
38+
});
39+
40+
// Send streaming events
41+
const events = [
42+
{
43+
type: 'message_start',
44+
message: {
45+
id: 'msg_stream123',
46+
type: 'message',
47+
role: 'assistant',
48+
model,
49+
content: [],
50+
usage: { input_tokens: 10 },
51+
},
52+
},
53+
{ type: 'content_block_start', index: 0, content_block: { type: 'text', text: '' } },
54+
{ type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: 'Hello ' } },
55+
{ type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: 'from ' } },
56+
{ type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: 'stream!' } },
57+
{ type: 'content_block_stop', index: 0 },
58+
{
59+
type: 'message_delta',
60+
delta: { stop_reason: 'end_turn', stop_sequence: null },
61+
usage: { output_tokens: 15 },
62+
},
63+
{ type: 'message_stop' },
64+
];
65+
66+
events.forEach((event, index) => {
67+
setTimeout(() => {
68+
res.write(`event: ${event.type}\n`);
69+
res.write(`data: ${JSON.stringify(event)}\n\n`);
70+
71+
if (index === events.length - 1) {
72+
res.end();
73+
}
74+
}, index * 10); // Small delay between events
75+
});
76+
77+
return;
78+
}
79+
80+
// Non-streaming response
3281
res.send({
3382
id: 'msg_mock123',
3483
type: 'message',
@@ -92,8 +141,32 @@ async function run() {
92141

93142
// Fourth test: models.retrieve
94143
await client.models.retrieve('claude-3-haiku-20240307');
144+
145+
// Fifth test: streaming via messages.create
146+
const stream = await client.messages.create({
147+
model: 'claude-3-haiku-20240307',
148+
messages: [{ role: 'user', content: 'What is the capital of France?' }],
149+
stream: true,
150+
});
151+
152+
for await (const _ of stream) {
153+
void _;
154+
}
155+
156+
// Sixth test: streaming via messages.stream
157+
await client.messages
158+
.stream({
159+
model: 'claude-3-haiku-20240307',
160+
messages: [{ role: 'user', content: 'What is the capital of France?' }],
161+
})
162+
.on('streamEvent', () => {
163+
Sentry.captureMessage('stream event from user-added event listener captured');
164+
});
95165
});
96166

167+
// Wait for the stream event handler to finish
168+
await Sentry.flush(2000);
169+
97170
server.close();
98171
}
99172

dev-packages/node-integration-tests/suites/tracing/anthropic/test.ts

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,30 @@ describe('Anthropic integration', () => {
152152
origin: 'auto.ai.anthropic',
153153
status: 'ok',
154154
}),
155+
// Fifth span - messages.create with stream: true
156+
expect.objectContaining({
157+
data: expect.objectContaining({
158+
'gen_ai.operation.name': 'messages',
159+
'gen_ai.request.model': 'claude-3-haiku-20240307',
160+
'gen_ai.request.stream': true,
161+
}),
162+
description: 'messages claude-3-haiku-20240307 stream-response',
163+
op: 'gen_ai.messages',
164+
origin: 'auto.ai.anthropic',
165+
status: 'ok',
166+
}),
167+
// Sixth span - messages.stream
168+
expect.objectContaining({
169+
data: expect.objectContaining({
170+
'gen_ai.operation.name': 'messages',
171+
'gen_ai.request.model': 'claude-3-haiku-20240307',
172+
'gen_ai.request.stream': true,
173+
}),
174+
description: 'messages claude-3-haiku-20240307 stream-response',
175+
op: 'gen_ai.messages',
176+
origin: 'auto.ai.anthropic',
177+
status: 'ok',
178+
}),
155179
]),
156180
};
157181

@@ -189,6 +213,21 @@ describe('Anthropic integration', () => {
189213
]),
190214
};
191215

216+
const EXPECTED_MODEL_ERROR = {
217+
exception: {
218+
values: [
219+
{
220+
type: 'Error',
221+
value: '404 Model not found',
222+
},
223+
],
224+
},
225+
};
226+
227+
const EXPECTED_STREAM_EVENT_HANDLER_MESSAGE = {
228+
message: 'stream event from user-added event listener captured',
229+
};
230+
192231
createEsmAndCjsTests(__dirname, 'scenario-manual-client.mjs', 'instrument.mjs', (createRunner, test) => {
193232
test('creates anthropic related spans when manually insturmenting client', async () => {
194233
await createRunner()
@@ -202,8 +241,9 @@ describe('Anthropic integration', () => {
202241
createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument.mjs', (createRunner, test) => {
203242
test('creates anthropic related spans with sendDefaultPii: false', async () => {
204243
await createRunner()
205-
.ignore('event')
244+
.expect({ event: EXPECTED_MODEL_ERROR })
206245
.expect({ transaction: EXPECTED_TRANSACTION_DEFAULT_PII_FALSE })
246+
.expect({ event: EXPECTED_STREAM_EVENT_HANDLER_MESSAGE })
207247
.start()
208248
.completed();
209249
});
@@ -212,8 +252,9 @@ describe('Anthropic integration', () => {
212252
createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument-with-pii.mjs', (createRunner, test) => {
213253
test('creates anthropic related spans with sendDefaultPii: true', async () => {
214254
await createRunner()
215-
.ignore('event')
255+
.expect({ event: EXPECTED_MODEL_ERROR })
216256
.expect({ transaction: EXPECTED_TRANSACTION_DEFAULT_PII_TRUE })
257+
.expect({ event: EXPECTED_STREAM_EVENT_HANDLER_MESSAGE })
217258
.start()
218259
.completed();
219260
});
@@ -222,8 +263,9 @@ describe('Anthropic integration', () => {
222263
createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument-with-options.mjs', (createRunner, test) => {
223264
test('creates anthropic related spans with custom options', async () => {
224265
await createRunner()
225-
.ignore('event')
266+
.expect({ event: EXPECTED_MODEL_ERROR })
226267
.expect({ transaction: EXPECTED_TRANSACTION_WITH_OPTIONS })
268+
.expect({ event: EXPECTED_STREAM_EVENT_HANDLER_MESSAGE })
227269
.start()
228270
.completed();
229271
});

packages/core/src/utils/anthropic-ai/index.ts

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,14 @@ import {
2525
} from '../ai/gen-ai-attributes';
2626
import { buildMethodPath, getFinalOperationName, getSpanOperation, setTokenUsageAttributes } from '../ai/utils';
2727
import { handleCallbackErrors } from '../handleCallbackErrors';
28-
import { instrumentStream } from './streaming';
29-
import type { AnthropicAiInstrumentedMethod, AnthropicAiOptions, AnthropicAiResponse, ContentBlock } from './types';
28+
import { instrumentAsyncIterableStream, instrumentMessageStream } from './streaming';
29+
import type {
30+
AnthropicAiInstrumentedMethod,
31+
AnthropicAiOptions,
32+
AnthropicAiResponse,
33+
AnthropicAiStreamingEvent,
34+
ContentBlock,
35+
} from './types';
3036
import { shouldInstrument } from './utils';
3137

3238
/**
@@ -209,7 +215,45 @@ function instrumentMethod<T extends unknown[], R>(
209215
const isStreamRequested = Boolean(params?.stream);
210216
const isStreamingMethod = methodPath === 'messages.stream';
211217

212-
if (isStreamRequested || isStreamingMethod) {
218+
if (isStreamRequested) {
219+
return startSpanManual(
220+
{
221+
name: `${operationName} ${model} stream-response`,
222+
op: getSpanOperation(methodPath),
223+
attributes: requestAttributes as Record<string, SpanAttributeValue>,
224+
},
225+
async span => {
226+
try {
227+
if (options.recordInputs && params) {
228+
addPrivateRequestAttributes(span, params);
229+
}
230+
231+
const result = await originalMethod.apply(context, args);
232+
return instrumentAsyncIterableStream(
233+
result as AsyncIterable<AnthropicAiStreamingEvent>,
234+
span,
235+
options.recordOutputs ?? false,
236+
) as unknown as R;
237+
} catch (error) {
238+
captureException(error, {
239+
mechanism: {
240+
handled: false,
241+
type: 'auto.ai.anthropic',
242+
data: {
243+
function: methodPath,
244+
},
245+
},
246+
});
247+
248+
if (span.isRecording()) {
249+
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
250+
span.end();
251+
}
252+
throw error;
253+
}
254+
},
255+
);
256+
} else if (isStreamingMethod) {
213257
// Create span for instrumentation using startSpanManual
214258
return startSpanManual(
215259
{
@@ -224,7 +268,7 @@ function instrumentMethod<T extends unknown[], R>(
224268
}
225269

226270
const messageStream = target.apply(context, args);
227-
return instrumentStream(messageStream, span, options.recordOutputs ?? false);
271+
return instrumentMessageStream(messageStream, span, options.recordOutputs ?? false);
228272
} catch (error) {
229273
captureException(error, {
230274
mechanism: {

packages/core/src/utils/anthropic-ai/streaming.ts

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,10 +259,86 @@ function finalizeStreamSpan(state: StreamingState, span: Span, recordOutputs: bo
259259
}
260260
}
261261

262+
/**
263+
* Instruments an async iterable stream of Anthropic events, updates the span with
264+
* streaming attributes and (optionally) the aggregated output text, and yields
265+
* each event from the input stream unchanged.
266+
*/
267+
export async function* instrumentAsyncIterableStream(
268+
stream: AsyncIterable<AnthropicAiStreamingEvent>,
269+
span: Span,
270+
recordOutputs: boolean,
271+
): AsyncGenerator<AnthropicAiStreamingEvent, void, unknown> {
272+
const state: StreamingState = {
273+
responseTexts: [],
274+
finishReasons: [],
275+
responseId: '',
276+
responseModel: '',
277+
promptTokens: undefined,
278+
completionTokens: undefined,
279+
cacheCreationInputTokens: undefined,
280+
cacheReadInputTokens: undefined,
281+
toolCalls: [],
282+
activeToolBlocks: {},
283+
};
284+
285+
try {
286+
for await (const event of stream) {
287+
processEvent(event, state, recordOutputs, span);
288+
yield event;
289+
}
290+
} finally {
291+
// Set common response attributes if available
292+
if (state.responseId) {
293+
span.setAttributes({
294+
[GEN_AI_RESPONSE_ID_ATTRIBUTE]: state.responseId,
295+
});
296+
}
297+
if (state.responseModel) {
298+
span.setAttributes({
299+
[GEN_AI_RESPONSE_MODEL_ATTRIBUTE]: state.responseModel,
300+
});
301+
}
302+
303+
setTokenUsageAttributes(
304+
span,
305+
state.promptTokens,
306+
state.completionTokens,
307+
state.cacheCreationInputTokens,
308+
state.cacheReadInputTokens,
309+
);
310+
311+
span.setAttributes({
312+
[GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]: true,
313+
});
314+
315+
if (state.finishReasons.length > 0) {
316+
span.setAttributes({
317+
[GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: JSON.stringify(state.finishReasons),
318+
});
319+
}
320+
321+
if (recordOutputs && state.responseTexts.length > 0) {
322+
span.setAttributes({
323+
[GEN_AI_RESPONSE_TEXT_ATTRIBUTE]: state.responseTexts.join(''),
324+
});
325+
}
326+
327+
// Set tool calls if any were captured
328+
if (recordOutputs && state.toolCalls.length > 0) {
329+
span.setAttributes({
330+
[GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE]: JSON.stringify(state.toolCalls),
331+
});
332+
}
333+
334+
span.end();
335+
}
336+
}
337+
262338
/**
263339
* Instruments a MessageStream by registering event handlers and preserving the original stream API.
264340
*/
265-
export function instrumentStream<R extends { on: (...args: unknown[]) => void }>(
341+
export function instrumentMessageStream<R extends { on: (...args: unknown[]) => void }>(
266342
stream: R,
267343
span: Span,
268344
recordOutputs: boolean,

0 commit comments

Comments
 (0)