Skip to content

Commit 0f999d4

Browse files
Increment HubProtocol version for Stateful Reconnect (#50442)
* Increment HubProtocol version for Stateful Reconnect * another * fb * fb
1 parent 6e1b89d commit 0f999d4

File tree

15 files changed

+218
-35
lines changed

15 files changed

+218
-35
lines changed

src/Components/Server/src/BlazorPack/BlazorPackHubProtocol.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ namespace Microsoft.AspNetCore.Components.Server.BlazorPack;
1616
internal sealed class BlazorPackHubProtocol : IHubProtocol
1717
{
1818
internal const string ProtocolName = "blazorpack";
19-
private const int ProtocolVersion = 1;
19+
private const int ProtocolVersion = 2;
2020

2121
private readonly BlazorPackHubProtocolWorker _worker = new BlazorPackHubProtocolWorker();
2222

@@ -32,7 +32,7 @@ internal sealed class BlazorPackHubProtocol : IHubProtocol
3232
/// <inheritdoc />
3333
public bool IsVersionSupported(int version)
3434
{
35-
return version == Version;
35+
return version <= Version;
3636
}
3737

3838
/// <inheritdoc />

src/SignalR/clients/csharp/Client.Core/src/HubConnection.Log.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,5 +334,8 @@ public static void ErrorHandshakeTimedOut(ILogger logger, TimeSpan handshakeTime
334334

335335
[LoggerMessage(92, LogLevel.Trace, "Received SequenceMessage with Sequence ID '{SequenceId}'.", EventName = "ReceivedSequenceMessage")]
336336
public static partial void ReceivedSequenceMessage(ILogger logger, long sequenceId);
337+
338+
[LoggerMessage(93, LogLevel.Debug, "HubProtocol '{Protocol} v{Version}' does not support Stateful Reconnect. Disabling the feature.", EventName = "DisablingReconnect")]
339+
public static partial void DisablingReconnect(ILogger logger, string protocol, int version);
337340
}
338341
}

src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -481,12 +481,35 @@ private async Task StartAsyncCore(CancellationToken cancellationToken)
481481
var connection = await _connectionFactory.ConnectAsync(_endPoint, cancellationToken).ConfigureAwait(false);
482482
var startingConnectionState = new ConnectionState(connection, this);
483483

484+
#pragma warning disable CA2252 // This API requires opting into preview features
485+
var statefulReconnectFeature = connection.Features.Get<IStatefulReconnectFeature>();
486+
#pragma warning restore CA2252 // This API requires opting into preview features
487+
484488
// From here on, if an error occurs we need to shut down the connection because
485489
// we still own it.
486490
try
487491
{
488-
Log.HubProtocol(_logger, _protocol.Name, _protocol.Version);
489-
await HandshakeAsync(startingConnectionState, cancellationToken).ConfigureAwait(false);
492+
var usedProtocolVersion = _protocol.Version;
493+
if (statefulReconnectFeature is null && _protocol.IsVersionSupported(1))
494+
{
495+
// Stateful Reconnect starts with HubProtocol version 2, newer clients connecting to older servers will fail to connect due to
496+
// the handshake only supporting version 1, so we will try to send version 1 during the handshake to keep old servers working
497+
// if the client is not attempting to enable stateful reconnect and therefore does not require a newer HubProtocol.
498+
usedProtocolVersion = 1;
499+
}
500+
else if (_protocol.Version < 2)
501+
{
502+
if (statefulReconnectFeature is not null)
503+
{
504+
Log.DisablingReconnect(_logger, _protocol.Name, _protocol.Version);
505+
#pragma warning disable CA2252 // This API requires opting into preview features
506+
statefulReconnectFeature.DisableReconnect();
507+
#pragma warning restore CA2252 // This API requires opting into preview features
508+
}
509+
}
510+
511+
Log.HubProtocol(_logger, _protocol.Name, usedProtocolVersion);
512+
await HandshakeAsync(startingConnectionState, usedProtocolVersion, cancellationToken).ConfigureAwait(false);
490513
}
491514
catch (Exception ex)
492515
{
@@ -1240,12 +1263,12 @@ private void CheckDisposed()
12401263
ObjectDisposedThrowHelper.ThrowIf(_disposed, this);
12411264
}
12421265

1243-
private async Task HandshakeAsync(ConnectionState startingConnectionState, CancellationToken cancellationToken)
1266+
private async Task HandshakeAsync(ConnectionState startingConnectionState, int protocolVersion, CancellationToken cancellationToken)
12441267
{
12451268
// Send the Handshake request
12461269
Log.SendingHubHandshake(_logger);
12471270

1248-
var handshakeRequest = new HandshakeRequestMessage(_protocol.Name, _protocol.Version);
1271+
var handshakeRequest = new HandshakeRequestMessage(_protocol.Name, protocolVersion);
12491272
HandshakeProtocol.WriteRequestMessage(handshakeRequest, startingConnectionState.Connection.Transport.Output);
12501273

12511274
var sendHandshakeResult = await startingConnectionState.Connection.Transport.Output.FlushAsync(CancellationToken.None).ConfigureAwait(false);

src/SignalR/clients/csharp/Client/test/FunctionalTests/HubConnectionTests.cs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2833,6 +2833,83 @@ public async Task CanSetMessageBufferSizeOnClient()
28332833
}
28342834
}
28352835

2836+
[Fact]
2837+
public async Task ServerWithOldProtocolVersionClientWithNewProtocolVersionWorksDoesNotAllowStatefulReconnect()
2838+
{
2839+
bool ExpectedErrors(WriteContext writeContext)
2840+
{
2841+
return writeContext.LoggerName == typeof(HubConnection).FullName &&
2842+
(writeContext.EventId.Name == "ShutdownWithError" ||
2843+
writeContext.EventId.Name == "ServerDisconnectedWithError");
2844+
}
2845+
2846+
var protocol = HubProtocols["json"];
2847+
await using (var server = await StartServer<Startup>(ExpectedErrors))
2848+
{
2849+
var websocket = new ClientWebSocket();
2850+
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
2851+
tcs.SetResult();
2852+
2853+
const string originalMessage = "SignalR";
2854+
var connectionBuilder = new HubConnectionBuilder()
2855+
.WithLoggerFactory(LoggerFactory)
2856+
.WithUrl(server.Url + "/default", HttpTransportType.WebSockets, o =>
2857+
{
2858+
o.WebSocketFactory = async (context, token) =>
2859+
{
2860+
await tcs.Task;
2861+
await websocket.ConnectAsync(context.Uri, token);
2862+
tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
2863+
return websocket;
2864+
};
2865+
o.UseStatefulReconnect = true;
2866+
});
2867+
// Force version 1 on the server so it turns off Stateful Reconnects
2868+
connectionBuilder.Services.AddSingleton<IHubProtocol>(new HubProtocolVersionTests.SingleVersionHubProtocol(HubProtocols["json"], 1));
2869+
var connection = connectionBuilder.Build();
2870+
2871+
var closedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
2872+
connection.Closed += (_) =>
2873+
{
2874+
closedTcs.SetResult();
2875+
return Task.CompletedTask;
2876+
};
2877+
2878+
try
2879+
{
2880+
await connection.StartAsync().DefaultTimeout();
2881+
var originalConnectionId = connection.ConnectionId;
2882+
2883+
var result = await connection.InvokeAsync<string>(nameof(TestHub.Echo), originalMessage).DefaultTimeout();
2884+
2885+
Assert.Equal(originalMessage, result);
2886+
2887+
var originalWebsocket = websocket;
2888+
websocket = new ClientWebSocket();
2889+
originalWebsocket.Dispose();
2890+
2891+
var resultTask = connection.InvokeAsync<string>(nameof(TestHub.Echo), originalMessage).DefaultTimeout();
2892+
tcs.SetResult();
2893+
2894+
// In-progress send canceled when connection closes
2895+
var ex = await Assert.ThrowsAnyAsync<Exception>(() => resultTask);
2896+
Assert.True(ex is TaskCanceledException or WebSocketException);
2897+
await closedTcs.Task;
2898+
2899+
Assert.Equal(HubConnectionState.Disconnected, connection.State);
2900+
}
2901+
catch (Exception ex)
2902+
{
2903+
LoggerFactory.CreateLogger<HubConnectionTests>().LogError(ex, "{ExceptionType} from test", ex.GetType().FullName);
2904+
throw;
2905+
}
2906+
finally
2907+
{
2908+
await connection.DisposeAsync().DefaultTimeout();
2909+
}
2910+
}
2911+
}
2912+
28362913
private class OneAtATimeSynchronizationContext : SynchronizationContext, IAsyncDisposable
28372914
{
28382915
private readonly Channel<(SendOrPostCallback, object)> _taskQueue = Channel.CreateUnbounded<(SendOrPostCallback, object)>();

src/SignalR/clients/csharp/Client/test/FunctionalTests/HubProtocolVersionTests.cs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System;
55
using System.Buffers;
66
using System.Collections.Generic;
7+
using System.Diagnostics.CodeAnalysis;
78
using System.Net;
89
using System.Text;
910
using System.Threading;
@@ -171,7 +172,7 @@ bool ExpectedErrors(WriteContext writeContext)
171172
var connectionBuilder = new HubConnectionBuilder()
172173
.WithLoggerFactory(LoggerFactory)
173174
.WithUrl(server.Url + "/version", transportType);
174-
connectionBuilder.Services.AddSingleton<IHubProtocol>(new VersionedJsonHubProtocol(int.MaxValue));
175+
connectionBuilder.Services.AddSingleton<IHubProtocol>(new SingleVersionHubProtocol(new VersionedJsonHubProtocol(int.MaxValue), int.MaxValue));
175176

176177
var connection = connectionBuilder.Build();
177178

@@ -193,6 +194,41 @@ await ExceptionAssert.ThrowsAsync<HubException>(
193194
}
194195
}
195196

197+
public class SingleVersionHubProtocol : IHubProtocol
198+
{
199+
private readonly IHubProtocol _protocol;
200+
private readonly int _version;
201+
202+
public SingleVersionHubProtocol(IHubProtocol inner, int version)
203+
{
204+
_protocol = inner;
205+
_version = version;
206+
}
207+
208+
public string Name => _protocol.Name;
209+
210+
public int Version => _version;
211+
212+
public TransferFormat TransferFormat => _protocol.TransferFormat;
213+
214+
public ReadOnlyMemory<byte> GetMessageBytes(HubMessage message) => _protocol.GetMessageBytes(message);
215+
216+
public bool IsVersionSupported(int version)
217+
{
218+
return version == _version;
219+
}
220+
221+
public bool TryParseMessage(ref ReadOnlySequence<byte> input, IInvocationBinder binder, [NotNullWhen(true)] out HubMessage message)
222+
{
223+
return _protocol.TryParseMessage(ref input, binder, out message);
224+
}
225+
226+
public void WriteMessage(HubMessage message, IBufferWriter<byte> output)
227+
{
228+
_protocol.WriteMessage(message, output);
229+
}
230+
}
231+
196232
private class ProxyConnectionFactory : IConnectionFactory
197233
{
198234
private readonly IConnectionFactory _innerFactory;

src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/WebSocketsTransport.cs

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,9 @@ private async Task ProcessSocketAsync(WebSocket socket, Uri url, bool isReconnec
372372

373373
// Abort the websocket if we're stuck in a pending send to the client
374374
socket.Abort();
375+
376+
// Should not throw
377+
await sending.ConfigureAwait(false);
375378
}
376379
}
377380
else
@@ -390,18 +393,40 @@ private async Task ProcessSocketAsync(WebSocket socket, Uri url, bool isReconnec
390393
{
391394
_application.Output.CancelPendingFlush();
392395
}
396+
397+
// Should not throw
398+
await receiving.ConfigureAwait(false);
393399
}
394400
}
395401

396-
if (_useStatefulReconnect && !_gracefulClose)
402+
var cleanup = true;
403+
try
397404
{
398-
if (!UpdateConnectionPair())
405+
if (!_gracefulClose && UpdateConnectionPair())
399406
{
400-
return;
407+
try
408+
{
409+
await StartAsync(url, _webSocketMessageType == WebSocketMessageType.Binary ? TransferFormat.Binary : TransferFormat.Text,
410+
cancellationToken: default).ConfigureAwait(false);
411+
cleanup = false;
412+
}
413+
catch (Exception ex)
414+
{
415+
throw new InvalidOperationException("Reconnect attempt failed.", innerException: ex);
416+
}
417+
}
418+
}
419+
finally
420+
{
421+
if (cleanup)
422+
{
423+
// Pipes will usually already be completed.
424+
// If stateful reconnect fails we want to make sure the Pipes are cleaned up.
425+
// And in rare cases where the websocket is closing at the same time StopAsync is called
426+
// It's possible a Pipe won't be completed so let's be safe and call Complete again.
427+
_application.Output.Complete();
428+
_application.Input.Complete();
401429
}
402-
403-
await StartAsync(url, _webSocketMessageType == WebSocketMessageType.Binary ? TransferFormat.Binary : TransferFormat.Text,
404-
cancellationToken: default).ConfigureAwait(false);
405430
}
406431
}
407432

@@ -479,7 +504,7 @@ private async Task StartReceiving(WebSocket socket)
479504
{
480505
if (!_aborted)
481506
{
482-
if (_gracefulClose)
507+
if (_gracefulClose || !_useStatefulReconnect)
483508
{
484509
_application.Output.Complete(ex);
485510
}
@@ -493,7 +518,7 @@ private async Task StartReceiving(WebSocket socket)
493518
finally
494519
{
495520
// We're done writing
496-
if (_gracefulClose)
521+
if (_gracefulClose || !_useStatefulReconnect)
497522
{
498523
_application.Output.Complete();
499524
}
@@ -589,7 +614,7 @@ private async Task StartSending(WebSocket socket, bool ignoreFirstCanceled)
589614
}
590615
}
591616

592-
if (_gracefulClose)
617+
if (_gracefulClose || !_useStatefulReconnect)
593618
{
594619
_application.Input.Complete(error);
595620
}

src/SignalR/clients/ts/signalr-protocol-msgpack/src/MessagePackHubProtocol.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export class MessagePackHubProtocol implements IHubProtocol {
2626
/** The name of the protocol. This is used by SignalR to resolve the protocol between the client and server. */
2727
public readonly name: string = "messagepack";
2828
/** The version of the protocol. */
29-
public readonly version: number = 1;
29+
public readonly version: number = 2;
3030
/** The TransferFormat of the protocol. */
3131
public readonly transferFormat: TransferFormat = TransferFormat.Binary;
3232

src/SignalR/clients/ts/signalr/src/HubConnection.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,9 +228,16 @@ export class HubConnection {
228228
await this.connection.start(this._protocol.transferFormat);
229229

230230
try {
231+
let version = this._protocol.version;
232+
if (!this.connection.features.reconnect) {
233+
// Stateful Reconnect starts with HubProtocol version 2, newer clients connecting to older servers will fail to connect due to
234+
// the handshake only supporting version 1, so we will try to send version 1 during the handshake to keep old servers working.
235+
version = 1;
236+
}
237+
231238
const handshakeRequest: HandshakeRequestMessage = {
232239
protocol: this._protocol.name,
233-
version: this._protocol.version,
240+
version,
234241
};
235242

236243
this._logger.log(LogLevel.Debug, "Sending handshake request.");
@@ -257,8 +264,8 @@ export class HubConnection {
257264
throw this._stopDuringStartError;
258265
}
259266

260-
const useAck = this.connection.features.reconnect || false;
261-
if (useAck) {
267+
const useStatefulReconnect = this.connection.features.reconnect || false;
268+
if (useStatefulReconnect) {
262269
this._messageBuffer = new MessageBuffer(this._protocol, this.connection, this._statefulReconnectBufferSize);
263270
this.connection.features.disconnected = this._messageBuffer._disconnected.bind(this._messageBuffer);
264271
this.connection.features.resend = () => {

src/SignalR/clients/ts/signalr/src/JsonHubProtocol.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ export class JsonHubProtocol implements IHubProtocol {
1515
/** @inheritDoc */
1616
public readonly name: string = JSON_HUB_PROTOCOL_NAME;
1717
/** @inheritDoc */
18-
public readonly version: number = 1;
18+
public readonly version: number = 2;
1919

2020
/** @inheritDoc */
2121
public readonly transferFormat: TransferFormat = TransferFormat.Text;

src/SignalR/common/Protocols.Json/src/Protocol/JsonHubProtocol.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public sealed class JsonHubProtocol : IHubProtocol
4747
private static readonly JsonEncodedText SequenceIdPropertyNameBytes = JsonEncodedText.Encode(SequenceIdPropertyName);
4848

4949
private const string ProtocolName = "json";
50-
private const int ProtocolVersion = 1;
50+
private const int ProtocolVersion = 2;
5151

5252
/// <summary>
5353
/// Gets the serializer used to serialize invocation arguments and return values.
@@ -82,7 +82,7 @@ public JsonHubProtocol(IOptions<JsonHubProtocolOptions> options)
8282
/// <inheritdoc />
8383
public bool IsVersionSupported(int version)
8484
{
85-
return version == Version;
85+
return version <= Version;
8686
}
8787

8888
/// <inheritdoc />

0 commit comments

Comments
 (0)