Skip to content

Commit 648ea4a

Browse files
committed
Fix SSE integration test race conditions and timeouts
Resolved test failures in CI environment by: - Added ClientCount property to ISseStream/SseStream for subscription tracking - Wait for client connection before publishing events to eliminate race conditions - Increased timeouts from 2/5 seconds to 15/30 seconds for slower CI environments - Added connection stabilization delay after client count verification - Improved event reading logic to handle stream buffering correctly All 76 tests now pass reliably in both local and CI environments.
1 parent 0180528 commit 648ea4a

File tree

4 files changed

+77
-26
lines changed

4 files changed

+77
-26
lines changed

SWEN3.Paperless.RabbitMq.Tests/Integration/GenAIEventStreamIntegrationTests.cs

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,26 @@ public async Task MapGenAIEventStream_ShouldEmitCorrectEventType(string status,
1313

1414
var readTask = Task.Run(async () =>
1515
{
16-
var response = await client.GetAsync("/api/v1/events/genai", HttpCompletionOption.ResponseHeadersRead);
16+
using var response = await client.GetAsync("/api/v1/events/genai", HttpCompletionOption.ResponseHeadersRead);
1717
await using var stream = await response.Content.ReadAsStreamAsync();
1818
using var reader = new StreamReader(stream);
1919

20-
var line = await reader.ReadLineAsync();
21-
response.Dispose();
22-
return line;
20+
while (true)
21+
{
22+
var line = await reader.ReadLineAsync();
23+
if (line == null) return null;
24+
if (line.StartsWith("event:")) return line;
25+
}
2326
});
2427

25-
await Task.Delay(100, TestContext.Current.CancellationToken);
28+
while (sseStream.ClientCount == 0) await Task.Delay(50, TestContext.Current.CancellationToken);
2629

2730
var genAiEvent = status == "Completed"
2831
? new GenAIEvent(Guid.NewGuid(), "Test summary", DateTimeOffset.UtcNow)
2932
: new GenAIEvent(Guid.NewGuid(), string.Empty, DateTimeOffset.UtcNow, "Service error");
3033
sseStream.Publish(genAiEvent);
3134

32-
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
35+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));
3336
var eventLine = await readTask.WaitAsync(cts.Token);
3437

3538
eventLine.Should().Be($"event: {expectedEventType}");
@@ -42,16 +45,31 @@ public async Task MapGenAIEventStream_WithMultipleEvents_ShouldStreamInOrder()
4245
using var server = SseTestHelpers.CreateSseTestServer(sseStream, endpoints => endpoints.MapGenAIEventStream());
4346

4447
var client = server.CreateClient();
45-
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
48+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
4649
var token = cts.Token;
4750
var readTask = Task.Run(() => ReadEventsAsync(client, 3, token), token);
4851

49-
await Task.Delay(300, TestContext.Current.CancellationToken);
52+
// Wait for client to connect with timeout
53+
var waitStart = DateTime.UtcNow;
54+
while (sseStream.ClientCount == 0)
55+
{
56+
if (DateTime.UtcNow - waitStart > TimeSpan.FromSeconds(10))
57+
throw new TimeoutException("Client did not connect within timeout");
58+
await Task.Delay(50, TestContext.Current.CancellationToken);
59+
}
60+
61+
// Give the HTTP connection a moment to stabilize
62+
await Task.Delay(100, TestContext.Current.CancellationToken);
63+
5064
sseStream.Publish(new GenAIEvent(Guid.NewGuid(), "Summary 1", DateTimeOffset.UtcNow));
65+
await Task.Delay(50, TestContext.Current.CancellationToken);
66+
5167
sseStream.Publish(new GenAIEvent(Guid.NewGuid(), "Summary 2", DateTimeOffset.UtcNow));
68+
await Task.Delay(50, TestContext.Current.CancellationToken);
69+
5270
sseStream.Publish(new GenAIEvent(Guid.NewGuid(), string.Empty, DateTimeOffset.UtcNow, "Error occurred"));
5371

54-
var events = await readTask.WaitAsync(TimeSpan.FromSeconds(2), TestContext.Current.CancellationToken);
72+
var events = await readTask.WaitAsync(TimeSpan.FromSeconds(30), TestContext.Current.CancellationToken);
5573

5674
events[0].Event.Should().Be("event: genai-completed");
5775
events[0].Data.Should().Contain("Summary 1");
@@ -73,16 +91,29 @@ public async Task MapGenAIEventStream_WithMultipleEvents_ShouldStreamInOrder()
7391
using var reader = new StreamReader(stream);
7492

7593
var events = new List<(string Event, string Data)>();
76-
for (var i = 0; i < count; i++)
77-
{
78-
var eventLine = await reader.ReadLineAsync(cancellationToken);
79-
var dataLine = await reader.ReadLineAsync(cancellationToken);
80-
81-
if (eventLine == null || dataLine == null)
82-
throw new InvalidOperationException($"Unexpected end of stream while reading event {i + 1} of {count}");
94+
string? currentEvent = null;
95+
string? currentData = null;
8396

84-
events.Add((eventLine, dataLine));
85-
await reader.ReadLineAsync(cancellationToken);
97+
while (events.Count < count)
98+
{
99+
var line = await reader.ReadLineAsync(cancellationToken);
100+
if (line == null) break;
101+
102+
if (string.IsNullOrWhiteSpace(line))
103+
{
104+
if (currentEvent != null && currentData != null)
105+
{
106+
events.Add((currentEvent, currentData));
107+
currentEvent = null;
108+
currentData = null;
109+
}
110+
continue;
111+
}
112+
113+
if (line.StartsWith("event:"))
114+
currentEvent = line;
115+
else if (line.StartsWith("data:"))
116+
currentData = line;
86117
}
87118

88119
return events;

SWEN3.Paperless.RabbitMq.Tests/Integration/OcrEventStreamIntegrationTests.cs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,38 @@ public async Task MapOcrEventStream_ShouldEmitCorrectEventType(string status, st
1111
var sseStream = new SseStream<OcrEvent>();
1212
var server = SseTestHelpers.CreateSseTestServer(sseStream, endpoints => endpoints.MapOcrEventStream());
1313

14+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
1415
var readTask = Task.Run(async () =>
1516
{
1617
using var client = server.CreateClient();
17-
var response = await client.GetAsync("/api/v1/ocr-results", HttpCompletionOption.ResponseHeadersRead);
18-
await using var stream = await response.Content.ReadAsStreamAsync();
18+
using var response = await client.GetAsync("/api/v1/ocr-results", HttpCompletionOption.ResponseHeadersRead, cts.Token);
19+
await using var stream = await response.Content.ReadAsStreamAsync(cts.Token);
1920
using var reader = new StreamReader(stream);
2021

21-
var line = await reader.ReadLineAsync();
22-
response.Dispose();
23-
return line;
24-
});
22+
while (true)
23+
{
24+
var line = await reader.ReadLineAsync(cts.Token);
25+
if (line == null) return null;
26+
if (line.StartsWith("event:")) return line;
27+
}
28+
}, cts.Token);
2529

26-
await Task.Delay(300, TestContext.Current.CancellationToken);
30+
// Wait for client to connect with timeout
31+
var waitStart = DateTime.UtcNow;
32+
while (sseStream.ClientCount == 0)
33+
{
34+
if (DateTime.UtcNow - waitStart > TimeSpan.FromSeconds(10))
35+
throw new TimeoutException("Client did not connect within timeout");
36+
await Task.Delay(50, TestContext.Current.CancellationToken);
37+
}
38+
39+
// Give the HTTP connection a moment to stabilize
40+
await Task.Delay(100, TestContext.Current.CancellationToken);
2741

2842
var ocrEvent = new OcrEvent(Guid.NewGuid(), status, status is "Completed" ? "Text" : null,
2943
DateTimeOffset.UtcNow);
3044
sseStream.Publish(ocrEvent);
3145

32-
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
3346
var eventLine = await readTask.WaitAsync(cts.Token);
3447

3548
eventLine.Should().Be($"event: {expectedEventType}");

SWEN3.Paperless.RabbitMq/Sse/ISseStream.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,9 @@ public interface ISseStream<T>
2929
/// </summary>
3030
/// <param name="item">The event to publish.</param>
3131
void Publish(T item);
32+
33+
/// <summary>
34+
/// Gets the number of currently subscribed clients.
35+
/// </summary>
36+
int ClientCount { get; }
3237
}

SWEN3.Paperless.RabbitMq/Sse/SseStream.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,6 @@ public void Publish(T item)
2323
{
2424
foreach (var channel in _channels.Values) channel.Writer.TryWrite(item);
2525
}
26+
27+
public int ClientCount => _channels.Count;
2628
}

0 commit comments

Comments
 (0)