@@ -94,89 +94,82 @@ private async Task ProcessPipeAsync(PipeReader reader, PipeWriter writer, Activi
9494
9595 try
9696 {
97- await ProcessStreamAsync ( reader , writer , parentActivity , cancellationToken ) ;
98- }
99- catch ( OperationCanceledException ex )
100- {
101- // Cancellation is expected and not an error - log as debug
102- Logger . LogDebug ( ex , "Stream processing was cancelled for transformer {TransformerType}" , GetType ( ) . Name ) ;
103- _ = ( activity ? . SetTag ( "gen_ai.response.error" , true ) ) ;
104- _ = ( activity ? . SetTag ( "gen_ai.response.error_type" , "OperationCanceledException" ) ) ;
105-
106- // Add error event to activity
107- _ = ( activity ? . AddEvent ( new ActivityEvent ( "gen_ai.error" ,
108- timestamp : DateTimeOffset . UtcNow ,
109- tags :
110- [
111- new KeyValuePair < string , object ? > ( "gen_ai.error.type" , "OperationCanceledException" ) ,
112- new KeyValuePair < string , object ? > ( "gen_ai.error.message" , "Stream processing was cancelled" ) ,
113- new KeyValuePair < string , object ? > ( "gen_ai.transformer.type" , GetType ( ) . Name )
114- ] ) ) ) ;
115-
11697 try
11798 {
118- await writer . CompleteAsync ( ex ) ;
119- await reader . CompleteAsync ( ex ) ;
99+ await ProcessStreamAsync ( reader , writer , parentActivity , cancellationToken ) ;
120100 }
121- catch ( Exception completeEx )
101+ catch ( OperationCanceledException ex )
122102 {
123- Logger . LogError ( completeEx , "Error completing pipe after cancellation for transformer {TransformerType}" , GetType ( ) . Name ) ;
103+ // Cancellation is expected and not an error - log as debug
104+ Logger . LogDebug ( ex , "Stream processing was cancelled for transformer {TransformerType}" , GetType ( ) . Name ) ;
105+ _ = ( activity ? . SetTag ( "gen_ai.response.error" , true ) ) ;
106+ _ = ( activity ? . SetTag ( "gen_ai.response.error_type" , "OperationCanceledException" ) ) ;
107+
108+ // Add error event to activity
109+ _ = ( activity ? . AddEvent ( new ActivityEvent ( "gen_ai.error" ,
110+ timestamp : DateTimeOffset . UtcNow ,
111+ tags :
112+ [
113+ new KeyValuePair < string , object ? > ( "gen_ai.error.type" , "OperationCanceledException" ) ,
114+ new KeyValuePair < string , object ? > ( "gen_ai.error.message" , "Stream processing was cancelled" ) ,
115+ new KeyValuePair < string , object ? > ( "gen_ai.transformer.type" , GetType ( ) . Name )
116+ ] ) ) ) ;
117+
118+ try
119+ {
120+ await writer . CompleteAsync ( ex ) ;
121+ await reader . CompleteAsync ( ex ) ;
122+ }
123+ catch ( Exception completeEx )
124+ {
125+ Logger . LogError ( completeEx , "Error completing pipe after cancellation for transformer {TransformerType}" , GetType ( ) . Name ) ;
126+ }
127+ return ;
124128 }
129+ catch ( Exception ex )
130+ {
131+ Logger . LogError ( ex , "Error transforming stream for transformer {TransformerType}. Stream processing will be terminated." , GetType ( ) . Name ) ;
132+ _ = ( activity ? . SetTag ( "gen_ai.response.error" , true ) ) ;
133+ _ = ( activity ? . SetTag ( "gen_ai.response.error_type" , ex . GetType ( ) . Name ) ) ;
134+ _ = ( activity ? . SetTag ( "gen_ai.response.error_message" , ex . Message ) ) ;
125135
126- // Dispose activities on error
127- transformActivity ? . Dispose ( ) ;
128- parentActivity ? . Dispose ( ) ;
129- return ;
130- }
131- catch ( Exception ex )
132- {
133- Logger . LogError ( ex , "Error transforming stream for transformer {TransformerType}. Stream processing will be terminated." , GetType ( ) . Name ) ;
134- _ = ( activity ? . SetTag ( "gen_ai.response.error" , true ) ) ;
135- _ = ( activity ? . SetTag ( "gen_ai.response.error_type" , ex . GetType ( ) . Name ) ) ;
136- _ = ( activity ? . SetTag ( "gen_ai.response.error_message" , ex . Message ) ) ;
136+ // Add error event to activity
137+ _ = ( activity ? . AddEvent ( new ActivityEvent ( "gen_ai.error" ,
138+ timestamp : DateTimeOffset . UtcNow ,
139+ tags :
140+ [
141+ new KeyValuePair < string , object ? > ( "gen_ai.error.type" , ex . GetType ( ) . Name ) ,
142+ new KeyValuePair < string , object ? > ( "gen_ai.error.message" , ex . Message ) ,
143+ new KeyValuePair < string , object ? > ( "gen_ai.transformer.type" , GetType ( ) . Name ) ,
144+ new KeyValuePair < string , object ? > ( "gen_ai.error.stack_trace" , ex . StackTrace ?? "" )
145+ ] ) ) ) ;
137146
138- // Add error event to activity
139- _ = ( activity ? . AddEvent ( new ActivityEvent ( "gen_ai.error" ,
140- timestamp : DateTimeOffset . UtcNow ,
141- tags :
142- [
143- new KeyValuePair < string , object ? > ( "gen_ai.error.type" , ex . GetType ( ) . Name ) ,
144- new KeyValuePair < string , object ? > ( "gen_ai.error.message" , ex . Message ) ,
145- new KeyValuePair < string , object ? > ( "gen_ai.transformer.type" , GetType ( ) . Name ) ,
146- new KeyValuePair < string , object ? > ( "gen_ai.error.stack_trace" , ex . StackTrace ?? "" )
147- ] ) ) ) ;
147+ try
148+ {
149+ await writer . CompleteAsync ( ex ) ;
150+ await reader . CompleteAsync ( ex ) ;
151+ }
152+ catch ( Exception completeEx )
153+ {
154+ Logger . LogError ( completeEx , "Error completing pipe after transformation error for transformer {TransformerType}" , GetType ( ) . Name ) ;
155+ }
156+ return ;
157+ }
148158
159+ // Normal completion - ensure cleanup happens
149160 try
150161 {
151- await writer . CompleteAsync ( ex ) ;
152- await reader . CompleteAsync ( ex ) ;
162+ await writer . CompleteAsync ( ) ;
163+ await reader . CompleteAsync ( ) ;
153164 }
154- catch ( Exception completeEx )
165+ catch ( Exception ex )
155166 {
156- Logger . LogError ( completeEx , "Error completing pipe after transformation error for transformer {TransformerType}" , GetType ( ) . Name ) ;
167+ Logger . LogError ( ex , "Error completing pipe after successful transformation" ) ;
157168 }
158-
159- // Dispose activities on error
160- transformActivity ? . Dispose ( ) ;
161- parentActivity ? . Dispose ( ) ;
162- return ;
163169 }
164-
165- // Normal completion - ensure cleanup happens
166- try
170+ finally
167171 {
168- await writer . CompleteAsync ( ) ;
169- await reader . CompleteAsync ( ) ;
170-
171- // Dispose activities on success
172- transformActivity ? . Dispose ( ) ;
173- parentActivity ? . Dispose ( ) ;
174- }
175- catch ( Exception ex )
176- {
177- Logger . LogError ( ex , "Error completing pipe after successful transformation" ) ;
178-
179- // Still dispose activities even if completion fails
172+ // Always dispose activities, regardless of how we exit
180173 transformActivity ? . Dispose ( ) ;
181174 parentActivity ? . Dispose ( ) ;
182175 }
0 commit comments