Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.

Commit 29b3316

Browse files
committed
Initial (simple) batching implementation
1 parent 8bcf162 commit 29b3316

File tree

5 files changed

+235
-10
lines changed

5 files changed

+235
-10
lines changed

src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
// Copyright (c) Microsoft. All rights reserved.
22
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
33

4+
using System.Diagnostics;
5+
using System.Linq;
6+
47
namespace Microsoft.Azure.ServiceBus.Amqp
58
{
69
using System;
@@ -23,9 +26,9 @@ static class AmqpMessageConverter
2326
const string EnqueueSequenceNumberName = "x-opt-enqueue-sequence-number";
2427
const string LockedUntilName = "x-opt-locked-until";
2528
const string PublisherName = "x-opt-publisher";
26-
const string PartitionKeyName = "x-opt-partition-key";
2729
const string PartitionIdName = "x-opt-partition-id";
28-
const string ViaPartitionKeyName = "x-opt-via-partition-key";
30+
internal const string PartitionKeyName = "x-opt-partition-key";
31+
internal const string ViaPartitionKeyName = "x-opt-via-partition-key";
2932
const string DeadLetterSourceName = "x-opt-deadletter-source";
3033
const string TimeSpanName = AmqpConstants.Vendor + ":timespan";
3134
const string UriName = AmqpConstants.Vendor + ":uri";
@@ -641,7 +644,7 @@ static ArraySegment<byte> StreamToBytes(Stream stream)
641644
return buffer;
642645
}
643646

644-
private static Data ToData(AmqpMessage message)
647+
internal static Data ToData(AmqpMessage message)
645648
{
646649
ArraySegment<byte>[] payload = message.GetPayload();
647650
var buffer = new BufferListStream(payload);
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Diagnostics;
4+
using Microsoft.Azure.Amqp;
5+
using Microsoft.Azure.Amqp.Framing;
6+
using Microsoft.Azure.ServiceBus.Amqp;
7+
8+
namespace Microsoft.Azure.ServiceBus.Core
9+
{
10+
[DebuggerDisplay("{" + nameof(DebuggerDisplay) + ",nq}")]
11+
public class Batch : IDisposable
12+
{
13+
private readonly long maximumBatchSize;
14+
private AmqpMessage firstMessage;
15+
private readonly List<Data> datas;
16+
private AmqpMessage result;
17+
private (string messageId, string sessionId, string partitionKey, string viaPartitionKey) originalMessageData;
18+
19+
/// <summary>
20+
/// Construct a new batch with a maximum batch size.
21+
/// </summary>
22+
/// <param name="maximumBatchSize">Maximum batch size allowed for batch.</param>
23+
public Batch(long maximumBatchSize)
24+
{
25+
this.maximumBatchSize = maximumBatchSize;
26+
this.datas = new List<Data>();
27+
this.result = AmqpMessage.Create(datas);
28+
}
29+
30+
/// <summary>
31+
/// Add <see cref="Message"/> to the batch if the overall size of the batch with the added message is not exceeding the batch maximum.
32+
/// </summary>
33+
/// <param name="message"><see cref="Message"/> to add to the batch.</param>
34+
/// <returns></returns>
35+
public bool TryAdd(Message message)
36+
{
37+
ThrowIfDisposed();
38+
39+
var amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(message);
40+
41+
if (firstMessage == null)
42+
{
43+
originalMessageData = (message.MessageId, message.SessionId, message.PartitionKey, message.ViaPartitionKey);
44+
firstMessage = amqpMessage;
45+
}
46+
47+
var data = AmqpMessageConverter.ToData(amqpMessage);
48+
datas.Add(data);
49+
50+
if (Size <= maximumBatchSize)
51+
{
52+
return true;
53+
}
54+
55+
datas.Remove(data);
56+
return false;
57+
58+
}
59+
60+
private long Size => result.SerializedMessageSize;
61+
62+
/// <summary>
63+
/// Convert batch to AMQP message
64+
/// </summary>
65+
/// <returns></returns>
66+
public AmqpMessage ToAmqpMessage()
67+
{
68+
ThrowIfDisposed();
69+
70+
if (datas.Count == 1)
71+
{
72+
firstMessage.Batchable = true;
73+
return firstMessage;
74+
}
75+
76+
if (originalMessageData.messageId != null)
77+
{
78+
result.Properties.MessageId = originalMessageData.messageId;
79+
}
80+
81+
if (originalMessageData.sessionId != null)
82+
{
83+
result.Properties.GroupId = originalMessageData.sessionId;
84+
}
85+
86+
if (originalMessageData.partitionKey != null)
87+
{
88+
result.MessageAnnotations.Map[AmqpMessageConverter.PartitionKeyName] = originalMessageData.partitionKey;
89+
}
90+
91+
if (originalMessageData.viaPartitionKey != null)
92+
{
93+
result.MessageAnnotations.Map[AmqpMessageConverter.ViaPartitionKeyName] = originalMessageData.viaPartitionKey;
94+
}
95+
96+
result.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat;
97+
result.Batchable = true;
98+
return result;
99+
}
100+
101+
public void Dispose()
102+
{
103+
// TODO: review if there's anything else to do
104+
firstMessage?.Dispose();
105+
result?.Dispose();
106+
107+
firstMessage = null;
108+
result = null;
109+
}
110+
111+
private void ThrowIfDisposed()
112+
{
113+
if (result == null)
114+
{
115+
throw new Exception("Batch is has been disposed and cannot be re-used.");
116+
}
117+
}
118+
119+
private string DebuggerDisplay => $"Batch: size={Size} message count={datas.Count}";
120+
}
121+
}

src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ public async Task SendAsync(IList<Message> messageList)
247247
{
248248
var processedMessages = await this.ProcessMessages(messageList).ConfigureAwait(false);
249249

250-
sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(processedMessages), this.OperationTimeout);
250+
sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(() => AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(processedMessages)), this.OperationTimeout);
251251
await sendTask.ConfigureAwait(false);
252252
}
253253
catch (Exception exception)
@@ -267,6 +267,41 @@ public async Task SendAsync(IList<Message> messageList)
267267

268268
MessagingEventSource.Log.MessageSendStop(this.ClientId);
269269
}
270+
public async Task SendAsync(Batch batch)
271+
{
272+
this.ThrowIfClosed();
273+
274+
//var count = MessageSender.ValidateMessages(messageList);
275+
// MessagingEventSource.Log.MessageSendStart(this.ClientId, count);
276+
277+
// var isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
278+
// var activity = isDiagnosticSourceEnabled ? this.diagnosticSource.SendStart(messageList) : null;
279+
Task sendTask = null;
280+
281+
try
282+
{
283+
//var processedMessages = await this.ProcessMessages(messageList).ConfigureAwait(false);
284+
285+
sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(batch.ToAmqpMessage), this.OperationTimeout);
286+
await sendTask.ConfigureAwait(false);
287+
}
288+
catch (Exception exception)
289+
{
290+
// if (isDiagnosticSourceEnabled)
291+
// {
292+
// this.diagnosticSource.ReportException(exception);
293+
// }
294+
295+
MessagingEventSource.Log.MessageSendException(this.ClientId, exception);
296+
throw;
297+
}
298+
finally
299+
{
300+
// this.diagnosticSource.SendStop(activity, messageList, sendTask?.Status);
301+
}
302+
303+
// MessagingEventSource.Log.MessageSendStop(this.ClientId);
304+
}
270305

271306
/// <summary>
272307
/// Schedules a message to appear on Service Bus at a later time.
@@ -521,10 +556,10 @@ async Task<IList<Message>> ProcessMessages(IList<Message> messageList)
521556
return processedMessageList;
522557
}
523558

524-
async Task OnSendAsync(IList<Message> messageList)
559+
async Task OnSendAsync(Func<AmqpMessage> amqpMessageProvider)
525560
{
526561
var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true);
527-
using (var amqpMessage = AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(messageList))
562+
using (var amqpMessage = amqpMessageProvider())
528563
{
529564
SendingAmqpLink amqpLink = null;
530565
try
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3+
4+
namespace Microsoft.Azure.ServiceBus.UnitTests.Primitives
5+
{
6+
using System;
7+
using System.Text;
8+
using Microsoft.Azure.ServiceBus.Core;
9+
using Xunit;
10+
11+
public class BatchTests
12+
{
13+
[Fact]
14+
public void Should_return_false_when_is_about_to_exceed_max_batch_size()
15+
{
16+
using (var batch = new Batch(1))
17+
{
18+
var wasAdded = batch.TryAdd(new Message(Encoding.UTF8.GetBytes("hello")));
19+
Assert.False(wasAdded, "Message should not have been added, but it was.");
20+
}
21+
}
22+
23+
[Fact]
24+
public void Should_throw_if_batch_disposed()
25+
{
26+
using (var batch = new Batch(1))
27+
{
28+
batch.Dispose();
29+
Assert.Throws<Exception>(() => batch.TryAdd(new Message()));
30+
}
31+
}
32+
}
33+
}

test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ namespace Microsoft.Azure.ServiceBus.UnitTests
1313

1414
public class SenderReceiverTests : SenderReceiverClientTestBase
1515
{
16-
private static TimeSpan TwoSeconds = TimeSpan.FromSeconds(2);
16+
private static readonly TimeSpan TwoSeconds = TimeSpan.FromSeconds(2);
1717

18-
public static IEnumerable<object[]> TestPermutations => new object[][]
18+
public static IEnumerable<object[]> TestPermutations => new[]
1919
{
2020
new object[] {TestConstants.NonPartitionedQueueName},
2121
new object[] {TestConstants.PartitionedQueueName}
@@ -376,7 +376,7 @@ public async Task ClientThrowsObjectDisposedExceptionWhenUserCloseConnectionAndW
376376

377377
var recivedMessage = await receiver.ReceiveAsync().ConfigureAwait(false);
378378
Assert.True(Encoding.UTF8.GetString(recivedMessage.Body) == Encoding.UTF8.GetString(messageBody));
379-
379+
380380
var connection = sender.ServiceBusConnection;
381381
Assert.Throws<ObjectDisposedException>(() => new MessageSender(connection, TestConstants.PartitionedQueueName));
382382
}
@@ -413,7 +413,7 @@ public async Task SendMesageCloseConnectionCreateAnotherConnectionSendAgainMessa
413413

414414
messageBody = Encoding.UTF8.GetBytes("Message 2");
415415
message = new Message(messageBody);
416-
await sender.SendAsync(message);
416+
await sender.SendAsync(message);
417417

418418
recivedMessage = await receiver.ReceiveAsync().ConfigureAwait(false);
419419
Assert.True(Encoding.UTF8.GetString(recivedMessage.Body) == Encoding.UTF8.GetString(messageBody));
@@ -459,5 +459,38 @@ public async Task ClientsUseGlobalConnectionCloseFirstClientSecoundClientShouldS
459459
await receiver.CloseAsync().ConfigureAwait(false);
460460
}
461461
}
462+
463+
[Fact]
464+
[DisplayTestMethodName]
465+
public async Task Sending_batch()
466+
{
467+
var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName);
468+
var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName, receiveMode: ReceiveMode.ReceiveAndDelete);
469+
try
470+
{
471+
var message1 = new Message(Encoding.UTF8.GetBytes("Hello Neeraj"));
472+
var message2 = new Message(Encoding.UTF8.GetBytes("from"));
473+
var message3 = new Message(Encoding.UTF8.GetBytes("Sean Feldman"));
474+
475+
var batch = new Batch(100);
476+
Assert.True(batch.TryAdd(message1), "Couldn't add first message");
477+
Assert.True(batch.TryAdd(message2), "Couldn't add second message");
478+
Assert.False(batch.TryAdd(message3), "Shouldn't be able to add third message");
479+
await sender.SendAsync(batch);
480+
//batch.dispose()
481+
await sender.CloseAsync();
482+
483+
var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 2);
484+
Assert.Equal("Hello Neeraj", Encoding.UTF8.GetString(receivedMessages[0].Body));
485+
Assert.Equal("from", Encoding.UTF8.GetString(receivedMessages[1].Body));
486+
var extraMessage = await TestUtility.PeekMessageAsync(receiver);
487+
Assert.True(extraMessage == null, "Should not have any messages other than the two, but an extra message is found");
488+
}
489+
finally
490+
{
491+
await sender.CloseAsync().ConfigureAwait(false);
492+
await receiver.CloseAsync().ConfigureAwait(false);
493+
}
494+
}
462495
}
463496
}

0 commit comments

Comments
 (0)