Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import { instrumentAnthropicAiClient } from '@sentry/core';
import * as Sentry from '@sentry/node';

function createMockStreamEvents(model = 'claude-3-haiku-20240307') {
async function* generator() {
// Provide message metadata early so the span can capture id/model/usage input tokens
yield {
type: 'content_block_start',
message: {
id: 'msg_stream_1',
type: 'message',
role: 'assistant',
model,
content: [],
stop_reason: 'end_turn',
stop_sequence: null,
usage: {
input_tokens: 10,
},
},
};

// Streamed text chunks
yield { type: 'content_block_delta', delta: { text: 'Hello ' } };
yield { type: 'content_block_delta', delta: { text: 'from ' } };
yield { type: 'content_block_delta', delta: { text: 'stream!' } };

// Final usage totals for output tokens
yield { type: 'message_delta', usage: { output_tokens: 15 } };
}

return generator();
}

class MockAnthropic {
constructor(config) {
this.apiKey = config.apiKey;

this.messages = {
create: this._messagesCreate.bind(this),
stream: this._messagesStream.bind(this),
};
}

async _messagesCreate(params) {
await new Promise(resolve => setTimeout(resolve, 5));
if (params?.stream === true) {
return createMockStreamEvents(params.model);
}
// Fallback non-streaming behavior (not used in this scenario)
return {
id: 'msg_mock123',
type: 'message',
model: params.model,
role: 'assistant',
content: [
{
type: 'text',
text: 'Hello from Anthropic mock!',
},
],
stop_reason: 'end_turn',
stop_sequence: null,
usage: {
input_tokens: 10,
output_tokens: 15,
},
};
}

async _messagesStream(params) {
await new Promise(resolve => setTimeout(resolve, 5));
return createMockStreamEvents(params?.model);
}
}

async function run() {
await Sentry.startSpan({ op: 'function', name: 'main' }, async () => {
const mockClient = new MockAnthropic({ apiKey: 'mock-api-key' });
const client = instrumentAnthropicAiClient(mockClient);

// 1) Streaming via stream: true param on messages.create
const stream1 = await client.messages.create({
model: 'claude-3-haiku-20240307',
messages: [{ role: 'user', content: 'Stream this please' }],
stream: true,
});
for await (const _ of stream1) {
void _;
}

// 2) Streaming via messages.stream API
const stream2 = await client.messages.stream({
model: 'claude-3-haiku-20240307',
messages: [{ role: 'user', content: 'Stream this too' }],
});
for await (const _ of stream2) {
void _;
}
});
}

run();


Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,79 @@ describe('Anthropic integration', () => {
.completed();
});
});

const EXPECTED_STREAM_SPANS_PII_FALSE = {
transaction: 'main',
spans: expect.arrayContaining([
// messages.create with stream: true
expect.objectContaining({
description: 'messages claude-3-haiku-20240307 stream-response',
op: 'gen_ai.messages',
data: expect.objectContaining({
'gen_ai.system': 'anthropic',
'gen_ai.operation.name': 'messages',
'gen_ai.request.model': 'claude-3-haiku-20240307',
'gen_ai.request.stream': true,
'gen_ai.response.streaming': true,
'gen_ai.response.model': 'claude-3-haiku-20240307',
'gen_ai.response.id': 'msg_stream_1',
'gen_ai.usage.input_tokens': 10,
'gen_ai.usage.output_tokens': 15,
'gen_ai.usage.total_tokens': 25,
'gen_ai.response.finish_reasons': '["end_turn"]',
}),
}),
// messages.stream
expect.objectContaining({
description: 'messages claude-3-haiku-20240307 stream-response',
op: 'gen_ai.messages',
data: expect.objectContaining({
'gen_ai.system': 'anthropic',
'gen_ai.operation.name': 'messages',
'gen_ai.request.model': 'claude-3-haiku-20240307',
'gen_ai.response.streaming': true,
'gen_ai.response.model': 'claude-3-haiku-20240307',
'gen_ai.response.id': 'msg_stream_1',
'gen_ai.usage.input_tokens': 10,
'gen_ai.usage.output_tokens': 15,
'gen_ai.usage.total_tokens': 25,
}),
}),
]),
};

const EXPECTED_STREAM_SPANS_PII_TRUE = {
transaction: 'main',
spans: expect.arrayContaining([
expect.objectContaining({
description: 'messages claude-3-haiku-20240307 stream-response',
op: 'gen_ai.messages',
data: expect.objectContaining({
'gen_ai.response.streaming': true,
// streamed text concatenated
'gen_ai.response.text': 'Hello from stream!',
}),
}),
expect.objectContaining({
description: 'messages claude-3-haiku-20240307 stream-response',
op: 'gen_ai.messages',
data: expect.objectContaining({
'gen_ai.response.streaming': true,
'gen_ai.response.text': 'Hello from stream!',
}),
}),
]),
};

createEsmAndCjsTests(__dirname, 'scenario-stream.mjs', 'instrument.mjs', (createRunner, test) => {
test('streams produce spans with token usage and metadata (PII false)', async () => {
await createRunner().ignore('event').expect({ transaction: EXPECTED_STREAM_SPANS_PII_FALSE }).start().completed();
});
});

createEsmAndCjsTests(__dirname, 'scenario-stream.mjs', 'instrument-with-pii.mjs', (createRunner, test) => {
test('streams record response text when PII true', async () => {
await createRunner().ignore('event').expect({ transaction: EXPECTED_STREAM_SPANS_PII_TRUE }).start().completed();
});
});
});
1 change: 1 addition & 0 deletions packages/core/src/utils/anthropic-ai/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export const ANTHROPIC_AI_INTEGRATION_NAME = 'Anthropic_AI';
// https://docs.anthropic.com/en/api/models-list
export const ANTHROPIC_AI_INSTRUMENTED_METHODS = [
'messages.create',
'messages.stream',
'messages.countTokens',
'models.get',
'completions.create',
Expand Down
48 changes: 46 additions & 2 deletions packages/core/src/utils/anthropic-ai/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { getCurrentScope } from '../../currentScopes';
import { captureException } from '../../exports';
import { SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN } from '../../semanticAttributes';
import { startSpan } from '../../tracing/trace';
import { SPAN_STATUS_ERROR } from '../../tracing';
import { startSpan, startSpanManual } from '../../tracing/trace';
import type { Span, SpanAttributeValue } from '../../types-hoist/span';
import {
ANTHROPIC_AI_RESPONSE_TIMESTAMP_ATTRIBUTE,
Expand All @@ -22,14 +23,17 @@ import {
} from '../ai/gen-ai-attributes';
import { buildMethodPath, getFinalOperationName, getSpanOperation, setTokenUsageAttributes } from '../ai/utils';
import { ANTHROPIC_AI_INTEGRATION_NAME } from './constants';
import { instrumentStream } from './streaming';
import type {
AnthropicAiClient,
AnthropicAiInstrumentedMethod,
AnthropicAiIntegration,
AnthropicAiOptions,
AnthropicAiResponse,
AnthropicAiStreamingEvent,
} from './types';
import { shouldInstrument } from './utils';

/**
* Extract request attributes from method arguments
*/
Expand Down Expand Up @@ -168,7 +172,47 @@ function instrumentMethod<T extends unknown[], R>(
const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown';
const operationName = getFinalOperationName(methodPath);

// TODO: Handle streaming responses
const params = typeof args[0] === 'object' ? (args[0] as Record<string, unknown>) : undefined;
const isStreamRequested = Boolean(params?.stream);
const isStreamingMethod = methodPath === 'messages.stream';

if (isStreamRequested || isStreamingMethod) {
return startSpanManual(
{
name: `${operationName} ${model} stream-response`,
op: getSpanOperation(methodPath),
attributes: requestAttributes as Record<string, SpanAttributeValue>,
},
async (span: Span) => {
try {
if (finalOptions.recordInputs && params) {
addPrivateRequestAttributes(span, params);
}

const result = await originalMethod.apply(context, args);
return instrumentStream(
result as AsyncIterable<AnthropicAiStreamingEvent>,
span,
finalOptions.recordOutputs ?? false,
) as unknown as R;
} catch (error) {
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
captureException(error, {
mechanism: {
handled: false,
type: 'auto.ai.anthropic',
data: {
function: methodPath,
},
},
});
span.end();
throw error;
}
},
);
}

return startSpan(
{
name: `${operationName} ${model}`,
Expand Down
Loading
Loading