Skip to content

Commit 030e5f9

Browse files
committed
Support for CancellationToken in asynchronous operations. Resolves #15
1 parent 97d1f09 commit 030e5f9

File tree

8 files changed

+124
-20
lines changed

8 files changed

+124
-20
lines changed

Benchmark.AspNetCore.ServerSentEvents/Benchmarks/ServerSentEventsServiceBenchmarks.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
2-
using System.Security.Claims;
2+
using System.Threading;
33
using System.Threading.Tasks;
4+
using System.Security.Claims;
45
using System.Collections.Generic;
56
using BenchmarkDotNet.Attributes;
67
using Lib.AspNetCore.ServerSentEvents;
@@ -57,7 +58,7 @@ public Task SendEventAsync_SingleEvent_SingleClient()
5758
[Benchmark]
5859
public Task ChangeReconnectIntervalAsync_SingleClient()
5960
{
60-
return _serverSentEventsClient.ChangeReconnectIntervalAsync(5000);
61+
return _serverSentEventsClient.ChangeReconnectIntervalAsync(5000, CancellationToken.None);
6162
}
6263

6364
[Benchmark]

Lib.AspNetCore.ServerSentEvents/IServerSentEventsClient.cs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
2-
using System.Security.Claims;
2+
using System.Threading;
33
using System.Threading.Tasks;
4+
using System.Security.Claims;
45

56
namespace Lib.AspNetCore.ServerSentEvents
67
{
@@ -34,12 +35,28 @@ public interface IServerSentEventsClient
3435
/// <returns>The task object representing the asynchronous operation.</returns>
3536
Task SendEventAsync(string text);
3637

38+
/// <summary>
39+
/// Sends event to client.
40+
/// </summary>
41+
/// <param name="text">The simple text event.</param>
42+
/// <param name="cancellationToken">The cancellation token to cancel operation.</param>
43+
/// <returns>The task object representing the asynchronous operation.</returns>
44+
Task SendEventAsync(string text, CancellationToken cancellationToken);
45+
3746
/// <summary>
3847
/// Sends event to client.
3948
/// </summary>
4049
/// <param name="serverSentEvent">The event.</param>
4150
/// <returns>The task object representing the asynchronous operation.</returns>
4251
Task SendEventAsync(ServerSentEvent serverSentEvent);
52+
53+
/// <summary>
54+
/// Sends event to client.
55+
/// </summary>
56+
/// <param name="serverSentEvent">The event.</param>
57+
/// <param name="cancellationToken">The cancellation token to cancel operation.</param>
58+
/// <returns>The task object representing the asynchronous operation.</returns>
59+
Task SendEventAsync(ServerSentEvent serverSentEvent, CancellationToken cancellationToken);
4360
#endregion
4461
}
4562
}

Lib.AspNetCore.ServerSentEvents/IServerSentEventsService.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
34
using System.Collections.Generic;
45
using Microsoft.AspNetCore.Http;
@@ -50,20 +51,44 @@ public interface IServerSentEventsService
5051
/// <returns>The task object representing the asynchronous operation.</returns>
5152
Task ChangeReconnectIntervalAsync(uint reconnectInterval);
5253

54+
/// <summary>
55+
/// Changes the interval after which clients will attempt to reestablish failed connections.
56+
/// </summary>
57+
/// <param name="reconnectInterval">The reconnect interval.</param>
58+
/// <param name="cancellationToken">The cancellation token to cancel operation.</param>
59+
/// <returns>The task object representing the asynchronous operation.</returns>
60+
Task ChangeReconnectIntervalAsync(uint reconnectInterval, CancellationToken cancellationToken);
61+
5362
/// <summary>
5463
/// Sends event to all clients.
5564
/// </summary>
5665
/// <param name="text">The simple text event.</param>
5766
/// <returns>The task object representing the asynchronous operation.</returns>
5867
Task SendEventAsync(string text);
5968

69+
/// <summary>
70+
/// Sends event to all clients.
71+
/// </summary>
72+
/// <param name="text">The simple text event.</param>
73+
/// <param name="cancellationToken">The cancellation token to cancel operation.</param>
74+
/// <returns>The task object representing the asynchronous operation.</returns>
75+
Task SendEventAsync(string text, CancellationToken cancellationToken);
76+
6077
/// <summary>
6178
/// Sends event to all clients.
6279
/// </summary>
6380
/// <param name="serverSentEvent">The event.</param>
6481
/// <returns>The task object representing the asynchronous operation.</returns>
6582
Task SendEventAsync(ServerSentEvent serverSentEvent);
6683

84+
/// <summary>
85+
/// Sends event to all clients.
86+
/// </summary>
87+
/// <param name="serverSentEvent">The event.</param>
88+
/// <param name="cancellationToken">The cancellation token to cancel operation.</param>
89+
/// <returns>The task object representing the asynchronous operation.</returns>
90+
Task SendEventAsync(ServerSentEvent serverSentEvent, CancellationToken cancellationToken);
91+
6792
/// <summary>
6893
/// Method which is called when client is establishing the connection. The base implementation raises the <see cref="ClientConnected"/> event.
6994
/// </summary>

Lib.AspNetCore.ServerSentEvents/Internals/ServerSentEventsHelper.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Text;
33
using System.Globalization;
4+
using System.Threading;
45
using System.Threading.Tasks;
56
using Microsoft.AspNetCore.Http;
67

@@ -27,9 +28,9 @@ internal static Task AcceptAsync(this HttpResponse response)
2728
return response.Body.FlushAsync();
2829
}
2930

30-
internal static Task WriteAsync(this HttpResponse response, ServerSentEventBytes serverSentEvent)
31+
internal static Task WriteAsync(this HttpResponse response, ServerSentEventBytes serverSentEvent, CancellationToken cancellationToken)
3132
{
32-
return response.Body.WriteAsync(serverSentEvent.Bytes, 0, serverSentEvent.BytesCount);
33+
return response.Body.WriteAsync(serverSentEvent.Bytes, 0, serverSentEvent.BytesCount, cancellationToken);
3334
}
3435

3536
internal static ServerSentEventBytes GetReconnectIntervalBytes(uint reconnectInterval)

Lib.AspNetCore.ServerSentEvents/ServerSentEventsClient.cs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
2-
using System.Security.Claims;
2+
using System.Threading;
33
using System.Threading.Tasks;
4+
using System.Security.Claims;
45
using Microsoft.AspNetCore.Http;
56

67
namespace Lib.AspNetCore.ServerSentEvents.Internals
@@ -50,7 +51,18 @@ internal ServerSentEventsClient(Guid id, ClaimsPrincipal user, HttpResponse resp
5051
/// <returns>The task object representing the asynchronous operation.</returns>
5152
public Task SendEventAsync(string text)
5253
{
53-
return SendAsync(ServerSentEventsHelper.GetEventBytes(text));
54+
return SendAsync(ServerSentEventsHelper.GetEventBytes(text), CancellationToken.None);
55+
}
56+
57+
/// <summary>
58+
/// Sends event to client.
59+
/// </summary>
60+
/// <param name="text">The simple text event.</param>
61+
/// <param name="cancellationToken">The cancellation token to cancel operation.</param>
62+
/// <returns>The task object representing the asynchronous operation.</returns>
63+
public Task SendEventAsync(string text, CancellationToken cancellationToken)
64+
{
65+
return SendAsync(ServerSentEventsHelper.GetEventBytes(text), cancellationToken);
5466
}
5567

5668
/// <summary>
@@ -60,19 +72,30 @@ public Task SendEventAsync(string text)
6072
/// <returns>The task object representing the asynchronous operation.</returns>
6173
public Task SendEventAsync(ServerSentEvent serverSentEvent)
6274
{
63-
return SendAsync(ServerSentEventsHelper.GetEventBytes(serverSentEvent));
75+
return SendAsync(ServerSentEventsHelper.GetEventBytes(serverSentEvent), CancellationToken.None);
76+
}
77+
78+
/// <summary>
79+
/// Sends event to client.
80+
/// </summary>
81+
/// <param name="serverSentEvent">The event.</param>
82+
/// <param name="cancellationToken">The cancellation token to cancel operation.</param>
83+
/// <returns>The task object representing the asynchronous operation.</returns>
84+
public Task SendEventAsync(ServerSentEvent serverSentEvent, CancellationToken cancellationToken)
85+
{
86+
return SendAsync(ServerSentEventsHelper.GetEventBytes(serverSentEvent), cancellationToken);
6487
}
6588

66-
internal Task SendAsync(ServerSentEventBytes serverSentEvent)
89+
internal Task SendAsync(ServerSentEventBytes serverSentEvent, CancellationToken cancellationToken)
6790
{
6891
CheckIsConnected();
6992

70-
return _response.WriteAsync(serverSentEvent);
93+
return _response.WriteAsync(serverSentEvent, cancellationToken);
7194
}
7295

73-
internal Task ChangeReconnectIntervalAsync(uint reconnectInterval)
96+
internal Task ChangeReconnectIntervalAsync(uint reconnectInterval, CancellationToken cancellationToken)
7497
{
75-
return SendAsync(ServerSentEventsHelper.GetReconnectIntervalBytes(reconnectInterval));
98+
return SendAsync(ServerSentEventsHelper.GetReconnectIntervalBytes(reconnectInterval), cancellationToken);
7699
}
77100

78101
private void CheckIsConnected()

Lib.AspNetCore.ServerSentEvents/ServerSentEventsKeepaliveService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ private async Task ExecuteAsync(CancellationToken stoppingToken)
6868
{
6969
while (!stoppingToken.IsCancellationRequested)
7070
{
71-
await _serverSentEventsService.SendEventAsync(_keepaliveServerSentEventBytes);
71+
await _serverSentEventsService.SendEventAsync(_keepaliveServerSentEventBytes, CancellationToken.None);
7272

7373
await Task.Delay(TimeSpan.FromSeconds(_options.KeepaliveInterval), stoppingToken);
7474
}

Lib.AspNetCore.ServerSentEvents/ServerSentEventsMiddleware.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
34
using Microsoft.AspNetCore.Http;
45
using Microsoft.AspNetCore.Http.Features;
@@ -50,7 +51,7 @@ public async Task Invoke(HttpContext context)
5051

5152
if (_serverSentEventsService.ReconnectInterval.HasValue)
5253
{
53-
await client.ChangeReconnectIntervalAsync(_serverSentEventsService.ReconnectInterval.Value);
54+
await client.ChangeReconnectIntervalAsync(_serverSentEventsService.ReconnectInterval.Value, CancellationToken.None);
5455
}
5556

5657
await ConnectClientAsync(context.Request, client);

Lib.AspNetCore.ServerSentEvents/ServerSentEventsService.cs

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Linq;
3+
using System.Threading;
34
using System.Threading.Tasks;
45
using System.Collections.Generic;
56
using System.Collections.Concurrent;
@@ -66,12 +67,23 @@ public IReadOnlyCollection<IServerSentEventsClient> GetClients()
6667
/// <param name="reconnectInterval">The reconnect interval.</param>
6768
/// <returns>The task object representing the asynchronous operation.</returns>
6869
public Task ChangeReconnectIntervalAsync(uint reconnectInterval)
70+
{
71+
return ChangeReconnectIntervalAsync(reconnectInterval, CancellationToken.None);
72+
}
73+
74+
/// <summary>
75+
/// Changes the interval after which clients will attempt to reestablish failed connections.
76+
/// </summary>
77+
/// <param name="reconnectInterval">The reconnect interval.</param>
78+
/// <param name="cancellationToken">The cancellation token to cancel operation.</param>
79+
/// <returns>The task object representing the asynchronous operation.</returns>
80+
public Task ChangeReconnectIntervalAsync(uint reconnectInterval, CancellationToken cancellationToken)
6981
{
7082
ReconnectInterval = reconnectInterval;
7183

7284
ServerSentEventBytes reconnectIntervalBytes = ServerSentEventsHelper.GetReconnectIntervalBytes(reconnectInterval);
7385

74-
return ForAllClientsAsync(client => client.SendAsync(reconnectIntervalBytes));
86+
return ForAllClientsAsync(client => client.SendAsync(reconnectIntervalBytes, cancellationToken), cancellationToken);
7587
}
7688

7789
/// <summary>
@@ -81,7 +93,18 @@ public Task ChangeReconnectIntervalAsync(uint reconnectInterval)
8193
/// <returns>The task object representing the asynchronous operation.</returns>
8294
public Task SendEventAsync(string text)
8395
{
84-
return SendEventAsync(ServerSentEventsHelper.GetEventBytes(text));
96+
return SendEventAsync(ServerSentEventsHelper.GetEventBytes(text), CancellationToken.None);
97+
}
98+
99+
/// <summary>
100+
/// Sends event to all clients.
101+
/// </summary>
102+
/// <param name="text">The simple text event.</param>
103+
/// <param name="cancellationToken">The cancellation token to cancel operation.</param>
104+
/// <returns>The task object representing the asynchronous operation.</returns>
105+
public Task SendEventAsync(string text, CancellationToken cancellationToken)
106+
{
107+
return SendEventAsync(ServerSentEventsHelper.GetEventBytes(text), cancellationToken);
85108
}
86109

87110
/// <summary>
@@ -91,7 +114,18 @@ public Task SendEventAsync(string text)
91114
/// <returns>The task object representing the asynchronous operation.</returns>
92115
public Task SendEventAsync(ServerSentEvent serverSentEvent)
93116
{
94-
return SendEventAsync(ServerSentEventsHelper.GetEventBytes(serverSentEvent));
117+
return SendEventAsync(ServerSentEventsHelper.GetEventBytes(serverSentEvent), CancellationToken.None);
118+
}
119+
120+
/// <summary>
121+
/// Sends event to all clients.
122+
/// </summary>
123+
/// <param name="serverSentEvent">The event.</param>
124+
/// <param name="cancellationToken">The cancellation token to cancel operation.</param>
125+
/// <returns>The task object representing the asynchronous operation.</returns>
126+
public Task SendEventAsync(ServerSentEvent serverSentEvent, CancellationToken cancellationToken)
127+
{
128+
return SendEventAsync(ServerSentEventsHelper.GetEventBytes(serverSentEvent), cancellationToken);
95129
}
96130

97131
/// <summary>
@@ -146,19 +180,21 @@ internal void RemoveClient(ServerSentEventsClient client)
146180
_clients.TryRemove(client.Id, out client);
147181
}
148182

149-
internal Task SendEventAsync(ServerSentEventBytes serverSentEventBytes)
183+
internal Task SendEventAsync(ServerSentEventBytes serverSentEventBytes, CancellationToken cancellationToken)
150184
{
151-
return ForAllClientsAsync(client => client.SendAsync(serverSentEventBytes));
185+
return ForAllClientsAsync(client => client.SendAsync(serverSentEventBytes, cancellationToken), cancellationToken);
152186
}
153187

154-
private Task ForAllClientsAsync(Func<ServerSentEventsClient, Task> clientOperationAsync)
188+
private Task ForAllClientsAsync(Func<ServerSentEventsClient, Task> clientOperationAsync, CancellationToken cancellationToken)
155189
{
156190
List<Task> clientsTasks = null;
157191

158192
foreach (ServerSentEventsClient client in _clients.Values)
159193
{
160194
if (client.IsConnected)
161195
{
196+
cancellationToken.ThrowIfCancellationRequested();
197+
162198
Task operationTask = clientOperationAsync(client);
163199

164200
if (operationTask.Status != TaskStatus.RanToCompletion)

0 commit comments

Comments
 (0)