Skip to content

Commit 93ca811

Browse files
ANcpLuaclaude
andcommitted
feat(sse): add IAsyncDisposable support with graceful shutdown
- Add IAsyncDisposable to ISseStream interface - Implement disposal in SseStream that completes all client channels - Update FakeCompletableSseStream with matching disposal behavior - Add disposal behavior tests (Subscribe after dispose throws, Publish after dispose is no-op, DisposeAsync completes channels) - Add simplified MapSse overload without payload selector 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 3355bc2 commit 93ca811

File tree

5 files changed

+131
-2
lines changed

5 files changed

+131
-2
lines changed

SWEN3.Paperless.RabbitMq.Tests/Helpers/FakeCompletableSseStream.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ namespace SWEN3.Paperless.RabbitMq.Tests.Helpers;
99
/// </summary>
1010
internal sealed class FakeCompletableSseStream<T> : ISseStream<T> where T : class
1111
{
12+
private volatile bool _disposed;
1213
private readonly ConcurrentDictionary<Guid, Channel<T>> _channels = new();
1314

1415
/// <summary>
@@ -21,6 +22,9 @@ internal sealed class FakeCompletableSseStream<T> : ISseStream<T> where T : clas
2122
/// </summary>
2223
public void Publish(T message)
2324
{
25+
if (_disposed)
26+
return;
27+
2428
foreach (var channel in _channels.Values)
2529
{
2630
channel.Writer.TryWrite(message);
@@ -30,8 +34,11 @@ public void Publish(T message)
3034
/// <summary>
3135
/// Subscribes a client and returns a channel reader for receiving events.
3236
/// </summary>
37+
/// <exception cref="ObjectDisposedException">Thrown if the stream has been disposed.</exception>
3338
public ChannelReader<T> Subscribe(Guid clientId)
3439
{
40+
ObjectDisposedException.ThrowIf(_disposed, this);
41+
3542
var channel = Channel.CreateUnbounded<T>(new UnboundedChannelOptions
3643
{
3744
SingleReader = true,
@@ -63,4 +70,19 @@ public void Complete()
6370
channel.Writer.TryComplete();
6471
}
6572
}
73+
74+
/// <summary>
75+
/// Disposes the stream, completing all client channels.
76+
/// </summary>
77+
/// <returns>A <see cref="ValueTask"/> representing the asynchronous dispose operation.</returns>
78+
public ValueTask DisposeAsync()
79+
{
80+
if (_disposed)
81+
return ValueTask.CompletedTask;
82+
83+
_disposed = true;
84+
Complete();
85+
_channels.Clear();
86+
return ValueTask.CompletedTask;
87+
}
6688
}

SWEN3.Paperless.RabbitMq.Tests/Unit/SseStreamTests.cs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
namespace SWEN3.Paperless.RabbitMq.Tests.Unit;
22

3-
public class SseStreamTests
3+
public class SseStreamTests : IAsyncDisposable
44
{
55
private readonly SseStream<Messages.SimpleMessage> _sseStream = new();
66

7+
public ValueTask DisposeAsync() => _sseStream.DisposeAsync();
8+
79
[Fact]
810
public void Subscribe_ShouldReturnChannelReader()
911
{
@@ -85,4 +87,38 @@ public void Publish_WhenClientIsSlow_ShouldDropOldestMessages()
8587
receivedMessages[0].Id.Should().Be(ExpectedFirstMessageId); // Should start from 11
8688
receivedMessages[^1].Id.Should().Be(MessagesToSend); // Should end at 110
8789
}
90+
91+
[Fact]
92+
public async Task Subscribe_AfterDispose_ShouldThrowObjectDisposedException()
93+
{
94+
await _sseStream.DisposeAsync();
95+
96+
var act = () => _sseStream.Subscribe(Guid.NewGuid());
97+
98+
act.Should().Throw<ObjectDisposedException>();
99+
}
100+
101+
[Fact]
102+
public async Task Publish_AfterDispose_ShouldBeNoOp()
103+
{
104+
var reader = _sseStream.Subscribe(Guid.NewGuid());
105+
await _sseStream.DisposeAsync();
106+
107+
_sseStream.Publish(new Messages.SimpleMessage(1)); // Should not throw
108+
109+
// Channel was completed by dispose, so no message available
110+
reader.TryRead(out _).Should().BeFalse();
111+
}
112+
113+
[Fact]
114+
public async Task DisposeAsync_ShouldCompleteAllClientChannels()
115+
{
116+
var reader1 = _sseStream.Subscribe(Guid.NewGuid());
117+
var reader2 = _sseStream.Subscribe(Guid.NewGuid());
118+
119+
await _sseStream.DisposeAsync();
120+
121+
reader1.Completion.IsCompleted.Should().BeTrue();
122+
reader2.Completion.IsCompleted.Should().BeTrue();
123+
}
88124
}

SWEN3.Paperless.RabbitMq/Sse/ISseStream.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ namespace SWEN3.Paperless.RabbitMq.Sse;
1212
/// <typeparam name="T">
1313
/// The type of event object being streamed.
1414
/// </typeparam>
15-
public interface ISseStream<T>
15+
public interface ISseStream<T> : IAsyncDisposable
1616
{
1717
/// <summary>
1818
/// Gets the number of currently active client subscriptions.
@@ -28,6 +28,7 @@ public interface ISseStream<T>
2828
/// <returns>
2929
/// A <see cref="ChannelReader{T}"/> that the client can read from to receive asynchronous event updates.
3030
/// </returns>
31+
/// <exception cref="ObjectDisposedException">Thrown if the stream has been disposed.</exception>
3132
ChannelReader<T> Subscribe(Guid clientId);
3233

3334
/// <summary>
@@ -47,6 +48,7 @@ public interface ISseStream<T>
4748
/// <remarks>
4849
/// The event is written to each client's individual channel. If a client's channel is full or closed,
4950
/// the delivery strategy (e.g., dropping the message) is determined by the implementation.
51+
/// If the stream has been disposed, this method is a no-op.
5052
/// </remarks>
5153
void Publish(T item);
5254
}

SWEN3.Paperless.RabbitMq/Sse/SseExtensions.cs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,45 @@ public static RouteHandlerBuilder MapSse<T>(this IEndpointRouteBuilder endpoints
9292
#endif
9393
}
9494

95+
/// <summary>
96+
/// Maps a GET endpoint that streams Server-Sent Events (SSE) to connected clients,
97+
/// serializing the entire event object as the payload using camelCase naming.
98+
/// </summary>
99+
/// <typeparam name="T">
100+
/// The type of the event object being streamed.
101+
/// </typeparam>
102+
/// <param name="endpoints">
103+
/// The <see cref="IEndpointRouteBuilder"/> to add the route to.
104+
/// </param>
105+
/// <param name="pattern">
106+
/// The URL pattern for the endpoint (e.g., <c>"/api/events"</c>).
107+
/// </param>
108+
/// <param name="eventTypeSelector">
109+
/// A function that determines the SSE "event" type string (e.g., "message", "update") for the client to listen for.
110+
/// </param>
111+
/// <returns>
112+
/// A <see cref="RouteHandlerBuilder"/> that can be used to further configure the endpoint (e.g., adding authorization).
113+
/// </returns>
114+
/// <remarks>
115+
/// <para>
116+
/// This is a simplified overload that serializes the entire event object as JSON.
117+
/// Use this when you don't need to transform the event before sending.
118+
/// </para>
119+
/// <para>
120+
/// The event object is serialized using <see cref="JsonSerializerDefaults.Web"/> (camelCase property naming).
121+
/// </para>
122+
/// </remarks>
123+
/// <example>
124+
/// <code>
125+
/// app.MapSse&lt;NotificationEvent&gt;("/api/notifications", evt => evt.Type);
126+
/// </code>
127+
/// </example>
128+
public static RouteHandlerBuilder MapSse<T>(this IEndpointRouteBuilder endpoints, string pattern,
129+
Func<T, string> eventTypeSelector) where T : class
130+
{
131+
return endpoints.MapSse(pattern, static item => item, eventTypeSelector);
132+
}
133+
95134
#if NET10_0_OR_GREATER
96135
/// <summary>
97136
/// .NET 10+ implementation using native ServerSentEvents API.
@@ -122,6 +161,7 @@ async IAsyncEnumerable<SseItem<object>> StreamEventsAsync([EnumeratorCancellatio
122161
}
123162
});
124163
}
164+
125165
#endif
126166

127167
#if !NET10_0_OR_GREATER

SWEN3.Paperless.RabbitMq/Sse/SseStream.cs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,17 @@ namespace SWEN3.Paperless.RabbitMq.Sse;
1010
internal sealed class SseStream<T> : ISseStream<T>
1111
{
1212
private readonly ConcurrentDictionary<Guid, Channel<T>> _channels = new();
13+
private volatile bool _disposed;
1314

1415
/// <inheritdoc />
1516
public int ClientCount => _channels.Count;
1617

18+
/// <inheritdoc />
19+
/// <exception cref="ObjectDisposedException">Thrown if the stream has been disposed.</exception>
1720
public ChannelReader<T> Subscribe(Guid clientId)
1821
{
22+
ObjectDisposedException.ThrowIf(_disposed, this);
23+
1924
var channel = Channel.CreateBounded<T>(new BoundedChannelOptions(100)
2025
{
2126
FullMode = BoundedChannelFullMode.DropOldest,
@@ -27,6 +32,7 @@ public ChannelReader<T> Subscribe(Guid clientId)
2732
return channel.Reader;
2833
}
2934

35+
/// <inheritdoc />
3036
public void Unsubscribe(Guid clientId)
3137
{
3238
if (_channels.TryRemove(clientId, out var channel))
@@ -38,9 +44,32 @@ public void Unsubscribe(Guid clientId)
3844
/// <inheritdoc />
3945
public void Publish(T item)
4046
{
47+
if (_disposed)
48+
return;
49+
4150
foreach (var channel in _channels.Values)
4251
{
4352
channel.Writer.TryWrite(item);
4453
}
4554
}
55+
56+
/// <summary>
57+
/// Disposes the stream, completing all client channels for graceful shutdown.
58+
/// </summary>
59+
/// <returns>A <see cref="ValueTask"/> representing the asynchronous dispose operation.</returns>
60+
public ValueTask DisposeAsync()
61+
{
62+
if (_disposed)
63+
return ValueTask.CompletedTask;
64+
65+
_disposed = true;
66+
67+
foreach (var channel in _channels.Values)
68+
{
69+
channel.Writer.TryComplete();
70+
}
71+
72+
_channels.Clear();
73+
return ValueTask.CompletedTask;
74+
}
4675
}

0 commit comments

Comments
 (0)