@@ -16,12 +16,12 @@ namespace ManagedCode.Orleans.SignalR.Tests;
1616[ Collection ( nameof ( LoadCluster ) ) ]
1717public class PerformanceComparisonTests
1818{
19- private const int BroadcastConnectionCount = 60 ;
20- private const int BroadcastMessageCount = 40 ;
19+ private const int BroadcastConnectionCount = 40 ;
20+ private const int BroadcastMessageCount = 2_500 ;
2121
22- private const int GroupConnectionCount = 48 ;
23- private const int GroupCount = 4 ;
24- private const int GroupMessagesPerGroup = 25 ;
22+ private const int GroupConnectionCount = 40 ;
23+ private const int GroupCount = 40 ;
24+ private const int GroupMessagesPerGroup = 2_500 ;
2525
2626 private readonly LoadClusterFixture _cluster ;
2727 private readonly ITestOutputHelper _output ;
@@ -35,8 +35,8 @@ public PerformanceComparisonTests(LoadClusterFixture cluster, ITestOutputHelper
3535 [ Fact ]
3636 public async Task Broadcast_Performance_Comparison ( )
3737 {
38- var orleans = await RunBroadcastScenarioAsync ( useOrleans : true , port : 9401 ) ;
39- var inMemory = await RunBroadcastScenarioAsync ( useOrleans : false , port : 9402 ) ;
38+ var orleans = await RunBroadcastScenarioAsync ( useOrleans : true , basePort : 9400 ) ;
39+ var inMemory = await RunBroadcastScenarioAsync ( useOrleans : false , basePort : 9500 ) ;
4040
4141 _output . WriteLine (
4242 $ "Broadcast comparison ({ BroadcastConnectionCount } connections, { BroadcastMessageCount } broadcasts) => Orleans: { orleans . TotalMilliseconds : F0} ms, In-Memory: { inMemory . TotalMilliseconds : F0} ms") ;
@@ -48,8 +48,8 @@ public async Task Broadcast_Performance_Comparison()
4848 [ Fact ]
4949 public async Task Group_Performance_Comparison ( )
5050 {
51- var orleans = await RunGroupScenarioAsync ( useOrleans : true , port : 9411 ) ;
52- var inMemory = await RunGroupScenarioAsync ( useOrleans : false , port : 9412 ) ;
51+ var orleans = await RunGroupScenarioAsync ( useOrleans : true , basePort : 9600 ) ;
52+ var inMemory = await RunGroupScenarioAsync ( useOrleans : false , basePort : 9700 ) ;
5353
5454 _output . WriteLine (
5555 $ "Group comparison ({ GroupConnectionCount } connections, { GroupCount } groups, { GroupMessagesPerGroup } messages/group) => Orleans: { orleans . TotalMilliseconds : F0} ms, In-Memory: { inMemory . TotalMilliseconds : F0} ms") ;
@@ -58,15 +58,24 @@ public async Task Group_Performance_Comparison()
5858 inMemory . ShouldNotBe ( TimeSpan . Zero ) ;
5959 }
6060
61- private async Task < TimeSpan > RunBroadcastScenarioAsync ( bool useOrleans , int port )
61+ private async Task < TimeSpan > RunBroadcastScenarioAsync ( bool useOrleans , int basePort )
6262 {
63- using var app = new TestWebApplication ( _cluster , port , useOrleans ) ;
64- var connections = await CreateConnectionsAsync ( app , BroadcastConnectionCount , "broadcast" ) ;
63+ var apps = CreateApplications ( basePort , useOrleans ) ;
64+ var ( connections , perAppCounts ) = await CreateConnectionsAsync ( apps , BroadcastConnectionCount ) ;
6565
6666 try
6767 {
6868 long received = 0 ;
69- var expected = BroadcastConnectionCount * BroadcastMessageCount ;
69+ var totalConnections = connections . Count ;
70+ long expected ;
71+ if ( useOrleans )
72+ {
73+ expected = BroadcastMessageCount * ( long ) totalConnections * totalConnections ;
74+ }
75+ else
76+ {
77+ expected = BroadcastMessageCount * perAppCounts . Sum ( count => ( long ) count * count ) ;
78+ }
7079
7180 foreach ( var connection in connections )
7281 {
@@ -76,44 +85,69 @@ private async Task<TimeSpan> RunBroadcastScenarioAsync(bool useOrleans, int port
7685 await Task . Delay ( TimeSpan . FromMilliseconds ( 250 ) ) ;
7786
7887 var starting = Interlocked . Read ( ref received ) ;
79-
8088 var stopwatch = Stopwatch . StartNew ( ) ;
89+ _output . WriteLine ( $ "Broadcast run ({ BroadcastConnectionCount } connections × { BroadcastMessageCount : N0} messages).") ;
8190
82- for ( var message = 0 ; message < BroadcastMessageCount ; message ++ )
91+ var sendTasks = connections . Select ( connection => Task . Run ( async ( ) =>
8392 {
84- var sender = connections [ message % connections . Count ] ;
85- await sender . InvokeAsync < int > ( "All" ) ;
86- }
93+ for ( var iteration = 0 ; iteration < BroadcastMessageCount ; iteration ++ )
94+ {
95+ await connection . InvokeAsync < int > ( "All" ) ;
96+ }
97+ } ) ) ;
98+ await Task . WhenAll ( sendTasks ) ;
99+
100+ var progressStep = Math . Max ( 1_000 , expected / 10 ) ;
101+ var lastReported = 0L ;
87102
88103 var completed = await WaitUntilAsync (
89104 ( ) => Interlocked . Read ( ref received ) - starting >= expected ,
90- TimeSpan . FromSeconds ( 30 ) ) ;
105+ timeout : TimeSpan . FromMinutes ( 2 ) ,
106+ _output ,
107+ ( ) =>
108+ {
109+ var delivered = Interlocked . Read ( ref received ) - starting ;
110+ if ( delivered - lastReported >= progressStep || delivered == expected )
111+ {
112+ lastReported = delivered ;
113+ return $ "{ delivered : N0} /{ expected : N0} ";
114+ }
115+
116+ return null ;
117+ } ) ;
91118
92119 stopwatch . Stop ( ) ;
93120
94- completed . ShouldBeTrue ( "Broadcast messages were not delivered to all connections." ) ;
95- ( Interlocked . Read ( ref received ) - starting ) . ShouldBe ( expected ) ;
121+ var delivered = Interlocked . Read ( ref received ) - starting ;
122+ if ( ! completed )
123+ {
124+ _output . WriteLine ( $ "Broadcast scenario timed out with { delivered : N0} /{ expected : N0} messages delivered.") ;
125+ }
126+
127+ delivered . ShouldBe ( expected , $ "Expected all broadcasts to reach listeners (delivered { delivered : N0} /{ expected : N0} ).") ;
128+ var throughput = delivered / Math . Max ( 1 , stopwatch . Elapsed . TotalSeconds ) ;
129+ _output . WriteLine ( $ "Broadcast delivered { delivered : N0} /{ expected : N0} in { stopwatch . Elapsed } . Throughput ≈ { throughput : N0} msg/s.") ;
96130
97131 return stopwatch . Elapsed ;
98132 }
99133 finally
100134 {
101135 await DisposeAsync ( connections ) ;
136+ DisposeApplications ( apps ) ;
102137 }
103138 }
104139
105- private async Task < TimeSpan > RunGroupScenarioAsync ( bool useOrleans , int port )
140+ private async Task < TimeSpan > RunGroupScenarioAsync ( bool useOrleans , int basePort )
106141 {
107- using var app = new TestWebApplication ( _cluster , port , useOrleans ) ;
108- var connections = await CreateConnectionsAsync ( app , GroupConnectionCount , "group" ) ;
142+ var apps = CreateApplications ( basePort , useOrleans ) ;
143+ var ( connections , _ ) = await CreateConnectionsAsync ( apps , GroupConnectionCount ) ;
109144
110145 try
111146 {
112147 var groupNames = Enumerable . Range ( 0 , GroupCount ) . Select ( i => $ "perf-group-{ i } ") . ToArray ( ) ;
113148 var groupMembers = groupNames . ToDictionary ( name => name , _ => new List < HubConnection > ( ) ) ;
114149
115150 long received = 0 ;
116- var expected = ( long ) GroupConnectionCount * GroupMessagesPerGroup ;
117151
118152 for ( var index = 0 ; index < connections . Count ; index ++ )
119153 {
@@ -125,53 +159,97 @@ private async Task<TimeSpan> RunGroupScenarioAsync(bool useOrleans, int port)
125159 await connection . InvokeAsync ( "AddToGroup" , groupName ) ;
126160 }
127161
162+ var activeGroups = groupMembers . Where ( kvp => kvp . Value . Count > 0 ) . ToArray ( ) ;
163+ var expected = activeGroups . Sum ( kvp => ( long ) kvp . Value . Count ) * GroupMessagesPerGroup ;
164+ expected . ShouldBeGreaterThan ( 0 , "At least one connection must belong to a group." ) ;
165+
128166 await Task . Delay ( TimeSpan . FromMilliseconds ( 250 ) ) ;
129167
130168 var starting = Interlocked . Read ( ref received ) ;
131-
132169 var stopwatch = Stopwatch . StartNew ( ) ;
170+ _output . WriteLine ( $ "Group run ({ GroupConnectionCount } connections across { activeGroups . Length } active groups × { GroupMessagesPerGroup : N0} messages).") ;
133171
134- foreach ( var group in groupNames )
172+ var sendTasks = activeGroups . Select ( tuple => Task . Run ( async ( ) =>
135173 {
136- var sender = groupMembers [ group ] . First ( ) ;
174+ var ( group , members ) = tuple ;
175+ var sender = members [ 0 ] ;
137176 for ( var message = 0 ; message < GroupMessagesPerGroup ; message ++ )
138177 {
139178 await sender . InvokeAsync ( "GroupSendAsync" , group , $ "payload-{ message } ") ;
140179 }
141- }
180+ } ) ) ;
181+ await Task . WhenAll ( sendTasks ) ;
182+
183+ var progressStep = Math . Max ( 1_000 , expected / 10 ) ;
184+ var lastReported = 0L ;
142185
143186 var completed = await WaitUntilAsync (
144187 ( ) => Interlocked . Read ( ref received ) - starting >= expected ,
145- TimeSpan . FromSeconds ( 30 ) ) ;
188+ timeout : TimeSpan . FromMinutes ( 2 ) ,
189+ _output ,
190+ ( ) =>
191+ {
192+ var delivered = Interlocked . Read ( ref received ) - starting ;
193+ if ( delivered - lastReported >= progressStep || delivered == expected )
194+ {
195+ lastReported = delivered ;
196+ return $ "{ delivered : N0} /{ expected : N0} ";
197+ }
198+
199+ return null ;
200+ } ) ;
146201
147202 stopwatch . Stop ( ) ;
148203
149- completed . ShouldBeTrue ( "Group messages were not delivered to all group members." ) ;
150- ( Interlocked . Read ( ref received ) - starting ) . ShouldBe ( expected ) ;
204+ var delivered = Interlocked . Read ( ref received ) - starting ;
205+ if ( ! completed )
206+ {
207+ _output . WriteLine ( $ "Group scenario timed out with { delivered : N0} /{ expected : N0} messages delivered.") ;
208+ }
209+
210+ delivered . ShouldBe ( expected , $ "Expected all group messages to reach listeners (delivered { delivered : N0} /{ expected : N0} ).") ;
211+ var throughput = delivered / Math . Max ( 1 , stopwatch . Elapsed . TotalSeconds ) ;
212+ _output . WriteLine ( $ "Group broadcast delivered { delivered : N0} /{ expected : N0} in { stopwatch . Elapsed } . Throughput ≈ { throughput : N0} msg/s.") ;
151213
152214 return stopwatch . Elapsed ;
153215 }
154216 finally
155217 {
156218 await DisposeAsync ( connections ) ;
219+ DisposeApplications ( apps ) ;
220+ }
221+ }
222+
223+ private List < TestWebApplication > CreateApplications ( int basePort , bool useOrleans )
224+ {
225+ var apps = new List < TestWebApplication > ( 4 ) ;
226+ for ( var i = 0 ; i < 4 ; i ++ )
227+ {
228+ var app = new TestWebApplication ( _cluster , basePort + i , useOrleans ) ;
229+ apps . Add ( app ) ;
157230 }
231+
232+ return apps ;
158233 }
159234
160- private static async Task < List < HubConnection > > CreateConnectionsAsync (
161- TestWebApplication app ,
162- int count ,
163- string label )
235+ private static async Task < ( List < HubConnection > Connections , int [ ] PerAppCounts ) > CreateConnectionsAsync (
236+ IReadOnlyList < TestWebApplication > apps ,
237+ int count )
164238 {
165239 var connections = new List < HubConnection > ( count ) ;
240+ var perAppCounts = new int [ apps . Count ] ;
166241
167242 for ( var index = 0 ; index < count ; index ++ )
168243 {
244+ var appIndex = index % apps . Count ;
245+ var app = apps [ appIndex ] ;
169246 var connection = app . CreateSignalRClient ( nameof ( SimpleTestHub ) ) ;
170247 await connection . StartAsync ( ) ;
171248 connections . Add ( connection ) ;
249+ perAppCounts [ appIndex ] ++ ;
172250 }
173251
174- return connections ;
252+ return ( connections , perAppCounts ) ;
175253 }
176254
177255 private static async Task DisposeAsync ( IEnumerable < HubConnection > connections )
@@ -193,17 +271,39 @@ private static async Task DisposeAsync(IEnumerable<HubConnection> connections)
193271 }
194272 }
195273
196- private static async Task < bool > WaitUntilAsync ( Func < bool > condition , TimeSpan timeout )
274+ private static void DisposeApplications ( IEnumerable < TestWebApplication > apps )
197275 {
198- var deadline = DateTime . UtcNow + timeout ;
276+ foreach ( var app in apps )
277+ {
278+ app . Dispose ( ) ;
279+ }
280+ }
199281
200- while ( DateTime . UtcNow < deadline )
282+ private static async Task < bool > WaitUntilAsync (
283+ Func < bool > condition ,
284+ TimeSpan ? timeout ,
285+ ITestOutputHelper output ,
286+ Func < string ? > ? progress = null )
287+ {
288+ var start = DateTime . UtcNow ;
289+ var deadline = timeout . HasValue ? start + timeout . Value : ( DateTime ? ) null ;
290+
291+ while ( ! deadline . HasValue || DateTime . UtcNow < deadline . Value )
201292 {
202293 if ( condition ( ) )
203294 {
204295 return true ;
205296 }
206297
298+ if ( progress is not null )
299+ {
300+ var status = progress ( ) ;
301+ if ( ! string . IsNullOrEmpty ( status ) )
302+ {
303+ output . WriteLine ( $ "Progress: { status } ") ;
304+ }
305+ }
306+
207307 await Task . Delay ( TimeSpan . FromMilliseconds ( 100 ) ) ;
208308 }
209309
0 commit comments