Skip to content
This repository was archived by the owner on Apr 29, 2022. It is now read-only.

Commit c930436

Browse files
author
Anton Vorontsov
committed
Made batch message handlers work with BasicDeliverEventArgs. Removed BatchMessageHandler.
1 parent bc8dc09 commit c930436

File tree

8 files changed

+22
-311
lines changed

8 files changed

+22
-311
lines changed

examples/Examples.BatchMessageHandler/AnotherCustomBatchMessageHandler.cs

Lines changed: 0 additions & 42 deletions
This file was deleted.

examples/Examples.BatchMessageHandler/CustomBatchMessageHandler.cs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
using System;
21
using System.Collections.Generic;
3-
using System.Text;
42
using System.Threading;
53
using System.Threading.Tasks;
64
using Microsoft.Extensions.Logging;
5+
using RabbitMQ.Client.Core.DependencyInjection;
76
using RabbitMQ.Client.Core.DependencyInjection.BatchMessageHandlers;
87
using RabbitMQ.Client.Core.DependencyInjection.Models;
98
using RabbitMQ.Client.Core.DependencyInjection.Services;
9+
using RabbitMQ.Client.Events;
1010

1111
namespace Examples.BatchMessageHandler
1212
{
@@ -28,13 +28,12 @@ public CustomBatchMessageHandler(
2828
// You have to be aware that BaseBatchMessageHandler does not declare the specified queue. So if it does not exists an exception will be thrown.
2929
public override string QueueName { get; set; } = "queue.name";
3030

31-
public override Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken)
31+
public override Task HandleMessages(IEnumerable<BasicDeliverEventArgs> messages, CancellationToken cancellationToken)
3232
{
3333
_logger.LogInformation("Handling a batch of messages.");
3434
foreach (var message in messages)
3535
{
36-
var stringifiedMessage = Encoding.UTF8.GetString(message.ToArray());
37-
_logger.LogInformation(stringifiedMessage);
36+
_logger.LogInformation(message.GetMessage());
3837
}
3938
return Task.CompletedTask;
4039
}

examples/Examples.BatchMessageHandler/Program.cs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,18 @@ public static async Task Main()
2121
// Let's configure two different BatchMessageHandlers with different methods.
2222
// First - configuring an appsettings.json section.
2323
services.AddBatchMessageHandler<CustomBatchMessageHandler>(hostContext.Configuration.GetSection("RabbitMq"));
24-
24+
2525
// Second one - passing configuration instance.
26-
var rabbitMqConfiguration = new RabbitMqClientOptions
27-
{
28-
HostName = "127.0.0.1",
29-
Port = 5672,
30-
UserName = "guest",
31-
Password = "guest"
32-
};
33-
services.AddBatchMessageHandler<AnotherCustomBatchMessageHandler>(rabbitMqConfiguration);
26+
var rabbitMqConfiguration = new RabbitMqClientOptions
27+
{
28+
HostName = "127.0.0.1",
29+
Port = 5672,
30+
UserName = "guest",
31+
Password = "guest"
32+
};
33+
services.AddBatchMessageHandler<CustomBatchMessageHandler>(rabbitMqConfiguration);
34+
35+
// Use either of them. Do not register batch message handlers multiple times in your real project.
3436
})
3537
.ConfigureLogging((hostingContext, logging) =>
3638
{

src/RabbitMQ.Client.Core.DependencyInjection/BatchMessageHandlers/BaseBatchMessageHandler.cs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,15 +108,13 @@ public Task StartAsync(CancellationToken cancellationToken)
108108

109109
async Task ProcessBatchOfMessages(CancellationToken cancellationToken)
110110
{
111-
112111
var messages = GetMessages();
113112
if (!messages.Any())
114113
{
115114
return;
116115
}
117116

118-
var byteMessages = messages.Select(x => x.Body).ToList();
119-
await HandleMessages(byteMessages, cancellationToken).ConfigureAwait(false);
117+
await HandleMessages(messages, cancellationToken).ConfigureAwait(false);
120118
var latestDeliveryTag = messages.Max(x => x.DeliveryTag);
121119
Channel.BasicAck(latestDeliveryTag, true);
122120
}
@@ -152,11 +150,11 @@ void ValidateProperties()
152150
/// <summary>
153151
/// Handle a batch of messages.
154152
/// </summary>
155-
/// <param name="messages">A collection of messages as bytes.</param>
153+
/// <param name="messages">A collection of messages.</param>
156154
/// <param name="cancellationToken">Cancellation token.</param>
157155
/// <returns></returns>
158-
public abstract Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken);
159-
156+
public abstract Task HandleMessages(IEnumerable<BasicDeliverEventArgs> messages, CancellationToken cancellationToken);
157+
160158
public Task StopAsync(CancellationToken cancellationToken)
161159
{
162160
_timer?.Change(Timeout.Infinite, 0);

src/RabbitMQ.Client.Core.DependencyInjection/BatchMessageHandlers/BatchMessageHandler.cs

Lines changed: 0 additions & 46 deletions
This file was deleted.

tests/RabbitMQ.Client.Core.DependencyInjection.Tests/Stubs/StubBaseBatchMessageHandler.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using RabbitMQ.Client.Core.DependencyInjection.BatchMessageHandlers;
77
using RabbitMQ.Client.Core.DependencyInjection.Models;
88
using RabbitMQ.Client.Core.DependencyInjection.Services;
9+
using RabbitMQ.Client.Events;
910

1011
namespace RabbitMQ.Client.Core.DependencyInjection.Tests.Stubs
1112
{
@@ -29,11 +30,11 @@ public StubBaseBatchMessageHandler(
2930

3031
public override TimeSpan? MessageHandlingPeriod { get; set; }
3132

32-
public override Task HandleMessages(IEnumerable<ReadOnlyMemory<byte>> messages, CancellationToken cancellationToken)
33+
public override Task HandleMessages(IEnumerable<BasicDeliverEventArgs> messages, CancellationToken cancellationToken)
3334
{
3435
foreach (var message in messages)
3536
{
36-
_caller.Call(message);
37+
_caller.Call(message.Body);
3738
}
3839
_caller.EmptyCall();
3940
return Task.CompletedTask;

tests/RabbitMQ.Client.Core.DependencyInjection.Tests/Stubs/StubBatchMessageHandler.cs

Lines changed: 0 additions & 42 deletions
This file was deleted.

0 commit comments

Comments
 (0)