diff --git a/dev-packages/node-integration-tests/suites/tracing/anthropic/scenario.mjs b/dev-packages/node-integration-tests/suites/tracing/anthropic/scenario.mjs index 577c63dc3d08..7e8e2ecd52f8 100644 --- a/dev-packages/node-integration-tests/suites/tracing/anthropic/scenario.mjs +++ b/dev-packages/node-integration-tests/suites/tracing/anthropic/scenario.mjs @@ -29,6 +29,55 @@ function startMockAnthropicServer() { return; } + // Check if streaming is requested + if (req.body.stream === true) { + res.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }); + + // Send streaming events + const events = [ + { + type: 'message_start', + message: { + id: 'msg_stream123', + type: 'message', + role: 'assistant', + model, + content: [], + usage: { input_tokens: 10 }, + }, + }, + { type: 'content_block_start', index: 0, content_block: { type: 'text', text: '' } }, + { type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: 'Hello ' } }, + { type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: 'from ' } }, + { type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: 'stream!' } }, + { type: 'content_block_stop', index: 0 }, + { + type: 'message_delta', + delta: { stop_reason: 'end_turn', stop_sequence: null }, + usage: { output_tokens: 15 }, + }, + { type: 'message_stop' }, + ]; + + events.forEach((event, index) => { + setTimeout(() => { + res.write(`event: ${event.type}\n`); + res.write(`data: ${JSON.stringify(event)}\n\n`); + + if (index === events.length - 1) { + res.end(); + } + }, index * 10); // Small delay between events + }); + + return; + } + + // Non-streaming response res.send({ id: 'msg_mock123', type: 'message', @@ -92,8 +141,32 @@ async function run() { // Fourth test: models.retrieve await client.models.retrieve('claude-3-haiku-20240307'); + + // Fifth test: streaming via messages.create + const stream = await client.messages.create({ + model: 'claude-3-haiku-20240307', + messages: [{ role: 'user', content: 'What is the capital of France?' }], + stream: true, + }); + + for await (const _ of stream) { + void _; + } + + // Sixth test: streaming via messages.stream + await client.messages + .stream({ + model: 'claude-3-haiku-20240307', + messages: [{ role: 'user', content: 'What is the capital of France?' }], + }) + .on('streamEvent', () => { + Sentry.captureMessage('stream event from user-added event listener captured'); + }); }); + // Wait for the stream event handler to finish + await Sentry.flush(2000); + server.close(); } diff --git a/dev-packages/node-integration-tests/suites/tracing/anthropic/test.ts b/dev-packages/node-integration-tests/suites/tracing/anthropic/test.ts index 9c14f698bc18..c05db16fc251 100644 --- a/dev-packages/node-integration-tests/suites/tracing/anthropic/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/anthropic/test.ts @@ -152,6 +152,30 @@ describe('Anthropic integration', () => { origin: 'auto.ai.anthropic', status: 'ok', }), + // Fifth span - messages.create with stream: true + expect.objectContaining({ + data: expect.objectContaining({ + 'gen_ai.operation.name': 'messages', + 'gen_ai.request.model': 'claude-3-haiku-20240307', + 'gen_ai.request.stream': true, + }), + description: 'messages claude-3-haiku-20240307 stream-response', + op: 'gen_ai.messages', + origin: 'auto.ai.anthropic', + status: 'ok', + }), + // Sixth span - messages.stream + expect.objectContaining({ + data: expect.objectContaining({ + 'gen_ai.operation.name': 'messages', + 'gen_ai.request.model': 'claude-3-haiku-20240307', + 'gen_ai.request.stream': true, + }), + description: 'messages claude-3-haiku-20240307 stream-response', + op: 'gen_ai.messages', + origin: 'auto.ai.anthropic', + status: 'ok', + }), ]), }; @@ -189,6 +213,21 @@ describe('Anthropic integration', () => { ]), }; + const EXPECTED_MODEL_ERROR = { + exception: { + values: [ + { + type: 'Error', + value: '404 Model not found', + }, + ], + }, + }; + + const EXPECTED_STREAM_EVENT_HANDLER_MESSAGE = { + message: 'stream event from user-added event listener captured', + }; + createEsmAndCjsTests(__dirname, 'scenario-manual-client.mjs', 'instrument.mjs', (createRunner, test) => { test('creates anthropic related spans when manually insturmenting client', async () => { await createRunner() @@ -202,8 +241,9 @@ describe('Anthropic integration', () => { createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument.mjs', (createRunner, test) => { test('creates anthropic related spans with sendDefaultPii: false', async () => { await createRunner() - .ignore('event') + .expect({ event: EXPECTED_MODEL_ERROR }) .expect({ transaction: EXPECTED_TRANSACTION_DEFAULT_PII_FALSE }) + .expect({ event: EXPECTED_STREAM_EVENT_HANDLER_MESSAGE }) .start() .completed(); }); @@ -212,8 +252,9 @@ describe('Anthropic integration', () => { createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument-with-pii.mjs', (createRunner, test) => { test('creates anthropic related spans with sendDefaultPii: true', async () => { await createRunner() - .ignore('event') + .expect({ event: EXPECTED_MODEL_ERROR }) .expect({ transaction: EXPECTED_TRANSACTION_DEFAULT_PII_TRUE }) + .expect({ event: EXPECTED_STREAM_EVENT_HANDLER_MESSAGE }) .start() .completed(); }); @@ -222,8 +263,9 @@ describe('Anthropic integration', () => { createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument-with-options.mjs', (createRunner, test) => { test('creates anthropic related spans with custom options', async () => { await createRunner() - .ignore('event') + .expect({ event: EXPECTED_MODEL_ERROR }) .expect({ transaction: EXPECTED_TRANSACTION_WITH_OPTIONS }) + .expect({ event: EXPECTED_STREAM_EVENT_HANDLER_MESSAGE }) .start() .completed(); }); diff --git a/packages/core/src/utils/anthropic-ai/index.ts b/packages/core/src/utils/anthropic-ai/index.ts index 8cc44c188671..8e77dd76b34e 100644 --- a/packages/core/src/utils/anthropic-ai/index.ts +++ b/packages/core/src/utils/anthropic-ai/index.ts @@ -25,7 +25,7 @@ import { } from '../ai/gen-ai-attributes'; import { buildMethodPath, getFinalOperationName, getSpanOperation, setTokenUsageAttributes } from '../ai/utils'; import { handleCallbackErrors } from '../handleCallbackErrors'; -import { instrumentStream } from './streaming'; +import { instrumentAsyncIterableStream, instrumentMessageStream } from './streaming'; import type { AnthropicAiInstrumentedMethod, AnthropicAiOptions, @@ -194,6 +194,74 @@ function addResponseAttributes(span: Span, response: AnthropicAiResponse, record addMetadataAttributes(span, response); } +/** + * Handle common error catching and reporting for streaming requests + */ +function handleStreamingError(error: unknown, span: Span, methodPath: string): never { + captureException(error, { + mechanism: { handled: false, type: 'auto.ai.anthropic', data: { function: methodPath } }, + }); + + if (span.isRecording()) { + span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' }); + span.end(); + } + throw error; +} + +/** + * Handle streaming cases with common logic + */ +function handleStreamingRequest( + originalMethod: (...args: T) => Promise, + target: (...args: T) => Promise, + context: unknown, + args: T, + requestAttributes: Record, + operationName: string, + methodPath: string, + params: Record | undefined, + options: AnthropicAiOptions, + isStreamRequested: boolean, +): Promise { + const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown'; + const spanConfig = { + name: `${operationName} ${model} stream-response`, + op: getSpanOperation(methodPath), + attributes: requestAttributes as Record, + }; + + if (isStreamRequested) { + return startSpanManual(spanConfig, async span => { + try { + if (options.recordInputs && params) { + addPrivateRequestAttributes(span, params); + } + const result = await originalMethod.apply(context, args); + return instrumentAsyncIterableStream( + result as AsyncIterable, + span, + options.recordOutputs ?? false, + ) as unknown as R; + } catch (error) { + return handleStreamingError(error, span, methodPath); + } + }); + } else { + return startSpanManual(spanConfig, span => { + try { + if (options.recordInputs && params) { + addPrivateRequestAttributes(span, params); + } + const messageStream = target.apply(context, args); + return instrumentMessageStream(messageStream, span, options.recordOutputs ?? false); + } catch (error) { + return handleStreamingError(error, span, methodPath); + } + }); + } +} + /** * Instrument a method with Sentry spans * Following Sentry AI Agents Manual Instrumentation conventions @@ -205,82 +273,62 @@ function instrumentMethod( context: unknown, options: AnthropicAiOptions, ): (...args: T) => Promise { - return async function instrumentedMethod(...args: T): Promise { - const requestAttributes = extractRequestAttributes(args, methodPath); - const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown'; - const operationName = getFinalOperationName(methodPath); + return new Proxy(originalMethod, { + apply(target, thisArg, args: T): Promise { + const requestAttributes = extractRequestAttributes(args, methodPath); + const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown'; + const operationName = getFinalOperationName(methodPath); - const params = typeof args[0] === 'object' ? (args[0] as Record) : undefined; - const isStreamRequested = Boolean(params?.stream); - const isStreamingMethod = methodPath === 'messages.stream'; + const params = typeof args[0] === 'object' ? (args[0] as Record) : undefined; + const isStreamRequested = Boolean(params?.stream); + const isStreamingMethod = methodPath === 'messages.stream'; - if (isStreamRequested || isStreamingMethod) { - return startSpanManual( + if (isStreamRequested || isStreamingMethod) { + return handleStreamingRequest( + originalMethod, + target, + context, + args, + requestAttributes, + operationName, + methodPath, + params, + options, + isStreamRequested, + ); + } + + return startSpan( { - name: `${operationName} ${model} stream-response`, + name: `${operationName} ${model}`, op: getSpanOperation(methodPath), attributes: requestAttributes as Record, }, - async span => { - try { - if (options.recordInputs && params) { - addPrivateRequestAttributes(span, params); - } - - const result = await originalMethod.apply(context, args); - return instrumentStream( - result as AsyncIterable, - span, - options.recordOutputs ?? false, - ) as unknown as R; - } catch (error) { - span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' }); - captureException(error, { - mechanism: { - handled: false, - type: 'auto.ai.anthropic', - data: { - function: methodPath, - }, - }, - }); - span.end(); - throw error; + span => { + if (options.recordInputs && params) { + addPrivateRequestAttributes(span, params); } - }, - ); - } - return startSpan( - { - name: `${operationName} ${model}`, - op: getSpanOperation(methodPath), - attributes: requestAttributes as Record, - }, - span => { - if (options.recordInputs && params) { - addPrivateRequestAttributes(span, params); - } - - return handleCallbackErrors( - () => originalMethod.apply(context, args), - error => { - captureException(error, { - mechanism: { - handled: false, - type: 'auto.ai.anthropic', - data: { - function: methodPath, + return handleCallbackErrors( + () => target.apply(context, args), + error => { + captureException(error, { + mechanism: { + handled: false, + type: 'auto.ai.anthropic', + data: { + function: methodPath, + }, }, - }, - }); - }, - () => {}, - result => addResponseAttributes(span, result as AnthropicAiResponse, options.recordOutputs), - ); - }, - ); - }; + }); + }, + () => {}, + result => addResponseAttributes(span, result as AnthropicAiResponse, options.recordOutputs), + ); + }, + ); + }, + }) as (...args: T) => Promise; } /** diff --git a/packages/core/src/utils/anthropic-ai/streaming.ts b/packages/core/src/utils/anthropic-ai/streaming.ts index cd30d99ad09e..b542cbfda75a 100644 --- a/packages/core/src/utils/anthropic-ai/streaming.ts +++ b/packages/core/src/utils/anthropic-ai/streaming.ts @@ -15,7 +15,6 @@ import type { AnthropicAiStreamingEvent } from './types'; /** * State object used to accumulate information from a stream of Anthropic AI events. */ - interface StreamingState { /** Collected response text fragments (for output recording). */ responseTexts: string[]; @@ -183,7 +182,6 @@ function handleContentBlockStop(event: AnthropicAiStreamingEvent, state: Streami * @param recordOutputs - Whether to record outputs * @param span - The span to update */ - function processEvent( event: AnthropicAiStreamingEvent, state: StreamingState, @@ -209,12 +207,66 @@ function processEvent( handleContentBlockStop(event, state); } +/** + * Finalizes span attributes when stream processing completes + */ +function finalizeStreamSpan(state: StreamingState, span: Span, recordOutputs: boolean): void { + if (!span.isRecording()) { + return; + } + + // Set common response attributes if available + if (state.responseId) { + span.setAttributes({ + [GEN_AI_RESPONSE_ID_ATTRIBUTE]: state.responseId, + }); + } + if (state.responseModel) { + span.setAttributes({ + [GEN_AI_RESPONSE_MODEL_ATTRIBUTE]: state.responseModel, + }); + } + + setTokenUsageAttributes( + span, + state.promptTokens, + state.completionTokens, + state.cacheCreationInputTokens, + state.cacheReadInputTokens, + ); + + span.setAttributes({ + [GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]: true, + }); + + if (state.finishReasons.length > 0) { + span.setAttributes({ + [GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: JSON.stringify(state.finishReasons), + }); + } + + if (recordOutputs && state.responseTexts.length > 0) { + span.setAttributes({ + [GEN_AI_RESPONSE_TEXT_ATTRIBUTE]: state.responseTexts.join(''), + }); + } + + // Set tool calls if any were captured + if (recordOutputs && state.toolCalls.length > 0) { + span.setAttributes({ + [GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE]: JSON.stringify(state.toolCalls), + }); + } + + span.end(); +} + /** * Instruments an async iterable stream of Anthropic events, updates the span with * streaming attributes and (optionally) the aggregated output text, and yields * each event from the input stream unchanged. */ -export async function* instrumentStream( +export async function* instrumentAsyncIterableStream( stream: AsyncIterable, span: Span, recordOutputs: boolean, @@ -284,3 +336,51 @@ export async function* instrumentStream( span.end(); } } + +/** + * Instruments a MessageStream by registering event handlers and preserving the original stream API. + */ +export function instrumentMessageStream void }>( + stream: R, + span: Span, + recordOutputs: boolean, +): R { + const state: StreamingState = { + responseTexts: [], + finishReasons: [], + responseId: '', + responseModel: '', + promptTokens: undefined, + completionTokens: undefined, + cacheCreationInputTokens: undefined, + cacheReadInputTokens: undefined, + toolCalls: [], + activeToolBlocks: {}, + }; + + stream.on('streamEvent', (event: unknown) => { + processEvent(event as AnthropicAiStreamingEvent, state, recordOutputs, span); + }); + + // The event fired when a message is done being streamed by the API. Corresponds to the message_stop SSE event. + // @see https://github.com/anthropics/anthropic-sdk-typescript/blob/d3be31f5a4e6ebb4c0a2f65dbb8f381ae73a9166/helpers.md?plain=1#L42-L44 + stream.on('message', () => { + finalizeStreamSpan(state, span, recordOutputs); + }); + + stream.on('error', (error: unknown) => { + captureException(error, { + mechanism: { + handled: false, + type: 'auto.ai.anthropic.stream_error', + }, + }); + + if (span.isRecording()) { + span.setStatus({ code: SPAN_STATUS_ERROR, message: 'stream_error' }); + span.end(); + } + }); + + return stream; +}