1
1
import { captureException } from '../../exports' ;
2
+ import { SPAN_STATUS_ERROR } from '../../tracing' ;
2
3
import type { Span } from '../../types-hoist/span' ;
3
- import { GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE , GEN_AI_RESPONSE_TEXT_ATTRIBUTE } from '../gen-ai-attributes' ;
4
+ import {
5
+ GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE ,
6
+ GEN_AI_RESPONSE_TEXT_ATTRIBUTE ,
7
+ OPENAI_RESPONSE_STREAM_ATTRIBUTE ,
8
+ } from '../gen-ai-attributes' ;
4
9
import { RESPONSE_EVENT_TYPES } from './constants' ;
5
10
import type { OpenAIResponseObject } from './types' ;
6
11
import { type ChatCompletionChunk , type ResponseStreamingEvent } from './types' ;
@@ -11,24 +16,48 @@ import {
11
16
setTokenUsageAttributes ,
12
17
} from './utils' ;
13
18
19
+ /**
20
+ * State object used to accumulate information from a stream of OpenAI events/chunks.
21
+ */
14
22
interface StreamingState {
23
+ /** Types of events encountered in the stream. */
15
24
eventTypes : string [ ] ;
25
+ /** Collected response text fragments (for output recording). */
16
26
responseTexts : string [ ] ;
27
+ /** Reasons for finishing the response, as reported by the API. */
17
28
finishReasons : string [ ] ;
18
- responseId ?: string ;
19
- responseModel ?: string ;
20
- responseTimestamp ?: number ;
21
- promptTokens ?: number ;
22
- completionTokens ?: number ;
23
- totalTokens ?: number ;
29
+ /** The response ID. */
30
+ responseId : string ;
31
+ /** The model name. */
32
+ responseModel : string ;
33
+ /** The timestamp of the response. */
34
+ responseTimestamp : number ;
35
+ /** Number of prompt/input tokens used. */
36
+ promptTokens : number | undefined ;
37
+ /** Number of completion/output tokens used. */
38
+ completionTokens : number | undefined ;
39
+ /** Total number of tokens used (prompt + completion). */
40
+ totalTokens : number | undefined ;
24
41
}
25
42
43
+ /**
44
+ * Processes a single OpenAI ChatCompletionChunk event, updating the streaming state.
45
+ *
46
+ * @param chunk - The ChatCompletionChunk event to process.
47
+ * @param state - The current streaming state to update.
48
+ * @param recordOutputs - Whether to record output text fragments.
49
+ */
26
50
function processChatCompletionChunk ( chunk : ChatCompletionChunk , state : StreamingState , recordOutputs : boolean ) : void {
27
51
state . responseId = chunk . id ?? state . responseId ;
28
52
state . responseModel = chunk . model ?? state . responseModel ;
29
53
state . responseTimestamp = chunk . created ?? state . responseTimestamp ;
30
54
31
55
if ( chunk . usage ) {
56
+ // For stream responses, the input tokens remain constant across all events in the stream.
57
+ // Output tokens, however, are only finalized in the last event.
58
+ // Since we can't guarantee that the last event will include usage data or even be a typed event,
59
+ // we update the output token values on every event that includes them.
60
+ // This ensures that output token usage is always set, even if the final event lacks it.
32
61
state . promptTokens = chunk . usage . prompt_tokens ;
33
62
state . completionTokens = chunk . usage . completion_tokens ;
34
63
state . totalTokens = chunk . usage . total_tokens ;
@@ -44,17 +73,31 @@ function processChatCompletionChunk(chunk: ChatCompletionChunk, state: Streaming
44
73
}
45
74
}
46
75
76
+ /**
77
+ * Processes a single OpenAI Responses API streaming event, updating the streaming state and span.
78
+ *
79
+ * @param streamEvent - The event to process (may be an error or unknown object).
80
+ * @param state - The current streaming state to update.
81
+ * @param recordOutputs - Whether to record output text fragments.
82
+ * @param span - The span to update with error status if needed.
83
+ */
47
84
function processResponsesApiEvent (
48
85
streamEvent : ResponseStreamingEvent | unknown | Error ,
49
86
state : StreamingState ,
50
87
recordOutputs : boolean ,
88
+ span : Span ,
51
89
) : void {
52
90
if ( ! ( streamEvent && typeof streamEvent === 'object' ) ) {
53
91
state . eventTypes . push ( 'unknown:non-object' ) ;
54
92
return ;
55
93
}
56
94
if ( streamEvent instanceof Error ) {
57
- captureException ( streamEvent ) ;
95
+ span . setStatus ( { code : SPAN_STATUS_ERROR , message : 'internal_error' } ) ;
96
+ captureException ( streamEvent , {
97
+ mechanism : {
98
+ handled : false ,
99
+ } ,
100
+ } ) ;
58
101
return ;
59
102
}
60
103
@@ -71,32 +114,42 @@ function processResponsesApiEvent(
71
114
return ;
72
115
}
73
116
74
- const { response } = event as { response : OpenAIResponseObject } ;
75
- state . responseId = response . id ?? state . responseId ;
76
- state . responseModel = response . model ?? state . responseModel ;
77
- state . responseTimestamp = response . created_at ?? state . responseTimestamp ;
78
-
79
- if ( response . usage ) {
80
- state . promptTokens = response . usage . input_tokens ;
81
- state . completionTokens = response . usage . output_tokens ;
82
- state . totalTokens = response . usage . total_tokens ;
83
- }
117
+ if ( 'response' in event ) {
118
+ const { response } = event as { response : OpenAIResponseObject } ;
119
+ state . responseId = response . id ?? state . responseId ;
120
+ state . responseModel = response . model ?? state . responseModel ;
121
+ state . responseTimestamp = response . created_at ?? state . responseTimestamp ;
122
+
123
+ if ( response . usage ) {
124
+ // For stream responses, the input tokens remain constant across all events in the stream.
125
+ // Output tokens, however, are only finalized in the last event.
126
+ // Since we can't guarantee that the last event will include usage data or even be a typed event,
127
+ // we update the output token values on every event that includes them.
128
+ // This ensures that output token usage is always set, even if the final event lacks it.
129
+ state . promptTokens = response . usage . input_tokens ;
130
+ state . completionTokens = response . usage . output_tokens ;
131
+ state . totalTokens = response . usage . total_tokens ;
132
+ }
84
133
85
- if ( response . status ) {
86
- state . finishReasons . push ( response . status ) ;
87
- }
134
+ if ( response . status ) {
135
+ state . finishReasons . push ( response . status ) ;
136
+ }
88
137
89
- if ( recordOutputs && response . output_text ) {
90
- state . responseTexts . push ( response . output_text ) ;
138
+ if ( recordOutputs && response . output_text ) {
139
+ state . responseTexts . push ( response . output_text ) ;
140
+ }
91
141
}
92
142
}
143
+
93
144
/**
94
- * Instrument a stream of OpenAI events
95
- * @param stream - The stream of events to instrument
96
- * @param span - The span to add attributes to
97
- * @param recordOutputs - Whether to record outputs
98
- * @param finishSpan - Optional function to finish the span manually
99
- * @returns A generator that yields the events
145
+ * Instruments a stream of OpenAI events, updating the provided span with relevant attributes and
146
+ * optionally recording output text. This function yields each event from the input stream as it is processed.
147
+ *
148
+ * @template T - The type of events in the stream.
149
+ * @param stream - The async iterable stream of events to instrument.
150
+ * @param span - The span to add attributes to and to finish at the end of the stream.
151
+ * @param recordOutputs - Whether to record output text fragments in the span.
152
+ * @returns An async generator yielding each event from the input stream.
100
153
*/
101
154
export async function * instrumentStream < T > (
102
155
stream : AsyncIterable < T > ,
@@ -107,21 +160,31 @@ export async function* instrumentStream<T>(
107
160
eventTypes : [ ] ,
108
161
responseTexts : [ ] ,
109
162
finishReasons : [ ] ,
163
+ responseId : '' ,
164
+ responseModel : '' ,
165
+ responseTimestamp : 0 ,
166
+ promptTokens : undefined ,
167
+ completionTokens : undefined ,
168
+ totalTokens : undefined ,
110
169
} ;
111
170
112
171
try {
113
172
for await ( const event of stream ) {
114
173
if ( isChatCompletionChunk ( event ) ) {
115
174
processChatCompletionChunk ( event as ChatCompletionChunk , state , recordOutputs ) ;
116
175
} else if ( isResponsesApiStreamEvent ( event ) ) {
117
- processResponsesApiEvent ( event as ResponseStreamingEvent , state , recordOutputs ) ;
176
+ processResponsesApiEvent ( event as ResponseStreamingEvent , state , recordOutputs , span ) ;
118
177
}
119
178
yield event ;
120
179
}
121
180
} finally {
122
181
setCommonResponseAttributes ( span , state . responseId , state . responseModel , state . responseTimestamp ) ;
123
182
setTokenUsageAttributes ( span , state . promptTokens , state . completionTokens , state . totalTokens ) ;
124
183
184
+ span . setAttributes ( {
185
+ [ OPENAI_RESPONSE_STREAM_ATTRIBUTE ] : true ,
186
+ } ) ;
187
+
125
188
if ( state . finishReasons . length ) {
126
189
span . setAttributes ( {
127
190
[ GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE ] : JSON . stringify ( state . finishReasons ) ,
0 commit comments