Skip to content

Commit 19ea3d6

Browse files
committed
Refactor and fix warnings
1 parent ebbbc35 commit 19ea3d6

File tree

5 files changed

+161
-228
lines changed

5 files changed

+161
-228
lines changed

src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@
22
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
33
// See the LICENSE file in the project root for more information
44

5-
using System.Buffers;
6-
using System.IO.Pipelines;
7-
using System.Text;
85
using System.Text.Json;
96
using Elastic.Documentation.Api.Core.AskAi;
107
using Microsoft.Extensions.Logging;
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information
4+
5+
using System.Buffers;
6+
using System.IO.Pipelines;
7+
using System.Runtime.CompilerServices;
8+
using System.Text;
9+
10+
namespace Elastic.Documentation.Api.Infrastructure.Adapters.AskAi;
11+
12+
/// <summary>
13+
/// Represents a parsed Server-Sent Event (SSE)
14+
/// </summary>
15+
/// <param name="EventType">The event type from the "event:" field, or null if not specified</param>
16+
/// <param name="Data">The accumulated data from all "data:" fields</param>
17+
public record SseEvent(string? EventType, string Data);
18+
19+
/// <summary>
20+
/// Parser for Server-Sent Events (SSE) following the W3C SSE specification.
21+
/// </summary>
22+
public static class SseParser
23+
{
24+
/// <summary>
25+
/// Parse Server-Sent Events (SSE) from a PipeReader following the W3C SSE specification.
26+
/// This method handles the standard SSE format with event:, data:, and comment lines.
27+
/// </summary>
28+
public static async IAsyncEnumerable<SseEvent> ParseAsync(
29+
PipeReader reader,
30+
[EnumeratorCancellation] CancellationToken cancellationToken = default
31+
)
32+
{
33+
string? currentEvent = null;
34+
var dataBuilder = new StringBuilder();
35+
36+
while (!cancellationToken.IsCancellationRequested)
37+
{
38+
var result = await reader.ReadAsync(cancellationToken);
39+
var buffer = result.Buffer;
40+
41+
// Process all complete lines in the buffer
42+
while (TryReadLine(ref buffer, out var line))
43+
{
44+
// SSE comment line - skip
45+
if (line.Length > 0 && line[0] == ':')
46+
continue;
47+
48+
// Event type line
49+
if (line.StartsWith("event:", StringComparison.Ordinal))
50+
currentEvent = line[6..].Trim();
51+
// Data line
52+
else if (line.StartsWith("data:", StringComparison.Ordinal))
53+
_ = dataBuilder.Append(line[5..].Trim());
54+
// Empty line - marks end of event
55+
else if (string.IsNullOrEmpty(line))
56+
{
57+
if (dataBuilder.Length <= 0)
58+
continue;
59+
yield return new SseEvent(currentEvent, dataBuilder.ToString());
60+
currentEvent = null;
61+
_ = dataBuilder.Clear();
62+
}
63+
}
64+
65+
// Tell the PipeReader how much of the buffer we consumed
66+
reader.AdvanceTo(buffer.Start, buffer.End);
67+
68+
// Stop reading if there's no more data coming
69+
if (!result.IsCompleted)
70+
continue;
71+
72+
// Yield any remaining event that hasn't been terminated with an empty line
73+
if (dataBuilder.Length > 0)
74+
yield return new SseEvent(currentEvent, dataBuilder.ToString());
75+
break;
76+
}
77+
}
78+
79+
/// <summary>
80+
/// Try to read a single line from the buffer
81+
/// </summary>
82+
private static bool TryReadLine(ref ReadOnlySequence<byte> buffer, out string line)
83+
{
84+
// Look for a line ending
85+
var position = buffer.PositionOf((byte)'\n');
86+
87+
if (position == null)
88+
{
89+
line = string.Empty;
90+
return false;
91+
}
92+
93+
// Extract the line (excluding the \n)
94+
var lineSlice = buffer.Slice(0, position.Value);
95+
line = Encoding.UTF8.GetString(lineSlice).TrimEnd('\r');
96+
97+
// Skip past the line + \n
98+
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
99+
return true;
100+
}
101+
}

src/api/Elastic.Documentation.Api.Core/AskAi/StreamTransformerBase.cs renamed to src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs

Lines changed: 4 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,15 @@
22
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
33
// See the LICENSE file in the project root for more information
44

5-
using System.Buffers;
65
using System.Diagnostics;
76
using System.IO.Pipelines;
8-
using System.Runtime.CompilerServices;
97
using System.Text;
108
using System.Text.Json;
9+
using Elastic.Documentation.Api.Core;
10+
using Elastic.Documentation.Api.Core.AskAi;
1111
using Microsoft.Extensions.Logging;
1212

13-
namespace Elastic.Documentation.Api.Core.AskAi;
14-
15-
/// <summary>
16-
/// Represents a parsed Server-Sent Event (SSE)
17-
/// </summary>
18-
/// <param name="EventType">The event type from the "event:" field, or null if not specified</param>
19-
/// <param name="Data">The accumulated data from all "data:" fields</param>
20-
public record SseEvent(string? EventType, string Data);
13+
namespace Elastic.Documentation.Api.Infrastructure.Adapters.AskAi;
2114

2215
/// <summary>
2316
/// Base class for stream transformers that handles common streaming logic
@@ -137,7 +130,7 @@ protected virtual async Task ProcessStreamAsync(PipeReader reader, PipeWriter wr
137130
_ = activity?.SetParentId(parentActivity.Id);
138131

139132
List<MessagePart> outputMessageParts = [];
140-
await foreach (var sseEvent in ParseSseEventsAsync(reader, cancellationToken))
133+
await foreach (var sseEvent in SseParser.ParseAsync(reader, cancellationToken))
141134
{
142135
AskAiEvent? transformedEvent;
143136
try
@@ -277,82 +270,4 @@ protected async Task WriteEventAsync(AskAiEvent? transformedEvent, PipeWriter wr
277270
throw; // Re-throw to be handled by caller
278271
}
279272
}
280-
281-
/// <summary>
282-
/// Parse Server-Sent Events (SSE) from a PipeReader following the W3C SSE specification.
283-
/// This method handles the standard SSE format with event:, data:, and comment lines.
284-
/// </summary>
285-
private static async IAsyncEnumerable<SseEvent> ParseSseEventsAsync(
286-
PipeReader reader,
287-
[EnumeratorCancellation] Cancel cancellationToken
288-
)
289-
{
290-
string? currentEvent = null;
291-
var dataBuilder = new StringBuilder();
292-
293-
while (!cancellationToken.IsCancellationRequested)
294-
{
295-
var result = await reader.ReadAsync(cancellationToken);
296-
var buffer = result.Buffer;
297-
298-
// Process all complete lines in the buffer
299-
while (TryReadLine(ref buffer, out var line))
300-
{
301-
// SSE comment line - skip
302-
if (line.Length > 0 && line[0] == ':')
303-
continue;
304-
305-
// Event type line
306-
if (line.StartsWith("event:", StringComparison.Ordinal))
307-
currentEvent = line[6..].Trim();
308-
// Data line
309-
else if (line.StartsWith("data:", StringComparison.Ordinal))
310-
_ = dataBuilder.Append(line[5..].Trim());
311-
// Empty line - marks end of event
312-
else if (string.IsNullOrEmpty(line))
313-
{
314-
if (dataBuilder.Length <= 0)
315-
continue;
316-
yield return new SseEvent(currentEvent, dataBuilder.ToString());
317-
currentEvent = null;
318-
_ = dataBuilder.Clear();
319-
}
320-
}
321-
322-
// Tell the PipeReader how much of the buffer we consumed
323-
reader.AdvanceTo(buffer.Start, buffer.End);
324-
325-
// Stop reading if there's no more data coming
326-
if (!result.IsCompleted)
327-
continue;
328-
329-
// Yield any remaining event that hasn't been terminated with an empty line
330-
if (dataBuilder.Length > 0)
331-
yield return new SseEvent(currentEvent, dataBuilder.ToString());
332-
break;
333-
}
334-
}
335-
336-
/// <summary>
337-
/// Try to read a single line from the buffer
338-
/// </summary>
339-
private static bool TryReadLine(ref ReadOnlySequence<byte> buffer, out string line)
340-
{
341-
// Look for a line ending
342-
var position = buffer.PositionOf((byte)'\n');
343-
344-
if (position == null)
345-
{
346-
line = string.Empty;
347-
return false;
348-
}
349-
350-
// Extract the line (excluding the \n)
351-
var lineSlice = buffer.Slice(0, position.Value);
352-
line = Encoding.UTF8.GetString(lineSlice).TrimEnd('\r');
353-
354-
// Skip past the line + \n
355-
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
356-
return true;
357-
}
358273
}

0 commit comments

Comments
 (0)