Skip to content

Commit 69203c4

Browse files
Send CloseMessage from client to server (#48577)
1 parent c5e43cf commit 69203c4

File tree

23 files changed

+293
-40
lines changed

23 files changed

+293
-40
lines changed

src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,8 @@ private async Task StartAsyncCore(CancellationToken cancellationToken)
489489
{
490490
Log.ErrorStartingConnection(_logger, ex);
491491

492+
startingConnectionState.Cleanup();
493+
492494
// Can't have any invocations to cancel, we're in the lock.
493495
await CloseAsync(startingConnectionState.Connection).ConfigureAwait(false);
494496
throw;
@@ -543,6 +545,8 @@ private async Task StopAsyncCore(bool disposing)
543545

544546
ConnectionState? connectionState;
545547

548+
var connectionStateStopTask = Task.CompletedTask;
549+
546550
try
547551
{
548552
if (disposing && _disposed)
@@ -559,6 +563,19 @@ private async Task StopAsyncCore(bool disposing)
559563
if (connectionState != null)
560564
{
561565
connectionState.Stopping = true;
566+
// Try to send CloseMessage
567+
var writeTask = SendHubMessage(connectionState, CloseMessage.Empty);
568+
if (writeTask.IsFaulted || writeTask.IsCanceled || !writeTask.IsCompleted)
569+
{
570+
// Ignore exception from write, this is a best effort attempt to let the server know the client closed gracefully.
571+
// We are already closing the connection via an explicit StopAsync call from the user so don't care about any potential
572+
// errors that might happen.
573+
_ = writeTask.ContinueWith(
574+
static t => _ = t.Exception,
575+
CancellationToken.None,
576+
TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted,
577+
TaskScheduler.Default);
578+
}
562579
}
563580
else
564581
{
@@ -579,17 +596,20 @@ private async Task StopAsyncCore(bool disposing)
579596
(_serviceProvider as IDisposable)?.Dispose();
580597
}
581598
}
599+
600+
if (connectionState != null)
601+
{
602+
// Start Stop inside the lock so a closure from the transport side at the same time as this doesn't cause an ODE
603+
// But don't await the call in the lock as that could deadlock with HandleConnectionClose in the ReceiveLoop
604+
connectionStateStopTask = connectionState.StopAsync();
605+
}
582606
}
583607
finally
584608
{
585609
_state.ReleaseConnectionLock();
586610
}
587611

588-
// Now stop the connection we captured
589-
if (connectionState != null)
590-
{
591-
await connectionState.StopAsync().ConfigureAwait(false);
592-
}
612+
await connectionStateStopTask.ConfigureAwait(false);
593613
}
594614

595615
/// <summary>
@@ -1459,6 +1479,7 @@ private async Task HandleConnectionClose(ConnectionState connectionState)
14591479

14601480
// Cancel any outstanding invocations within the connection lock
14611481
connectionState.CancelOutstandingInvocations(connectionState.CloseException);
1482+
connectionState.Cleanup();
14621483

14631484
if (connectionState.Stopping || _reconnectPolicy == null)
14641485
{
@@ -1965,9 +1986,9 @@ public Task StopAsync()
19651986

19661987
private async Task StopAsyncCore()
19671988
{
1968-
Log.Stopping(_logger);
1989+
_hubConnection._state.AssertInConnectionLock();
19691990

1970-
_messageBuffer?.Dispose();
1991+
Log.Stopping(_logger);
19711992

19721993
// Complete our write pipe, which should cause everything to shut down
19731994
Log.TerminatingReceiveLoop(_logger);
@@ -1983,6 +2004,11 @@ private async Task StopAsyncCore()
19832004
_stopTcs!.TrySetResult(null);
19842005
}
19852006

2007+
public void Cleanup()
2008+
{
2009+
_messageBuffer?.Dispose();
2010+
}
2011+
19862012
public async Task TimerLoop(TimerAwaitable timer)
19872013
{
19882014
// initialize the timers

src/SignalR/clients/csharp/Client/test/FunctionalTests/Startup.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,9 @@ public void ConfigureServices(IServiceCollection services)
5959
});
6060
services.AddAuthentication(NegotiateDefaults.AuthenticationScheme).AddNegotiate();
6161

62-
// Since tests run in parallel, it's possible multiple servers will startup and read files being written by another test
63-
// Use a unique directory per server to avoid this collision
64-
services.AddDataProtection()
65-
.PersistKeysToFileSystem(Directory.CreateDirectory(Path.GetRandomFileName()));
62+
// Since tests run in parallel, it's possible multiple servers will startup,
63+
// we use an ephemeral key provider to avoid filesystem contention issues
64+
services.AddSingleton<IDataProtectionProvider, EphemeralDataProtectionProvider>();
6665
}
6766

6867
public void Configure(IApplicationBuilder app)

src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.Protocol.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,9 @@ public async Task ClientWithInherentKeepAliveDoesNotPing()
668668
await hubConnection.DisposeAsync().DefaultTimeout();
669669
await connection.DisposeAsync().DefaultTimeout();
670670

671-
Assert.Equal(0, (await connection.ReadAllSentMessagesAsync(ignorePings: false).DefaultTimeout()).Count);
671+
var messages = await connection.ReadAllSentMessagesAsync(ignorePings: false).DefaultTimeout();
672+
var message = Assert.Single(messages);
673+
Assert.Equal("{\"type\":7}", message);
672674
}
673675
finally
674676
{

src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ await Assert.ThrowsAsync<TaskCanceledException>(() =>
193193
await hubConnection.StopAsync().DefaultTimeout();
194194

195195
// Assert that InvokeAsync didn't send a message
196-
Assert.Null(await connection.ReadSentTextMessageAsync().DefaultTimeout());
196+
Assert.Equal("{\"type\":7}", await connection.ReadSentTextMessageAsync().DefaultTimeout());
197197
}
198198
}
199199

@@ -212,7 +212,7 @@ await Assert.ThrowsAsync<TaskCanceledException>(() =>
212212
await hubConnection.StopAsync().DefaultTimeout();
213213

214214
// Assert that SendAsync didn't send a message
215-
Assert.Null(await connection.ReadSentTextMessageAsync().DefaultTimeout());
215+
Assert.Equal("{\"type\":7}", await connection.ReadSentTextMessageAsync().DefaultTimeout());
216216
}
217217
}
218218

@@ -254,7 +254,7 @@ await Assert.ThrowsAsync<TaskCanceledException>(() =>
254254
await hubConnection.StopAsync().DefaultTimeout();
255255

256256
// Assert that StreamAsChannelAsync didn't send a message
257-
Assert.Null(await connection.ReadSentTextMessageAsync().DefaultTimeout());
257+
Assert.Equal("{\"type\":7}", await connection.ReadSentTextMessageAsync().DefaultTimeout());
258258
}
259259
}
260260

@@ -273,7 +273,7 @@ public async Task StreamAsyncCanceledWhenPassedCanceledToken()
273273
await hubConnection.StopAsync().DefaultTimeout();
274274

275275
// Assert that StreamAsync didn't send a message
276-
Assert.Null(await connection.ReadSentTextMessageAsync().DefaultTimeout());
276+
Assert.Equal("{\"type\":7}", await connection.ReadSentTextMessageAsync().DefaultTimeout());
277277
}
278278
}
279279

src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/WebSocketsTransport.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,6 @@ private async Task StartSending(WebSocket socket, bool ignoreFirstCanceled)
491491
{
492492
if (result.IsCanceled && !ignoreFirstCanceled)
493493
{
494-
_logger.LogInformation("send canceled");
495494
break;
496495
}
497496

src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/CloseMessage.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
package com.microsoft.signalr;
55

66
public final class CloseMessage extends HubMessage {
7+
private final int type = HubMessageType.CLOSE.value;
8+
79
private final String error;
810
private final boolean allowReconnect;
911

src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/GsonHubProtocol.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import com.google.gson.stream.JsonToken;
2020

2121
public final class GsonHubProtocol implements HubProtocol {
22-
private final JsonParser jsonParser = new JsonParser();
2322
private final Gson gson;
2423
private static final String RECORD_SEPARATOR = "\u001e";
2524

@@ -95,7 +94,7 @@ public List<HubMessage> parseMessages(ByteBuffer payload, InvocationBinder binde
9594
case "result":
9695
case "item":
9796
if (invocationId == null || binder.getReturnType(invocationId) == null) {
98-
resultToken = jsonParser.parse(reader);
97+
resultToken = JsonParser.parseReader(reader);
9998
} else {
10099
result = gson.fromJson(reader, binder.getReturnType(invocationId));
101100
}
@@ -123,7 +122,7 @@ public List<HubMessage> parseMessages(ByteBuffer payload, InvocationBinder binde
123122
}
124123
}
125124
} else {
126-
argumentsToken = (JsonArray)jsonParser.parse(reader);
125+
argumentsToken = (JsonArray)JsonParser.parseReader(reader);
127126
}
128127
break;
129128
case "headers":

src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/HubConnection.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,9 @@ private Completable stop(String errorMessage) {
431431
connectionState.stopError = errorMessage;
432432
logger.error("HubConnection disconnected with an error: {}.", errorMessage);
433433
} else {
434+
if (this.state.getHubConnectionState() == HubConnectionState.CONNECTED) {
435+
sendHubMessageWithLock(new CloseMessage());
436+
}
434437
logger.debug("Stopping HubConnection.");
435438
}
436439

src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/GsonHubProtocolTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,15 @@ public void parsePingMessage() {
5656
assertEquals(HubMessageType.PING, messages.get(0).getMessageType());
5757
}
5858

59+
@Test
60+
public void writeCloseMessage() {
61+
CloseMessage closeMessage = new CloseMessage();
62+
String result = TestUtils.byteBufferToString(hubProtocol.writeMessage(closeMessage));
63+
String expectedResult = "{\"type\":7,\"allowReconnect\":false}\u001E";
64+
65+
assertEquals(expectedResult, result);
66+
}
67+
5968
@Test
6069
public void parseCloseMessage() {
6170
String stringifiedMessage = "{\"type\":7}\u001E";

src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/HubConnectionTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4022,4 +4022,21 @@ public void WebsocketStopLoggedOnce() {
40224022
assertEquals(1, count);
40234023
}
40244024
}
4025+
4026+
@Test
4027+
public void sendsCloseMessageOnStop() throws InterruptedException {
4028+
MockTransport mockTransport = new MockTransport(true, false);
4029+
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
4030+
4031+
hubConnection.start().timeout(30, TimeUnit.SECONDS).blockingAwait();
4032+
4033+
hubConnection.stop().timeout(30, TimeUnit.SECONDS).blockingAwait();
4034+
4035+
ByteBuffer[] messages = mockTransport.getSentMessages();
4036+
4037+
// handshake, close
4038+
assertEquals(2, messages.length);
4039+
String message = TestUtils.byteBufferToString(messages[1]);
4040+
assertEquals("{\"type\":7,\"allowReconnect\":false}" + RECORD_SEPARATOR, message);
4041+
}
40254042
}

0 commit comments

Comments
 (0)