11import { captureException } from '../../exports' ;
2+ import { SPAN_STATUS_ERROR } from '../../tracing' ;
23import type { Span } from '../../types-hoist/span' ;
3- import { GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE , GEN_AI_RESPONSE_TEXT_ATTRIBUTE } from '../gen-ai-attributes' ;
4+ import {
5+ GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE ,
6+ GEN_AI_RESPONSE_TEXT_ATTRIBUTE ,
7+ OPENAI_RESPONSE_STREAM_ATTRIBUTE ,
8+ } from '../gen-ai-attributes' ;
49import { RESPONSE_EVENT_TYPES } from './constants' ;
510import type { OpenAIResponseObject } from './types' ;
611import { type ChatCompletionChunk , type ResponseStreamingEvent } from './types' ;
@@ -11,24 +16,48 @@ import {
1116 setTokenUsageAttributes ,
1217} from './utils' ;
1318
19+ /**
20+ * State object used to accumulate information from a stream of OpenAI events/chunks.
21+ */
1422interface StreamingState {
23+ /** Types of events encountered in the stream. */
1524 eventTypes : string [ ] ;
25+ /** Collected response text fragments (for output recording). */
1626 responseTexts : string [ ] ;
27+ /** Reasons for finishing the response, as reported by the API. */
1728 finishReasons : string [ ] ;
18- responseId ?: string ;
19- responseModel ?: string ;
20- responseTimestamp ?: number ;
21- promptTokens ?: number ;
22- completionTokens ?: number ;
23- totalTokens ?: number ;
29+ /** The response ID. */
30+ responseId : string ;
31+ /** The model name. */
32+ responseModel : string ;
33+ /** The timestamp of the response. */
34+ responseTimestamp : number ;
35+ /** Number of prompt/input tokens used. */
36+ promptTokens : number | undefined ;
37+ /** Number of completion/output tokens used. */
38+ completionTokens : number | undefined ;
39+ /** Total number of tokens used (prompt + completion). */
40+ totalTokens : number | undefined ;
2441}
2542
43+ /**
44+ * Processes a single OpenAI ChatCompletionChunk event, updating the streaming state.
45+ *
46+ * @param chunk - The ChatCompletionChunk event to process.
47+ * @param state - The current streaming state to update.
48+ * @param recordOutputs - Whether to record output text fragments.
49+ */
2650function processChatCompletionChunk ( chunk : ChatCompletionChunk , state : StreamingState , recordOutputs : boolean ) : void {
2751 state . responseId = chunk . id ?? state . responseId ;
2852 state . responseModel = chunk . model ?? state . responseModel ;
2953 state . responseTimestamp = chunk . created ?? state . responseTimestamp ;
3054
3155 if ( chunk . usage ) {
56+ // For stream responses, the input tokens remain constant across all events in the stream.
57+ // Output tokens, however, are only finalized in the last event.
58+ // Since we can't guarantee that the last event will include usage data or even be a typed event,
59+ // we update the output token values on every event that includes them.
60+ // This ensures that output token usage is always set, even if the final event lacks it.
3261 state . promptTokens = chunk . usage . prompt_tokens ;
3362 state . completionTokens = chunk . usage . completion_tokens ;
3463 state . totalTokens = chunk . usage . total_tokens ;
@@ -44,17 +73,31 @@ function processChatCompletionChunk(chunk: ChatCompletionChunk, state: Streaming
4473 }
4574}
4675
76+ /**
77+ * Processes a single OpenAI Responses API streaming event, updating the streaming state and span.
78+ *
79+ * @param streamEvent - The event to process (may be an error or unknown object).
80+ * @param state - The current streaming state to update.
81+ * @param recordOutputs - Whether to record output text fragments.
82+ * @param span - The span to update with error status if needed.
83+ */
4784function processResponsesApiEvent (
4885 streamEvent : ResponseStreamingEvent | unknown | Error ,
4986 state : StreamingState ,
5087 recordOutputs : boolean ,
88+ span : Span ,
5189) : void {
5290 if ( ! ( streamEvent && typeof streamEvent === 'object' ) ) {
5391 state . eventTypes . push ( 'unknown:non-object' ) ;
5492 return ;
5593 }
5694 if ( streamEvent instanceof Error ) {
57- captureException ( streamEvent ) ;
95+ span . setStatus ( { code : SPAN_STATUS_ERROR , message : 'internal_error' } ) ;
96+ captureException ( streamEvent , {
97+ mechanism : {
98+ handled : false ,
99+ } ,
100+ } ) ;
58101 return ;
59102 }
60103
@@ -71,32 +114,42 @@ function processResponsesApiEvent(
71114 return ;
72115 }
73116
74- const { response } = event as { response : OpenAIResponseObject } ;
75- state . responseId = response . id ?? state . responseId ;
76- state . responseModel = response . model ?? state . responseModel ;
77- state . responseTimestamp = response . created_at ?? state . responseTimestamp ;
78-
79- if ( response . usage ) {
80- state . promptTokens = response . usage . input_tokens ;
81- state . completionTokens = response . usage . output_tokens ;
82- state . totalTokens = response . usage . total_tokens ;
83- }
117+ if ( 'response' in event ) {
118+ const { response } = event as { response : OpenAIResponseObject } ;
119+ state . responseId = response . id ?? state . responseId ;
120+ state . responseModel = response . model ?? state . responseModel ;
121+ state . responseTimestamp = response . created_at ?? state . responseTimestamp ;
122+
123+ if ( response . usage ) {
124+ // For stream responses, the input tokens remain constant across all events in the stream.
125+ // Output tokens, however, are only finalized in the last event.
126+ // Since we can't guarantee that the last event will include usage data or even be a typed event,
127+ // we update the output token values on every event that includes them.
128+ // This ensures that output token usage is always set, even if the final event lacks it.
129+ state . promptTokens = response . usage . input_tokens ;
130+ state . completionTokens = response . usage . output_tokens ;
131+ state . totalTokens = response . usage . total_tokens ;
132+ }
84133
85- if ( response . status ) {
86- state . finishReasons . push ( response . status ) ;
87- }
134+ if ( response . status ) {
135+ state . finishReasons . push ( response . status ) ;
136+ }
88137
89- if ( recordOutputs && response . output_text ) {
90- state . responseTexts . push ( response . output_text ) ;
138+ if ( recordOutputs && response . output_text ) {
139+ state . responseTexts . push ( response . output_text ) ;
140+ }
91141 }
92142}
143+
93144/**
94- * Instrument a stream of OpenAI events
95- * @param stream - The stream of events to instrument
96- * @param span - The span to add attributes to
97- * @param recordOutputs - Whether to record outputs
98- * @param finishSpan - Optional function to finish the span manually
99- * @returns A generator that yields the events
145+ * Instruments a stream of OpenAI events, updating the provided span with relevant attributes and
146+ * optionally recording output text. This function yields each event from the input stream as it is processed.
147+ *
148+ * @template T - The type of events in the stream.
149+ * @param stream - The async iterable stream of events to instrument.
150+ * @param span - The span to add attributes to and to finish at the end of the stream.
151+ * @param recordOutputs - Whether to record output text fragments in the span.
152+ * @returns An async generator yielding each event from the input stream.
100153 */
101154export async function * instrumentStream < T > (
102155 stream : AsyncIterable < T > ,
@@ -107,21 +160,31 @@ export async function* instrumentStream<T>(
107160 eventTypes : [ ] ,
108161 responseTexts : [ ] ,
109162 finishReasons : [ ] ,
163+ responseId : '' ,
164+ responseModel : '' ,
165+ responseTimestamp : 0 ,
166+ promptTokens : undefined ,
167+ completionTokens : undefined ,
168+ totalTokens : undefined ,
110169 } ;
111170
112171 try {
113172 for await ( const event of stream ) {
114173 if ( isChatCompletionChunk ( event ) ) {
115174 processChatCompletionChunk ( event as ChatCompletionChunk , state , recordOutputs ) ;
116175 } else if ( isResponsesApiStreamEvent ( event ) ) {
117- processResponsesApiEvent ( event as ResponseStreamingEvent , state , recordOutputs ) ;
176+ processResponsesApiEvent ( event as ResponseStreamingEvent , state , recordOutputs , span ) ;
118177 }
119178 yield event ;
120179 }
121180 } finally {
122181 setCommonResponseAttributes ( span , state . responseId , state . responseModel , state . responseTimestamp ) ;
123182 setTokenUsageAttributes ( span , state . promptTokens , state . completionTokens , state . totalTokens ) ;
124183
184+ span . setAttributes ( {
185+ [ OPENAI_RESPONSE_STREAM_ATTRIBUTE ] : true ,
186+ } ) ;
187+
125188 if ( state . finishReasons . length ) {
126189 span . setAttributes ( {
127190 [ GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE ] : JSON . stringify ( state . finishReasons ) ,
0 commit comments