Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.

Commit 1e06f7b

Browse files
committed
Pass messages in Batch through outgoing plugins
1 parent 2eb647a commit 1e06f7b

File tree

5 files changed

+88
-30
lines changed

5 files changed

+88
-30
lines changed

src/Microsoft.Azure.ServiceBus/Core/Batch.cs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// Copyright (c) Microsoft. All rights reserved.
22
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
33

4+
using System.Threading.Tasks;
5+
46
namespace Microsoft.Azure.ServiceBus.Core
57
{
68
using System;
@@ -15,18 +17,25 @@ namespace Microsoft.Azure.ServiceBus.Core
1517
public class Batch : IDisposable
1618
{
1719
internal readonly ulong maximumBatchSize;
20+
private readonly Func<Message, Task<Message>> pluginsCallback;
1821
private AmqpMessage firstMessage;
1922
private readonly List<Data> datas;
2023
private AmqpMessage result;
2124
private (string messageId, string sessionId, string partitionKey, string viaPartitionKey) originalMessageData;
2225

2326
/// <summary>
24-
/// Construct a new batch with a maximum batch size.
27+
/// Construct a new batch with a maximum batch size and outgoing plugins callback.
28+
/// <remarks>
29+
/// To construct a batch at run-time, use <see cref="MessageSender"/>, <see cref="QueueClient"/>, or <see cref="TopicClient"/>.
30+
/// Use this constructor for testing and custom implementations.
31+
/// </remarks>
2532
/// </summary>
2633
/// <param name="maximumBatchSize">Maximum batch size allowed for batch.</param>
27-
public Batch(ulong maximumBatchSize)
34+
/// <param name="pluginsCallback">Plugins callback to invoke on outgoing messages regisered with batch.</param>
35+
public Batch(ulong maximumBatchSize, Func<Message, Task<Message>> pluginsCallback)
2836
{
2937
this.maximumBatchSize = maximumBatchSize;
38+
this.pluginsCallback = pluginsCallback;
3039
this.datas = new List<Data>();
3140
this.result = AmqpMessage.Create(datas);
3241
}
@@ -36,17 +45,19 @@ public Batch(ulong maximumBatchSize)
3645
/// </summary>
3746
/// <param name="message"><see cref="Message"/> to add to the batch.</param>
3847
/// <returns></returns>
39-
public bool TryAdd(Message message)
48+
public async Task<bool> TryAdd(Message message)
4049
{
4150
ThrowIfDisposed();
4251

4352
message.VerifyMessageIsNotPreviouslyReceived();
4453

45-
var amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(message);
54+
var processedMessage = await pluginsCallback(message);
55+
56+
var amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(processedMessage);
4657

4758
if (firstMessage == null)
4859
{
49-
originalMessageData = (message.MessageId, message.SessionId, message.PartitionKey, message.ViaPartitionKey);
60+
originalMessageData = (processedMessage.MessageId, processedMessage.SessionId, processedMessage.PartitionKey, processedMessage.ViaPartitionKey);
5061
firstMessage = amqpMessage;
5162
}
5263

src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ public async Task<Batch> CreateBatch()
316316
{
317317
if (maxMessageSize != 0)
318318
{
319-
return new Batch(maxMessageSize);
319+
return new Batch(maxMessageSize, ProcessMessage);
320320
}
321321

322322
var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true);
@@ -335,7 +335,7 @@ public async Task<Batch> CreateBatch()
335335

336336
maxMessageSize = amqpLink.Settings.MaxMessageSize.Value;
337337

338-
return new Batch(maxMessageSize);
338+
return new Batch(maxMessageSize, ProcessMessage);
339339
}
340340
catch (Exception exception)
341341
{

test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -484,11 +484,11 @@ namespace Microsoft.Azure.ServiceBus.Core
484484
[System.Diagnostics.DebuggerDisplayAttribute("{DebuggerDisplay,nq}")]
485485
public class Batch : System.IDisposable
486486
{
487-
public Batch(ulong maximumBatchSize) { }
487+
public Batch(ulong maximumBatchSize, System.Func<Microsoft.Azure.ServiceBus.Message, System.Threading.Tasks.Task<Microsoft.Azure.ServiceBus.Message>> pluginsCallback) { }
488488
public int Length { get; }
489489
public void Dispose() { }
490490
public Microsoft.Azure.Amqp.AmqpMessage ToAmqpMessage() { }
491-
public bool TryAdd(Microsoft.Azure.ServiceBus.Message message) { }
491+
public System.Threading.Tasks.Task<bool> TryAdd(Microsoft.Azure.ServiceBus.Message message) { }
492492
}
493493
public interface IMessageReceiver : Microsoft.Azure.ServiceBus.Core.IReceiverClient, Microsoft.Azure.ServiceBus.IClientEntity
494494
{

test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// Copyright (c) Microsoft. All rights reserved.
22
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
33

4+
using System.Threading.Tasks;
5+
46
namespace Microsoft.Azure.ServiceBus.UnitTests.Primitives
57
{
68
using System;
@@ -10,72 +12,74 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.Primitives
1012

1113
public class BatchTests
1214
{
15+
private Func<Message, Task<Message>> fakePluginsCallback = Task.FromResult;
16+
1317
[Fact]
14-
public void Should_return_false_when_is_about_to_exceed_max_batch_size()
18+
public async Task Should_return_false_when_is_about_to_exceed_max_batch_size()
1519
{
16-
using (var batch = new Batch(1))
20+
using (var batch = new Batch(1, fakePluginsCallback))
1721
{
18-
var wasAdded = batch.TryAdd(new Message(Encoding.UTF8.GetBytes("hello")));
22+
var wasAdded = await batch.TryAdd(new Message(Encoding.UTF8.GetBytes("hello")));
1923
Assert.False(wasAdded, "Message should not have been added, but it was.");
2024
}
2125
}
2226

2327
[Fact]
2428
public void Should_throw_if_batch_disposed()
2529
{
26-
using (var batch = new Batch(1))
30+
using (var batch = new Batch(1, fakePluginsCallback))
2731
{
2832
batch.Dispose();
29-
Assert.Throws<Exception>(() => batch.TryAdd(new Message()));
33+
Assert.ThrowsAsync<Exception>(() => batch.TryAdd(new Message()));
3034
}
3135
}
3236

3337
[Fact]
3438
public void Should_throw_when_trying_to_add_an_already_received_message_to_batch()
3539
{
36-
using (var batch = new Batch(100))
40+
using (var batch = new Batch(100, fakePluginsCallback))
3741
{
3842
var message = new Message("test".GetBytes());
3943
message.SystemProperties.LockTokenGuid = Guid.NewGuid();
4044

41-
Assert.Throws<ArgumentException>(() => batch.TryAdd(message));
45+
Assert.ThrowsAsync<ArgumentException>(() => batch.TryAdd(message));
4246
}
4347
}
4448

4549
[Theory]
4650
[InlineData(1)]
4751
[InlineData(3)]
48-
public void Should_report_how_many_messages_are_in_batch(int numberOfMessages)
52+
public async Task Should_report_how_many_messages_are_in_batch(int numberOfMessages)
4953
{
50-
using (var batch = new Batch(100))
54+
using (var batch = new Batch(100, fakePluginsCallback))
5155
{
5256
for (var i = 0; i < numberOfMessages; i++)
5357
{
54-
batch.TryAdd(new Message());
58+
await batch.TryAdd(new Message());
5559
}
5660

5761
Assert.Equal(numberOfMessages, batch.Length);
5862
}
5963
}
6064

6165
[Fact]
62-
public void Should_show_reflect_property_in_batch_size()
66+
public async Task Should_reflect_property_in_batch_size()
6367
{
64-
using (var batch = new Batch(100))
68+
using (var batch = new Batch(100, fakePluginsCallback))
6569
{
6670
var message = new Message();
6771

68-
batch.TryAdd(message);
72+
await batch.TryAdd(message);
6973

7074
Assert.Equal((ulong)24, batch.Size);
7175
}
7276

73-
using (var batch = new Batch(100))
77+
using (var batch = new Batch(100, fakePluginsCallback))
7478
{
7579
var message = new Message();
7680
message.UserProperties["custom"] = "value";
7781

78-
batch.TryAdd(message);
82+
await batch.TryAdd(message);
7983

8084
Assert.Equal((ulong)45, batch.Size);
8185
}

test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -473,10 +473,10 @@ public async Task Sending_batch()
473473
var message2 = new Message("from".GetBytes());
474474
var message3 = new Message("Sean Feldman".GetBytes());
475475

476-
var batch = new Batch(100);
477-
Assert.True(batch.TryAdd(message1), "Couldn't add first message");
478-
Assert.True(batch.TryAdd(message2), "Couldn't add second message");
479-
Assert.False(batch.TryAdd(message3), "Shouldn't be able to add third message");
476+
var batch = new Batch(100, Task.FromResult);
477+
Assert.True(await batch.TryAdd(message1), "Couldn't add first message");
478+
Assert.True(await batch.TryAdd(message2), "Couldn't add second message");
479+
Assert.False(await batch.TryAdd(message3), "Shouldn't be able to add third message");
480480
await sender.SendAsync(batch);
481481
batch.Dispose();
482482
await sender.CloseAsync();
@@ -507,8 +507,8 @@ public async Task Sending_batch_with_properties()
507507
var message = new Message("Hello Neeraj".GetBytes());
508508
message.UserProperties["custom"] = "value";
509509

510-
var batch = new Batch(100);
511-
Assert.True(batch.TryAdd(message), "Couldn't add message");
510+
var batch = new Batch(100, Task.FromResult);
511+
Assert.True(await batch.TryAdd(message), "Couldn't add message");
512512
await sender.SendAsync(batch);
513513
batch.Dispose();
514514
await sender.CloseAsync();
@@ -543,5 +543,48 @@ public async Task Batch_should_have_maximum_size_set()
543543
await sender.CloseAsync().ConfigureAwait(false);
544544
}
545545
}
546+
547+
[Fact]
548+
[DisplayTestMethodName]
549+
public async Task Batch_should_go_through_outgoing_plugins()
550+
{
551+
var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName);
552+
var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName, receiveMode: ReceiveMode.ReceiveAndDelete);
553+
554+
sender.RegisterPlugin(new RemoveVowelsPlugin());
555+
try
556+
{
557+
var batch = await sender.CreateBatch();
558+
await batch.TryAdd(new Message("Hello Neeraj".GetBytes()));
559+
await batch.TryAdd(new Message("from".GetBytes()));
560+
await batch.TryAdd(new Message("Sean Feldman".GetBytes()));
561+
562+
await sender.SendAsync(batch);
563+
batch.Dispose();
564+
await sender.CloseAsync();
565+
566+
var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 3);
567+
var bodies = receivedMessages.Select(m => m.Body.GetString());
568+
var bodiesArray = bodies as string[] ?? bodies.ToArray();
569+
Assert.True(bodiesArray.Contains("Hll Nrj") && bodiesArray.Contains("frm") && bodiesArray.Contains("Sn Fldmn"));
570+
}
571+
finally
572+
{
573+
await sender.CloseAsync().ConfigureAwait(false);
574+
await receiver.CloseAsync().ConfigureAwait(false);
575+
}
576+
}
577+
578+
class RemoveVowelsPlugin : ServiceBusPlugin
579+
{
580+
public override string Name { get; } = nameof(RemoveVowelsPlugin);
581+
582+
public override Task<Message> BeforeMessageSend(Message message)
583+
{
584+
message.Body = new string(message.Body.GetString().Where( x => "aeiouy".Contains(x) == false).ToArray()).GetBytes();
585+
return Task.FromResult(message);
586+
}
587+
}
588+
546589
}
547590
}

0 commit comments

Comments
 (0)