Skip to content

Commit 7841f95

Browse files
committed
more tests
1 parent c3d0b35 commit 7841f95

16 files changed

+470
-175
lines changed

ManagedCode.Orleans.SignalR.Core/Config/OrleansSignalROptions.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,24 @@ public class OrleansSignalROptions
3434
/// </summary>
3535
public uint ConnectionPartitionCount { get; set; } = 4;
3636

37+
/// <summary>
38+
/// Target number of concurrent connections per partition.
39+
/// Used as a hint when determining how many partitions to allocate dynamically.
40+
/// Lower values result in more partitions.
41+
/// </summary>
42+
public int ConnectionsPerPartitionHint { get; set; } = 10_000;
43+
3744
/// <summary>
3845
/// Number of partitions to use for group distribution.
3946
/// Set to 1 to disable partitioning.
4047
/// Increase this value for better scalability with millions of groups.
4148
/// The default value is 4.
4249
/// </summary>
4350
public uint GroupPartitionCount { get; set; } = 4;
51+
52+
/// <summary>
53+
/// Target number of groups per partition.
54+
/// Used as a hint when determining how many partitions to allocate dynamically.
55+
/// </summary>
56+
public int GroupsPerPartitionHint { get; set; } = 1_000;
4457
}

ManagedCode.Orleans.SignalR.Core/Helpers/PartitionHelper.cs

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,22 +28,37 @@ public static int GetPartitionId(string connectionId, uint partitionCount)
2828

2929
public static int GetOptimalPartitionCount(int expectedConnections)
3030
{
31-
// Rule of thumb: ~10,000 connections per partition
32-
const int connectionsPerPartition = 10000;
33-
var partitions = Math.Max(1, (expectedConnections + connectionsPerPartition - 1) / connectionsPerPartition);
34-
35-
// Round to nearest power of 2 for better hash distribution
36-
return (int)Math.Pow(2, Math.Ceiling(Math.Log(partitions, 2)));
31+
return GetOptimalPartitionCount(expectedConnections, 10_000);
32+
}
33+
34+
public static int GetOptimalPartitionCount(int expectedConnections, int connectionsPerPartition)
35+
{
36+
var perPartition = Math.Max(1, connectionsPerPartition);
37+
var partitions = Math.Max(1, (expectedConnections + perPartition - 1) / perPartition);
38+
return ToPowerOfTwo(partitions);
3739
}
3840

3941
public static int GetOptimalGroupPartitionCount(int expectedGroups)
4042
{
41-
// Rule of thumb: ~1,000 groups per partition
42-
const int groupsPerPartition = 1000;
43-
var partitions = Math.Max(1, (expectedGroups + groupsPerPartition - 1) / groupsPerPartition);
44-
45-
// Round to nearest power of 2 for better hash distribution
46-
return (int)Math.Pow(2, Math.Ceiling(Math.Log(partitions, 2)));
43+
return GetOptimalGroupPartitionCount(expectedGroups, 1_000);
44+
}
45+
46+
public static int GetOptimalGroupPartitionCount(int expectedGroups, int groupsPerPartition)
47+
{
48+
var perPartition = Math.Max(1, groupsPerPartition);
49+
var partitions = Math.Max(1, (expectedGroups + perPartition - 1) / perPartition);
50+
return ToPowerOfTwo(partitions);
51+
}
52+
53+
private static int ToPowerOfTwo(int value)
54+
{
55+
if (value <= 1)
56+
{
57+
return 1;
58+
}
59+
60+
var power = (int)Math.Ceiling(Math.Log(value, 2));
61+
return (int)Math.Pow(2, power);
4762
}
4863

4964
private readonly record struct RingCacheKey(int PartitionCount, int VirtualNodes);

ManagedCode.Orleans.SignalR.Core/Interfaces/ISignalRConnectionCoordinatorGrain.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,6 @@ public interface ISignalRConnectionCoordinatorGrain : IGrainWithStringKey
2323

2424
[OneWay]
2525
Task SendToConnections(HubMessage message, string[] connectionIds);
26+
27+
Task NotifyConnectionRemoved(string connectionId);
2628
}

ManagedCode.Orleans.SignalR.Core/Interfaces/ISignalRGroupCoordinatorGrain.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,6 @@ public interface ISignalRGroupCoordinatorGrain : IGrainWithStringKey
2525
Task AddConnectionToGroup(string groupName, string connectionId, ISignalRObserver observer);
2626

2727
Task RemoveConnectionFromGroup(string groupName, string connectionId, ISignalRObserver observer);
28+
29+
Task NotifyGroupRemoved(string groupName);
2830
}

ManagedCode.Orleans.SignalR.Core/Interfaces/ISignalRGroupPartitionGrain.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,6 @@ public interface ISignalRGroupPartitionGrain : IGrainWithIntegerKey, IObserverCo
1919

2020
[ReadOnly]
2121
Task<bool> HasConnection(string connectionId);
22+
23+
Task EnsureInitialized(string hubKey);
2224
}

ManagedCode.Orleans.SignalR.Core/Models/GroupPartitionState.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ public class GroupPartitionState
1616
[Id(2)]
1717
public Dictionary<string, string> ConnectionObservers { get; set; } = new();
1818

19+
[Id(3)]
20+
public string? HubKey { get; set; }
21+
1922
public bool IsEmpty =>
2023
Groups.Count == 0 &&
2124
ConnectionGroups.Count == 0 &&

ManagedCode.Orleans.SignalR.Core/SignalR/NameHelperGenerator.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ public static ISignalRGroupCoordinatorGrain GetGroupCoordinatorGrain<THub>(IGrai
6060
return grainFactory.GetGrain<ISignalRGroupCoordinatorGrain>(CleanString(typeof(THub).FullName!));
6161
}
6262

63+
public static ISignalRGroupCoordinatorGrain GetGroupCoordinatorGrain(IGrainFactory grainFactory, string hubKey)
64+
{
65+
return grainFactory.GetGrain<ISignalRGroupCoordinatorGrain>(CleanString(hubKey));
66+
}
67+
6368
public static ISignalRGroupPartitionGrain GetGroupPartitionGrain<THub>(IGrainFactory grainFactory, int partitionId)
6469
{
6570
var key = GetPartitionGrainKey(typeof(THub).FullName!, partitionId, alreadyCleaned: false);

ManagedCode.Orleans.SignalR.Core/SignalR/OrleansHubLifetimeManager.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ public override async Task OnDisconnectedAsync(HubConnectionContext connection)
9191
// For small number of grains (typical case), await all tasks
9292
// This ensures proper cleanup on disconnect
9393
await Task.Run(() => Task.WhenAll(tasks));
94+
95+
await Task.Run(() => NameHelperGenerator.GetConnectionCoordinatorGrain<THub>(_clusterClient)
96+
.NotifyConnectionRemoved(connection.ConnectionId));
9497
}
9598

9699
public override Task SendAllAsync(string methodName, object?[] args, CancellationToken cancellationToken = new())
@@ -231,6 +234,9 @@ public override async Task AddToGroupAsync(string connectionId, string groupName
231234
var coordinatorGrain = NameHelperGenerator.GetGroupCoordinatorGrain<THub>(_clusterClient);
232235
var partitionId = await Task.Run(() => coordinatorGrain.GetPartitionForGroup(groupName), cancellationToken);
233236
var partitionGrain = NameHelperGenerator.GetGroupPartitionGrain<THub>(_clusterClient, partitionId);
237+
var hubKey = NameHelperGenerator.CleanString(typeof(THub).FullName!);
238+
239+
await Task.Run(() => partitionGrain.EnsureInitialized(hubKey), cancellationToken);
234240

235241
subscription.AddGrain(partitionGrain);
236242
await Task.Run(() => partitionGrain.AddConnection(connectionId, subscription.Reference), cancellationToken);
@@ -256,6 +262,9 @@ public override async Task RemoveFromGroupAsync(string connectionId, string grou
256262
var coordinatorGrain = NameHelperGenerator.GetGroupCoordinatorGrain<THub>(_clusterClient);
257263
var partitionId = await Task.Run(() => coordinatorGrain.GetPartitionForGroup(groupName), cancellationToken);
258264
var partitionGrain = NameHelperGenerator.GetGroupPartitionGrain<THub>(_clusterClient, partitionId);
265+
var hubKey = NameHelperGenerator.CleanString(typeof(THub).FullName!);
266+
267+
await Task.Run(() => partitionGrain.EnsureInitialized(hubKey), cancellationToken);
259268

260269
await Task.Run(() => coordinatorGrain.RemoveConnectionFromGroup(groupName, connectionId, subscription.Reference), cancellationToken);
261270

0 commit comments

Comments
 (0)