Skip to content

Commit 2c8e527

Browse files
committed
fin
1 parent 5756710 commit 2c8e527

File tree

6 files changed

+156
-49
lines changed

6 files changed

+156
-49
lines changed

.github/workflows/tests.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,15 @@ jobs:
3838
dotnet test SWEN3.Paperless.RabbitMq.sln --no-build --configuration Release \
3939
--logger:"console;verbosity=detailed"
4040
41+
- name: Run tests with coverage on net8.0 (fallback path)
42+
run: |
43+
dotnet test SWEN3.Paperless.RabbitMq.Tests/SWEN3.Paperless.RabbitMq.Tests.csproj \
44+
--no-build \
45+
--configuration Release \
46+
--framework net8.0 \
47+
--collect:"XPlat Code Coverage" \
48+
--results-directory ./coverage-net8
49+
4150
- name: Run tests with coverage on net9.0 (fallback path)
4251
run: |
4352
dotnet test SWEN3.Paperless.RabbitMq.Tests/SWEN3.Paperless.RabbitMq.Tests.csproj \
Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,55 @@
1+
using System.Collections.Concurrent;
12
using System.Threading.Channels;
23

34
namespace SWEN3.Paperless.RabbitMq.Tests.Helpers;
45

56
/// <summary>
6-
/// Test-only implementation of ISseStream that can be completed on demand.
7-
/// Uses a Channel internally to enable natural completion of ReadAllAsync.
7+
/// Test-only implementation of ISseStream that mirrors production semantics:
8+
/// one channel per subscriber plus an explicit Complete() hook to end streams naturally.
89
/// </summary>
910
internal sealed class FakeCompletableSseStream<T> : ISseStream<T> where T : class
1011
{
11-
private readonly Channel<T> _channel = Channel.CreateUnbounded<T>();
12-
private readonly Dictionary<Guid, ChannelReader<T>> _subscribers = new();
13-
private readonly Lock _lock = new();
12+
private readonly ConcurrentDictionary<Guid, Channel<T>> _channels = new();
1413

1514
/// <summary>
1615
/// Gets the current number of active subscribers.
1716
/// </summary>
18-
public int ClientCount
19-
{
20-
get
21-
{
22-
lock (_lock)
23-
{
24-
return _subscribers.Count;
25-
}
26-
}
27-
}
17+
public int ClientCount => _channels.Count;
2818

2919
/// <summary>
3020
/// Publishes an event to all subscribers.
3121
/// </summary>
3222
public void Publish(T message)
3323
{
34-
_channel.Writer.TryWrite(message);
24+
foreach (var channel in _channels.Values)
25+
{
26+
channel.Writer.TryWrite(message);
27+
}
3528
}
3629

3730
/// <summary>
3831
/// Subscribes a client and returns a channel reader for receiving events.
3932
/// </summary>
4033
public ChannelReader<T> Subscribe(Guid clientId)
4134
{
42-
lock (_lock)
35+
var channel = Channel.CreateUnbounded<T>(new UnboundedChannelOptions
4336
{
44-
_subscribers[clientId] = _channel.Reader;
45-
return _channel.Reader;
46-
}
37+
SingleReader = true,
38+
SingleWriter = true
39+
});
40+
41+
_channels[clientId] = channel;
42+
return channel.Reader;
4743
}
4844

4945
/// <summary>
5046
/// Unsubscribes a client.
5147
/// </summary>
5248
public void Unsubscribe(Guid clientId)
5349
{
54-
lock (_lock)
50+
if (_channels.TryRemove(clientId, out var channel))
5551
{
56-
_subscribers.Remove(clientId);
52+
channel.Writer.TryComplete();
5753
}
5854
}
5955

@@ -62,6 +58,9 @@ public void Unsubscribe(Guid clientId)
6258
/// </summary>
6359
public void Complete()
6460
{
65-
_channel.Writer.TryComplete();
61+
foreach (var channel in _channels.Values)
62+
{
63+
channel.Writer.TryComplete();
64+
}
6665
}
6766
}

SWEN3.Paperless.RabbitMq.Tests/Helpers/RabbitMqTestContainerFactory.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using DotNet.Testcontainers.Builders;
2-
using DotNet.Testcontainers.Containers;
32

43
namespace SWEN3.Paperless.RabbitMq.Tests.Helpers;
54

SWEN3.Paperless.RabbitMq.Tests/Integration/SseExtensionsTests.cs

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,20 @@ public class SseExtensionsTests
44
{
55
private const string TestEndpoint = "/sse-test";
66

7+
private static async Task WaitForClientsAsync(ISseStream<Messages.SseTestEvent> stream, int expected, CancellationToken ct)
8+
{
9+
var start = DateTime.UtcNow;
10+
while (stream.ClientCount < expected)
11+
{
12+
if (DateTime.UtcNow - start > TimeSpan.FromSeconds(5))
13+
{
14+
throw new TimeoutException("Client did not connect within timeout");
15+
}
16+
17+
await Task.Delay(50, ct);
18+
}
19+
}
20+
721
[Fact]
822
public async Task MapSse_WithPublishedEvent_ShouldStreamToClient()
923
{
@@ -20,9 +34,7 @@ public async Task MapSse_WithPublishedEvent_ShouldStreamToClient()
2034

2135
var responseTask = client.GetAsync(TestEndpoint, HttpCompletionOption.ResponseHeadersRead, ct);
2236

23-
// Wait for HTTP client to connect
24-
while (sseStream.ClientCount == 0)
25-
await Task.Delay(50, ct);
37+
await WaitForClientsAsync(sseStream, 1, ct);
2638

2739
var testEvent = new Messages.SseTestEvent { Id = 42, Message = "Hello SSE" };
2840
sseStream.Publish(testEvent);
@@ -37,13 +49,23 @@ public async Task MapSse_WithPublishedEvent_ShouldStreamToClient()
3749
var blankLine = await reader.ReadLineAsync(ct);
3850

3951
eventLine.Should().Be("event: test-event");
40-
// Note: .NET 9 uses PascalCase, .NET 10+ uses camelCase
41-
dataLine.Should().Contain("42");
42-
dataLine.Should().Contain("Hello SSE");
52+
dataLine.Should().StartWith("data: ");
53+
var json = dataLine["data: ".Length..];
54+
using (var doc = JsonDocument.Parse(json))
55+
{
56+
var root = doc.RootElement;
57+
root.TryGetProperty("Id", out var idPascal);
58+
root.TryGetProperty("Message", out var msgPascal);
59+
root.TryGetProperty("id", out var idCamel);
60+
root.TryGetProperty("message", out var msgCamel);
61+
(idPascal.ValueKind != JsonValueKind.Undefined ? idPascal.GetInt32() : idCamel.GetInt32()).Should().Be(42);
62+
(msgPascal.ValueKind != JsonValueKind.Undefined ? msgPascal.GetString() : msgCamel.GetString()).Should().Be("Hello SSE");
63+
}
4364
blankLine.Should().BeEmpty();
4465
}
4566

4667
[Fact]
68+
[SuppressMessage("Design", "MA0051:Method is too long")]
4769
public async Task MapSse_MultipleClients_ShouldReceiveSameEvent()
4870
{
4971
var (host, server) = await SseTestHelpers.CreateSseTestServerAsync<Messages.SseTestEvent>(
@@ -63,9 +85,7 @@ public async Task MapSse_MultipleClients_ShouldReceiveSameEvent()
6385
var responseTask1 = client1.GetAsync(TestEndpoint, HttpCompletionOption.ResponseHeadersRead, ct);
6486
var responseTask2 = client2.GetAsync(TestEndpoint, HttpCompletionOption.ResponseHeadersRead, ct);
6587

66-
// Wait for both HTTP clients to connect
67-
while (sseStream.ClientCount < 2)
68-
await Task.Delay(50, ct);
88+
await WaitForClientsAsync(sseStream, 2, ct);
6989

7090
var testEvent = new Messages.SseTestEvent { Id = 99, Message = "Broadcast" };
7191
sseStream.Publish(testEvent);
@@ -90,14 +110,33 @@ public async Task MapSse_MultipleClients_ShouldReceiveSameEvent()
90110
var blank2 = await reader2.ReadLineAsync(ct);
91111

92112
event1.Should().Be("event: test-event");
93-
// Note: .NET 9 uses PascalCase, .NET 10+ uses camelCase
94-
data1.Should().Contain("99");
95-
data1.Should().Contain("Broadcast");
113+
data1.Should().StartWith("data: ");
114+
var json1 = data1["data: ".Length..];
115+
using (var doc = JsonDocument.Parse(json1))
116+
{
117+
var root = doc.RootElement;
118+
root.TryGetProperty("Id", out var idPascal);
119+
root.TryGetProperty("Message", out var msgPascal);
120+
root.TryGetProperty("id", out var idCamel);
121+
root.TryGetProperty("message", out var msgCamel);
122+
(idPascal.ValueKind != JsonValueKind.Undefined ? idPascal.GetInt32() : idCamel.GetInt32()).Should().Be(99);
123+
(msgPascal.ValueKind != JsonValueKind.Undefined ? msgPascal.GetString() : msgCamel.GetString()).Should().Be("Broadcast");
124+
}
96125
blank1.Should().BeEmpty();
97126

98127
event2.Should().Be("event: test-event");
99-
data2.Should().Contain("99");
100-
data2.Should().Contain("Broadcast");
128+
data2.Should().StartWith("data: ");
129+
var json2 = data2["data: ".Length..];
130+
using (var doc = JsonDocument.Parse(json2))
131+
{
132+
var root = doc.RootElement;
133+
root.TryGetProperty("Id", out var idPascal);
134+
root.TryGetProperty("Message", out var msgPascal);
135+
root.TryGetProperty("id", out var idCamel);
136+
root.TryGetProperty("message", out var msgCamel);
137+
(idPascal.ValueKind != JsonValueKind.Undefined ? idPascal.GetInt32() : idCamel.GetInt32()).Should().Be(99);
138+
(msgPascal.ValueKind != JsonValueKind.Undefined ? msgPascal.GetString() : msgCamel.GetString()).Should().Be("Broadcast");
139+
}
101140
blank2.Should().BeEmpty();
102141
}
103142
}

SWEN3.Paperless.RabbitMq.Tests/Unit/SseExtensionsFallbackTests.cs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public async Task MapSse_Fallback_ShouldWriteCorrectSseFormat()
4747
}
4848

4949
[Fact]
50+
[SuppressMessage("Design", "MA0051:Method is too long")]
5051
public async Task MapSse_Fallback_ValidatesHeadersAndMultiEventPayload()
5152
{
5253
// Arrange
@@ -120,7 +121,7 @@ public async Task MapSse_Fallback_CompletesNaturallyWhenStreamEnds()
120121
using var cts = CancellationTokenSource.CreateLinkedTokenSource(TestContext.Current.CancellationToken);
121122
cts.CancelAfter(TimeSpan.FromSeconds(30));
122123

123-
var fakeStream = new Helpers.FakeCompletableSseStream<Messages.SseTestEvent>();
124+
var fakeStream = new FakeCompletableSseStream<Messages.SseTestEvent>();
124125

125126
var (host, server) = await SseTestHelpers.CreateSseTestServerAsync<Messages.SseTestEvent>(
126127
configureServices: services => services.AddSingleton<ISseStream<Messages.SseTestEvent>>(fakeStream),
@@ -132,11 +133,17 @@ public async Task MapSse_Fallback_CompletesNaturallyWhenStreamEnds()
132133
var client = server.CreateClient();
133134
client.Timeout = Timeout.InfiniteTimeSpan;
134135

135-
// Act - Connect without cancellation token to avoid cancellation-based termination
136-
var responseTask = client.GetAsync("/sse", HttpCompletionOption.ResponseHeadersRead, CancellationToken.None);
136+
// Act - connect with test token
137+
var responseTask = client.GetAsync("/sse", HttpCompletionOption.ResponseHeadersRead, cts.Token);
137138

138-
// Wait for connection to establish
139-
await Task.Delay(100, cts.Token);
139+
// Wait for connection to establish (subscriber registered)
140+
var start = DateTime.UtcNow;
141+
while (fakeStream.ClientCount == 0)
142+
{
143+
if (DateTime.UtcNow - start > TimeSpan.FromSeconds(5))
144+
throw new TimeoutException("SSE client did not connect");
145+
await Task.Delay(50, cts.Token);
146+
}
140147

141148
// Publish one event
142149
fakeStream.Publish(new Messages.SseTestEvent { Id = 42, Message = "Final" });

SWEN3.Paperless.RabbitMq.Tests/Unit/SseExtensionsNet10Tests.cs

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,20 @@ namespace SWEN3.Paperless.RabbitMq.Tests.Unit;
44
public static class SseExtensionsNet10Tests
55
{
66
#if NET10_0_OR_GREATER
7+
private static async Task WaitForClientsAsync(ISseStream<Messages.SseTestEvent> stream, int expected, CancellationToken ct)
8+
{
9+
var start = DateTime.UtcNow;
10+
while (stream.ClientCount < expected)
11+
{
12+
if (DateTime.UtcNow - start > TimeSpan.FromSeconds(5))
13+
{
14+
throw new TimeoutException("Client did not connect within timeout");
15+
}
16+
17+
await Task.Delay(50, ct);
18+
}
19+
}
20+
721
[Fact]
822
public static async Task MapSse_Net10_ShouldStreamEventsUsingNativeApi()
923
{
@@ -25,9 +39,7 @@ public static async Task MapSse_Net10_ShouldStreamEventsUsingNativeApi()
2539
// Act
2640
var responseTask = client.GetAsync("/sse", HttpCompletionOption.ResponseHeadersRead, cts.Token);
2741

28-
// Wait for client to connect
29-
while (sseStream.ClientCount == 0)
30-
await Task.Delay(50, cts.Token);
42+
await WaitForClientsAsync(sseStream, 1, cts.Token);
3143

3244
// Publish once
3345
sseStream.Publish(new Messages.SseTestEvent { Id = 1, Message = "Hello" });
@@ -76,9 +88,7 @@ public static async Task MapSse_Net10_ShouldStreamMultipleEvents()
7688
new Messages.SseTestEvent { Id = 3, Message = "Third" }
7789
};
7890

79-
// Wait for client to connect
80-
while (sseStream.ClientCount == 0)
81-
await Task.Delay(50, cts.Token);
91+
await WaitForClientsAsync(sseStream, 1, cts.Token);
8292

8393
// Publish events once
8494
foreach (var evt in events)
@@ -173,6 +183,50 @@ public static async Task MapSse_Net10_CompletesIteratorWhenClientDisconnects()
173183
}
174184
}
175185

186+
[Fact]
187+
public static async Task MapSse_Net10_RequestAborted_CompletesStream()
188+
{
189+
using var cts = CancellationTokenSource.CreateLinkedTokenSource(TestContext.Current.CancellationToken);
190+
cts.CancelAfter(TimeSpan.FromSeconds(10));
191+
192+
var (host, server) = await SseTestHelpers.CreateSseTestServerAsync<Messages.SseTestEvent>(
193+
configureServices: null,
194+
configureEndpoints: e => e.MapSse<Messages.SseTestEvent>("/sse",
195+
m => new { m.Id, m.Message },
196+
_ => "evt-abort"));
197+
198+
using var _ = host;
199+
var client = server.CreateClient();
200+
client.Timeout = Timeout.InfiniteTimeSpan;
201+
var sseStream = host.Services.GetRequiredService<ISseStream<Messages.SseTestEvent>>();
202+
203+
var responseTask = client.GetAsync("/sse", HttpCompletionOption.ResponseHeadersRead, cts.Token);
204+
205+
await WaitForClientsAsync(sseStream, 1, cts.Token);
206+
207+
sseStream.Publish(new Messages.SseTestEvent { Id = 2, Message = "Abort" });
208+
209+
using var resp = await responseTask;
210+
resp.EnsureSuccessStatusCode();
211+
await using var body = await resp.Content.ReadAsStreamAsync(cts.Token);
212+
using var reader = new StreamReader(body);
213+
214+
var eventLine = await reader.ReadLineAsync(cts.Token);
215+
var dataLine = await reader.ReadLineAsync(cts.Token);
216+
var blankLine = await reader.ReadLineAsync(cts.Token);
217+
218+
eventLine.Should().Be("event: evt-abort");
219+
dataLine.Should().Be("data: {\"id\":2,\"message\":\"Abort\"}");
220+
blankLine.Should().BeEmpty();
221+
222+
// Abort request to trigger RequestAborted callback and end the stream
223+
await cts.CancelAsync();
224+
225+
// Further reads should not yield data; cancellation should surface
226+
Func<Task> readMore = async () => await reader.ReadLineAsync(cts.Token);
227+
await readMore.Should().ThrowAsync<OperationCanceledException>();
228+
}
229+
176230
[Fact]
177231
public static async Task MapSse_Net10_CompletesNaturallyWhenStreamEnds()
178232
{

0 commit comments

Comments
 (0)