Skip to content

Commit 945a7dc

Browse files
committed
fix(core): Prevent instrumentAnthropicAiClient breaking MessageStream api
1 parent 1e6fde6 commit 945a7dc

File tree

2 files changed

+141
-130
lines changed

2 files changed

+141
-130
lines changed

packages/core/src/utils/anthropic-ai/index.ts

Lines changed: 67 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,7 @@ import {
2626
import { buildMethodPath, getFinalOperationName, getSpanOperation, setTokenUsageAttributes } from '../ai/utils';
2727
import { handleCallbackErrors } from '../handleCallbackErrors';
2828
import { instrumentStream } from './streaming';
29-
import type {
30-
AnthropicAiInstrumentedMethod,
31-
AnthropicAiOptions,
32-
AnthropicAiResponse,
33-
AnthropicAiStreamingEvent,
34-
ContentBlock,
35-
} from './types';
29+
import type { AnthropicAiInstrumentedMethod, AnthropicAiOptions, AnthropicAiResponse, ContentBlock } from './types';
3630
import { shouldInstrument } from './utils';
3731

3832
/**
@@ -205,82 +199,82 @@ function instrumentMethod<T extends unknown[], R>(
205199
context: unknown,
206200
options: AnthropicAiOptions,
207201
): (...args: T) => Promise<R> {
208-
return async function instrumentedMethod(...args: T): Promise<R> {
209-
const requestAttributes = extractRequestAttributes(args, methodPath);
210-
const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown';
211-
const operationName = getFinalOperationName(methodPath);
202+
return new Proxy(originalMethod, {
203+
apply(target, thisArg, args: T): Promise<R> {
204+
const requestAttributes = extractRequestAttributes(args, methodPath);
205+
const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown';
206+
const operationName = getFinalOperationName(methodPath);
212207

213-
const params = typeof args[0] === 'object' ? (args[0] as Record<string, unknown>) : undefined;
214-
const isStreamRequested = Boolean(params?.stream);
215-
const isStreamingMethod = methodPath === 'messages.stream';
208+
const params = typeof args[0] === 'object' ? (args[0] as Record<string, unknown>) : undefined;
209+
const isStreamRequested = Boolean(params?.stream);
210+
const isStreamingMethod = methodPath === 'messages.stream';
216211

217-
if (isStreamRequested || isStreamingMethod) {
218-
return startSpanManual(
212+
if (isStreamRequested || isStreamingMethod) {
213+
const messageStream = target.apply(context, args);
214+
215+
// Create span for instrumentation using startSpanManual
216+
return startSpanManual(
217+
{
218+
name: `${operationName} ${model} stream-response`,
219+
op: getSpanOperation(methodPath),
220+
attributes: requestAttributes as Record<string, SpanAttributeValue>,
221+
},
222+
span => {
223+
try {
224+
if (options.recordInputs && params) {
225+
addPrivateRequestAttributes(span, params);
226+
}
227+
228+
return instrumentStream(messageStream, span, options.recordOutputs ?? false);
229+
} catch (error) {
230+
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
231+
captureException(error, {
232+
mechanism: {
233+
handled: false,
234+
type: 'auto.ai.anthropic',
235+
data: {
236+
function: methodPath,
237+
},
238+
},
239+
});
240+
span.end();
241+
throw error;
242+
}
243+
},
244+
);
245+
}
246+
247+
return startSpan(
219248
{
220-
name: `${operationName} ${model} stream-response`,
249+
name: `${operationName} ${model}`,
221250
op: getSpanOperation(methodPath),
222251
attributes: requestAttributes as Record<string, SpanAttributeValue>,
223252
},
224-
async span => {
225-
try {
226-
if (options.recordInputs && params) {
227-
addPrivateRequestAttributes(span, params);
228-
}
253+
span => {
254+
if (options.recordInputs && params) {
255+
addPrivateRequestAttributes(span, params);
256+
}
229257

230-
const result = await originalMethod.apply(context, args);
231-
return instrumentStream(
232-
result as AsyncIterable<AnthropicAiStreamingEvent>,
233-
span,
234-
options.recordOutputs ?? false,
235-
) as unknown as R;
236-
} catch (error) {
237-
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' });
238-
captureException(error, {
239-
mechanism: {
240-
handled: false,
241-
type: 'auto.ai.anthropic',
242-
data: {
243-
function: methodPath,
258+
return handleCallbackErrors(
259+
() => target.apply(context, args),
260+
error => {
261+
captureException(error, {
262+
mechanism: {
263+
handled: false,
264+
type: 'auto.ai.anthropic',
265+
data: {
266+
function: methodPath,
267+
},
244268
},
245-
},
246-
});
247-
span.end();
248-
throw error;
249-
}
269+
});
270+
},
271+
() => {},
272+
result => addResponseAttributes(span, result as AnthropicAiResponse, options.recordOutputs),
273+
);
250274
},
251275
);
252-
}
253-
254-
return startSpan(
255-
{
256-
name: `${operationName} ${model}`,
257-
op: getSpanOperation(methodPath),
258-
attributes: requestAttributes as Record<string, SpanAttributeValue>,
259-
},
260-
span => {
261-
if (options.recordInputs && params) {
262-
addPrivateRequestAttributes(span, params);
263-
}
264-
265-
return handleCallbackErrors(
266-
() => originalMethod.apply(context, args),
267-
error => {
268-
captureException(error, {
269-
mechanism: {
270-
handled: false,
271-
type: 'auto.ai.anthropic',
272-
data: {
273-
function: methodPath,
274-
},
275-
},
276-
});
277-
},
278-
() => {},
279-
result => addResponseAttributes(span, result as AnthropicAiResponse, options.recordOutputs),
280-
);
281-
},
282-
);
283-
};
276+
},
277+
}) as (...args: T) => Promise<R>;
284278
}
285279

286280
/**

packages/core/src/utils/anthropic-ai/streaming.ts

Lines changed: 74 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@ import type { AnthropicAiStreamingEvent } from './types';
1515
/**
1616
* State object used to accumulate information from a stream of Anthropic AI events.
1717
*/
18-
19-
interface StreamingState {
18+
export interface StreamingState {
2019
/** Collected response text fragments (for output recording). */
2120
responseTexts: string[];
2221
/** Reasons for finishing the response, as reported by the API. */
@@ -183,8 +182,7 @@ function handleContentBlockStop(event: AnthropicAiStreamingEvent, state: Streami
183182
* @param recordOutputs - Whether to record outputs
184183
* @param span - The span to update
185184
*/
186-
187-
function processEvent(
185+
export function processEvent(
188186
event: AnthropicAiStreamingEvent,
189187
state: StreamingState,
190188
recordOutputs: boolean,
@@ -210,15 +208,63 @@ function processEvent(
210208
}
211209

212210
/**
213-
* Instruments an async iterable stream of Anthropic events, updates the span with
214-
* streaming attributes and (optionally) the aggregated output text, and yields
215-
* each event from the input stream unchanged.
211+
* Finalizes span attributes when stream processing completes
212+
*/
213+
export function finalizeStreamSpan(state: StreamingState, span: Span, recordOutputs: boolean): void {
214+
// Set common response attributes if available
215+
if (state.responseId) {
216+
span.setAttributes({
217+
[GEN_AI_RESPONSE_ID_ATTRIBUTE]: state.responseId,
218+
});
219+
}
220+
if (state.responseModel) {
221+
span.setAttributes({
222+
[GEN_AI_RESPONSE_MODEL_ATTRIBUTE]: state.responseModel,
223+
});
224+
}
225+
226+
setTokenUsageAttributes(
227+
span,
228+
state.promptTokens,
229+
state.completionTokens,
230+
state.cacheCreationInputTokens,
231+
state.cacheReadInputTokens,
232+
);
233+
234+
span.setAttributes({
235+
[GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]: true,
236+
});
237+
238+
if (state.finishReasons.length > 0) {
239+
span.setAttributes({
240+
[GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: JSON.stringify(state.finishReasons),
241+
});
242+
}
243+
244+
if (recordOutputs && state.responseTexts.length > 0) {
245+
span.setAttributes({
246+
[GEN_AI_RESPONSE_TEXT_ATTRIBUTE]: state.responseTexts.join(''),
247+
});
248+
}
249+
250+
// Set tool calls if any were captured
251+
if (recordOutputs && state.toolCalls.length > 0) {
252+
span.setAttributes({
253+
[GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE]: JSON.stringify(state.toolCalls),
254+
});
255+
}
256+
257+
span.end();
258+
}
259+
260+
/**
261+
* Instruments a MessageStream by registering event handlers and preserving the original stream API.
216262
*/
217-
export async function* instrumentStream(
218-
stream: AsyncIterable<AnthropicAiStreamingEvent>,
263+
export function instrumentStream<R extends { on: (...args: unknown[]) => void }>(
264+
stream: R,
219265
span: Span,
220266
recordOutputs: boolean,
221-
): AsyncGenerator<AnthropicAiStreamingEvent, void, unknown> {
267+
): R {
222268
const state: StreamingState = {
223269
responseTexts: [],
224270
finishReasons: [],
@@ -232,55 +278,26 @@ export async function* instrumentStream(
232278
activeToolBlocks: {},
233279
};
234280

235-
try {
236-
for await (const event of stream) {
237-
processEvent(event, state, recordOutputs, span);
238-
yield event;
239-
}
240-
} finally {
241-
// Set common response attributes if available
242-
if (state.responseId) {
243-
span.setAttributes({
244-
[GEN_AI_RESPONSE_ID_ATTRIBUTE]: state.responseId,
245-
});
246-
}
247-
if (state.responseModel) {
248-
span.setAttributes({
249-
[GEN_AI_RESPONSE_MODEL_ATTRIBUTE]: state.responseModel,
250-
});
251-
}
281+
stream.on('streamEvent', (event: unknown) => {
282+
processEvent(event as AnthropicAiStreamingEvent, state, recordOutputs, span);
283+
});
252284

253-
setTokenUsageAttributes(
254-
span,
255-
state.promptTokens,
256-
state.completionTokens,
257-
state.cacheCreationInputTokens,
258-
state.cacheReadInputTokens,
259-
);
285+
// The event fired when a message is done being streamed by the API. Corresponds to the message_stop SSE event.
286+
// @see https://github.com/anthropics/anthropic-sdk-typescript/blob/d3be31f5a4e6ebb4c0a2f65dbb8f381ae73a9166/helpers.md?plain=1#L42-L44
287+
stream.on('message', () => {
288+
finalizeStreamSpan(state, span, recordOutputs);
289+
});
260290

261-
span.setAttributes({
262-
[GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]: true,
291+
stream.on('error', (error: unknown) => {
292+
span.setStatus({ code: SPAN_STATUS_ERROR, message: 'stream_error' });
293+
captureException(error, {
294+
mechanism: {
295+
handled: false,
296+
type: 'auto.ai.anthropic.stream_error',
297+
},
263298
});
264-
265-
if (state.finishReasons.length > 0) {
266-
span.setAttributes({
267-
[GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: JSON.stringify(state.finishReasons),
268-
});
269-
}
270-
271-
if (recordOutputs && state.responseTexts.length > 0) {
272-
span.setAttributes({
273-
[GEN_AI_RESPONSE_TEXT_ATTRIBUTE]: state.responseTexts.join(''),
274-
});
275-
}
276-
277-
// Set tool calls if any were captured
278-
if (recordOutputs && state.toolCalls.length > 0) {
279-
span.setAttributes({
280-
[GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE]: JSON.stringify(state.toolCalls),
281-
});
282-
}
283-
284299
span.end();
285-
}
300+
});
301+
302+
return stream;
286303
}

0 commit comments

Comments
 (0)