@@ -99,7 +99,7 @@ public async Task TestBasicRoundtripConcurrent()
99
99
} ) ;
100
100
} ;
101
101
102
- consumer . Received += ( o , a ) =>
102
+ consumer . Received += async ( o , a ) =>
103
103
{
104
104
string decoded = _encoding . GetString ( a . Body . ToArray ( ) ) ;
105
105
if ( decoded == publish1 )
@@ -115,10 +115,13 @@ public async Task TestBasicRoundtripConcurrent()
115
115
var ex = new InvalidOperationException ( "incorrect message - should never happen!" ) ;
116
116
SetException ( ex , publish1SyncSource , publish2SyncSource ) ;
117
117
}
118
- return Task . CompletedTask ;
118
+
119
+ AsyncEventingBasicConsumer cons = ( AsyncEventingBasicConsumer ) o ;
120
+ await cons . Channel . BasicAckAsync ( a . DeliveryTag , false ) ;
119
121
} ;
120
122
121
- await _channel . BasicConsumeAsync ( qname , true , ConsumerTag . Empty , false , false , null , consumer ) ;
123
+ await _channel . BasicQosAsync ( 0 , 1 , false ) ;
124
+ await _channel . BasicConsumeAsync ( qname , autoAck : false , ConsumerTag . Empty , false , false , null , consumer ) ;
122
125
123
126
// ensure we get a delivery
124
127
await AssertRanToCompletion ( publish1SyncSource . Task , publish2SyncSource . Task ) ;
@@ -216,8 +219,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
216
219
217
220
for ( int i = 0 ; i < publish_total ; i ++ )
218
221
{
219
- await publishChannel . BasicPublishAsync ( ExchangeName . Empty , ( RoutingKey ) queueName , body1 ) ;
220
- await publishChannel . BasicPublishAsync ( ExchangeName . Empty , ( RoutingKey ) queueName , body2 ) ;
222
+ await publishChannel . BasicPublishAsync ( ExchangeName . Empty , ( RoutingKey ) queueName , body1 , mandatory : true ) ;
223
+ await publishChannel . BasicPublishAsync ( ExchangeName . Empty , ( RoutingKey ) queueName , body2 , mandatory : true ) ;
221
224
await publishChannel . WaitForConfirmsOrDieAsync ( ) ;
222
225
}
223
226
@@ -260,7 +263,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
260
263
int publish1_count = 0 ;
261
264
int publish2_count = 0 ;
262
265
263
- consumer . Received += ( o , a ) =>
266
+ consumer . Received += async ( o , a ) =>
264
267
{
265
268
string decoded = _encoding . GetString ( a . Body . ToArray ( ) ) ;
266
269
if ( decoded == publish1 )
@@ -282,10 +285,13 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
282
285
var ex = new InvalidOperationException ( "incorrect message - should never happen!" ) ;
283
286
SetException ( ex , publish1SyncSource , publish2SyncSource ) ;
284
287
}
285
- return Task . CompletedTask ;
288
+
289
+ AsyncEventingBasicConsumer cons = ( AsyncEventingBasicConsumer ) o ;
290
+ await cons . Channel . BasicAckAsync ( a . DeliveryTag , false ) ;
286
291
} ;
287
292
288
- await consumeChannel . BasicConsumeAsync ( queueName , true , ConsumerTag . Empty , false , false , null , consumer ) ;
293
+ await consumeChannel . BasicQosAsync ( 0 , 1 , false ) ;
294
+ await consumeChannel . BasicConsumeAsync ( queueName , autoAck : false , ConsumerTag . Empty , false , false , null , consumer ) ;
289
295
await consumerSyncSource . Task ;
290
296
291
297
await consumeChannel . CloseAsync ( ) ;
0 commit comments