Skip to content

Commit 740ac92

Browse files
committed
feat(core): Support stream responses for Anthropic AI
1 parent 123be24 commit 740ac92

File tree

4 files changed

+305
-2
lines changed

4 files changed

+305
-2
lines changed

packages/core/src/utils/anthropic-ai/constants.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ export const ANTHROPIC_AI_INTEGRATION_NAME = 'Anthropic_AI';
44
// https://docs.anthropic.com/en/api/models-list
55
export const ANTHROPIC_AI_INSTRUMENTED_METHODS = [
66
'messages.create',
7+
'messages.stream',
78
'messages.countTokens',
89
'models.get',
910
'completions.create',

packages/core/src/utils/anthropic-ai/index.ts

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import { getCurrentScope } from '../../currentScopes';
22
import { captureException } from '../../exports';
33
import { SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN } from '../../semanticAttributes';
4-
import { startSpan } from '../../tracing/trace';
4+
import { SPAN_STATUS_ERROR } from '../../tracing';
5+
import { startSpan, startSpanManual } from '../../tracing/trace';
56
import type { Span, SpanAttributeValue } from '../../types-hoist/span';
67
import {
78
ANTHROPIC_AI_RESPONSE_TIMESTAMP_ATTRIBUTE,
@@ -22,14 +23,17 @@ import {
2223
} from '../ai/gen-ai-attributes';
2324
import { buildMethodPath, getFinalOperationName, getSpanOperation, setTokenUsageAttributes } from '../ai/utils';
2425
import { ANTHROPIC_AI_INTEGRATION_NAME } from './constants';
26+
import { instrumentStream } from './streaming';
2527
import type {
2628
AnthropicAiClient,
2729
AnthropicAiInstrumentedMethod,
2830
AnthropicAiIntegration,
2931
AnthropicAiOptions,
3032
AnthropicAiResponse,
33+
AnthropicAiStreamingEvent,
3134
} from './types';
3235
import { shouldInstrument } from './utils';
36+
3337
/**
3438
* Extract request attributes from method arguments
3539
*/
@@ -168,7 +172,47 @@ function instrumentMethod<T extends unknown[], R>(
168172
const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown';
169173
const operationName = getFinalOperationName(methodPath);
170174

171-
// TODO: Handle streaming responses
175+
const params = typeof args[0] === 'object' ? (args[0] as Record<string, unknown>) : undefined;
176+
const isStreamRequested = Boolean(params?.stream);
177+
const isStreamingMethod = methodPath === 'messages.stream';
178+
179+
if (isStreamRequested || isStreamingMethod) {
180+
return startSpanManual(
181+
{
182+
name: `${operationName} ${model} stream-response`,
183+
op: getSpanOperation(methodPath),
184+
attributes: requestAttributes as Record<string, SpanAttributeValue>,
185+
},
186+
async (span: Span) => {
187+
try {
188+
if (finalOptions.recordInputs && params) {
189+
addPrivateRequestAttributes(span, params);
190+
}
191+
192+
const result = await originalMethod.apply(context, args);
193+
return instrumentStream(
194+
result as unknown as AsyncIterable<AnthropicAiStreamingEvent>,
195+
span,
196+
finalOptions.recordOutputs ?? false,
197+
) as unknown as R;
198+
} catch (error) {
199+
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
200+
captureException(error, {
201+
mechanism: {
202+
handled: false,
203+
type: 'auto.ai.anthropic',
204+
data: {
205+
function: methodPath,
206+
},
207+
},
208+
});
209+
span.end();
210+
throw error;
211+
}
212+
},
213+
);
214+
}
215+
172216
return startSpan(
173217
{
174218
name: `${operationName} ${model}`,
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
import { captureException } from '../../exports';
2+
import { SPAN_STATUS_ERROR } from '../../tracing';
3+
import type { Span } from '../../types-hoist/span';
4+
import {
5+
ANTHROPIC_AI_RESPONSE_TIMESTAMP_ATTRIBUTE,
6+
GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE,
7+
GEN_AI_RESPONSE_ID_ATTRIBUTE,
8+
GEN_AI_RESPONSE_MODEL_ATTRIBUTE,
9+
GEN_AI_RESPONSE_STREAMING_ATTRIBUTE,
10+
GEN_AI_RESPONSE_TEXT_ATTRIBUTE,
11+
} from '../ai/gen-ai-attributes';
12+
import { setTokenUsageAttributes } from '../ai/utils';
13+
import type { AnthropicAiStreamingEvent } from './types';
14+
15+
/**
16+
* State object used to accumulate information from a stream of Anthropic AI events.
17+
*/
18+
19+
interface StreamingState {
20+
/** Types of events encountered in the stream. */
21+
eventTypes: string[];
22+
/** Collected response text fragments (for output recording). */
23+
responseTexts: string[];
24+
/** Reasons for finishing the response, as reported by the API. */
25+
finishReasons: string[];
26+
/** The response ID. */
27+
responseId: string;
28+
/** The model name. */
29+
responseModel: string;
30+
/** The timestamp of the response. */
31+
responseTimestamp: number;
32+
/** Number of prompt/input tokens used. */
33+
promptTokens: number | undefined;
34+
/** Number of completion/output tokens used. */
35+
completionTokens: number | undefined;
36+
/** Number of cache creation input tokens used. */
37+
cacheCreationInputTokens: number | undefined;
38+
/** Number of cache read input tokens used. */
39+
cacheReadInputTokens: number | undefined;
40+
}
41+
42+
/**
43+
* Checks if an event is an error event
44+
* @param event - The event to process
45+
* @param state - The state of the streaming process
46+
* @param recordOutputs - Whether to record outputs
47+
* @param span - The span to update
48+
* @returns Whether an error occurred
49+
*/
50+
51+
function isErrorEvent(
52+
event: AnthropicAiStreamingEvent,
53+
state: StreamingState,
54+
recordOutputs: boolean,
55+
span: Span,
56+
): boolean {
57+
if ('type' in event && typeof event.type === 'string') {
58+
state.eventTypes.push(event.type);
59+
60+
// If the event is an error, set the span status and capture the error
61+
// These error events are not rejected by the API by default, but are sent as metadata of the response
62+
if (event.type === 'error') {
63+
const message = event.error?.message ?? 'internal_error';
64+
span.setStatus({ code: SPAN_STATUS_ERROR, message });
65+
captureException(new Error(`anthropic_stream_error: ${message}`), {
66+
mechanism: {
67+
handled: false,
68+
type: 'auto.ai.anthropic',
69+
data: {
70+
function: 'anthropic_stream_error',
71+
},
72+
},
73+
data: {
74+
function: 'anthropic_stream_error',
75+
},
76+
});
77+
return true;
78+
}
79+
80+
if (recordOutputs && event.type === 'content_block_delta') {
81+
const text = event.delta?.text ?? '';
82+
if (text) state.responseTexts.push(text);
83+
}
84+
}
85+
return false;
86+
}
87+
88+
/**
89+
* Processes the message metadata of an event
90+
* @param event - The event to process
91+
* @param state - The state of the streaming process
92+
*/
93+
94+
function handleMessageMetadata(event: AnthropicAiStreamingEvent, state: StreamingState): void {
95+
// The token counts shown in the usage field of the message_delta event are cumulative.
96+
// @see https://docs.anthropic.com/en/docs/build-with-claude/streaming#event-types
97+
if (event.type === 'message_delta' && event.usage) {
98+
if ('output_tokens' in event.usage && typeof event.usage.output_tokens === 'number') {
99+
state.completionTokens = event.usage.output_tokens;
100+
}
101+
}
102+
103+
if (event.message) {
104+
const message = event.message;
105+
106+
if (message.id) state.responseId = message.id;
107+
if (message.model) state.responseModel = message.model;
108+
if (message.stop_reason) state.finishReasons.push(message.stop_reason);
109+
110+
if (message.usage) {
111+
if (typeof message.usage.input_tokens === 'number') state.promptTokens = message.usage.input_tokens;
112+
if (typeof message.usage.cache_creation_input_tokens === 'number')
113+
state.cacheCreationInputTokens = message.usage.cache_creation_input_tokens;
114+
if (typeof message.usage.cache_read_input_tokens === 'number')
115+
state.cacheReadInputTokens = message.usage.cache_read_input_tokens;
116+
}
117+
}
118+
}
119+
120+
/**
121+
* Processes an event
122+
* @param event - The event to process
123+
* @param state - The state of the streaming process
124+
* @param recordOutputs - Whether to record outputs
125+
* @param span - The span to update
126+
*/
127+
128+
function processEvent(
129+
event: AnthropicAiStreamingEvent,
130+
state: StreamingState,
131+
recordOutputs: boolean,
132+
span: Span,
133+
): void {
134+
if (!(event && typeof event === 'object')) {
135+
state.eventTypes.push('unknown:non-object');
136+
return;
137+
}
138+
139+
const isError = isErrorEvent(event, state, recordOutputs, span);
140+
if (isError) return;
141+
142+
handleMessageMetadata(event, state);
143+
}
144+
145+
/**
146+
* Instruments an async iterable stream of Anthropic events, updates the span with
147+
* streaming attributes and (optionally) the aggregated output text, and yields
148+
* each event from the input stream unchanged.
149+
*/
150+
export async function* instrumentStream(
151+
stream: AsyncIterable<AnthropicAiStreamingEvent>,
152+
span: Span,
153+
recordOutputs: boolean,
154+
): AsyncGenerator<AnthropicAiStreamingEvent, void, unknown> {
155+
const state: StreamingState = {
156+
eventTypes: [],
157+
responseTexts: [],
158+
finishReasons: [],
159+
responseId: '',
160+
responseModel: '',
161+
responseTimestamp: 0,
162+
promptTokens: undefined,
163+
completionTokens: undefined,
164+
cacheCreationInputTokens: undefined,
165+
cacheReadInputTokens: undefined,
166+
};
167+
168+
try {
169+
for await (const event of stream) {
170+
processEvent(event, state, recordOutputs, span);
171+
yield event;
172+
}
173+
} finally {
174+
// Set common response attributes if available
175+
if (state.responseId) {
176+
span.setAttributes({
177+
[GEN_AI_RESPONSE_ID_ATTRIBUTE]: state.responseId,
178+
});
179+
}
180+
if (state.responseModel) {
181+
span.setAttributes({
182+
[GEN_AI_RESPONSE_MODEL_ATTRIBUTE]: state.responseModel,
183+
});
184+
}
185+
if (state.responseTimestamp) {
186+
span.setAttributes({
187+
[ANTHROPIC_AI_RESPONSE_TIMESTAMP_ATTRIBUTE]: new Date(state.responseTimestamp * 1000).toISOString(),
188+
});
189+
}
190+
191+
setTokenUsageAttributes(
192+
span,
193+
state.promptTokens,
194+
state.completionTokens,
195+
state.cacheCreationInputTokens,
196+
state.cacheReadInputTokens,
197+
);
198+
199+
span.setAttributes({
200+
[GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]: true,
201+
});
202+
203+
if (state.finishReasons.length > 0) {
204+
span.setAttributes({
205+
[GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: JSON.stringify(state.finishReasons),
206+
});
207+
}
208+
209+
if (recordOutputs && state.responseTexts.length > 0) {
210+
span.setAttributes({
211+
[GEN_AI_RESPONSE_TEXT_ATTRIBUTE]: state.responseTexts.join(''),
212+
});
213+
}
214+
215+
span.end();
216+
}
217+
}

packages/core/src/utils/anthropic-ai/types.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,44 @@ export interface AnthropicAiIntegration {
6161
}
6262

6363
export type AnthropicAiInstrumentedMethod = (typeof ANTHROPIC_AI_INSTRUMENTED_METHODS)[number];
64+
65+
/**
66+
* Message type for Anthropic AI
67+
*/
68+
export type AnthropicAiMessage = {
69+
id: string;
70+
type: 'message';
71+
role: string;
72+
model: string;
73+
content: unknown[];
74+
stop_reason: string | null;
75+
stop_sequence: number | null;
76+
usage?: {
77+
input_tokens: number;
78+
cache_creation_input_tokens?: number;
79+
cache_read_input_tokens?: number;
80+
cache_creation?: unknown;
81+
output_tokens?: number; // Not final; do not treat as total. Use `message_delta.usage.output_tokens` for the final total.
82+
service_tier?: string;
83+
};
84+
};
85+
86+
/**
87+
* Streaming event type for Anthropic AI
88+
*/
89+
export type AnthropicAiStreamingEvent = {
90+
type: 'message_delta' | 'content_block_start' | 'content_block_delta' | 'content_block_stop' | 'error';
91+
error?: {
92+
type: string;
93+
message: string;
94+
};
95+
index?: number;
96+
delta?: {
97+
type: unknown;
98+
text?: string;
99+
};
100+
usage?: {
101+
output_tokens: number; // Final total output tokens; emitted on the last `message_delta` event
102+
};
103+
message?: AnthropicAiMessage;
104+
};

0 commit comments

Comments
 (0)