Skip to content

Commit 86f45af

Browse files
committed
feat: Stream responses for openai node
1 parent da8dfef commit 86f45af

File tree

8 files changed

+802
-65
lines changed

8 files changed

+802
-65
lines changed

packages/core/src/utils/gen-ai-attributes.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,20 @@ export const OPENAI_RESPONSE_MODEL_ATTRIBUTE = 'openai.response.model';
127127
export const OPENAI_RESPONSE_TIMESTAMP_ATTRIBUTE = 'openai.response.timestamp';
128128

129129
/**
130-
* The number of completion tokens used (OpenAI specific)
130+
* The number of completion tokens used
131131
*/
132132
export const OPENAI_USAGE_COMPLETION_TOKENS_ATTRIBUTE = 'openai.usage.completion_tokens';
133133

134134
/**
135-
* The number of prompt tokens used (OpenAI specific)
135+
* The number of prompt tokens used
136136
*/
137137
export const OPENAI_USAGE_PROMPT_TOKENS_ATTRIBUTE = 'openai.usage.prompt_tokens';
138138

139+
/**
140+
* Whether the response is a stream response
141+
*/
142+
export const OPENAI_RESPONSE_STREAM_ATTRIBUTE = 'openai.response.stream';
143+
139144
// =============================================================================
140145
// OPENAI OPERATIONS
141146
// =============================================================================

packages/core/src/utils/openai/constants.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ export const OPENAI_INTEGRATION_NAME = 'OpenAI';
33
// https://platform.openai.com/docs/quickstart?api-mode=responses
44
// https://platform.openai.com/docs/quickstart?api-mode=chat
55
export const INSTRUMENTED_METHODS = ['responses.create', 'chat.completions.create'] as const;
6+
export const RESPONSE_EVENT_TYPES = ['response.created', 'response.in_progress', 'response.failed', 'response.completed', 'response.incomplete', 'response.queued', 'response.output_text.delta'] as const;

packages/core/src/utils/openai/index.ts

Lines changed: 23 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -8,38 +8,37 @@ import {
88
GEN_AI_REQUEST_MESSAGES_ATTRIBUTE,
99
GEN_AI_REQUEST_MODEL_ATTRIBUTE,
1010
GEN_AI_REQUEST_PRESENCE_PENALTY_ATTRIBUTE,
11+
GEN_AI_REQUEST_STREAM_ATTRIBUTE,
1112
GEN_AI_REQUEST_TEMPERATURE_ATTRIBUTE,
1213
GEN_AI_REQUEST_TOP_P_ATTRIBUTE,
1314
GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE,
14-
GEN_AI_RESPONSE_ID_ATTRIBUTE,
15-
GEN_AI_RESPONSE_MODEL_ATTRIBUTE,
1615
GEN_AI_RESPONSE_TEXT_ATTRIBUTE,
1716
GEN_AI_SYSTEM_ATTRIBUTE,
18-
GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE,
19-
GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE,
20-
GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE,
21-
OPENAI_RESPONSE_ID_ATTRIBUTE,
22-
OPENAI_RESPONSE_MODEL_ATTRIBUTE,
23-
OPENAI_RESPONSE_TIMESTAMP_ATTRIBUTE,
24-
OPENAI_USAGE_COMPLETION_TOKENS_ATTRIBUTE,
25-
OPENAI_USAGE_PROMPT_TOKENS_ATTRIBUTE,
17+
OPENAI_RESPONSE_STREAM_ATTRIBUTE,
2618
} from '../gen-ai-attributes';
2719
import { OPENAI_INTEGRATION_NAME } from './constants';
20+
import { instrumentStream } from './streaming';
2821
import type {
22+
ChatCompletionChunk,
2923
InstrumentedMethod,
3024
OpenAiChatCompletionObject,
3125
OpenAiClient,
3226
OpenAiIntegration,
3327
OpenAiOptions,
3428
OpenAiResponse,
3529
OpenAIResponseObject,
30+
OpenAIStream,
31+
ResponseStreamingEvent,
3632
} from './types';
3733
import {
3834
buildMethodPath,
3935
getOperationName,
4036
getSpanOperation,
4137
isChatCompletionResponse,
4238
isResponsesApiResponse,
39+
isStream,
40+
setCommonResponseAttributes,
41+
setTokenUsageAttributes,
4342
shouldInstrument,
4443
} from './utils';
4544

@@ -61,64 +60,14 @@ function extractRequestAttributes(args: unknown[], methodPath: string): Record<s
6160
if ('frequency_penalty' in params)
6261
attributes[GEN_AI_REQUEST_FREQUENCY_PENALTY_ATTRIBUTE] = params.frequency_penalty;
6362
if ('presence_penalty' in params) attributes[GEN_AI_REQUEST_PRESENCE_PENALTY_ATTRIBUTE] = params.presence_penalty;
63+
if ('stream' in params) attributes[GEN_AI_REQUEST_STREAM_ATTRIBUTE] = params.stream;
6464
} else {
6565
attributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] = 'unknown';
6666
}
6767

6868
return attributes;
6969
}
7070

71-
/**
72-
* Helper function to set token usage attributes
73-
*/
74-
function setTokenUsageAttributes(
75-
span: Span,
76-
promptTokens?: number,
77-
completionTokens?: number,
78-
totalTokens?: number,
79-
): void {
80-
if (promptTokens !== undefined) {
81-
span.setAttributes({
82-
[OPENAI_USAGE_PROMPT_TOKENS_ATTRIBUTE]: promptTokens,
83-
[GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE]: promptTokens,
84-
});
85-
}
86-
if (completionTokens !== undefined) {
87-
span.setAttributes({
88-
[OPENAI_USAGE_COMPLETION_TOKENS_ATTRIBUTE]: completionTokens,
89-
[GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE]: completionTokens,
90-
});
91-
}
92-
if (totalTokens !== undefined) {
93-
span.setAttributes({
94-
[GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE]: totalTokens,
95-
});
96-
}
97-
}
98-
99-
/**
100-
* Helper function to set common response attributes (ID, model, timestamp)
101-
*/
102-
function setCommonResponseAttributes(span: Span, id?: string, model?: string, timestamp?: number): void {
103-
if (id) {
104-
span.setAttributes({
105-
[OPENAI_RESPONSE_ID_ATTRIBUTE]: id,
106-
[GEN_AI_RESPONSE_ID_ATTRIBUTE]: id,
107-
});
108-
}
109-
if (model) {
110-
span.setAttributes({
111-
[OPENAI_RESPONSE_MODEL_ATTRIBUTE]: model,
112-
[GEN_AI_RESPONSE_MODEL_ATTRIBUTE]: model,
113-
});
114-
}
115-
if (timestamp) {
116-
span.setAttributes({
117-
[OPENAI_RESPONSE_TIMESTAMP_ATTRIBUTE]: new Date(timestamp * 1000).toISOString(),
118-
});
119-
}
120-
}
121-
12271
/**
12372
* Add attributes for Chat Completion responses
12473
*/
@@ -195,6 +144,9 @@ function addRequestAttributes(span: Span, params: Record<string, unknown>): void
195144
if ('input' in params) {
196145
span.setAttributes({ [GEN_AI_REQUEST_MESSAGES_ATTRIBUTE]: JSON.stringify(params.input) });
197146
}
147+
if ('stream' in params) {
148+
span.setAttributes({ [OPENAI_RESPONSE_STREAM_ATTRIBUTE]: Boolean(params.stream) });
149+
}
198150
}
199151

200152
function getOptionsFromIntegration(): OpenAiOptions {
@@ -239,7 +191,16 @@ function instrumentMethod<T extends unknown[], R>(
239191
}
240192

241193
const result = await originalMethod.apply(context, args);
242-
// TODO: Add streaming support
194+
195+
if (isStream(result)) {
196+
return instrumentStream(
197+
result as OpenAIStream<ChatCompletionChunk | ResponseStreamingEvent>,
198+
span,
199+
finalOptions.recordOutputs ?? false,
200+
) as unknown as R;
201+
}
202+
203+
// Handle non-streaming responses
243204
addResponseAttributes(span, result, finalOptions.recordOutputs);
244205
return result;
245206
} catch (error) {
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
import { captureException } from '../../exports';
2+
import type { Span } from '../../types-hoist/span';
3+
import { GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE, GEN_AI_RESPONSE_TEXT_ATTRIBUTE } from '../gen-ai-attributes';
4+
import { RESPONSE_EVENT_TYPES } from './constants';
5+
import type { OpenAIResponseObject } from './types';
6+
import { type ChatCompletionChunk, type ResponseStreamingEvent } from './types';
7+
import {
8+
isChatCompletionChunk,
9+
isResponsesApiStreamEvent,
10+
setCommonResponseAttributes,
11+
setTokenUsageAttributes,
12+
} from './utils';
13+
14+
interface StreamingState {
15+
eventTypes: string[];
16+
responseTexts: string[];
17+
finishReasons: string[];
18+
responseId?: string;
19+
responseModel?: string;
20+
responseTimestamp?: number;
21+
promptTokens?: number;
22+
completionTokens?: number;
23+
totalTokens?: number;
24+
}
25+
26+
function processChatCompletionChunk(chunk: ChatCompletionChunk, state: StreamingState, recordOutputs: boolean): void {
27+
state.responseId = chunk.id ?? state.responseId;
28+
state.responseModel = chunk.model ?? state.responseModel;
29+
state.responseTimestamp = chunk.created ?? state.responseTimestamp;
30+
31+
if (chunk.usage) {
32+
state.promptTokens = chunk.usage.prompt_tokens;
33+
state.completionTokens = chunk.usage.completion_tokens;
34+
state.totalTokens = chunk.usage.total_tokens;
35+
}
36+
37+
for (const choice of chunk.choices ?? []) {
38+
if (recordOutputs && choice.delta?.content) {
39+
state.responseTexts.push(choice.delta.content);
40+
}
41+
if (choice.finish_reason) {
42+
state.finishReasons.push(choice.finish_reason);
43+
}
44+
}
45+
}
46+
47+
function processResponsesApiEvent(
48+
streamEvent: ResponseStreamingEvent | unknown | Error,
49+
state: StreamingState,
50+
recordOutputs: boolean,
51+
): void {
52+
if (!(streamEvent && typeof streamEvent === 'object')) {
53+
state.eventTypes.push('unknown:non-object');
54+
return;
55+
}
56+
if (streamEvent instanceof Error) {
57+
captureException(streamEvent);
58+
return;
59+
}
60+
61+
if (!('type' in streamEvent)) return;
62+
const event = streamEvent as ResponseStreamingEvent;
63+
64+
if (!RESPONSE_EVENT_TYPES.includes(event.type)) {
65+
state.eventTypes.push(event.type);
66+
return;
67+
}
68+
69+
if (recordOutputs && event.type === 'response.output_text.delta' && 'delta' in event && event.delta) {
70+
state.responseTexts.push(event.delta);
71+
return;
72+
}
73+
74+
const { response } = event as { response: OpenAIResponseObject };
75+
state.responseId = response.id ?? state.responseId;
76+
state.responseModel = response.model ?? state.responseModel;
77+
state.responseTimestamp = response.created_at ?? state.responseTimestamp;
78+
79+
if (response.usage) {
80+
state.promptTokens = response.usage.input_tokens;
81+
state.completionTokens = response.usage.output_tokens;
82+
state.totalTokens = response.usage.total_tokens;
83+
}
84+
85+
if (response.status) {
86+
state.finishReasons.push(response.status);
87+
}
88+
89+
if (recordOutputs && response.output_text) {
90+
state.responseTexts.push(response.output_text);
91+
}
92+
}
93+
/**
94+
* Instrument a stream of OpenAI events
95+
* @param stream - The stream of events to instrument
96+
* @param span - The span to add attributes to
97+
* @param recordOutputs - Whether to record outputs
98+
* @returns A generator that yields the events
99+
*/
100+
export async function* instrumentStream<T>(
101+
stream: AsyncIterable<T>,
102+
span: Span,
103+
recordOutputs: boolean,
104+
): AsyncGenerator<T, void, unknown> {
105+
const state: StreamingState = {
106+
eventTypes: [],
107+
responseTexts: [],
108+
finishReasons: [],
109+
};
110+
111+
try {
112+
for await (const event of stream) {
113+
if (isChatCompletionChunk(event)) {
114+
processChatCompletionChunk(event as ChatCompletionChunk, state, recordOutputs);
115+
} else if (isResponsesApiStreamEvent(event)) {
116+
processResponsesApiEvent(event as ResponseStreamingEvent, state, recordOutputs);
117+
}
118+
yield event;
119+
}
120+
} finally {
121+
setCommonResponseAttributes(span, state.responseId, state.responseModel, state.responseTimestamp);
122+
setTokenUsageAttributes(span, state.promptTokens, state.completionTokens, state.totalTokens);
123+
124+
if (state.finishReasons.length) {
125+
span.setAttributes({
126+
[GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: JSON.stringify(state.finishReasons),
127+
});
128+
}
129+
130+
if (recordOutputs && state.responseTexts.length) {
131+
span.setAttributes({
132+
[GEN_AI_RESPONSE_TEXT_ATTRIBUTE]: JSON.stringify(state.responseTexts),
133+
});
134+
}
135+
}
136+
}

0 commit comments

Comments
 (0)