Skip to content

Commit e70de85

Browse files
committed
use try catch
1 parent 807b162 commit e70de85

File tree

1 file changed

+80
-23
lines changed

1 file changed

+80
-23
lines changed

src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs

Lines changed: 80 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -86,34 +86,57 @@ private async Task ProcessPipeAsync(PipeReader reader, PipeWriter writer, Activi
8686
catch (OperationCanceledException ex)
8787
{
8888
// Cancellation is expected and not an error - log as debug
89-
Logger.LogDebug("Stream processing was cancelled.");
89+
Logger.LogDebug(ex, "Stream processing was cancelled for transformer {TransformerType}", GetType().Name);
9090
_ = (activity?.SetTag("gen_ai.response.error", true));
9191
_ = (activity?.SetTag("gen_ai.response.error_type", "OperationCanceledException"));
92+
93+
// Add error event to activity
94+
_ = (activity?.AddEvent(new ActivityEvent("gen_ai.error",
95+
timestamp: DateTimeOffset.UtcNow,
96+
tags:
97+
[
98+
new KeyValuePair<string, object?>("gen_ai.error.type", "OperationCanceledException"),
99+
new KeyValuePair<string, object?>("gen_ai.error.message", "Stream processing was cancelled"),
100+
new KeyValuePair<string, object?>("gen_ai.transformer.type", GetType().Name)
101+
])));
102+
92103
try
93104
{
94105
await writer.CompleteAsync(ex);
95106
await reader.CompleteAsync(ex);
96107
}
97108
catch (Exception completeEx)
98109
{
99-
Logger.LogError(completeEx, "Error completing pipe after cancellation");
110+
Logger.LogError(completeEx, "Error completing pipe after cancellation for transformer {TransformerType}", GetType().Name);
100111
}
101112
return;
102113
}
103114
catch (Exception ex)
104115
{
105-
Logger.LogError(ex, "Error transforming stream. Stream processing will be terminated.");
116+
Logger.LogError(ex, "Error transforming stream for transformer {TransformerType}. Stream processing will be terminated.", GetType().Name);
106117
_ = (activity?.SetTag("gen_ai.response.error", true));
107118
_ = (activity?.SetTag("gen_ai.response.error_type", ex.GetType().Name));
108119
_ = (activity?.SetTag("gen_ai.response.error_message", ex.Message));
120+
121+
// Add error event to activity
122+
_ = (activity?.AddEvent(new ActivityEvent("gen_ai.error",
123+
timestamp: DateTimeOffset.UtcNow,
124+
tags:
125+
[
126+
new KeyValuePair<string, object?>("gen_ai.error.type", ex.GetType().Name),
127+
new KeyValuePair<string, object?>("gen_ai.error.message", ex.Message),
128+
new KeyValuePair<string, object?>("gen_ai.transformer.type", GetType().Name),
129+
new KeyValuePair<string, object?>("gen_ai.error.stack_trace", ex.StackTrace ?? "")
130+
])));
131+
109132
try
110133
{
111134
await writer.CompleteAsync(ex);
112135
await reader.CompleteAsync(ex);
113136
}
114137
catch (Exception completeEx)
115138
{
116-
Logger.LogError(completeEx, "Error completing pipe after transformation error");
139+
Logger.LogError(completeEx, "Error completing pipe after transformation error for transformer {TransformerType}", GetType().Name);
117140
}
118141
return;
119142
}
@@ -160,7 +183,20 @@ protected virtual async Task ProcessStreamAsync(PipeReader reader, PipeWriter wr
160183
catch (JsonException ex)
161184
{
162185
jsonParseErrors++;
163-
Logger.LogError(ex, "Failed to parse JSON from SSE event: {Data}", sseEvent.Data);
186+
Logger.LogError(ex, "Failed to parse JSON from SSE event for transformer {TransformerType}. EventType: {EventType}, Data: {Data}",
187+
GetType().Name, sseEvent.EventType, sseEvent.Data);
188+
189+
// Add error event to activity for JSON parsing failures
190+
_ = (activity?.AddEvent(new ActivityEvent("gen_ai.error",
191+
timestamp: DateTimeOffset.UtcNow,
192+
tags:
193+
[
194+
new KeyValuePair<string, object?>("gen_ai.error.type", "JsonException"),
195+
new KeyValuePair<string, object?>("gen_ai.error.message", ex.Message),
196+
new KeyValuePair<string, object?>("gen_ai.transformer.type", GetType().Name),
197+
new KeyValuePair<string, object?>("gen_ai.sse.event_type", sseEvent.EventType ?? "unknown"),
198+
new KeyValuePair<string, object?>("gen_ai.sse.data", sseEvent.Data)
199+
])));
164200
}
165201

166202
if (transformedEvent != null)
@@ -206,24 +242,45 @@ protected async Task WriteEventAsync(AskAiEvent? transformedEvent, PipeWriter wr
206242
_ = (activity?.SetTag("gen_ai.provider.name", GetAgentProvider()));
207243
_ = (activity?.SetTag("gen_ai.response.token_type", transformedEvent.GetType().Name));
208244

209-
// Add GenAI completion event for each token/chunk
210-
_ = (activity?.AddEvent(new ActivityEvent("gen_ai.content.completion",
211-
timestamp: DateTimeOffset.UtcNow,
212-
tags:
213-
[
214-
new KeyValuePair<string, object?>("gen_ai.completion", JsonSerializer.Serialize(transformedEvent, AskAiEventJsonContext.Default.AskAiEvent))
215-
])));
216-
217-
// Serialize as base AskAiEvent type to include the type discriminator
218-
var json = JsonSerializer.Serialize<AskAiEvent>(transformedEvent, AskAiEventJsonContext.Default.AskAiEvent);
219-
var sseData = $"data: {json}\n\n";
220-
var bytes = Encoding.UTF8.GetBytes(sseData);
221-
222-
_ = (activity?.SetTag("gen_ai.response.token_size", bytes.Length));
223-
224-
// Write to pipe and flush immediately for real-time streaming
225-
_ = await writer.WriteAsync(bytes, cancellationToken);
226-
_ = await writer.FlushAsync(cancellationToken);
245+
try
246+
{
247+
// Add GenAI completion event for each token/chunk
248+
_ = (activity?.AddEvent(new ActivityEvent("gen_ai.content.completion",
249+
timestamp: DateTimeOffset.UtcNow,
250+
tags:
251+
[
252+
new KeyValuePair<string, object?>("gen_ai.completion", JsonSerializer.Serialize(transformedEvent, AskAiEventJsonContext.Default.AskAiEvent))
253+
])));
254+
255+
// Serialize as base AskAiEvent type to include the type discriminator
256+
var json = JsonSerializer.Serialize<AskAiEvent>(transformedEvent, AskAiEventJsonContext.Default.AskAiEvent);
257+
var sseData = $"data: {json}\n\n";
258+
var bytes = Encoding.UTF8.GetBytes(sseData);
259+
260+
_ = (activity?.SetTag("gen_ai.response.token_size", bytes.Length));
261+
262+
// Write to pipe and flush immediately for real-time streaming
263+
_ = await writer.WriteAsync(bytes, cancellationToken);
264+
_ = await writer.FlushAsync(cancellationToken);
265+
}
266+
catch (Exception ex)
267+
{
268+
Logger.LogError(ex, "Error writing event to stream for transformer {TransformerType}. EventType: {EventType}",
269+
GetType().Name, transformedEvent.GetType().Name);
270+
271+
// Add error event to activity
272+
_ = (activity?.AddEvent(new ActivityEvent("gen_ai.error",
273+
timestamp: DateTimeOffset.UtcNow,
274+
tags:
275+
[
276+
new KeyValuePair<string, object?>("gen_ai.error.type", ex.GetType().Name),
277+
new KeyValuePair<string, object?>("gen_ai.error.message", ex.Message),
278+
new KeyValuePair<string, object?>("gen_ai.transformer.type", GetType().Name),
279+
new KeyValuePair<string, object?>("gen_ai.event.type", transformedEvent.GetType().Name)
280+
])));
281+
282+
throw; // Re-throw to be handled by caller
283+
}
227284
}
228285

229286
/// <summary>

0 commit comments

Comments
 (0)