Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit 9ee705e

Browse files
author
Anton Vorontsov
committed
Added changelog and readme changes.
1 parent a9d80da commit 9ee705e

File tree

5 files changed

+144
-4
lines changed

5 files changed

+144
-4
lines changed

docs/changelog.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,19 @@
22

33
All notable changes to this library will be documented in this file.
44

5+
## [4.0.0] - 2020-05-05
6+
7+
### Added
8+
9+
- `BaseBatchMessageHandler` and `BatchMessageHandler` for handling messages in batches via prefetch count feature.
10+
- Example of basic usages of batch message handlers.
11+
12+
### Updated
13+
14+
- Updated RabbitMQ.Client to the newest version 6.0.0. Made some changes according to the breaking changes that come with the newest version of RabbitMQ.Client.
15+
- Moved message handlers to the different namespace `RabbitMQ.Client.Core.DependencyInjection.MessageHandlers`.
16+
- Moved internal DI extensions to the different namespace `RabbitMQ.Client.Core.DependencyInjection.InternalExtensions`.
17+
518
## [3.2.1] - 2020-03-29
619

720
### Fixed

docs/message-consumption.md

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,84 @@ The message handling process is organized as follows:
311311
- If any exception occurs `IMessageHandlingService` acknowledges the message anyway and checks if the message has to be re-send. If exchange option `RequeueFailedMessages` is set `true` then `IMessageHandlingService` adds a header `"requeued"` to the message and sends it again with delay in 60 seconds. Mechanism of sending delayed messages covered in the message production [documentation](message-production.md).
312312
- If any exception occurs within handling the message that has been already re-sent that message will not be re-send again (re-send happens only once).
313313

314+
### Batch message handlers
315+
316+
There are also a feature that you can use in case of necessity of handling messages in batches.
317+
First of all you have to create a class that inherits a `BatchMessageHandler` class.
318+
You have to set up values for `QueueName` and `PrefetchCount` properties. These values are responsible for the queue that will be read by the message handler, and the size of batches of messages.
319+
320+
```c#
321+
public class CustomBatchMessageHandler : BatchMessageHandler
322+
{
323+
readonly ILogger<CustomBatchMessageHandler> _logger;
324+
325+
public CustomBatchMessageHandler(
326+
IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions,
327+
ILogger<CustomBatchMessageHandler> logger)
328+
: base(batchConsumerConnectionOptions, logger)
329+
{
330+
_logger = logger;
331+
}
332+
333+
protected override ushort PrefetchCount { get; set; } = 50;
334+
335+
protected override string QueueName { get; set; } = "another.queue.name";
336+
337+
protected override Task HandleMessage(IEnumerable<string> messages, CancellationToken cancellationToken)
338+
{
339+
_logger.LogInformation("Handling a batch of messages.");
340+
foreach (var message in messages)
341+
{
342+
_logger.LogInformation(message);
343+
}
344+
return Task.CompletedTask;
345+
}
346+
}
347+
```
348+
349+
If you want to get raw messages as `ReadOnlyMemory<byte>` you can inherit base message handler class.
350+
351+
```c#
352+
public class CustomBatchMessageHandler : BaseBatchMessageHandler
353+
{
354+
readonly ILogger<CustomBatchMessageHandler> _logger;
355+
356+
public CustomBatchMessageHandler(
357+
IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions,
358+
ILogger<CustomBatchMessageHandler> logger)
359+
: base(batchConsumerConnectionOptions, logger)
360+
{
361+
_logger = logger;
362+
}
363+
364+
protected override ushort PrefetchCount { get; set; } = 3;
365+
366+
protected override string QueueName { get; set; } = "queue.name";
367+
368+
protected override Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken)
369+
{
370+
_logger.LogInformation("Handling a batch of messages.");
371+
foreach (var message in messages)
372+
{
373+
var stringifiedMessage = Encoding.UTF8.GetString(message.ToArray());
374+
_logger.LogInformation(stringifiedMessage);
375+
}
376+
return Task.CompletedTask;
377+
}
378+
}
379+
```
380+
381+
After all you have to register that batch message handler via DI.
382+
```c#
383+
services.AddBatchMessageHandler<CustomBatchMessageHandler>(Configuration.GetSection("RabbitMq"));
384+
```
385+
386+
The message handler will create a separate connection and use it for reading messages.
387+
When the message collection is full to the size of `PrefetchCount` they are passed to the `HandleMessage` method.
388+
Both `BaseBatchMessageHandler` and `BatchMessageHandler` implement `IDisposable` interface, so you can use it for release of resources.
389+
390+
Use this method of getting messages only when you sure that the number of messages that pass through this queue is really huge. Otherwise, messages could stack in the temporary collection of messages waiting to get in full.
391+
314392
For message production features see the [Previous page](message-production.md)
315393

316394
For more information about advanced usage of the RabbitMQ client see the [Next page](advanced-usage.md)

readme.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,49 @@ Exchange sections define how to bind queues and exchanges with each other using
237237

238238
For more information about `appsettings.json` and manual configuration features, see [rabbit-configuration](./docs/rabbit-configuration.md) and [exchange-configuration](./docs/exchange-configuration.md) documentation files.
239239

240+
## Batch message handlers
241+
242+
There are also a feature that you can use in case of necessity of handling messages in batches.
243+
First of all you have to create a class that inherits a `BatchMessageHandler` class.
244+
You have to set up values for `QueueName` and `PrefetchCount` properties. These values are responsible for the queue that will be read by the message handler, and the size of batches of messages.
245+
246+
```c#
247+
public class CustomBatchMessageHandler : BatchMessageHandler
248+
{
249+
readonly ILogger<CustomBatchMessageHandler> _logger;
250+
251+
public CustomBatchMessageHandler(
252+
IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions,
253+
ILogger<CustomBatchMessageHandler> logger)
254+
: base(batchConsumerConnectionOptions, logger)
255+
{
256+
_logger = logger;
257+
}
258+
259+
protected override ushort PrefetchCount { get; set; } = 50;
260+
261+
protected override string QueueName { get; set; } = "another.queue.name";
262+
263+
protected override Task HandleMessage(IEnumerable<string> messages, CancellationToken cancellationToken)
264+
{
265+
_logger.LogInformation("Handling a batch of messages.");
266+
foreach (var message in messages)
267+
{
268+
_logger.LogInformation(message);
269+
}
270+
return Task.CompletedTask;
271+
}
272+
}
273+
```
274+
275+
After all you have to register that batch message handler via DI.
276+
```c#
277+
services.AddBatchMessageHandler<CustomBatchMessageHandler>(Configuration.GetSection("RabbitMq"));
278+
```
279+
280+
The message handler will create a separate connection and use it for reading messages.
281+
When the message collection is full to the size of `PrefetchCount` they are passed to the `HandleMessage` method. For more information, see the [message-consuming](./docs/message-consumption.md) documentation file.
282+
240283
## Advanced usage and nuances
241284

242285
RabbitMQ client implemented in this library (class which implements `IQueueService`) opens two connections to the RabbitMQ server. One connection is used for message production and the other one is for message consumption.

src/RabbitMQ.Client.Core.DependencyInjection/BatchMessageHandlers/BaseBatchMessageHandler.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public Task StartAsync(CancellationToken cancellationToken)
8080
}
8181

8282
var byteMessages = messages.Select(x => x.Body).ToList();
83-
await HandleMessages(byteMessages, cancellationToken);
83+
await HandleMessages(byteMessages, cancellationToken).ConfigureAwait(false);
8484
var latestDeliveryTag = messages.Max(x => x.DeliveryTag);
8585
messages.Clear();
8686
Channel.BasicAck(latestDeliveryTag, true);
@@ -115,11 +115,17 @@ public Task StopAsync(CancellationToken cancellationToken)
115115
_logger.LogInformation($"Batch message handler {GetType()} has been stopped.");
116116
return Task.CompletedTask;
117117
}
118-
119-
public void Dispose()
118+
119+
protected virtual void Dispose(bool disposing)
120120
{
121121
Connection?.Dispose();
122122
Channel?.Dispose();
123123
}
124+
125+
public void Dispose()
126+
{
127+
Dispose(true);
128+
GC.SuppressFinalize(this);
129+
}
124130
}
125131
}

src/RabbitMQ.Client.Core.DependencyInjection/BatchMessageHandlers/BatchMessageHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ protected BatchMessageHandler(
3030
protected override async Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken)
3131
{
3232
var decodedMessages = messages.Select(x => Encoding.UTF8.GetString(x.ToArray()));
33-
await HandleMessage(decodedMessages, cancellationToken);
33+
await HandleMessage(decodedMessages, cancellationToken).ConfigureAwait(false);
3434
}
3535

3636
/// <summary>

0 commit comments

Comments
 (0)