Skip to content

Commit 51926dc

Browse files
ANcpLuaclaude
andcommitted
fix(tests): resolve SSE test hanging issue
- Refactored all SSE tests to use FakeCompletableSseStream with .Complete() - Added .AsTask().WaitAsync() pattern for read operations with timeouts - Fixed tests that were hanging due to never-ending SSE streams: - SseExtensionsTests.cs - SseExtensionsFallbackTests.cs - SseExtensionsNet10Tests.cs - GenAIEventStreamIntegrationTests.cs - OcrEventStreamIntegrationTests.cs Test suite now completes in ~50 seconds instead of 11+ minutes. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent cc2a452 commit 51926dc

File tree

11 files changed

+236
-117
lines changed

11 files changed

+236
-117
lines changed

README.md

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,15 @@ services.AddPaperlessRabbitMq(config, includeOcrResultStream: true, includeGenAi
6161
### Publishing
6262

6363
```csharp
64+
// Basic command
6465
var command = new OcrCommand(docId, fileName, storagePath);
6566
await publisher.PublishOcrCommandAsync(command);
6667

68+
// With timestamp (v2.2.0+)
69+
var command = new OcrCommand(docId, fileName, storagePath, createdAt: DateTimeOffset.UtcNow);
70+
await publisher.PublishOcrCommandAsync(command);
71+
72+
// Event
6773
var result = new OcrEvent(jobId, "Completed", text, DateTimeOffset.UtcNow);
6874
await publisher.PublishOcrEventAsync(result);
6975
```
@@ -87,13 +93,28 @@ await foreach (var command in consumer.ConsumeAsync(cancellationToken))
8793
}
8894
```
8995

90-
### SSE Endpoint
96+
### SSE Endpoints
9197

9298
```csharp
93-
// Map endpoint
94-
app.MapOcrEventStream();
99+
// Built-in endpoints
100+
app.MapOcrEventStream(); // /api/v1/ocr-results
101+
app.MapGenAIEventStream(); // /api/v1/events/genai
102+
103+
// Custom endpoint with dynamic event types (v2.3.0+)
104+
app.MapSse<NotificationEvent>("/api/notifications", evt => evt.Type);
105+
106+
// Custom endpoint with fixed event type (v2.4.0+)
107+
app.MapSse<HeartbeatEvent>("/api/heartbeat", "heartbeat");
108+
109+
// Full control with payload projection
110+
app.MapSse<OcrEvent>("/api/ocr",
111+
evt => new { evt.JobId, evt.Status }, // payload
112+
evt => evt.Status == "Completed" ? "done" : "error"); // event type
113+
```
95114

96-
// Client-side
115+
**Client-side:**
116+
117+
```javascript
97118
const eventSource = new EventSource('/api/v1/ocr-results');
98119
eventSource.addEventListener('ocr-completed', (event) => {
99120
const data = JSON.parse(event.data);
@@ -139,18 +160,57 @@ await _publisher.PublishGenAICommandAsync(genAiCommand);
139160
## Message Types
140161

141162
```csharp
142-
public record OcrCommand(Guid JobId, string FileName, string FilePath);
163+
// OCR
164+
public record OcrCommand(Guid JobId, string FileName, string FilePath, DateTimeOffset? CreatedAt = null);
143165
public record OcrEvent(Guid JobId, string Status, string? Text, DateTimeOffset ProcessedAt);
166+
167+
// GenAI
144168
public record GenAICommand(Guid DocumentId, string Text);
145-
public record GenAIEvent(Guid DocumentId, string? Summary, DateTimeOffset ProcessedAt, string? ErrorMessage = null);
169+
public record GenAIEvent(Guid DocumentId, string? Summary, DateTimeOffset GeneratedAt, string? ErrorMessage = null);
146170
```
147171

172+
## SSE MapSse Overloads
173+
174+
| Overload | Use Case |
175+
|----------|----------|
176+
| `MapSse<T>(pattern, payloadSelector, eventTypeSelector)` | Full control over payload and per-event type |
177+
| `MapSse<T>(pattern, eventTypeSelector)` | Serialize whole object, dynamic event type |
178+
| `MapSse<T>(pattern, eventType)` | Serialize whole object, fixed event type |
179+
148180
## Installation
149181

150182
```bash
151183
dotnet add package SWEN3.Paperless.RabbitMq
152184
```
153185

186+
## Changelog
187+
188+
### v2.4.0
189+
190+
- Added `MapSse<T>(pattern, eventType)` overload for fixed event type streams
191+
- .NET 10: Uses optimized `TypedResults.ServerSentEvents` overload (no per-event `SseItem<T>` allocation)
192+
193+
### v2.3.0
194+
195+
- `ISseStream<T>` now extends `IAsyncDisposable` for graceful shutdown
196+
- Added `MapSse<T>(pattern, eventTypeSelector)` overload for simplified endpoint registration
197+
- `SseStream<T>` completes all client channels on dispose
198+
199+
### v2.2.0
200+
201+
- Added optional `CreatedAt` parameter to `OcrCommand` for document timestamp tracking
202+
- Updated default Gemini model to `gemini-2.5-flash`
203+
204+
### v2.1.0
205+
206+
- GenAI SSE streaming support
207+
208+
### v2.0.0
209+
210+
- Simplified `AddPaperlessGenAI()` registration
211+
- Added `Microsoft.Extensions.Http.Resilience` for automatic retry/circuit breaker
212+
- Configuration section renamed: `GenAI:Gemini``Gemini`
213+
154214
## License
155215

156216
This project is licensed under the [MIT License](LICENSE).

SWEN3.Paperless.RabbitMq.Tests/Helpers/SseTestHelpers.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,14 @@ public static async Task WaitForClientsAsync<T>(
4242

4343
try
4444
{
45+
// PeriodicTimer is more efficient than Task.Delay in a loop:
46+
// - Returns ValueTask (often zero-alloc) vs new Task per iteration
47+
// - Clearer semantics: "wait for next tick" vs "delay for X ms"
48+
using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(PollingIntervalMs));
49+
4550
while (stream.ClientCount < expectedClients)
4651
{
47-
await Task.Delay(PollingIntervalMs, timeoutCts.Token);
52+
await timer.WaitForNextTickAsync(timeoutCts.Token);
4853
}
4954

5055
if (stabilize)

SWEN3.Paperless.RabbitMq.Tests/Integration/GenAIEventStreamIntegrationTests.cs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,19 @@ namespace SWEN3.Paperless.RabbitMq.Tests.Integration;
22

33
public class GenAIEventStreamIntegrationTests
44
{
5+
private static readonly TimeSpan ReadTimeout = TimeSpan.FromSeconds(5);
6+
57
[Theory]
68
[InlineData("Completed", "genai-completed")]
79
[InlineData("Failed", "genai-failed")]
810
public async Task MapGenAIEventStream_ShouldEmitCorrectEventType(string status, string expectedEventType)
911
{
12+
var fakeStream = new FakeCompletableSseStream<GenAIEvent>();
1013
var (host, server) = await SseTestHelpers.CreateSseTestServerAsync<GenAIEvent>(
11-
configureServices: null,
14+
configureServices: s => s.AddSingleton<ISseStream<GenAIEvent>>(fakeStream),
1215
configureEndpoints: endpoints => endpoints.MapGenAIEventStream());
1316

1417
using var _ = host;
15-
var sseStream = host.Services.GetRequiredService<ISseStream<GenAIEvent>>();
1618
var client = server.CreateClient();
1719
var ct = TestContext.Current.CancellationToken;
1820

@@ -25,20 +27,21 @@ public async Task MapGenAIEventStream_ShouldEmitCorrectEventType(string status,
2527

2628
while (true)
2729
{
28-
var line = await reader.ReadLineAsync(ct);
30+
var line = await reader.ReadLineAsync(ct).AsTask().WaitAsync(ReadTimeout, ct);
2931
if (line == null)
3032
return null;
3133
if (line.StartsWith("event:"))
3234
return line;
3335
}
3436
}, ct);
3537

36-
await SseTestHelpers.WaitForClientsAsync(sseStream, stabilize: false, cancellationToken: ct);
38+
await SseTestHelpers.WaitForClientsAsync(fakeStream, stabilize: false, cancellationToken: ct);
3739

3840
var genAiEvent = status == "Completed"
3941
? new GenAIEvent(Guid.NewGuid(), "Test summary", DateTimeOffset.UtcNow)
4042
: new GenAIEvent(Guid.NewGuid(), string.Empty, DateTimeOffset.UtcNow, "Service error");
41-
sseStream.Publish(genAiEvent);
43+
fakeStream.Publish(genAiEvent);
44+
fakeStream.Complete();
4245

4346
var eventLine = await readTask;
4447

@@ -48,26 +51,27 @@ public async Task MapGenAIEventStream_ShouldEmitCorrectEventType(string status,
4851
[Fact]
4952
public async Task MapGenAIEventStream_WithMultipleEvents_ShouldStreamInOrder()
5053
{
54+
var fakeStream = new FakeCompletableSseStream<GenAIEvent>();
5155
var (host, server) = await SseTestHelpers.CreateSseTestServerAsync<GenAIEvent>(
52-
configureServices: null,
56+
configureServices: s => s.AddSingleton<ISseStream<GenAIEvent>>(fakeStream),
5357
configureEndpoints: endpoints => endpoints.MapGenAIEventStream());
5458

5559
using var _ = host;
56-
var sseStream = host.Services.GetRequiredService<ISseStream<GenAIEvent>>();
5760
var client = server.CreateClient();
5861
var ct = TestContext.Current.CancellationToken;
5962

6063
var readTask = Task.Run(() => ReadEventsAsync(client, 3, ct), ct);
6164

62-
await SseTestHelpers.WaitForClientsAsync(sseStream, cancellationToken: ct);
65+
await SseTestHelpers.WaitForClientsAsync(fakeStream, cancellationToken: ct);
6366

64-
sseStream.Publish(new GenAIEvent(Guid.NewGuid(), "Summary 1", DateTimeOffset.UtcNow));
67+
fakeStream.Publish(new GenAIEvent(Guid.NewGuid(), "Summary 1", DateTimeOffset.UtcNow));
6568
await Task.Delay(50, ct);
6669

67-
sseStream.Publish(new GenAIEvent(Guid.NewGuid(), "Summary 2", DateTimeOffset.UtcNow));
70+
fakeStream.Publish(new GenAIEvent(Guid.NewGuid(), "Summary 2", DateTimeOffset.UtcNow));
6871
await Task.Delay(50, ct);
6972

70-
sseStream.Publish(new GenAIEvent(Guid.NewGuid(), string.Empty, DateTimeOffset.UtcNow, "Error occurred"));
73+
fakeStream.Publish(new GenAIEvent(Guid.NewGuid(), string.Empty, DateTimeOffset.UtcNow, "Error occurred"));
74+
fakeStream.Complete();
7175

7276
var events = await readTask;
7377

@@ -98,7 +102,7 @@ await client.GetAsync("/api/v1/events/genai", HttpCompletionOption.ResponseHeade
98102

99103
while (events.Count < count)
100104
{
101-
var line = await reader.ReadLineAsync(cancellationToken);
105+
var line = await reader.ReadLineAsync(cancellationToken).AsTask().WaitAsync(ReadTimeout, cancellationToken);
102106
if (line == null)
103107
break;
104108

SWEN3.Paperless.RabbitMq.Tests/Integration/OcrEventStreamIntegrationTests.cs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,20 @@ namespace SWEN3.Paperless.RabbitMq.Tests.Integration;
22

33
public sealed class OcrEventStreamIntegrationTests
44
{
5+
private static readonly TimeSpan ReadTimeout = TimeSpan.FromSeconds(5);
6+
57
[Theory]
68
[InlineData("Completed", "ocr-completed")]
79
[InlineData("Failed", "ocr-failed")]
810
[InlineData("Processing", "ocr-failed")]
911
public async Task MapOcrEventStream_ShouldEmitCorrectEventType(string status, string expectedEventType)
1012
{
13+
var fakeStream = new FakeCompletableSseStream<OcrEvent>();
1114
var (host, server) = await SseTestHelpers.CreateSseTestServerAsync<OcrEvent>(
12-
configureServices: null,
15+
configureServices: s => s.AddSingleton<ISseStream<OcrEvent>>(fakeStream),
1316
configureEndpoints: endpoints => endpoints.MapOcrEventStream());
1417

1518
using var _ = host;
16-
var sseStream = host.Services.GetRequiredService<ISseStream<OcrEvent>>();
1719
var ct = TestContext.Current.CancellationToken;
1820

1921
var readTask = Task.Run(async () =>
@@ -26,19 +28,20 @@ public async Task MapOcrEventStream_ShouldEmitCorrectEventType(string status, st
2628

2729
while (true)
2830
{
29-
var line = await reader.ReadLineAsync(ct);
31+
var line = await reader.ReadLineAsync(ct).AsTask().WaitAsync(ReadTimeout, ct);
3032
if (line == null)
3133
return null;
3234
if (line.StartsWith("event:"))
3335
return line;
3436
}
3537
}, ct);
3638

37-
await SseTestHelpers.WaitForClientsAsync(sseStream, cancellationToken: ct);
39+
await SseTestHelpers.WaitForClientsAsync(fakeStream, cancellationToken: ct);
3840

3941
var ocrEvent = new OcrEvent(Guid.NewGuid(), status, status is "Completed" ? "Text" : null,
4042
DateTimeOffset.UtcNow);
41-
sseStream.Publish(ocrEvent);
43+
fakeStream.Publish(ocrEvent);
44+
fakeStream.Complete();
4245

4346
var eventLine = await readTask;
4447

0 commit comments

Comments
 (0)