Skip to content

Commit cb67c9b

Browse files
[Backport] [SignalR] Fix WebSocket client close when network disappears (#43576)
1 parent 31eff6e commit cb67c9b

File tree

2 files changed

+96
-18
lines changed

2 files changed

+96
-18
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System.Net.WebSockets;
5+
using Microsoft.AspNetCore.Connections;
6+
using Microsoft.AspNetCore.Http.Connections.Client;
7+
using Microsoft.AspNetCore.Http.Connections.Client.Internal;
8+
using Microsoft.AspNetCore.SignalR.Tests;
9+
using Microsoft.AspNetCore.Testing;
10+
11+
namespace Microsoft.AspNetCore.SignalR.Client.Tests;
12+
13+
public class WebSocketsTransportTests : VerifiableLoggedTest
14+
{
15+
// Tests that the transport can still be stopped if SendAsync and ReceiveAsync are hanging (ethernet unplugged for example)
16+
[Fact]
17+
public async Task StopCancelsSendAndReceive()
18+
{
19+
var options = new HttpConnectionOptions()
20+
{
21+
WebSocketFactory = (context, token) =>
22+
{
23+
return ValueTask.FromResult((WebSocket)new TestWebSocket());
24+
},
25+
CloseTimeout = TimeSpan.FromMilliseconds(1),
26+
};
27+
28+
using (StartVerifiableLog())
29+
{
30+
var webSocketsTransport = new WebSocketsTransport(options, loggerFactory: LoggerFactory, () => Task.FromResult<string>(null));
31+
32+
await webSocketsTransport.StartAsync(
33+
new Uri("http://fakeuri.org"), TransferFormat.Text).DefaultTimeout();
34+
35+
await webSocketsTransport.StopAsync().DefaultTimeout();
36+
37+
await webSocketsTransport.Running.DefaultTimeout();
38+
}
39+
}
40+
41+
internal class TestWebSocket : WebSocket
42+
{
43+
public Task ConnectAsync(Uri uri, CancellationToken cancellationToken) => Task.CompletedTask;
44+
45+
public override WebSocketCloseStatus? CloseStatus => null;
46+
47+
public override string CloseStatusDescription => string.Empty;
48+
49+
public override WebSocketState State => WebSocketState.Open;
50+
51+
public override string SubProtocol => string.Empty;
52+
53+
public override void Abort() { }
54+
55+
public override Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken)
56+
=> Task.CompletedTask;
57+
58+
public override async Task CloseOutputAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken)
59+
{
60+
await cancellationToken.WaitForCancellationAsync();
61+
cancellationToken.ThrowIfCancellationRequested();
62+
}
63+
64+
public override void Dispose() { }
65+
66+
public override async Task<WebSocketReceiveResult> ReceiveAsync(ArraySegment<byte> buffer, CancellationToken cancellationToken)
67+
{
68+
await cancellationToken.WaitForCancellationAsync();
69+
cancellationToken.ThrowIfCancellationRequested();
70+
return new WebSocketReceiveResult(0, WebSocketMessageType.Text, true);
71+
}
72+
73+
public override async Task SendAsync(ArraySegment<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken)
74+
{
75+
await cancellationToken.WaitForCancellationAsync();
76+
cancellationToken.ThrowIfCancellationRequested();
77+
}
78+
}
79+
}

src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/WebSocketsTransport.cs

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ internal partial class WebSocketsTransport : ITransport
2424
private readonly TimeSpan _closeTimeout;
2525
private volatile bool _aborted;
2626
private readonly HttpConnectionOptions _httpConnectionOptions;
27+
private readonly CancellationTokenSource _stopCts = new CancellationTokenSource();
2728

2829
private IDuplexPipe? _transport;
2930

@@ -204,6 +205,8 @@ private async Task ProcessSocketAsync(WebSocket socket)
204205
// Wait for send or receive to complete
205206
var trigger = await Task.WhenAny(receiving, sending);
206207

208+
_stopCts.CancelAfter(_closeTimeout);
209+
207210
if (trigger == receiving)
208211
{
209212
// We're waiting for the application to finish and there are 2 things it could be doing
@@ -213,22 +216,14 @@ private async Task ProcessSocketAsync(WebSocket socket)
213216
// Cancel the application so that ReadAsync yields
214217
_application.Input.CancelPendingRead();
215218

216-
using (var delayCts = new CancellationTokenSource())
217-
{
218-
var resultTask = await Task.WhenAny(sending, Task.Delay(_closeTimeout, delayCts.Token));
219+
var resultTask = await Task.WhenAny(sending, Task.Delay(_closeTimeout, _stopCts.Token));
219220

220-
if (resultTask != sending)
221-
{
222-
_aborted = true;
221+
if (resultTask != sending)
222+
{
223+
_aborted = true;
223224

224-
// Abort the websocket if we're stuck in a pending send to the client
225-
socket.Abort();
226-
}
227-
else
228-
{
229-
// Cancel the timeout
230-
delayCts.Cancel();
231-
}
225+
// Abort the websocket if we're stuck in a pending send to the client
226+
socket.Abort();
232227
}
233228
}
234229
else
@@ -258,7 +253,7 @@ private async Task StartReceiving(WebSocket socket)
258253
{
259254
#if NETSTANDARD2_1 || NETCOREAPP
260255
// Do a 0 byte read so that idle connections don't allocate a buffer when waiting for a read
261-
var result = await socket.ReceiveAsync(Memory<byte>.Empty, CancellationToken.None);
256+
var result = await socket.ReceiveAsync(Memory<byte>.Empty, _stopCts.Token);
262257

263258
if (result.MessageType == WebSocketMessageType.Close)
264259
{
@@ -275,13 +270,13 @@ private async Task StartReceiving(WebSocket socket)
275270
var memory = _application.Output.GetMemory();
276271
#if NETSTANDARD2_1 || NETCOREAPP
277272
// Because we checked the CloseStatus from the 0 byte read above, we don't need to check again after reading
278-
var receiveResult = await socket.ReceiveAsync(memory, CancellationToken.None);
273+
var receiveResult = await socket.ReceiveAsync(memory, _stopCts.Token);
279274
#elif NETSTANDARD2_0 || NET461
280275
var isArray = MemoryMarshal.TryGetArray<byte>(memory, out var arraySegment);
281276
Debug.Assert(isArray);
282277

283278
// Exceptions are handled above where the send and receive tasks are being run.
284-
var receiveResult = await socket.ReceiveAsync(arraySegment, CancellationToken.None);
279+
var receiveResult = await socket.ReceiveAsync(arraySegment, _stopCts.Token);
285280
#else
286281
#error TFMs need to be updated
287282
#endif
@@ -400,7 +395,7 @@ private async Task StartSending(WebSocket socket)
400395
try
401396
{
402397
// We're done sending, send the close frame to the client if the websocket is still open
403-
await socket.CloseOutputAsync(error != null ? WebSocketCloseStatus.InternalServerError : WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
398+
await socket.CloseOutputAsync(error != null ? WebSocketCloseStatus.InternalServerError : WebSocketCloseStatus.NormalClosure, "", _stopCts.Token);
404399
}
405400
catch (Exception ex)
406401
{
@@ -452,6 +447,9 @@ public async Task StopAsync()
452447
// Cancel any pending reads from the application, this should start the entire shutdown process
453448
_application.Input.CancelPendingRead();
454449

450+
// Start ungraceful close timer
451+
_stopCts.CancelAfter(_closeTimeout);
452+
455453
try
456454
{
457455
await Running;
@@ -465,6 +463,7 @@ public async Task StopAsync()
465463
finally
466464
{
467465
_webSocket?.Dispose();
466+
_stopCts.Dispose();
468467
}
469468

470469
Log.TransportStopped(_logger, null);

0 commit comments

Comments
 (0)