1+ using System ;
2+ using System . Collections . Generic ;
3+ using System . Threading ;
4+ using System . Threading . Tasks ;
5+ using Microsoft . Extensions . Logging ;
6+ using Moq ;
7+ using RabbitMQ . Client . Core . DependencyInjection . BatchMessageHandlers ;
8+ using RabbitMQ . Client . Core . DependencyInjection . Configuration ;
9+ using RabbitMQ . Client . Core . DependencyInjection . Models ;
10+ using RabbitMQ . Client . Core . DependencyInjection . Services ;
11+ using RabbitMQ . Client . Core . DependencyInjection . Tests . UnitTests . Stubs ;
12+ using RabbitMQ . Client . Events ;
13+ using Xunit ;
14+
15+ namespace RabbitMQ . Client . Core . DependencyInjection . Tests . UnitTests
16+ {
17+ public class BatchMessageHandlerTests
18+ {
19+ [ Theory ]
20+ [ InlineData ( 1 , 10 ) ]
21+ [ InlineData ( 5 , 47 ) ]
22+ [ InlineData ( 10 , 185 ) ]
23+ [ InlineData ( 16 , 200 ) ]
24+ [ InlineData ( 20 , 310 ) ]
25+ [ InlineData ( 25 , 400 ) ]
26+ public async Task ShouldProperlyHandlerMessagesByBatches ( ushort prefetchCount , int numberOfMessages )
27+ {
28+ const string queueName = "queue.name" ;
29+
30+ var channelMock = new Mock < IModel > ( ) ;
31+ var connectionMock = new Mock < IConnection > ( ) ;
32+ connectionMock . Setup ( x => x . CreateModel ( ) )
33+ . Returns ( channelMock . Object ) ;
34+
35+ var connectionFactoryMock = new Mock < IRabbitMqConnectionFactory > ( ) ;
36+ connectionFactoryMock . Setup ( x => x . CreateRabbitMqConnection ( It . IsAny < RabbitMqClientOptions > ( ) ) )
37+ . Returns ( connectionMock . Object ) ;
38+
39+ var consumer = new AsyncEventingBasicConsumer ( channelMock . Object ) ;
40+ connectionFactoryMock . Setup ( x => x . CreateConsumer ( It . IsAny < IModel > ( ) ) )
41+ . Returns ( consumer ) ;
42+
43+ var callerMock = new Mock < IStubCaller > ( ) ;
44+
45+ var messageHandler = CreateBatchMessageHandler ( queueName , prefetchCount , connectionFactoryMock . Object , callerMock . Object ) ;
46+ await messageHandler . StartAsync ( CancellationToken . None ) ;
47+
48+ for ( var i = 0 ; i < numberOfMessages ; i ++ )
49+ {
50+ await consumer . HandleBasicDeliver (
51+ "1" ,
52+ ( ulong ) numberOfMessages ,
53+ false ,
54+ "exchange" ,
55+ "routing,key" ,
56+ null ,
57+ new ReadOnlyMemory < byte > ( ) ) ;
58+ }
59+
60+ var numberOfBatches = numberOfMessages / prefetchCount ;
61+ callerMock . Verify ( x => x . EmptyCall ( ) , Times . Exactly ( numberOfBatches ) ) ;
62+
63+ var processedMessages = numberOfBatches * prefetchCount ;
64+ callerMock . Verify ( x => x . Call ( It . IsAny < string > ( ) ) , Times . Exactly ( processedMessages ) ) ;
65+
66+ await messageHandler . StopAsync ( CancellationToken . None ) ;
67+ }
68+
69+ BatchMessageHandler CreateBatchMessageHandler (
70+ string queueName ,
71+ ushort prefetchCount ,
72+ IRabbitMqConnectionFactory connectionFactory ,
73+ IStubCaller caller )
74+ {
75+ var connectionOptions = new BatchConsumerConnectionOptions
76+ {
77+ Type = typeof ( StubBatchMessageHandler ) ,
78+ ClientOptions = new RabbitMqClientOptions ( )
79+ } ;
80+ var loggerMock = new Mock < ILogger < StubBatchMessageHandler > > ( ) ;
81+ return new StubBatchMessageHandler (
82+ caller ,
83+ connectionFactory ,
84+ new List < BatchConsumerConnectionOptions > { connectionOptions } ,
85+ loggerMock . Object )
86+ {
87+ QueueName = queueName ,
88+ PrefetchCount = prefetchCount
89+ } ;
90+ }
91+ }
92+ }
0 commit comments