@@ -333,6 +333,8 @@ await ch.BasicPublishAsync(exchange: string.Empty, routingKey: queueName,
333
333
[ Fact ]
334
334
public async Task MaybeSomethingUpWithRateLimiter_GH1793 ( )
335
335
{
336
+ const int messageCount = 16 ;
337
+
336
338
_connFactory = new ConnectionFactory
337
339
{
338
340
AutomaticRecoveryEnabled = true
@@ -343,7 +345,7 @@ public async Task MaybeSomethingUpWithRateLimiter_GH1793()
343
345
var channelOpts = new CreateChannelOptions (
344
346
publisherConfirmationsEnabled : true ,
345
347
publisherConfirmationTrackingEnabled : true ,
346
- outstandingPublisherConfirmationsRateLimiter : new ThrottlingRateLimiter ( 256 )
348
+ outstandingPublisherConfirmationsRateLimiter : new ThrottlingRateLimiter ( messageCount )
347
349
) ;
348
350
349
351
_channel = await _conn . CreateChannelAsync ( channelOpts ) ;
@@ -352,22 +354,40 @@ public async Task MaybeSomethingUpWithRateLimiter_GH1793()
352
354
{
353
355
DeliveryMode = DeliveryModes . Persistent
354
356
} ;
355
- int retryCount = 0 ;
356
- const int maxRetries = 3 ;
357
- while ( retryCount <= maxRetries )
357
+
358
+ async Task PublishMessagesAsync ( )
358
359
{
359
- try
360
- {
361
- var bytes = Encoding . UTF8 . GetBytes ( "message" ) ;
362
- await _channel . BasicPublishAsync ( string . Empty , string . Empty , true , properties , bytes ) ;
363
- break ;
364
- }
365
- catch ( Exception ex )
360
+ for ( int i = 0 ; i < messageCount ; i ++ )
366
361
{
367
- retryCount ++ ;
368
- _output . WriteLine ( "{0} exception: {1}" , _testDisplayName , ex ) ;
362
+ int retryCount = 0 ;
363
+ const int maxRetries = 3 ;
364
+ while ( retryCount <= maxRetries )
365
+ {
366
+ try
367
+ {
368
+ byte [ ] bytes = Encoding . UTF8 . GetBytes ( "message" ) ;
369
+ await _channel . BasicPublishAsync ( string . Empty , string . Empty , true , properties , bytes ) ;
370
+ break ;
371
+ }
372
+ catch ( SemaphoreFullException ex0 )
373
+ {
374
+ _output . WriteLine ( "{0} ex: {1}" , _testDisplayName , ex0 ) ;
375
+ retryCount ++ ;
376
+ }
377
+ catch ( PublishException )
378
+ {
379
+ retryCount ++ ;
380
+ }
381
+ }
369
382
}
370
383
}
384
+
385
+ var publishTasks = new List < Task > ( ) ;
386
+ for ( int i = 0 ; i < messageCount ; i ++ )
387
+ {
388
+ publishTasks . Add ( Task . Run ( PublishMessagesAsync ) ) ;
389
+ }
390
+ await Task . WhenAll ( publishTasks ) ;
371
391
}
372
392
}
373
393
}
0 commit comments