Skip to content

Commit 2222fbd

Browse files
committed
tests
1 parent 423ed12 commit 2222fbd

File tree

6 files changed

+96
-67
lines changed

6 files changed

+96
-67
lines changed

ManagedCode.Orleans.SignalR.Tests/Cluster/ClusterFixtures.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public sealed class LoadClusterFixture : ClusterFixtureBase
4646
public LoadClusterFixture()
4747
: base(builder =>
4848
{
49-
builder.Options.InitialSilosCount = 4;
49+
builder.Options.InitialSilosCount = 6;
5050
})
5151
{
5252
}

ManagedCode.Orleans.SignalR.Tests/Cluster/Grains/TestGrain.cs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,23 +27,19 @@ public Task PushMessage(string message)
2727
return _orleansHubContext.Clients.All.SendMessage(this.GetPrimaryKeyString());
2828
}
2929

30-
public async Task<string> GetMessageInvoke(string connectionId)
30+
public Task<string> GetMessageInvoke(string connectionId)
3131
{
32-
var localConnection = connectionId;
33-
var message = await Task.Run(() => _hubContext.Clients.Client(localConnection)
34-
.InvokeAsync<string>("GetMessage", CancellationToken.None));
35-
36-
return message;
32+
return _hubContext.Clients.Client(connectionId)
33+
.InvokeAsync<string>("GetMessage", CancellationToken.None);
3734
}
3835

39-
public async Task<string> GetMessage(string connectionId)
36+
public Task<string> GetMessage(string connectionId)
4037
{
41-
var message = await Task.Run(() => _orleansHubContext.Clients.Client(connectionId).GetMessage());
42-
return message;
38+
return _orleansHubContext.Clients.Client(connectionId).GetMessage();
4339
}
4440

4541
public Task SendToUser(string userName, string message)
4642
{
47-
return Task.Run(() => _hubContext.Clients.User(userName).SendAsync("SendMessage", message));
43+
return _hubContext.Clients.User(userName).SendAsync("SendMessage", message);
4844
}
49-
}
45+
}

ManagedCode.Orleans.SignalR.Tests/PartitioningTests.cs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -78,33 +78,31 @@ public async Task Default_Configuration_Should_Use_Connection_Partitioning()
7878
public async Task Default_Group_Configuration_Should_Use_Group_Partitioning()
7979
{
8080
// Arrange
81-
const int groupCount = 20;
81+
const int groupCount = 100;
8282
var connection = _apps[1].CreateSignalRClient(nameof(SimpleTestHub));
8383
await connection.StartAsync();
8484
connection.State.ShouldBe(HubConnectionState.Connected);
8585

8686
// Act - Add connection to multiple groups
87-
for (int i = 0; i < groupCount; i++)
88-
{
89-
await connection.InvokeAsync("AddToGroup", $"group_{i}");
90-
}
91-
92-
// Give some time for group operations to complete
93-
await Task.Delay(1000);
87+
var addTasks = Enumerable.Range(0, groupCount)
88+
.Select(i => connection.InvokeAsync("AddToGroup", $"group_{i}"))
89+
.ToArray();
90+
await Task.WhenAll(addTasks);
9491

9592
// Assert - Send messages to different groups
96-
for (int i = 0; i < groupCount; i++)
97-
{
98-
await connection.InvokeAsync("GroupSendAsync", $"group_{i}", $"Hello group_{i}!");
99-
}
93+
var sendTasks = Enumerable.Range(0, groupCount)
94+
.Select(i => connection.InvokeAsync("GroupSendAsync", $"group_{i}", $"Hello group_{i}!"))
95+
.ToArray();
96+
await Task.WhenAll(sendTasks);
10097

10198
// Verify group coordinator is working with default configuration (partitioning enabled)
10299
var groupCoordinatorGrain = NameHelperGenerator.GetGroupCoordinatorGrain<SimpleTestHub>(_siloCluster.Cluster.Client);
103100
var groupPartitionCount = await groupCoordinatorGrain.GetPartitionCount();
104101

105102
var defaultGroupPartitions = (int)new OrleansSignalROptions().GroupPartitionCount;
106103
defaultGroupPartitions.ShouldBeGreaterThan(1);
107-
groupPartitionCount.ShouldBe(defaultGroupPartitions);
104+
groupPartitionCount.ShouldBeGreaterThanOrEqualTo(defaultGroupPartitions);
105+
(groupPartitionCount & (groupPartitionCount - 1)).ShouldBe(0);
108106

109107
// Cleanup
110108
await connection.StopAsync();

ManagedCode.Orleans.SignalR.Tests/PerformanceComparisonTests.cs

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ namespace ManagedCode.Orleans.SignalR.Tests;
1717
[Collection(nameof(LoadCluster))]
1818
public class PerformanceComparisonTests
1919
{
20-
private const int BroadcastConnectionCount = 40;
21-
private const int BroadcastMessageCount = 1_000;
20+
private const int BroadcastConnectionCount = 211;
21+
private const int BroadcastMessageCount = 1_001;
2222

23-
private const int GroupConnectionCount = 40;
24-
private const int GroupCount = 40;
25-
private const int GroupMessagesPerGroup = 1_000;
23+
private const int GroupConnectionCount = 127;
24+
private const int GroupCount = 113;
25+
private const int GroupMessagesPerGroup = 257;
2626

2727
private readonly LoadClusterFixture _cluster;
2828
private readonly ITestOutputHelper _output;
@@ -64,7 +64,7 @@ public async Task Group_Performance_Comparison()
6464
private async Task<TimeSpan> RunBroadcastScenarioAsync(bool useOrleans, int basePort)
6565
{
6666
var apps = CreateApplications(basePort, useOrleans);
67-
var (connections, perAppCounts) = await CreateConnectionsAsync(apps, BroadcastConnectionCount);
67+
var (connections, perAppCounts, _) = await CreateConnectionsAsync(apps, BroadcastConnectionCount);
6868

6969
try
7070
{
@@ -143,7 +143,7 @@ private async Task<TimeSpan> RunBroadcastScenarioAsync(bool useOrleans, int base
143143
private async Task<TimeSpan> RunGroupScenarioAsync(bool useOrleans, int basePort)
144144
{
145145
var apps = CreateApplications(basePort, useOrleans);
146-
var (connections, _) = await CreateConnectionsAsync(apps, GroupConnectionCount);
146+
var (connections, _, connectionAppIndices) = await CreateConnectionsAsync(apps, GroupConnectionCount);
147147

148148
try
149149
{
@@ -152,18 +152,42 @@ private async Task<TimeSpan> RunGroupScenarioAsync(bool useOrleans, int basePort
152152

153153
long received = 0;
154154

155+
var subscriptionTasks = new List<Task>(connections.Count);
156+
155157
for (var index = 0; index < connections.Count; index++)
156158
{
157159
var connection = connections[index];
158160
var groupName = groupNames[index % groupNames.Length];
159161
groupMembers[groupName].Add(connection);
160-
161162
connection.On<string>("SendAll", _ => Interlocked.Increment(ref received));
162-
await connection.InvokeAsync("AddToGroup", groupName);
163+
subscriptionTasks.Add(connection.InvokeAsync("AddToGroup", groupName));
163164
}
164165

166+
await Task.WhenAll(subscriptionTasks);
167+
165168
var activeGroups = groupMembers.Where(kvp => kvp.Value.Count > 0).ToArray();
166-
var expected = activeGroups.Sum(kvp => (long)kvp.Value.Count) * GroupMessagesPerGroup;
169+
var appIndexByConnection = connections
170+
.Select((connection, idx) => (connection, idx))
171+
.ToDictionary(tuple => tuple.connection, tuple => connectionAppIndices[tuple.idx]);
172+
173+
long expected;
174+
if (useOrleans)
175+
{
176+
expected = activeGroups.Sum(kvp => (long)kvp.Value.Count * kvp.Value.Count) * GroupMessagesPerGroup;
177+
}
178+
else
179+
{
180+
expected = activeGroups.Sum(kvp =>
181+
{
182+
var members = kvp.Value;
183+
return members.Sum(sender =>
184+
{
185+
var senderApp = appIndexByConnection[sender];
186+
var recipients = members.Count(connection => appIndexByConnection[connection] == senderApp);
187+
return (long)recipients;
188+
}) * GroupMessagesPerGroup;
189+
});
190+
}
167191
expected.ShouldBeGreaterThan(0, "At least one connection must belong to a group.");
168192

169193
await Task.Delay(TimeSpan.FromMilliseconds(250));
@@ -175,11 +199,14 @@ private async Task<TimeSpan> RunGroupScenarioAsync(bool useOrleans, int basePort
175199
var sendTasks = activeGroups.Select(tuple => Task.Run(async () =>
176200
{
177201
var (group, members) = tuple;
178-
var sender = members[0];
179-
for (var message = 0; message < GroupMessagesPerGroup; message++)
202+
var perSender = members.Select(sender => Task.Run(async () =>
180203
{
181-
await sender.InvokeAsync("GroupSendAsync", group, $"payload-{message}");
182-
}
204+
for (var message = 0; message < GroupMessagesPerGroup; message++)
205+
{
206+
await sender.InvokeAsync("GroupSendAsync", group, $"payload-{message}");
207+
}
208+
}));
209+
await Task.WhenAll(perSender);
183210
}));
184211
await Task.WhenAll(sendTasks);
185212

@@ -235,12 +262,13 @@ private List<TestWebApplication> CreateApplications(int basePort, bool useOrlean
235262
return apps;
236263
}
237264

238-
private static async Task<(List<HubConnection> Connections, int[] PerAppCounts)> CreateConnectionsAsync(
265+
private static async Task<(List<HubConnection> Connections, int[] PerAppCounts, int[] ConnectionAppIndices)> CreateConnectionsAsync(
239266
IReadOnlyList<TestWebApplication> apps,
240267
int count)
241268
{
242269
var connections = new List<HubConnection>(count);
243270
var perAppCounts = new int[apps.Count];
271+
var connectionAppIndices = new int[count];
244272

245273
for (var index = 0; index < count; index++)
246274
{
@@ -250,9 +278,10 @@ private List<TestWebApplication> CreateApplications(int basePort, bool useOrlean
250278
await connection.StartAsync();
251279
connections.Add(connection);
252280
perAppCounts[appIndex]++;
281+
connectionAppIndices[index] = appIndex;
253282
}
254283

255-
return (connections, perAppCounts);
284+
return (connections, perAppCounts, connectionAppIndices);
256285
}
257286

258287
private static async Task DisposeAsync(IEnumerable<HubConnection> connections)

ManagedCode.Orleans.SignalR.Tests/StressTests.cs

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@ namespace ManagedCode.Orleans.SignalR.Tests;
1818
public class StressTests
1919
{
2020
private const string StressGroup = "stress-group";
21+
private const int StressConnectionCount = 211;
22+
private const int BroadcastsPerConnection = 233;
23+
private const int GroupWorkflowIterations = 29;
2124
private static readonly TimeSpan WaitInterval = TimeSpan.FromMilliseconds(200);
2225
private static readonly TimeSpan LogInterval = TimeSpan.FromSeconds(1);
2326

24-
private readonly TestWebApplication _firstApp;
27+
private readonly IReadOnlyList<TestWebApplication> _apps;
2528
private readonly ITestOutputHelper _outputHelper;
26-
private readonly Random _random = new(42);
27-
private readonly TestWebApplication _secondApp;
2829
private readonly LoadClusterFixture _siloCluster;
2930
private readonly TestOutputHelperAccessor _loggerAccessor = new();
3031

@@ -33,25 +34,24 @@ public StressTests(LoadClusterFixture testApp, ITestOutputHelper outputHelper)
3334
_siloCluster = testApp;
3435
_outputHelper = outputHelper;
3536
_loggerAccessor.Output = outputHelper;
36-
_firstApp = new TestWebApplication(_siloCluster, 8081, loggerAccessor: _loggerAccessor);
37-
_secondApp = new TestWebApplication(_siloCluster, 8082, loggerAccessor: _loggerAccessor);
37+
_apps = Enumerable.Range(0, 4)
38+
.Select(index => new TestWebApplication(_siloCluster, 8081 + index, loggerAccessor: _loggerAccessor))
39+
.ToArray();
3840
}
3941

4042
[Fact]
4143
public async Task InvokeAsyncSignalRTest()
4244
{
43-
const int totalConnections = 6;
44-
const int broadcastsPerSender = 10;
45-
var expectedMessages = (long)totalConnections * broadcastsPerSender;
45+
var expectedMessages = (long)StressConnectionCount * BroadcastsPerConnection;
4646

47-
_outputHelper.WriteLine($"Creating {totalConnections} connections for stress broadcast test.");
48-
var connections = new List<HubConnection>(totalConnections);
47+
_outputHelper.WriteLine($"Creating {StressConnectionCount} connections for stress broadcast test.");
48+
var connections = new List<HubConnection>(StressConnectionCount);
4949
var observedMessages = 0L;
5050
var completionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
5151

52-
for (var connectionIndex = 0; connectionIndex < totalConnections; connectionIndex++)
52+
async Task<HubConnection> CreateAsync(int connectionIndex)
5353
{
54-
var started = await CreateStressConnectionAsync(connectionIndex, () =>
54+
return await CreateStressConnectionAsync(connectionIndex, () =>
5555
{
5656
var total = Interlocked.Increment(ref observedMessages);
5757
if (total >= expectedMessages)
@@ -61,21 +61,25 @@ public async Task InvokeAsyncSignalRTest()
6161

6262
return total;
6363
});
64-
connections.Add(started);
6564
}
6665

66+
var connectionTasks = Enumerable.Range(0, StressConnectionCount)
67+
.Select(CreateAsync)
68+
.ToArray();
69+
connections.AddRange(await Task.WhenAll(connectionTasks));
70+
6771
var stopwatch = Stopwatch.StartNew();
6872
var sendTasks = connections.Select(connection => Task.Run(async () =>
6973
{
70-
for (var iteration = 0; iteration < broadcastsPerSender; iteration++)
74+
for (var iteration = 0; iteration < BroadcastsPerConnection; iteration++)
7175
{
7276
await connection.InvokeAsync<int>("All");
7377
}
7478
}));
7579

7680
await Task.WhenAll(sendTasks);
7781

78-
var finishedTask = await Task.WhenAny(completionSource.Task, Task.Delay(TimeSpan.FromSeconds(10)));
82+
var finishedTask = await Task.WhenAny(completionSource.Task, Task.Delay(TimeSpan.FromSeconds(45)));
7983
finishedTask.ShouldBe(completionSource.Task, $"Timed out delivering {expectedMessages:N0} broadcast messages; observed {Interlocked.Read(ref observedMessages):N0}.");
8084

8185
stopwatch.Stop();
@@ -108,19 +112,21 @@ async Task<GrainCounts> FetchCountsAsync()
108112
var before = await FetchCountsAsync();
109113
_outputHelper.WriteLine($"Initial grain counts: {before}");
110114

111-
var hubConnection = await CreateUserConnectionAsync("stress-user", _firstApp, nameof(SimpleTestHub));
115+
var hubConnection = await CreateUserConnectionAsync("stress-user", _apps[0], nameof(SimpleTestHub));
112116
hubConnection.On("GetMessage", () => "connection1");
113117

114118
await hubConnection.StartAsync();
115119
hubConnection.State.ShouldBe(HubConnectionState.Connected);
116120
_outputHelper.WriteLine($"Stress connection started with id {hubConnection.ConnectionId}.");
117121

118-
for (var iteration = 0; iteration < 6; iteration++)
119-
{
120-
await hubConnection.InvokeAsync<int>("DoTest");
121-
await hubConnection.InvokeAsync("AddToGroup", StressGroup);
122-
await hubConnection.InvokeAsync("GroupSendAsync", StressGroup, $"payload-{iteration}");
123-
}
122+
var workflowTasks = Enumerable.Range(0, GroupWorkflowIterations)
123+
.Select(async iteration =>
124+
{
125+
await hubConnection.InvokeAsync<int>("DoTest");
126+
await hubConnection.InvokeAsync("AddToGroup", StressGroup);
127+
await hubConnection.InvokeAsync("GroupSendAsync", StressGroup, $"payload-{iteration}");
128+
});
129+
await Task.WhenAll(workflowTasks);
124130

125131
await Task.Delay(TimeSpan.FromMilliseconds(250));
126132

@@ -166,8 +172,8 @@ await _siloCluster.Cluster.Client.GetGrain<IManagementGrain>(0)
166172

167173
private async Task<HubConnection> CreateStressConnectionAsync(int index, Func<long> onBroadcastReceived)
168174
{
169-
var useUser = _random.NextDouble() < 0.5;
170-
var app = _random.NextDouble() < 0.5 ? _firstApp : _secondApp;
175+
var useUser = Random.Shared.NextDouble() < 0.5;
176+
var app = _apps[Random.Shared.Next(_apps.Count)];
171177
HubConnection connection;
172178
string? userId = null;
173179

@@ -201,7 +207,7 @@ private async Task<HubConnection> CreateStressConnectionAsync(int index, Func<lo
201207
connection.State.ShouldBe(HubConnectionState.Connected);
202208
_outputHelper.WriteLine($"[stress#{index}] connected (ConnectionId={connection.ConnectionId}, user={userId ?? "anonymous"}).");
203209

204-
if (_random.NextDouble() < 0.35)
210+
if (Random.Shared.NextDouble() < 0.35)
205211
{
206212
await connection.InvokeAsync("AddToGroup", StressGroup);
207213
_outputHelper.WriteLine($"[stress#{index}] joined group {StressGroup}.");

ManagedCode.Orleans.SignalR.Tests/TestApp/TestWebApplication.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ protected override void ConfigureWebHost(IWebHostBuilder builder)
5050
builder.ConfigureLogging(logging =>
5151
{
5252
logging.ClearProviders();
53-
logging.SetMinimumLevel(LogLevel.Debug);
53+
logging.SetMinimumLevel(LogLevel.Warning);
5454
logging.AddProvider(new XunitLoggerProvider(_loggerAccessor));
5555
});
5656
}

0 commit comments

Comments
 (0)