Skip to content

Commit 6874fbe

Browse files
authored
feat(core): Stream responses Anthropic AI (#17460)
This PR adds support for streamed Anthropic Messages API responses in the Core AI instrumentation. It detects streaming via `messages.create({…, stream: true})` and `messages.stream(...),` and marks spans with stream related attributes. It also aggregates token usage from stream events and records streamed text when PII capture is enabled.
1 parent 7303ab1 commit 6874fbe

File tree

6 files changed

+470
-2
lines changed

6 files changed

+470
-2
lines changed
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import { instrumentAnthropicAiClient } from '@sentry/core';
2+
import * as Sentry from '@sentry/node';
3+
4+
function createMockStreamEvents(model = 'claude-3-haiku-20240307') {
5+
async function* generator() {
6+
// Provide message metadata early so the span can capture id/model/usage input tokens
7+
yield {
8+
type: 'content_block_start',
9+
message: {
10+
id: 'msg_stream_1',
11+
type: 'message',
12+
role: 'assistant',
13+
model,
14+
content: [],
15+
stop_reason: 'end_turn',
16+
stop_sequence: null,
17+
usage: {
18+
input_tokens: 10,
19+
},
20+
},
21+
};
22+
23+
// Streamed text chunks
24+
yield { type: 'content_block_delta', delta: { text: 'Hello ' } };
25+
yield { type: 'content_block_delta', delta: { text: 'from ' } };
26+
yield { type: 'content_block_delta', delta: { text: 'stream!' } };
27+
28+
// Final usage totals for output tokens
29+
yield { type: 'message_delta', usage: { output_tokens: 15 } };
30+
}
31+
32+
return generator();
33+
}
34+
35+
class MockAnthropic {
36+
constructor(config) {
37+
this.apiKey = config.apiKey;
38+
39+
this.messages = {
40+
create: this._messagesCreate.bind(this),
41+
stream: this._messagesStream.bind(this),
42+
};
43+
}
44+
45+
async _messagesCreate(params) {
46+
await new Promise(resolve => setTimeout(resolve, 5));
47+
if (params?.stream === true) {
48+
return createMockStreamEvents(params.model);
49+
}
50+
// Fallback non-streaming behavior (not used in this scenario)
51+
return {
52+
id: 'msg_mock123',
53+
type: 'message',
54+
model: params.model,
55+
role: 'assistant',
56+
content: [
57+
{
58+
type: 'text',
59+
text: 'Hello from Anthropic mock!',
60+
},
61+
],
62+
stop_reason: 'end_turn',
63+
stop_sequence: null,
64+
usage: {
65+
input_tokens: 10,
66+
output_tokens: 15,
67+
},
68+
};
69+
}
70+
71+
async _messagesStream(params) {
72+
await new Promise(resolve => setTimeout(resolve, 5));
73+
return createMockStreamEvents(params?.model);
74+
}
75+
}
76+
77+
async function run() {
78+
await Sentry.startSpan({ op: 'function', name: 'main' }, async () => {
79+
const mockClient = new MockAnthropic({ apiKey: 'mock-api-key' });
80+
const client = instrumentAnthropicAiClient(mockClient);
81+
82+
// 1) Streaming via stream: true param on messages.create
83+
const stream1 = await client.messages.create({
84+
model: 'claude-3-haiku-20240307',
85+
messages: [{ role: 'user', content: 'Stream this please' }],
86+
stream: true,
87+
});
88+
for await (const _ of stream1) {
89+
void _;
90+
}
91+
92+
// 2) Streaming via messages.stream API
93+
const stream2 = await client.messages.stream({
94+
model: 'claude-3-haiku-20240307',
95+
messages: [{ role: 'user', content: 'Stream this too' }],
96+
});
97+
for await (const _ of stream2) {
98+
void _;
99+
}
100+
});
101+
}
102+
103+
run();
104+
105+

dev-packages/node-integration-tests/suites/tracing/anthropic/test.ts

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,4 +218,79 @@ describe('Anthropic integration', () => {
218218
.completed();
219219
});
220220
});
221+
222+
const EXPECTED_STREAM_SPANS_PII_FALSE = {
223+
transaction: 'main',
224+
spans: expect.arrayContaining([
225+
// messages.create with stream: true
226+
expect.objectContaining({
227+
description: 'messages claude-3-haiku-20240307 stream-response',
228+
op: 'gen_ai.messages',
229+
data: expect.objectContaining({
230+
'gen_ai.system': 'anthropic',
231+
'gen_ai.operation.name': 'messages',
232+
'gen_ai.request.model': 'claude-3-haiku-20240307',
233+
'gen_ai.request.stream': true,
234+
'gen_ai.response.streaming': true,
235+
'gen_ai.response.model': 'claude-3-haiku-20240307',
236+
'gen_ai.response.id': 'msg_stream_1',
237+
'gen_ai.usage.input_tokens': 10,
238+
'gen_ai.usage.output_tokens': 15,
239+
'gen_ai.usage.total_tokens': 25,
240+
'gen_ai.response.finish_reasons': '["end_turn"]',
241+
}),
242+
}),
243+
// messages.stream
244+
expect.objectContaining({
245+
description: 'messages claude-3-haiku-20240307 stream-response',
246+
op: 'gen_ai.messages',
247+
data: expect.objectContaining({
248+
'gen_ai.system': 'anthropic',
249+
'gen_ai.operation.name': 'messages',
250+
'gen_ai.request.model': 'claude-3-haiku-20240307',
251+
'gen_ai.response.streaming': true,
252+
'gen_ai.response.model': 'claude-3-haiku-20240307',
253+
'gen_ai.response.id': 'msg_stream_1',
254+
'gen_ai.usage.input_tokens': 10,
255+
'gen_ai.usage.output_tokens': 15,
256+
'gen_ai.usage.total_tokens': 25,
257+
}),
258+
}),
259+
]),
260+
};
261+
262+
const EXPECTED_STREAM_SPANS_PII_TRUE = {
263+
transaction: 'main',
264+
spans: expect.arrayContaining([
265+
expect.objectContaining({
266+
description: 'messages claude-3-haiku-20240307 stream-response',
267+
op: 'gen_ai.messages',
268+
data: expect.objectContaining({
269+
'gen_ai.response.streaming': true,
270+
// streamed text concatenated
271+
'gen_ai.response.text': 'Hello from stream!',
272+
}),
273+
}),
274+
expect.objectContaining({
275+
description: 'messages claude-3-haiku-20240307 stream-response',
276+
op: 'gen_ai.messages',
277+
data: expect.objectContaining({
278+
'gen_ai.response.streaming': true,
279+
'gen_ai.response.text': 'Hello from stream!',
280+
}),
281+
}),
282+
]),
283+
};
284+
285+
createEsmAndCjsTests(__dirname, 'scenario-stream.mjs', 'instrument.mjs', (createRunner, test) => {
286+
test('streams produce spans with token usage and metadata (PII false)', async () => {
287+
await createRunner().ignore('event').expect({ transaction: EXPECTED_STREAM_SPANS_PII_FALSE }).start().completed();
288+
});
289+
});
290+
291+
createEsmAndCjsTests(__dirname, 'scenario-stream.mjs', 'instrument-with-pii.mjs', (createRunner, test) => {
292+
test('streams record response text when PII true', async () => {
293+
await createRunner().ignore('event').expect({ transaction: EXPECTED_STREAM_SPANS_PII_TRUE }).start().completed();
294+
});
295+
});
221296
});

packages/core/src/utils/anthropic-ai/constants.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ export const ANTHROPIC_AI_INTEGRATION_NAME = 'Anthropic_AI';
44
// https://docs.anthropic.com/en/api/models-list
55
export const ANTHROPIC_AI_INSTRUMENTED_METHODS = [
66
'messages.create',
7+
'messages.stream',
78
'messages.countTokens',
89
'models.get',
910
'completions.create',

packages/core/src/utils/anthropic-ai/index.ts

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import { getCurrentScope } from '../../currentScopes';
22
import { captureException } from '../../exports';
33
import { SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN } from '../../semanticAttributes';
4-
import { startSpan } from '../../tracing/trace';
4+
import { SPAN_STATUS_ERROR } from '../../tracing';
5+
import { startSpan, startSpanManual } from '../../tracing/trace';
56
import type { Span, SpanAttributeValue } from '../../types-hoist/span';
67
import {
78
ANTHROPIC_AI_RESPONSE_TIMESTAMP_ATTRIBUTE,
@@ -22,14 +23,17 @@ import {
2223
} from '../ai/gen-ai-attributes';
2324
import { buildMethodPath, getFinalOperationName, getSpanOperation, setTokenUsageAttributes } from '../ai/utils';
2425
import { ANTHROPIC_AI_INTEGRATION_NAME } from './constants';
26+
import { instrumentStream } from './streaming';
2527
import type {
2628
AnthropicAiClient,
2729
AnthropicAiInstrumentedMethod,
2830
AnthropicAiIntegration,
2931
AnthropicAiOptions,
3032
AnthropicAiResponse,
33+
AnthropicAiStreamingEvent,
3134
} from './types';
3235
import { shouldInstrument } from './utils';
36+
3337
/**
3438
* Extract request attributes from method arguments
3539
*/
@@ -168,7 +172,47 @@ function instrumentMethod<T extends unknown[], R>(
168172
const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown';
169173
const operationName = getFinalOperationName(methodPath);
170174

171-
// TODO: Handle streaming responses
175+
const params = typeof args[0] === 'object' ? (args[0] as Record<string, unknown>) : undefined;
176+
const isStreamRequested = Boolean(params?.stream);
177+
const isStreamingMethod = methodPath === 'messages.stream';
178+
179+
if (isStreamRequested || isStreamingMethod) {
180+
return startSpanManual(
181+
{
182+
name: `${operationName} ${model} stream-response`,
183+
op: getSpanOperation(methodPath),
184+
attributes: requestAttributes as Record<string, SpanAttributeValue>,
185+
},
186+
async (span: Span) => {
187+
try {
188+
if (finalOptions.recordInputs && params) {
189+
addPrivateRequestAttributes(span, params);
190+
}
191+
192+
const result = await originalMethod.apply(context, args);
193+
return instrumentStream(
194+
result as AsyncIterable<AnthropicAiStreamingEvent>,
195+
span,
196+
finalOptions.recordOutputs ?? false,
197+
) as unknown as R;
198+
} catch (error) {
199+
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
200+
captureException(error, {
201+
mechanism: {
202+
handled: false,
203+
type: 'auto.ai.anthropic',
204+
data: {
205+
function: methodPath,
206+
},
207+
},
208+
});
209+
span.end();
210+
throw error;
211+
}
212+
},
213+
);
214+
}
215+
172216
return startSpan(
173217
{
174218
name: `${operationName} ${model}`,

0 commit comments

Comments
 (0)