Skip to content

Commit 85bde1d

Browse files
Parallel hub invocations (#23535)
1 parent df04381 commit 85bde1d

File tree

12 files changed

+636
-86
lines changed

12 files changed

+636
-86
lines changed

src/SignalR/perf/Microbenchmarks/DefaultHubDispatcherBenchmark.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public void GlobalSetup()
4747
var contextOptions = new HubConnectionContextOptions()
4848
{
4949
KeepAliveInterval = TimeSpan.Zero,
50+
StreamBufferCapacity = 10,
5051
};
5152
_connectionContext = new NoErrorHubConnectionContext(connection, contextOptions, NullLoggerFactory.Instance);
5253

src/SignalR/server/Core/src/HubConnectionContext.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,13 @@ public HubConnectionContext(ConnectionContext connectionContext, HubConnectionCo
7373

7474
_systemClock = contextOptions.SystemClock ?? new SystemClock();
7575
_lastSendTimeStamp = _systemClock.UtcNowTicks;
76+
77+
// We'll be avoiding using the semaphore when the limit is set to 1, so no need to allocate it
78+
var maxInvokeLimit = contextOptions.MaximumParallelInvocations;
79+
if (maxInvokeLimit != 1)
80+
{
81+
ActiveInvocationLimit = new SemaphoreSlim(maxInvokeLimit, maxInvokeLimit);
82+
}
7683
}
7784

7885
internal StreamTracker StreamTracker
@@ -93,6 +100,8 @@ internal StreamTracker StreamTracker
93100

94101
internal Exception? CloseException { get; private set; }
95102

103+
internal SemaphoreSlim? ActiveInvocationLimit { get; }
104+
96105
/// <summary>
97106
/// Gets a <see cref="CancellationToken"/> that notifies when the connection is aborted.
98107
/// </summary>

src/SignalR/server/Core/src/HubConnectionContextOptions.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,10 @@ public class HubConnectionContextOptions
3232
public long? MaximumReceiveMessageSize { get; set; }
3333

3434
internal ISystemClock SystemClock { get; set; } = default!;
35+
36+
/// <summary>
37+
/// Gets or sets the maximum parallel hub method invocations.
38+
/// </summary>
39+
public int MaximumParallelInvocations { get; set; } = 1;
3540
}
3641
}

src/SignalR/server/Core/src/HubConnectionHandler.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Collections.Generic;
66
using System.IO;
77
using System.Linq;
8+
using System.Threading.Channels;
89
using System.Threading.Tasks;
910
using Microsoft.AspNetCore.Connections;
1011
using Microsoft.AspNetCore.Internal;
@@ -31,6 +32,7 @@ public class HubConnectionHandler<THub> : ConnectionHandler where THub : Hub
3132
private readonly HubDispatcher<THub> _dispatcher;
3233
private readonly bool _enableDetailedErrors;
3334
private readonly long? _maximumMessageSize;
35+
private readonly int _maxParallelInvokes;
3436

3537
// Internal for testing
3638
internal ISystemClock SystemClock { get; set; } = new SystemClock();
@@ -70,6 +72,7 @@ IServiceScopeFactory serviceScopeFactory
7072
{
7173
_maximumMessageSize = _hubOptions.MaximumReceiveMessageSize;
7274
_enableDetailedErrors = _hubOptions.EnableDetailedErrors ?? _enableDetailedErrors;
75+
_maxParallelInvokes = _hubOptions.MaximumParallelInvocationsPerClient;
7376

7477
if (_hubOptions.HubFilters != null)
7578
{
@@ -80,6 +83,7 @@ IServiceScopeFactory serviceScopeFactory
8083
{
8184
_maximumMessageSize = _globalHubOptions.MaximumReceiveMessageSize;
8285
_enableDetailedErrors = _globalHubOptions.EnableDetailedErrors ?? _enableDetailedErrors;
86+
_maxParallelInvokes = _globalHubOptions.MaximumParallelInvocationsPerClient;
8387

8488
if (_globalHubOptions.HubFilters != null)
8589
{
@@ -116,6 +120,7 @@ public override async Task OnConnectedAsync(ConnectionContext connection)
116120
StreamBufferCapacity = _hubOptions.StreamBufferCapacity ?? _globalHubOptions.StreamBufferCapacity ?? HubOptionsSetup.DefaultStreamBufferCapacity,
117121
MaximumReceiveMessageSize = _maximumMessageSize,
118122
SystemClock = SystemClock,
123+
MaximumParallelInvocations = _maxParallelInvokes,
119124
};
120125

121126
Log.ConnectedStarting(_logger);
@@ -235,7 +240,6 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection)
235240
var protocol = connection.Protocol;
236241
connection.BeginClientTimeout();
237242

238-
239243
var binder = new HubConnectionBinder<THub>(_dispatcher, connection);
240244

241245
while (true)
@@ -258,8 +262,9 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection)
258262
{
259263
while (protocol.TryParseMessage(ref buffer, binder, out var message))
260264
{
261-
messageReceived = true;
262265
connection.StopClientTimeout();
266+
// This lets us know the timeout has stopped and we need to re-enable it after dispatching the message
267+
messageReceived = true;
263268
await _dispatcher.DispatchMessageAsync(connection, message);
264269
}
265270

@@ -286,9 +291,9 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection)
286291

287292
if (protocol.TryParseMessage(ref segment, binder, out var message))
288293
{
289-
messageReceived = true;
290294
connection.StopClientTimeout();
291-
295+
// This lets us know the timeout has stopped and we need to re-enable it after dispatching the message
296+
messageReceived = true;
292297
await _dispatcher.DispatchMessageAsync(connection, message);
293298
}
294299
else if (overLength)

src/SignalR/server/Core/src/HubOptions.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ namespace Microsoft.AspNetCore.SignalR
1111
/// </summary>
1212
public class HubOptions
1313
{
14+
private int _maximumParallelInvocationsPerClient = 1;
15+
1416
// HandshakeTimeout and KeepAliveInterval are set to null here to help identify when
1517
// local hub options have been set. Global default values are set in HubOptionsSetup.
1618
// SupportedProtocols being null is the true default value, and it represents support
@@ -53,5 +55,23 @@ public class HubOptions
5355
public int? StreamBufferCapacity { get; set; } = null;
5456

5557
internal List<IHubFilter>? HubFilters { get; set; }
58+
59+
/// <summary>
60+
/// By default a client is only allowed to invoke a single Hub method at a time.
61+
/// Changing this property will allow clients to invoke multiple methods at the same time before queueing.
62+
/// </summary>
63+
public int MaximumParallelInvocationsPerClient
64+
{
65+
get => _maximumParallelInvocationsPerClient;
66+
set
67+
{
68+
if (value < 1)
69+
{
70+
throw new ArgumentOutOfRangeException(nameof(MaximumParallelInvocationsPerClient));
71+
}
72+
73+
_maximumParallelInvocationsPerClient = value;
74+
}
75+
}
5676
}
5777
}

src/SignalR/server/Core/src/HubOptionsSetup`T.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public void Configure(HubOptions<THub> options)
2525
options.EnableDetailedErrors = _hubOptions.EnableDetailedErrors;
2626
options.MaximumReceiveMessageSize = _hubOptions.MaximumReceiveMessageSize;
2727
options.StreamBufferCapacity = _hubOptions.StreamBufferCapacity;
28+
options.MaximumParallelInvocationsPerClient = _hubOptions.MaximumParallelInvocationsPerClient;
2829

2930
options.UserHasSetValues = true;
3031

src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.Log.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ private static class Log
7979
private static readonly Action<ILogger, string, Exception> _invalidHubParameters =
8080
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(22, "InvalidHubParameters"), "Parameters to hub method '{HubMethod}' are incorrect.");
8181

82+
private static readonly Action<ILogger, string, Exception> _invocationIdInUse =
83+
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(23, "InvocationIdInUse"), "Invocation ID '{InvocationId}' is already in use.");
84+
8285
public static void ReceivedHubInvocation(ILogger logger, InvocationMessage invocationMessage)
8386
{
8487
_receivedHubInvocation(logger, invocationMessage, null);
@@ -188,6 +191,11 @@ public static void InvalidHubParameters(ILogger logger, string hubMethod, Except
188191
{
189192
_invalidHubParameters(logger, hubMethod, exception);
190193
}
194+
195+
public static void InvocationIdInUse(ILogger logger, string InvocationId)
196+
{
197+
_invocationIdInUse(logger, InvocationId, null);
198+
}
191199
}
192200
}
193201
}

0 commit comments

Comments
 (0)