11using System ;
2+ using System . Collections . Concurrent ;
23using System . Collections . Generic ;
34using System . Linq ;
4- using System . Text ;
55using System . Threading ;
66using System . Threading . Tasks ;
77using Microsoft . Extensions . Hosting ;
1414
1515namespace RabbitMQ . Client . Core . DependencyInjection . BatchMessageHandlers
1616{
17+ /// <summary>
18+ /// A message handler that handles messages in batches.
19+ /// </summary>
1720 public abstract class BaseBatchMessageHandler : IHostedService , IDisposable
1821 {
19- protected virtual TimeSpan DueTo { get ; set ; } = TimeSpan . Zero ;
22+ /// <summary>
23+ /// A connection which is in use by batch message handler.
24+ /// </summary>
25+ protected IConnection Connection { get ; private set ; }
2026
21- protected abstract TimeSpan Period { get ; set ; }
22-
27+ /// <summary>
28+ /// A channel that has been created using the connection.
29+ /// </summary>
30+ protected IModel Channel { get ; private set ; }
31+
32+ /// <summary>
33+ /// Prefetch size value that can be overriden.
34+ /// </summary>
2335 protected virtual uint PrefetchSize { get ; set ; } = 0 ;
2436
37+ /// <summary>
38+ /// Queue name which will be read by that batch message handler.
39+ /// </summary>
2540 protected abstract string QueueName { get ; set ; }
2641
42+ /// <summary>
43+ /// Prefetch count value (batch size).
44+ /// </summary>
2745 protected abstract ushort PrefetchCount { get ; set ; }
2846
2947 readonly RabbitMqClientOptions _clientOptions ;
3048 readonly ILogger < BaseBatchMessageHandler > _logger ;
31- Timer _timer ;
3249
3350 protected BaseBatchMessageHandler (
3451 IEnumerable < BatchConsumerConnectionOptions > batchConsumerConnectionOptions ,
@@ -47,12 +64,28 @@ protected BaseBatchMessageHandler(
4764 public Task StartAsync ( CancellationToken cancellationToken )
4865 {
4966 ValidateProperties ( ) ;
50- _logger . LogInformation ( "BatchMessageHandler has been started." ) ;
51- _timer = new Timer (
52- async state => await StartPeriodicJob ( cancellationToken ) ,
53- null ,
54- DueTo ,
55- Period ) ;
67+ _logger . LogInformation ( $ "Batch message handler { GetType ( ) } has been started.") ;
68+ Connection = RabbitMqFactoryExtensions . CreateRabbitMqConnection ( _clientOptions ) ;
69+ Channel = Connection . CreateModel ( ) ;
70+ Channel . BasicQos ( PrefetchSize , PrefetchCount , false ) ;
71+
72+ var messages = new ConcurrentBag < BasicDeliverEventArgs > ( ) ;
73+ var consumer = new AsyncEventingBasicConsumer ( Channel ) ;
74+ consumer . Received += async ( sender , eventArgs ) =>
75+ {
76+ messages . Add ( eventArgs ) ;
77+ if ( messages . Count < PrefetchCount )
78+ {
79+ return ;
80+ }
81+
82+ var byteMessages = messages . Select ( x => x . Body ) . ToList ( ) ;
83+ await HandleMessages ( byteMessages , cancellationToken ) ;
84+ var latestDeliveryTag = messages . Max ( x => x . DeliveryTag ) ;
85+ messages . Clear ( ) ;
86+ Channel . BasicAck ( latestDeliveryTag , true ) ;
87+ } ;
88+ Channel . BasicConsume ( queue : QueueName , autoAck : false , consumer : consumer ) ;
5689 return Task . CompletedTask ;
5790 }
5891
@@ -69,40 +102,24 @@ void ValidateProperties()
69102 }
70103 }
71104
72- async Task StartPeriodicJob ( CancellationToken cancellationToken )
73- {
74- while ( ! cancellationToken . IsCancellationRequested )
75- {
76- using var connection = RabbitMqFactoryExtensions . CreateRabbitMqConnection ( _clientOptions ) ;
77- using var channel = connection . CreateModel ( ) ;
78-
79- channel . BasicQos ( PrefetchSize , PrefetchCount , false ) ;
80-
81- var consumer = new AsyncEventingBasicConsumer ( channel ) ;
82- consumer . Received += async ( sender , eventArgs ) => await HandleMessageReceivingEvent ( eventArgs ) ;
83- channel . BasicConsume ( queue : QueueName , autoAck : false , consumer : consumer ) ;
84- }
85- }
86-
87- async Task HandleMessageReceivingEvent ( BasicDeliverEventArgs eventArgs )
88- {
89- var message = Encoding . UTF8 . GetString ( eventArgs . Body . ToArray ( ) ) ;
90- _logger . LogInformation ( $ "A new message was received with deliveryTag { eventArgs . DeliveryTag } .") ;
91- _logger . LogInformation ( message ) ;
92- }
93-
94- protected abstract Task HandleMessages ( ) ;
105+ /// <summary>
106+ /// Handle a batch of messages.
107+ /// </summary>
108+ /// <param name="messages">A collection of messages as bytes.</param>
109+ /// <param name="cancellationToken">Cancellation token.</param>
110+ /// <returns></returns>
111+ protected abstract Task HandleMessages ( IEnumerable < ReadOnlyMemory < byte > > messages , CancellationToken cancellationToken ) ;
95112
96113 public Task StopAsync ( CancellationToken cancellationToken )
97114 {
98- _logger . LogInformation ( "BatchMessageHandler has been stopped." ) ;
99- _timer ? . Change ( Timeout . Infinite , 0 ) ;
115+ _logger . LogInformation ( $ "Batch message handler { GetType ( ) } has been stopped.") ;
100116 return Task . CompletedTask ;
101117 }
102118
103119 public void Dispose ( )
104120 {
105- _timer ? . Dispose ( ) ;
121+ Connection ? . Dispose ( ) ;
122+ Channel ? . Dispose ( ) ;
106123 }
107124 }
108125}
0 commit comments