@@ -32,11 +32,13 @@ public async void NumberOfConnectionsShouldBeEqualsToThePartitions()
3232 SystemUtils . ResetSuperStreams ( ) ;
3333 var system = await StreamSystem . Create ( new StreamSystemConfig ( ) ) ;
3434 var connectionName = Guid . NewGuid ( ) . ToString ( ) ;
35- var consumer = await system . CreateSuperStreamConsumer ( new RawSuperStreamConsumerConfig ( SystemUtils . InvoicesExchange )
36- {
37- ClientProvidedName = connectionName ,
38- OffsetSpec = await SystemUtils . OffsetsForSuperStreamConsumer ( system , "invoices" , new OffsetTypeFirst ( ) )
39- } ) ;
35+ var consumer = await system . CreateSuperStreamConsumer (
36+ new RawSuperStreamConsumerConfig ( SystemUtils . InvoicesExchange )
37+ {
38+ ClientProvidedName = connectionName ,
39+ OffsetSpec =
40+ await SystemUtils . OffsetsForSuperStreamConsumer ( system , "invoices" , new OffsetTypeFirst ( ) )
41+ } ) ;
4042
4143 Assert . NotNull ( consumer ) ;
4244 SystemUtils . Wait ( ) ;
@@ -52,28 +54,30 @@ public async void NumberOfMessagesConsumedShouldBeEqualsToPublished()
5254
5355 var testPassed = new TaskCompletionSource < int > ( ) ;
5456 var listConsumed = new ConcurrentBag < string > ( ) ;
57+
5558 var consumedMessages = 0 ;
5659 const int NumberOfMessages = 20 ;
5760 var system = await StreamSystem . Create ( new StreamSystemConfig ( ) ) ;
5861 await SystemUtils . PublishMessagesSuperStream ( system , "invoices" , NumberOfMessages , "" , _testOutputHelper ) ;
5962 var clientProvidedName = Guid . NewGuid ( ) . ToString ( ) ;
6063
61- var consumer = await system . CreateSuperStreamConsumer ( new RawSuperStreamConsumerConfig ( SystemUtils . InvoicesExchange )
62- {
63- ClientProvidedName = clientProvidedName ,
64- OffsetSpec = await SystemUtils . OffsetsForSuperStreamConsumer ( system , "invoices" , new OffsetTypeFirst ( ) ) ,
65- MessageHandler = ( stream , consumer1 , context , message ) =>
64+ var consumer = await system . CreateSuperStreamConsumer (
65+ new RawSuperStreamConsumerConfig ( SystemUtils . InvoicesExchange )
6666 {
67- listConsumed . Add ( stream ) ;
68- Interlocked . Increment ( ref consumedMessages ) ;
69- if ( consumedMessages == NumberOfMessages )
67+ ClientProvidedName = clientProvidedName ,
68+ OffsetSpec = await SystemUtils . OffsetsForSuperStreamConsumer ( system , "invoices" , new OffsetTypeFirst ( ) ) ,
69+ MessageHandler = ( stream , consumer1 , context , message ) =>
7070 {
71- testPassed . SetResult ( NumberOfMessages ) ;
72- }
71+ listConsumed . Add ( stream ) ;
72+ Interlocked . Increment ( ref consumedMessages ) ;
73+ if ( consumedMessages == NumberOfMessages )
74+ {
75+ testPassed . SetResult ( NumberOfMessages ) ;
76+ }
7377
74- return Task . CompletedTask ;
75- }
76- } ) ;
78+ return Task . CompletedTask ;
79+ }
80+ } ) ;
7781
7882 Assert . NotNull ( consumer ) ;
7983 SystemUtils . Wait ( ) ;
@@ -93,10 +97,8 @@ public async void RemoveOneConnectionIfaStreamIsDeleted()
9397 // This is to test the metadata update functionality
9498 var system = await StreamSystem . Create ( new StreamSystemConfig ( ) ) ;
9599 var clientProvidedName = Guid . NewGuid ( ) . ToString ( ) ;
96- var consumer = await system . CreateSuperStreamConsumer ( new RawSuperStreamConsumerConfig ( SystemUtils . InvoicesExchange )
97- {
98- ClientProvidedName = clientProvidedName ,
99- } ) ;
100+ var consumer = await system . CreateSuperStreamConsumer (
101+ new RawSuperStreamConsumerConfig ( SystemUtils . InvoicesExchange ) { ClientProvidedName = clientProvidedName , } ) ;
100102
101103 Assert . NotNull ( consumer ) ;
102104 SystemUtils . Wait ( ) ;
@@ -202,18 +204,20 @@ public async void MoreConsumersNumberOfMessagesConsumedShouldBeEqualsToPublished
202204
203205 async Task < IConsumer > NewConsumer ( )
204206 {
205- var iConsumer = await system . CreateSuperStreamConsumer ( new RawSuperStreamConsumerConfig ( SystemUtils . InvoicesExchange )
206- {
207- ClientProvidedName = clientProvidedName ,
208- OffsetSpec = await SystemUtils . OffsetsForSuperStreamConsumer ( system , "invoices" , new OffsetTypeFirst ( ) ) ,
209- IsSingleActiveConsumer = consumerExpected . IsSingleActiveConsumer ,
210- Reference = "super_stream_consumer_name" ,
211- MessageHandler = ( stream , consumer1 , context , message ) =>
207+ var iConsumer = await system . CreateSuperStreamConsumer (
208+ new RawSuperStreamConsumerConfig ( SystemUtils . InvoicesExchange )
212209 {
213- listConsumed . Add ( stream ) ;
214- return Task . CompletedTask ;
215- }
216- } ) ;
210+ ClientProvidedName = clientProvidedName ,
211+ OffsetSpec =
212+ await SystemUtils . OffsetsForSuperStreamConsumer ( system , "invoices" , new OffsetTypeFirst ( ) ) ,
213+ IsSingleActiveConsumer = consumerExpected . IsSingleActiveConsumer ,
214+ Reference = "super_stream_consumer_name" ,
215+ MessageHandler = ( stream , consumer1 , context , message ) =>
216+ {
217+ listConsumed . Add ( stream ) ;
218+ return Task . CompletedTask ;
219+ }
220+ } ) ;
217221 return iConsumer ;
218222 }
219223
@@ -249,18 +253,25 @@ public async void ReliableConsumerNumberOfMessagesConsumedShouldBeEqualsToPublis
249253 var system = await StreamSystem . Create ( new StreamSystemConfig ( ) ) ;
250254 await SystemUtils . PublishMessagesSuperStream ( system , SystemUtils . InvoicesExchange , 20 , "" , _testOutputHelper ) ;
251255 var listConsumed = new ConcurrentBag < string > ( ) ;
256+ var testPassed = new TaskCompletionSource < bool > ( ) ;
252257 var consumer = await Consumer . Create ( new ConsumerConfig ( system , SystemUtils . InvoicesExchange )
253258 {
254259 OffsetSpec = new OffsetTypeFirst ( ) ,
255260 IsSuperStream = true ,
256261 MessageHandler = ( stream , consumer1 , context , message ) =>
257262 {
258263 listConsumed . Add ( stream ) ;
264+ if ( listConsumed . Count == 20 )
265+ {
266+ testPassed . SetResult ( true ) ;
267+ }
268+
259269 return Task . CompletedTask ;
260270 }
261271 } ) ;
262272
263- SystemUtils . Wait ( TimeSpan . FromSeconds ( 2 ) ) ;
273+ new Utils < bool > ( _testOutputHelper ) . WaitUntilTaskCompletes ( testPassed ) ;
274+ Assert . True ( testPassed . Task . Result ) ;
264275 Assert . Equal ( 9 ,
265276 listConsumed . Sum ( x => x == SystemUtils . InvoicesStream0 ? 1 : 0 ) ) ;
266277 Assert . Equal ( 7 ,
@@ -289,6 +300,7 @@ public async void ReliableConsumerNumberOfMessagesConsumedShouldBeEqualsToPublis
289300 var listConsumed = new ConcurrentBag < string > ( ) ;
290301 var reference = Guid . NewGuid ( ) . ToString ( ) ;
291302 var consumers = new List < Consumer > ( ) ;
303+ var consumerMessageReceived = new TaskCompletionSource < bool > ( ) ;
292304
293305 async Task < Consumer > NewReliableConsumer ( string refConsumer , string clientProvidedName ,
294306 Func < string , string , bool , Task < IOffsetType > > consumerUpdateListener
@@ -305,7 +317,12 @@ Func<string, string, bool, Task<IOffsetType>> consumerUpdateListener
305317 MessageHandler = async ( stream , consumer1 , context , message ) =>
306318 {
307319 await consumer1 . StoreOffset ( context . Offset ) ;
320+
308321 listConsumed . Add ( stream ) ;
322+ if ( listConsumed . Count == 20 )
323+ {
324+ consumerMessageReceived . SetResult ( true ) ;
325+ }
309326 }
310327 } ) ;
311328 }
@@ -319,14 +336,11 @@ Func<string, string, bool, Task<IOffsetType>> consumerUpdateListener
319336 for ( var i = 0 ; i < 2 ; i ++ )
320337 {
321338 var consumer = await NewReliableConsumer ( reference , Guid . NewGuid ( ) . ToString ( ) ,
322- ( consumerRef , stream , arg3 ) =>
323- {
324- return new Task < IOffsetType > ( ( ) => new OffsetTypeFirst ( ) ) ;
325- } ) ;
339+ ( consumerRef , stream , arg3 ) => { return new Task < IOffsetType > ( ( ) => new OffsetTypeFirst ( ) ) ; } ) ;
326340 consumers . Add ( consumer ) ;
327341 }
328342
329- SystemUtils . Wait ( TimeSpan . FromSeconds ( 1 ) ) ;
343+ new Utils < bool > ( _testOutputHelper ) . WaitUntilTaskCompletes ( consumerMessageReceived ) ;
330344 // The sum og the messages must be 20 as the publisher published 20 messages
331345 Assert . Equal ( 9 ,
332346 listConsumed . Sum ( x => x == SystemUtils . InvoicesStream0 ? 1 : 0 ) ) ;
@@ -371,6 +385,7 @@ public async void SaCAddNewConsumerShouldReceiveAllTheMessage()
371385 var system = await StreamSystem . Create ( new StreamSystemConfig ( ) ) ;
372386 await SystemUtils . PublishMessagesSuperStream ( system , SystemUtils . InvoicesExchange , 20 , "" , _testOutputHelper ) ;
373387 var listConsumed = new ConcurrentBag < string > ( ) ;
388+ var firstConsumerMessageReceived = new TaskCompletionSource < bool > ( ) ;
374389 const string Reference = "My-group-app" ;
375390 var firstConsumer = await Consumer . Create ( new ConsumerConfig ( system , SystemUtils . InvoicesExchange )
376391 {
@@ -381,16 +396,24 @@ public async void SaCAddNewConsumerShouldReceiveAllTheMessage()
381396 MessageHandler = ( stream , consumer1 , context , message ) =>
382397 {
383398 listConsumed . Add ( stream ) ;
399+ if ( listConsumed . Count == 20 )
400+ {
401+ firstConsumerMessageReceived . SetResult ( true ) ;
402+ }
403+
384404 return Task . CompletedTask ;
385405 }
386406 } ) ;
387407
388- SystemUtils . Wait ( TimeSpan . FromSeconds ( 1 ) ) ;
408+ new Utils < bool > ( _testOutputHelper ) . WaitUntilTaskCompletes ( firstConsumerMessageReceived ) ;
409+ Assert . True ( firstConsumerMessageReceived . Task . Result ) ;
389410 // the first consumer consumes all the messages and have to be like the messages published
390411 Assert . Equal ( 20 , listConsumed . Count ) ;
391412 SystemUtils . Wait ( TimeSpan . FromSeconds ( 1 ) ) ;
392413 // the second consumer joins the group and consumes the messages only from one partition
393414 var listSecondConsumed = new ConcurrentBag < string > ( ) ;
415+ var secondConsumerMessageReceived = new TaskCompletionSource < bool > ( ) ;
416+
394417 var secondConsumer = await Consumer . Create ( new ConsumerConfig ( system , SystemUtils . InvoicesExchange )
395418 {
396419 OffsetSpec = new OffsetTypeFirst ( ) ,
@@ -400,15 +423,21 @@ public async void SaCAddNewConsumerShouldReceiveAllTheMessage()
400423 MessageHandler = ( stream , consumer1 , context , message ) =>
401424 {
402425 listSecondConsumed . Add ( stream ) ;
426+ // When the second consumer joins the group it consumes only from one partition
427+ // We don't know which partition will be consumed
428+ // so the test is to check if there are at least 4 messages consumed
429+ // that is the partition with less messages
430+ if ( listSecondConsumed . Count >= 4 )
431+ {
432+ secondConsumerMessageReceived . SetResult ( true ) ;
433+ }
434+
403435 return Task . CompletedTask ;
404436 }
405437 } ) ;
406438
407- SystemUtils . Wait ( TimeSpan . FromSeconds ( 1 ) ) ;
408- // When the second consumer joins the group it consumes only from one partition
409- // We don't know which partition will be consumed
410- // so the test is to check if there are at least 4 messages consumed
411- // that is the partition with less messages
439+ new Utils < bool > ( _testOutputHelper ) . WaitUntilTaskCompletes ( secondConsumerMessageReceived ) ;
440+ Assert . True ( secondConsumerMessageReceived . Task . Result ) ;
412441 Assert . True ( listSecondConsumed . Count >= 4 ) ;
413442
414443 await firstConsumer . Close ( ) ;
0 commit comments