Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit b5e51ab

Browse files
Merge pull request #37 from AntonyVorontsov/feature/prefetch-handlers
Batch message handlers
2 parents 62af41a + 9ee705e commit b5e51ab

File tree

65 files changed

+768
-102
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+768
-102
lines changed

RabbitMQ.Client.Core.DependencyInjection.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ.Client.Core.Depend
4545
EndProject
4646
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Examples.AdvancedConfiguration", "examples\Examples.AdvancedConfiguration\Examples.AdvancedConfiguration.csproj", "{0C713913-8D54-49E7-AADD-45497216E2EF}"
4747
EndProject
48+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Examples.BatchMessageHandler", "examples\Examples.BatchMessageHandler\Examples.BatchMessageHandler.csproj", "{BC9BE326-AEFE-42F2-B592-80126188514D}"
49+
EndProject
4850
Global
4951
GlobalSection(SolutionConfigurationPlatforms) = preSolution
5052
Debug|Any CPU = Debug|Any CPU
@@ -75,6 +77,10 @@ Global
7577
{0C713913-8D54-49E7-AADD-45497216E2EF}.Debug|Any CPU.Build.0 = Debug|Any CPU
7678
{0C713913-8D54-49E7-AADD-45497216E2EF}.Release|Any CPU.ActiveCfg = Release|Any CPU
7779
{0C713913-8D54-49E7-AADD-45497216E2EF}.Release|Any CPU.Build.0 = Release|Any CPU
80+
{BC9BE326-AEFE-42F2-B592-80126188514D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
81+
{BC9BE326-AEFE-42F2-B592-80126188514D}.Debug|Any CPU.Build.0 = Debug|Any CPU
82+
{BC9BE326-AEFE-42F2-B592-80126188514D}.Release|Any CPU.ActiveCfg = Release|Any CPU
83+
{BC9BE326-AEFE-42F2-B592-80126188514D}.Release|Any CPU.Build.0 = Release|Any CPU
7884
EndGlobalSection
7985
GlobalSection(SolutionProperties) = preSolution
8086
HideSolutionNode = FALSE
@@ -87,6 +93,7 @@ Global
8793
{93D59B0E-856C-4260-B50E-57FE5C0F5073} = {BC4CDBDE-4AEE-44B3-B00B-380EB1AC5E62}
8894
{85907F19-00B0-4BA6-9B7C-0452A174903D} = {9D0289B2-C566-46CC-A53A-471BCBA0F277}
8995
{0C713913-8D54-49E7-AADD-45497216E2EF} = {04EFD8F7-5120-4072-9728-64203F6F4873}
96+
{BC9BE326-AEFE-42F2-B592-80126188514D} = {04EFD8F7-5120-4072-9728-64203F6F4873}
9097
EndGlobalSection
9198
GlobalSection(ExtensibilityGlobals) = postSolution
9299
SolutionGuid = {0EBBD182-65B2-47F9-ABBE-64B5B8C9652F}

docs/changelog.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,19 @@
22

33
All notable changes to this library will be documented in this file.
44

5+
## [4.0.0] - 2020-05-05
6+
7+
### Added
8+
9+
- `BaseBatchMessageHandler` and `BatchMessageHandler` for handling messages in batches via prefetch count feature.
10+
- Example of basic usages of batch message handlers.
11+
12+
### Updated
13+
14+
- Updated RabbitMQ.Client to the newest version 6.0.0. Made some changes according to the breaking changes that come with the newest version of RabbitMQ.Client.
15+
- Moved message handlers to the different namespace `RabbitMQ.Client.Core.DependencyInjection.MessageHandlers`.
16+
- Moved internal DI extensions to the different namespace `RabbitMQ.Client.Core.DependencyInjection.InternalExtensions`.
17+
518
## [3.2.1] - 2020-03-29
619

720
### Fixed

docs/message-consumption.md

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,84 @@ The message handling process is organized as follows:
311311
- If any exception occurs `IMessageHandlingService` acknowledges the message anyway and checks if the message has to be re-send. If exchange option `RequeueFailedMessages` is set `true` then `IMessageHandlingService` adds a header `"requeued"` to the message and sends it again with delay in 60 seconds. Mechanism of sending delayed messages covered in the message production [documentation](message-production.md).
312312
- If any exception occurs within handling the message that has been already re-sent that message will not be re-send again (re-send happens only once).
313313

314+
### Batch message handlers
315+
316+
There are also a feature that you can use in case of necessity of handling messages in batches.
317+
First of all you have to create a class that inherits a `BatchMessageHandler` class.
318+
You have to set up values for `QueueName` and `PrefetchCount` properties. These values are responsible for the queue that will be read by the message handler, and the size of batches of messages.
319+
320+
```c#
321+
public class CustomBatchMessageHandler : BatchMessageHandler
322+
{
323+
readonly ILogger<CustomBatchMessageHandler> _logger;
324+
325+
public CustomBatchMessageHandler(
326+
IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions,
327+
ILogger<CustomBatchMessageHandler> logger)
328+
: base(batchConsumerConnectionOptions, logger)
329+
{
330+
_logger = logger;
331+
}
332+
333+
protected override ushort PrefetchCount { get; set; } = 50;
334+
335+
protected override string QueueName { get; set; } = "another.queue.name";
336+
337+
protected override Task HandleMessage(IEnumerable<string> messages, CancellationToken cancellationToken)
338+
{
339+
_logger.LogInformation("Handling a batch of messages.");
340+
foreach (var message in messages)
341+
{
342+
_logger.LogInformation(message);
343+
}
344+
return Task.CompletedTask;
345+
}
346+
}
347+
```
348+
349+
If you want to get raw messages as `ReadOnlyMemory<byte>` you can inherit base message handler class.
350+
351+
```c#
352+
public class CustomBatchMessageHandler : BaseBatchMessageHandler
353+
{
354+
readonly ILogger<CustomBatchMessageHandler> _logger;
355+
356+
public CustomBatchMessageHandler(
357+
IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions,
358+
ILogger<CustomBatchMessageHandler> logger)
359+
: base(batchConsumerConnectionOptions, logger)
360+
{
361+
_logger = logger;
362+
}
363+
364+
protected override ushort PrefetchCount { get; set; } = 3;
365+
366+
protected override string QueueName { get; set; } = "queue.name";
367+
368+
protected override Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken)
369+
{
370+
_logger.LogInformation("Handling a batch of messages.");
371+
foreach (var message in messages)
372+
{
373+
var stringifiedMessage = Encoding.UTF8.GetString(message.ToArray());
374+
_logger.LogInformation(stringifiedMessage);
375+
}
376+
return Task.CompletedTask;
377+
}
378+
}
379+
```
380+
381+
After all you have to register that batch message handler via DI.
382+
```c#
383+
services.AddBatchMessageHandler<CustomBatchMessageHandler>(Configuration.GetSection("RabbitMq"));
384+
```
385+
386+
The message handler will create a separate connection and use it for reading messages.
387+
When the message collection is full to the size of `PrefetchCount` they are passed to the `HandleMessage` method.
388+
Both `BaseBatchMessageHandler` and `BatchMessageHandler` implement `IDisposable` interface, so you can use it for release of resources.
389+
390+
Use this method of getting messages only when you sure that the number of messages that pass through this queue is really huge. Otherwise, messages could stack in the temporary collection of messages waiting to get in full.
391+
314392
For message production features see the [Previous page](message-production.md)
315393

316394
For more information about advanced usage of the RabbitMQ client see the [Next page](advanced-usage.md)

examples/Examples.AdvancedConfiguration/Controllers/ExampleController.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using Microsoft.AspNetCore.Mvc;
33
using Microsoft.Extensions.Logging;
44
using RabbitMQ.Client.Core.DependencyInjection;
5+
using RabbitMQ.Client.Core.DependencyInjection.Services;
56

67
namespace Examples.AdvancedConfiguration.Controllers
78
{

examples/Examples.AdvancedConfiguration/MessageHandlers/CustomAsyncMessageHandler.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System.Threading.Tasks;
22
using RabbitMQ.Client.Core.DependencyInjection;
3+
using RabbitMQ.Client.Core.DependencyInjection.MessageHandlers;
34

45
namespace Examples.AdvancedConfiguration.MessageHandlers
56
{

examples/Examples.AdvancedConfiguration/MessageHandlers/CustomAsyncNonCyclicMessageHandler.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
using System.Threading.Tasks;
22
using RabbitMQ.Client.Core.DependencyInjection;
3+
using RabbitMQ.Client.Core.DependencyInjection.MessageHandlers;
4+
using RabbitMQ.Client.Core.DependencyInjection.Services;
35

46
namespace Examples.AdvancedConfiguration.MessageHandlers
57
{

examples/Examples.AdvancedConfiguration/MessageHandlers/CustomMessageHandler.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using RabbitMQ.Client.Core.DependencyInjection;
2+
using RabbitMQ.Client.Core.DependencyInjection.MessageHandlers;
23

34
namespace Examples.AdvancedConfiguration.MessageHandlers
45
{

examples/Examples.AdvancedConfiguration/MessageHandlers/CustomNonCyclicMessageHandler.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
using RabbitMQ.Client.Core.DependencyInjection;
2+
using RabbitMQ.Client.Core.DependencyInjection.MessageHandlers;
3+
using RabbitMQ.Client.Core.DependencyInjection.Services;
24

35
namespace Examples.AdvancedConfiguration.MessageHandlers
46
{

examples/Examples.AdvancedConfiguration/Services/ConsumingHostedService.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Threading.Tasks;
33
using Microsoft.Extensions.Hosting;
44
using RabbitMQ.Client.Core.DependencyInjection;
5+
using RabbitMQ.Client.Core.DependencyInjection.Services;
56

67
namespace Examples.AdvancedConfiguration.Services
78
{
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
using System.Collections.Generic;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using Microsoft.Extensions.Logging;
5+
using RabbitMQ.Client.Core.DependencyInjection.Models;
6+
7+
namespace Examples.BatchMessageHandler
8+
{
9+
public class AnotherCustomBatchMessageHandler : RabbitMQ.Client.Core.DependencyInjection.BatchMessageHandlers.BatchMessageHandler
10+
{
11+
readonly ILogger<AnotherCustomBatchMessageHandler> _logger;
12+
13+
public AnotherCustomBatchMessageHandler(
14+
IEnumerable<BatchConsumerConnectionOptions> batchConsumerConnectionOptions,
15+
ILogger<AnotherCustomBatchMessageHandler> logger)
16+
: base(batchConsumerConnectionOptions, logger)
17+
{
18+
_logger = logger;
19+
}
20+
21+
protected override ushort PrefetchCount { get; set; } = 5;
22+
23+
protected override string QueueName { get; set; } = "another.queue.name";
24+
25+
protected override Task HandleMessage(IEnumerable<string> messages, CancellationToken cancellationToken)
26+
{
27+
_logger.LogInformation("Handling a batch of messages.");
28+
foreach (var message in messages)
29+
{
30+
_logger.LogInformation(message);
31+
}
32+
return Task.CompletedTask;
33+
}
34+
}
35+
}

0 commit comments

Comments
 (0)