diff --git a/dev-packages/node-integration-tests/suites/tracing/anthropic/scenario-stream.mjs b/dev-packages/node-integration-tests/suites/tracing/anthropic/scenario-stream.mjs new file mode 100644 index 000000000000..da70a2b12467 --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/anthropic/scenario-stream.mjs @@ -0,0 +1,105 @@ +import { instrumentAnthropicAiClient } from '@sentry/core'; +import * as Sentry from '@sentry/node'; + +function createMockStreamEvents(model = 'claude-3-haiku-20240307') { + async function* generator() { + // Provide message metadata early so the span can capture id/model/usage input tokens + yield { + type: 'content_block_start', + message: { + id: 'msg_stream_1', + type: 'message', + role: 'assistant', + model, + content: [], + stop_reason: 'end_turn', + stop_sequence: null, + usage: { + input_tokens: 10, + }, + }, + }; + + // Streamed text chunks + yield { type: 'content_block_delta', delta: { text: 'Hello ' } }; + yield { type: 'content_block_delta', delta: { text: 'from ' } }; + yield { type: 'content_block_delta', delta: { text: 'stream!' } }; + + // Final usage totals for output tokens + yield { type: 'message_delta', usage: { output_tokens: 15 } }; + } + + return generator(); +} + +class MockAnthropic { + constructor(config) { + this.apiKey = config.apiKey; + + this.messages = { + create: this._messagesCreate.bind(this), + stream: this._messagesStream.bind(this), + }; + } + + async _messagesCreate(params) { + await new Promise(resolve => setTimeout(resolve, 5)); + if (params?.stream === true) { + return createMockStreamEvents(params.model); + } + // Fallback non-streaming behavior (not used in this scenario) + return { + id: 'msg_mock123', + type: 'message', + model: params.model, + role: 'assistant', + content: [ + { + type: 'text', + text: 'Hello from Anthropic mock!', + }, + ], + stop_reason: 'end_turn', + stop_sequence: null, + usage: { + input_tokens: 10, + output_tokens: 15, + }, + }; + } + + async _messagesStream(params) { + await new Promise(resolve => setTimeout(resolve, 5)); + return createMockStreamEvents(params?.model); + } +} + +async function run() { + await Sentry.startSpan({ op: 'function', name: 'main' }, async () => { + const mockClient = new MockAnthropic({ apiKey: 'mock-api-key' }); + const client = instrumentAnthropicAiClient(mockClient); + + // 1) Streaming via stream: true param on messages.create + const stream1 = await client.messages.create({ + model: 'claude-3-haiku-20240307', + messages: [{ role: 'user', content: 'Stream this please' }], + stream: true, + }); + for await (const _ of stream1) { + void _; + } + + // 2) Streaming via messages.stream API + const stream2 = await client.messages.stream({ + model: 'claude-3-haiku-20240307', + messages: [{ role: 'user', content: 'Stream this too' }], + }); + for await (const _ of stream2) { + void _; + } + }); +} + +run(); + + diff --git a/dev-packages/node-integration-tests/suites/tracing/anthropic/test.ts b/dev-packages/node-integration-tests/suites/tracing/anthropic/test.ts index 4b7d19b7cc58..9b8c7219000d 100644 --- a/dev-packages/node-integration-tests/suites/tracing/anthropic/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/anthropic/test.ts @@ -218,4 +218,79 @@ describe('Anthropic integration', () => { .completed(); }); }); + + const EXPECTED_STREAM_SPANS_PII_FALSE = { + transaction: 'main', + spans: expect.arrayContaining([ + // messages.create with stream: true + expect.objectContaining({ + description: 'messages claude-3-haiku-20240307 stream-response', + op: 'gen_ai.messages', + data: expect.objectContaining({ + 'gen_ai.system': 'anthropic', + 'gen_ai.operation.name': 'messages', + 'gen_ai.request.model': 'claude-3-haiku-20240307', + 'gen_ai.request.stream': true, + 'gen_ai.response.streaming': true, + 'gen_ai.response.model': 'claude-3-haiku-20240307', + 'gen_ai.response.id': 'msg_stream_1', + 'gen_ai.usage.input_tokens': 10, + 'gen_ai.usage.output_tokens': 15, + 'gen_ai.usage.total_tokens': 25, + 'gen_ai.response.finish_reasons': '["end_turn"]', + }), + }), + // messages.stream + expect.objectContaining({ + description: 'messages claude-3-haiku-20240307 stream-response', + op: 'gen_ai.messages', + data: expect.objectContaining({ + 'gen_ai.system': 'anthropic', + 'gen_ai.operation.name': 'messages', + 'gen_ai.request.model': 'claude-3-haiku-20240307', + 'gen_ai.response.streaming': true, + 'gen_ai.response.model': 'claude-3-haiku-20240307', + 'gen_ai.response.id': 'msg_stream_1', + 'gen_ai.usage.input_tokens': 10, + 'gen_ai.usage.output_tokens': 15, + 'gen_ai.usage.total_tokens': 25, + }), + }), + ]), + }; + + const EXPECTED_STREAM_SPANS_PII_TRUE = { + transaction: 'main', + spans: expect.arrayContaining([ + expect.objectContaining({ + description: 'messages claude-3-haiku-20240307 stream-response', + op: 'gen_ai.messages', + data: expect.objectContaining({ + 'gen_ai.response.streaming': true, + // streamed text concatenated + 'gen_ai.response.text': 'Hello from stream!', + }), + }), + expect.objectContaining({ + description: 'messages claude-3-haiku-20240307 stream-response', + op: 'gen_ai.messages', + data: expect.objectContaining({ + 'gen_ai.response.streaming': true, + 'gen_ai.response.text': 'Hello from stream!', + }), + }), + ]), + }; + + createEsmAndCjsTests(__dirname, 'scenario-stream.mjs', 'instrument.mjs', (createRunner, test) => { + test('streams produce spans with token usage and metadata (PII false)', async () => { + await createRunner().ignore('event').expect({ transaction: EXPECTED_STREAM_SPANS_PII_FALSE }).start().completed(); + }); + }); + + createEsmAndCjsTests(__dirname, 'scenario-stream.mjs', 'instrument-with-pii.mjs', (createRunner, test) => { + test('streams record response text when PII true', async () => { + await createRunner().ignore('event').expect({ transaction: EXPECTED_STREAM_SPANS_PII_TRUE }).start().completed(); + }); + }); }); diff --git a/packages/core/src/utils/anthropic-ai/constants.ts b/packages/core/src/utils/anthropic-ai/constants.ts index 41a227f171e0..1e20745e0f1f 100644 --- a/packages/core/src/utils/anthropic-ai/constants.ts +++ b/packages/core/src/utils/anthropic-ai/constants.ts @@ -4,6 +4,7 @@ export const ANTHROPIC_AI_INTEGRATION_NAME = 'Anthropic_AI'; // https://docs.anthropic.com/en/api/models-list export const ANTHROPIC_AI_INSTRUMENTED_METHODS = [ 'messages.create', + 'messages.stream', 'messages.countTokens', 'models.get', 'completions.create', diff --git a/packages/core/src/utils/anthropic-ai/index.ts b/packages/core/src/utils/anthropic-ai/index.ts index 8d56b2a56c04..2ed95be76843 100644 --- a/packages/core/src/utils/anthropic-ai/index.ts +++ b/packages/core/src/utils/anthropic-ai/index.ts @@ -1,7 +1,8 @@ import { getCurrentScope } from '../../currentScopes'; import { captureException } from '../../exports'; import { SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN } from '../../semanticAttributes'; -import { startSpan } from '../../tracing/trace'; +import { SPAN_STATUS_ERROR } from '../../tracing'; +import { startSpan, startSpanManual } from '../../tracing/trace'; import type { Span, SpanAttributeValue } from '../../types-hoist/span'; import { ANTHROPIC_AI_RESPONSE_TIMESTAMP_ATTRIBUTE, @@ -22,14 +23,17 @@ import { } from '../ai/gen-ai-attributes'; import { buildMethodPath, getFinalOperationName, getSpanOperation, setTokenUsageAttributes } from '../ai/utils'; import { ANTHROPIC_AI_INTEGRATION_NAME } from './constants'; +import { instrumentStream } from './streaming'; import type { AnthropicAiClient, AnthropicAiInstrumentedMethod, AnthropicAiIntegration, AnthropicAiOptions, AnthropicAiResponse, + AnthropicAiStreamingEvent, } from './types'; import { shouldInstrument } from './utils'; + /** * Extract request attributes from method arguments */ @@ -168,7 +172,47 @@ function instrumentMethod( const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown'; const operationName = getFinalOperationName(methodPath); - // TODO: Handle streaming responses + const params = typeof args[0] === 'object' ? (args[0] as Record) : undefined; + const isStreamRequested = Boolean(params?.stream); + const isStreamingMethod = methodPath === 'messages.stream'; + + if (isStreamRequested || isStreamingMethod) { + return startSpanManual( + { + name: `${operationName} ${model} stream-response`, + op: getSpanOperation(methodPath), + attributes: requestAttributes as Record, + }, + async (span: Span) => { + try { + if (finalOptions.recordInputs && params) { + addPrivateRequestAttributes(span, params); + } + + const result = await originalMethod.apply(context, args); + return instrumentStream( + result as AsyncIterable, + span, + finalOptions.recordOutputs ?? false, + ) as unknown as R; + } catch (error) { + span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' }); + captureException(error, { + mechanism: { + handled: false, + type: 'auto.ai.anthropic', + data: { + function: methodPath, + }, + }, + }); + span.end(); + throw error; + } + }, + ); + } + return startSpan( { name: `${operationName} ${model}`, diff --git a/packages/core/src/utils/anthropic-ai/streaming.ts b/packages/core/src/utils/anthropic-ai/streaming.ts new file mode 100644 index 000000000000..8ebbfc0b42cd --- /dev/null +++ b/packages/core/src/utils/anthropic-ai/streaming.ts @@ -0,0 +1,202 @@ +import { captureException } from '../../exports'; +import { SPAN_STATUS_ERROR } from '../../tracing'; +import type { Span } from '../../types-hoist/span'; +import { + GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE, + GEN_AI_RESPONSE_ID_ATTRIBUTE, + GEN_AI_RESPONSE_MODEL_ATTRIBUTE, + GEN_AI_RESPONSE_STREAMING_ATTRIBUTE, + GEN_AI_RESPONSE_TEXT_ATTRIBUTE, +} from '../ai/gen-ai-attributes'; +import { setTokenUsageAttributes } from '../ai/utils'; +import type { AnthropicAiStreamingEvent } from './types'; + +/** + * State object used to accumulate information from a stream of Anthropic AI events. + */ + +interface StreamingState { + /** Collected response text fragments (for output recording). */ + responseTexts: string[]; + /** Reasons for finishing the response, as reported by the API. */ + finishReasons: string[]; + /** The response ID. */ + responseId: string; + /** The model name. */ + responseModel: string; + /** Number of prompt/input tokens used. */ + promptTokens: number | undefined; + /** Number of completion/output tokens used. */ + completionTokens: number | undefined; + /** Number of cache creation input tokens used. */ + cacheCreationInputTokens: number | undefined; + /** Number of cache read input tokens used. */ + cacheReadInputTokens: number | undefined; +} + +/** + * Checks if an event is an error event + * @param event - The event to process + * @param state - The state of the streaming process + * @param recordOutputs - Whether to record outputs + * @param span - The span to update + * @returns Whether an error occurred + */ + +function isErrorEvent( + event: AnthropicAiStreamingEvent, + state: StreamingState, + recordOutputs: boolean, + span: Span, +): boolean { + if ('type' in event && typeof event.type === 'string') { + // If the event is an error, set the span status and capture the error + // These error events are not rejected by the API by default, but are sent as metadata of the response + if (event.type === 'error') { + const message = event.error?.message ?? 'internal_error'; + span.setStatus({ code: SPAN_STATUS_ERROR, message }); + captureException(new Error(`anthropic_stream_error: ${message}`), { + mechanism: { + handled: false, + type: 'auto.ai.anthropic', + data: { + function: 'anthropic_stream_error', + }, + }, + data: { + function: 'anthropic_stream_error', + }, + }); + return true; + } + + if (recordOutputs && event.type === 'content_block_delta') { + const text = event.delta?.text; + if (text) state.responseTexts.push(text); + } + } + return false; +} + +/** + * Processes the message metadata of an event + * @param event - The event to process + * @param state - The state of the streaming process + */ + +function handleMessageMetadata(event: AnthropicAiStreamingEvent, state: StreamingState): void { + // The token counts shown in the usage field of the message_delta event are cumulative. + // @see https://docs.anthropic.com/en/docs/build-with-claude/streaming#event-types + if (event.type === 'message_delta' && event.usage) { + if ('output_tokens' in event.usage && typeof event.usage.output_tokens === 'number') { + state.completionTokens = event.usage.output_tokens; + } + } + + if (event.message) { + const message = event.message; + + if (message.id) state.responseId = message.id; + if (message.model) state.responseModel = message.model; + if (message.stop_reason) state.finishReasons.push(message.stop_reason); + + if (message.usage) { + if (typeof message.usage.input_tokens === 'number') state.promptTokens = message.usage.input_tokens; + if (typeof message.usage.cache_creation_input_tokens === 'number') + state.cacheCreationInputTokens = message.usage.cache_creation_input_tokens; + if (typeof message.usage.cache_read_input_tokens === 'number') + state.cacheReadInputTokens = message.usage.cache_read_input_tokens; + } + } +} + +/** + * Processes an event + * @param event - The event to process + * @param state - The state of the streaming process + * @param recordOutputs - Whether to record outputs + * @param span - The span to update + */ + +function processEvent( + event: AnthropicAiStreamingEvent, + state: StreamingState, + recordOutputs: boolean, + span: Span, +): void { + if (!(event && typeof event === 'object')) { + return; + } + + const isError = isErrorEvent(event, state, recordOutputs, span); + if (isError) return; + + handleMessageMetadata(event, state); +} + +/** + * Instruments an async iterable stream of Anthropic events, updates the span with + * streaming attributes and (optionally) the aggregated output text, and yields + * each event from the input stream unchanged. + */ +export async function* instrumentStream( + stream: AsyncIterable, + span: Span, + recordOutputs: boolean, +): AsyncGenerator { + const state: StreamingState = { + responseTexts: [], + finishReasons: [], + responseId: '', + responseModel: '', + promptTokens: undefined, + completionTokens: undefined, + cacheCreationInputTokens: undefined, + cacheReadInputTokens: undefined, + }; + + try { + for await (const event of stream) { + processEvent(event, state, recordOutputs, span); + yield event; + } + } finally { + // Set common response attributes if available + if (state.responseId) { + span.setAttributes({ + [GEN_AI_RESPONSE_ID_ATTRIBUTE]: state.responseId, + }); + } + if (state.responseModel) { + span.setAttributes({ + [GEN_AI_RESPONSE_MODEL_ATTRIBUTE]: state.responseModel, + }); + } + + setTokenUsageAttributes( + span, + state.promptTokens, + state.completionTokens, + state.cacheCreationInputTokens, + state.cacheReadInputTokens, + ); + + span.setAttributes({ + [GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]: true, + }); + + if (state.finishReasons.length > 0) { + span.setAttributes({ + [GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: JSON.stringify(state.finishReasons), + }); + } + + if (recordOutputs && state.responseTexts.length > 0) { + span.setAttributes({ + [GEN_AI_RESPONSE_TEXT_ATTRIBUTE]: state.responseTexts.join(''), + }); + } + + span.end(); + } +} diff --git a/packages/core/src/utils/anthropic-ai/types.ts b/packages/core/src/utils/anthropic-ai/types.ts index 566e9588d56f..fd533b6795bc 100644 --- a/packages/core/src/utils/anthropic-ai/types.ts +++ b/packages/core/src/utils/anthropic-ai/types.ts @@ -61,3 +61,44 @@ export interface AnthropicAiIntegration { } export type AnthropicAiInstrumentedMethod = (typeof ANTHROPIC_AI_INSTRUMENTED_METHODS)[number]; + +/** + * Message type for Anthropic AI + */ +export type AnthropicAiMessage = { + id: string; + type: 'message'; + role: string; + model: string; + content: unknown[]; + stop_reason: string | null; + stop_sequence: number | null; + usage?: { + input_tokens: number; + cache_creation_input_tokens?: number; + cache_read_input_tokens?: number; + cache_creation?: unknown; + output_tokens?: number; // Not final; do not treat as total. Use `message_delta.usage.output_tokens` for the final total. + service_tier?: string; + }; +}; + +/** + * Streaming event type for Anthropic AI + */ +export type AnthropicAiStreamingEvent = { + type: 'message_delta' | 'content_block_start' | 'content_block_delta' | 'content_block_stop' | 'error'; + error?: { + type: string; + message: string; + }; + index?: number; + delta?: { + type: unknown; + text?: string; + }; + usage?: { + output_tokens: number; // Final total output tokens; emitted on the last `message_delta` event + }; + message?: AnthropicAiMessage; +};