Skip to content

Commit 77226a9

Browse files
committed
Refactor and move more logic into the StreamTransformerBase class
1 parent ebe6ef8 commit 77226a9

File tree

4 files changed

+246
-251
lines changed

4 files changed

+246
-251
lines changed

src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs

Lines changed: 47 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
33
// See the LICENSE file in the project root for more information
44

5+
using System.Buffers;
56
using System.IO.Pipelines;
67
using System.Text;
78
using System.Text.Json;
@@ -15,146 +16,70 @@ namespace Elastic.Documentation.Api.Infrastructure.Adapters.AskAi;
1516
/// </summary>
1617
public class AgentBuilderStreamTransformer(ILogger<AgentBuilderStreamTransformer> logger) : StreamTransformerBase(logger)
1718
{
18-
// SSE format constants
19-
private const string EventPrefix = "event:";
20-
private const string DataPrefix = "data:";
21-
private const int EventPrefixLength = 6; // "event:".Length
22-
private const int DataPrefixLength = 5; // "data:".Length
23-
24-
protected override async Task ProcessStreamAsync(StreamReader reader, PipeWriter writer, CancellationToken cancellationToken)
19+
protected override AskAiEvent? TransformJsonEvent(string? eventType, JsonElement json)
2520
{
26-
Logger.LogInformation("Starting Agent Builder stream transformation");
27-
string? currentEvent = null;
28-
var dataBuilder = new StringBuilder();
29-
var eventCount = 0;
21+
var type = eventType ?? "message";
22+
var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
23+
var id = Guid.NewGuid().ToString();
3024

31-
while (!cancellationToken.IsCancellationRequested)
25+
// Special handling for error events - they may have a different structure
26+
if (type == "error")
3227
{
33-
var line = await reader.ReadLineAsync(cancellationToken);
34-
if (line == null)
35-
{
36-
Logger.LogInformation("Stream ended after processing {EventCount} events", eventCount);
37-
break;
38-
}
39-
40-
// Parse SSE format
41-
if (line.Length > 0 && line[0] == ':')
42-
{
43-
// Comment/keep-alive - skip
44-
continue;
45-
}
46-
else if (line.StartsWith(EventPrefix, StringComparison.Ordinal))
47-
{
48-
currentEvent = line.Substring(EventPrefixLength).Trim();
49-
}
50-
else if (line.StartsWith(DataPrefix, StringComparison.Ordinal))
51-
{
52-
_ = dataBuilder.Append(line.Substring(DataPrefixLength).Trim());
53-
}
54-
else if (string.IsNullOrEmpty(line))
55-
{
56-
// End of event - transform and write immediately
57-
if (currentEvent != null && dataBuilder.Length > 0)
58-
{
59-
var eventData = dataBuilder.ToString();
60-
var transformedEvent = TransformEvent(currentEvent, eventData);
61-
await WriteEventAsync(transformedEvent, writer, cancellationToken);
62-
if (transformedEvent != null)
63-
eventCount++;
64-
}
65-
66-
// Reset for next event
67-
currentEvent = null;
68-
_ = dataBuilder.Clear();
69-
}
28+
return ParseErrorEventFromRoot(id, timestamp, json);
7029
}
7130

72-
Logger.LogInformation("Completed Agent Builder stream transformation. Total events: {EventCount}", eventCount);
73-
}
74-
75-
private AskAiEvent? TransformEvent(string eventType, string data)
76-
{
77-
try
31+
// Most Agent Builder events have data nested in a "data" property
32+
if (!json.TryGetProperty("data", out var innerData))
7833
{
79-
var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
80-
var id = Guid.NewGuid().ToString();
81-
82-
using var doc = JsonDocument.Parse(data);
83-
var root = doc.RootElement;
84-
85-
// Special handling for error events - they may have a different structure
86-
if (eventType == "error")
87-
{
88-
return ParseErrorEventFromRoot(id, timestamp, root);
89-
}
90-
91-
// Most Agent Builder events have data nested in a "data" property
92-
if (!root.TryGetProperty("data", out var innerData))
93-
{
94-
Logger.LogDebug("Agent Builder event without 'data' property (skipping): {EventType}", eventType);
95-
return null;
96-
}
34+
Logger.LogDebug("Agent Builder event without 'data' property (skipping): {EventType}", type);
35+
return null;
36+
}
9737

98-
return eventType switch
99-
{
100-
"conversation_id_set" when innerData.TryGetProperty("conversation_id", out var convId) =>
101-
new AskAiEvent.ConversationStart(id, timestamp, convId.GetString()!),
38+
return type switch
39+
{
40+
"conversation_id_set" when innerData.TryGetProperty("conversation_id", out var convId) =>
41+
new AskAiEvent.ConversationStart(id, timestamp, convId.GetString()!),
10242

103-
"message_chunk" when innerData.TryGetProperty("text_chunk", out var textChunk) =>
104-
new AskAiEvent.Chunk(id, timestamp, textChunk.GetString()!),
43+
"message_chunk" when innerData.TryGetProperty("text_chunk", out var textChunk) =>
44+
new AskAiEvent.Chunk(id, timestamp, textChunk.GetString()!),
10545

106-
"message_complete" when innerData.TryGetProperty("message_content", out var fullContent) =>
107-
new AskAiEvent.ChunkComplete(id, timestamp, fullContent.GetString()!),
46+
"message_complete" when innerData.TryGetProperty("message_content", out var fullContent) =>
47+
new AskAiEvent.ChunkComplete(id, timestamp, fullContent.GetString()!),
10848

109-
"reasoning" =>
110-
// Parse reasoning message if available
111-
ParseReasoningEvent(id, timestamp, innerData),
49+
"reasoning" =>
50+
// Parse reasoning message if available
51+
ParseReasoningEvent(id, timestamp, innerData),
11252

113-
"tool_call" =>
114-
// Parse tool call
115-
ParseToolCallEvent(id, timestamp, innerData),
53+
"tool_call" =>
54+
// Parse tool call
55+
ParseToolCallEvent(id, timestamp, innerData),
11656

117-
"tool_result" =>
118-
// Parse tool result
119-
ParseToolResultEvent(id, timestamp, innerData),
57+
"tool_result" =>
58+
// Parse tool result
59+
ParseToolResultEvent(id, timestamp, innerData),
12060

121-
"round_complete" =>
122-
new AskAiEvent.ConversationEnd(id, timestamp),
61+
"round_complete" =>
62+
new AskAiEvent.ConversationEnd(id, timestamp),
12363

124-
"conversation_created" =>
125-
null, // Skip, already handled by conversation_id_set
64+
"conversation_created" =>
65+
null, // Skip, already handled by conversation_id_set
12666

127-
_ => LogUnknownEvent(eventType, data)
128-
};
129-
}
130-
catch (JsonException ex)
131-
{
132-
Logger.LogError(ex, "Failed to parse Agent Builder event: {EventType}, data: {Data}", eventType, data);
133-
return null;
134-
}
67+
_ => LogUnknownEvent(type, json)
68+
};
13569
}
13670

137-
private AskAiEvent? LogUnknownEvent(string eventType, string data)
71+
private AskAiEvent? LogUnknownEvent(string eventType, JsonElement _)
13872
{
139-
Logger.LogWarning("Unknown Agent Builder event type: {EventType}, data: {Data}", eventType, data);
73+
Logger.LogWarning("Unknown Agent Builder event type: {EventType}", eventType);
14074
return null;
14175
}
14276

14377
private AskAiEvent.Reasoning ParseReasoningEvent(string id, long timestamp, JsonElement innerData)
14478
{
145-
string? message = null;
146-
147-
// Try common property names
148-
if (innerData.TryGetProperty("message", out var msgProp))
149-
message = msgProp.GetString();
150-
else if (innerData.TryGetProperty("text", out var textProp))
151-
message = textProp.GetString();
152-
else if (innerData.TryGetProperty("content", out var contentProp))
153-
message = contentProp.GetString();
154-
else if (innerData.TryGetProperty("reasoning", out var reasoningProp))
155-
message = reasoningProp.GetString();
156-
else if (innerData.TryGetProperty("status", out var statusProp))
157-
message = statusProp.GetString();
79+
// Agent Builder sends: {"data":{"reasoning":"..."}}
80+
var message = innerData.TryGetProperty("reasoning", out var reasoningProp)
81+
? reasoningProp.GetString()
82+
: null;
15883

15984
return new AskAiEvent.Reasoning(id, timestamp, message ?? "Thinking...");
16085
}
@@ -203,45 +128,11 @@ private AskAiEvent ParseToolCallEvent(string id, long timestamp, JsonElement inn
203128

204129
private AskAiEvent.ErrorEvent ParseErrorEventFromRoot(string id, long timestamp, JsonElement root)
205130
{
206-
// Try to extract error message from different possible structures
207-
string? errorMessage = null;
208-
209-
// Check if there's a data property first
210-
if (root.TryGetProperty("data", out var dataElement))
211-
{
212-
if (dataElement.TryGetProperty("error", out var errorProp))
213-
{
214-
// Agent Builder sends: {"data":{"error":{"code":"...","message":"...","meta":{...}}}}
215-
if (errorProp.ValueKind == JsonValueKind.Object && errorProp.TryGetProperty("message", out var msgProp))
216-
{
217-
errorMessage = msgProp.GetString();
218-
}
219-
else if (errorProp.ValueKind == JsonValueKind.String)
220-
{
221-
errorMessage = errorProp.GetString();
222-
}
223-
}
224-
else if (dataElement.TryGetProperty("message", out var directMsg))
225-
{
226-
errorMessage = directMsg.GetString();
227-
}
228-
}
229-
// Or error might be at root level
230-
else if (root.TryGetProperty("error", out var rootError))
231-
{
232-
if (rootError.ValueKind == JsonValueKind.Object && rootError.TryGetProperty("message", out var msgProp))
233-
{
234-
errorMessage = msgProp.GetString();
235-
}
236-
else if (rootError.ValueKind == JsonValueKind.String)
237-
{
238-
errorMessage = rootError.GetString();
239-
}
240-
}
241-
else if (root.TryGetProperty("message", out var rootMsg))
242-
{
243-
errorMessage = rootMsg.GetString();
244-
}
131+
// Agent Builder sends: {"error":{"code":"...","message":"...","meta":{...}}}
132+
var errorMessage = root.TryGetProperty("error", out var errorProp) &&
133+
errorProp.TryGetProperty("message", out var msgProp)
134+
? msgProp.GetString()
135+
: null;
245136

246137
Logger.LogError("Error event received from Agent Builder: {ErrorMessage}", errorMessage ?? "Unknown error");
247138

src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayStreamTransformer.cs

Lines changed: 35 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
33
// See the LICENSE file in the project root for more information
44

5+
using System.Buffers;
56
using System.IO.Pipelines;
67
using System.Text;
78
using System.Text.Json;
@@ -15,80 +16,49 @@ namespace Elastic.Documentation.Api.Infrastructure.Adapters.AskAi;
1516
/// </summary>
1617
public class LlmGatewayStreamTransformer(ILogger<LlmGatewayStreamTransformer> logger) : StreamTransformerBase(logger)
1718
{
18-
protected override async Task ProcessStreamAsync(StreamReader reader, PipeWriter writer, CancellationToken cancellationToken)
19+
protected override AskAiEvent? TransformJsonEvent(string? eventType, JsonElement json)
1920
{
20-
while (!cancellationToken.IsCancellationRequested)
21+
// LLM Gateway format: ["custom", {type: "...", ...}]
22+
if (json.ValueKind != JsonValueKind.Array || json.GetArrayLength() < 2)
2123
{
22-
var line = await reader.ReadLineAsync(cancellationToken);
23-
if (line == null)
24-
break;
25-
26-
// LLM Gateway uses "data:" prefix
27-
if (!line.StartsWith("data:", StringComparison.Ordinal))
28-
continue;
29-
30-
var data = line.Substring(5).Trim();
31-
if (string.IsNullOrEmpty(data))
32-
continue;
33-
34-
var transformedEvent = TransformEvent(data);
35-
await WriteEventAsync(transformedEvent, writer, cancellationToken);
24+
Logger.LogWarning("LLM Gateway data is not in expected array format");
25+
return null;
3626
}
37-
}
3827

39-
private AskAiEvent? TransformEvent(string data)
40-
{
41-
try
42-
{
43-
// LLM Gateway format: [null, {message}]
44-
using var doc = JsonDocument.Parse(data);
45-
var array = doc.RootElement;
28+
// Extract the actual message object from index 1 (index 0 is always "custom")
29+
var message = json[1];
30+
var type = message.GetProperty("type").GetString();
31+
var timestamp = message.GetProperty("timestamp").GetInt64();
32+
var id = message.GetProperty("id").GetString()!;
33+
var messageData = message.GetProperty("data");
4634

47-
if (array.ValueKind != JsonValueKind.Array || array.GetArrayLength() < 2)
48-
{
49-
Logger.LogWarning("LLM Gateway data is not in expected array format");
50-
return null;
51-
}
52-
53-
var message = array[1];
54-
var type = message.GetProperty("type").GetString();
55-
var timestamp = message.GetProperty("timestamp").GetInt64();
56-
var id = message.GetProperty("id").GetString()!;
57-
var messageData = message.GetProperty("data");
58-
59-
return type switch
60-
{
61-
"agent_start" =>
62-
// LLM Gateway doesn't provide conversation ID, so generate one
63-
new AskAiEvent.ConversationStart(id, timestamp, Guid.NewGuid().ToString()),
35+
return type switch
36+
{
37+
"agent_start" =>
38+
// LLM Gateway doesn't provide conversation ID, so generate one
39+
new AskAiEvent.ConversationStart(id, timestamp, Guid.NewGuid().ToString()),
6440

65-
"ai_message_chunk" when messageData.TryGetProperty("content", out var content) =>
66-
new AskAiEvent.Chunk(id, timestamp, content.GetString()!),
41+
"ai_message_chunk" when messageData.TryGetProperty("content", out var content) =>
42+
new AskAiEvent.Chunk(id, timestamp, content.GetString()!),
6743

68-
"ai_message" when messageData.TryGetProperty("content", out var fullContent) =>
69-
new AskAiEvent.ChunkComplete(id, timestamp, fullContent.GetString()!),
44+
"ai_message" when messageData.TryGetProperty("content", out var fullContent) =>
45+
new AskAiEvent.ChunkComplete(id, timestamp, fullContent.GetString()!),
7046

71-
"tool_call" when messageData.TryGetProperty("toolCalls", out var toolCalls) =>
72-
TransformToolCall(id, timestamp, toolCalls),
47+
"tool_call" when messageData.TryGetProperty("toolCalls", out var toolCalls) =>
48+
TransformToolCall(id, timestamp, toolCalls),
7349

74-
"tool_message" when messageData.TryGetProperty("toolCallId", out var toolCallId)
75-
&& messageData.TryGetProperty("result", out var result) =>
76-
new AskAiEvent.ToolResult(id, timestamp, toolCallId.GetString()!, result.GetString()!),
50+
"tool_message" when messageData.TryGetProperty("toolCallId", out var toolCallId)
51+
&& messageData.TryGetProperty("result", out var result) =>
52+
new AskAiEvent.ToolResult(id, timestamp, toolCallId.GetString()!, result.GetString()!),
7753

78-
"agent_end" =>
79-
new AskAiEvent.ConversationEnd(id, timestamp),
54+
"agent_end" =>
55+
new AskAiEvent.ConversationEnd(id, timestamp),
8056

81-
"chat_model_start" or "chat_model_end" =>
82-
null, // Skip model lifecycle events
57+
"chat_model_start" or "chat_model_end" =>
58+
null, // Skip model lifecycle events
8359

84-
_ => LogUnknownEvent(type, data)
85-
};
86-
}
87-
catch (JsonException ex)
88-
{
89-
Logger.LogError(ex, "Failed to parse LLM Gateway event: {Data}", data);
90-
return null;
91-
}
60+
_ => LogUnknownEvent(type, json)
61+
};
9262
}
9363

9464
private AskAiEvent? TransformToolCall(string id, long timestamp, JsonElement toolCalls)
@@ -104,9 +74,7 @@ protected override async Task ProcessStreamAsync(StreamReader reader, PipeWriter
10474
var toolName = toolCall.GetProperty("name").GetString()!;
10575
var args = toolCall.GetProperty("args");
10676

107-
// Check if this is a search tool (ragSearch or similar)
108-
if (toolName != null && (toolName.Contains("search", StringComparison.OrdinalIgnoreCase) ||
109-
toolName.Contains("rag", StringComparison.OrdinalIgnoreCase)))
77+
if (toolName is not null and "ragSearch")
11078
{
11179
// LLM Gateway uses "searchQuery" in args
11280
if (args.TryGetProperty("searchQuery", out var searchQueryProp))
@@ -135,9 +103,9 @@ protected override async Task ProcessStreamAsync(StreamReader reader, PipeWriter
135103
}
136104
}
137105

138-
private AskAiEvent? LogUnknownEvent(string? type, string data)
106+
private AskAiEvent? LogUnknownEvent(string? type, JsonElement _)
139107
{
140-
Logger.LogWarning("Unknown LLM Gateway event type: {Type}, data: {Data}", type, data);
108+
Logger.LogWarning("Unknown LLM Gateway event type: {Type}", type);
141109
return null;
142110
}
143111
}

0 commit comments

Comments
 (0)