Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.

Commit 2eb647a

Browse files
committed
Provide an API to create Batch initiated with supported maximum message size
1 parent 5a7adf9 commit 2eb647a

File tree

8 files changed

+116
-34
lines changed

8 files changed

+116
-34
lines changed

src/Microsoft.Azure.ServiceBus/Core/Batch.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ namespace Microsoft.Azure.ServiceBus.Core
1414
[DebuggerDisplay("{" + nameof(DebuggerDisplay) + ",nq}")]
1515
public class Batch : IDisposable
1616
{
17-
private readonly long maximumBatchSize;
17+
internal readonly ulong maximumBatchSize;
1818
private AmqpMessage firstMessage;
1919
private readonly List<Data> datas;
2020
private AmqpMessage result;
@@ -24,7 +24,7 @@ public class Batch : IDisposable
2424
/// Construct a new batch with a maximum batch size.
2525
/// </summary>
2626
/// <param name="maximumBatchSize">Maximum batch size allowed for batch.</param>
27-
public Batch(long maximumBatchSize)
27+
public Batch(ulong maximumBatchSize)
2828
{
2929
this.maximumBatchSize = maximumBatchSize;
3030
this.datas = new List<Data>();
@@ -68,7 +68,7 @@ public bool TryAdd(Message message)
6868
/// </summary>
6969
public int Length => datas.Count;
7070

71-
internal long Size => result.SerializedMessageSize;
71+
internal ulong Size => (ulong) result.SerializedMessageSize;
7272

7373

7474
/// <summary>
@@ -128,6 +128,6 @@ private void ThrowIfDisposed()
128128
}
129129
}
130130

131-
private string DebuggerDisplay => $"Batch: size={Size} message count={datas.Count}";
131+
private string DebuggerDisplay => $"Batch: size={Size} message count={datas.Count} maximum size={maximumBatchSize}";
132132
}
133133
}

src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,15 @@ public interface ISenderClient : IClientEntity
2525
/// </summary>
2626
Task SendAsync(IList<Message> messageList);
2727

28-
// TODO: extract method into this interface for the next major version
29-
// /// <summary>
30-
// /// Sends a <see cref="Batch"/> of messages to Service Bus.
31-
// /// </summary>
32-
// Task SendAsync(Batch batch);
28+
// TODO: extract methods into this interface for the next major version
29+
// /// <summary>
30+
// /// Sends a <see cref="Batch"/> of messages to Service Bus.
31+
// /// </summary>
32+
// Task SendAsync(Batch batch);
33+
// /// <summary>
34+
// /// Create a new <see cref="Batch"/> setting maximum size to the maximum message size allowed by the underlying namespace.
35+
// /// </summary>
36+
// Task<Batch> CreateBatch();
3337

3438
/// <summary>
3539
/// Schedules a message to appear on Service Bus.

src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class MessageSender : ClientEntity, IMessageSender
4343
readonly ServiceBusDiagnosticSource diagnosticSource;
4444
readonly bool isViaSender;
4545
readonly string transferDestinationPath;
46+
private ulong maxMessageSize = 0;
4647

4748
/// <summary>
4849
/// Creates a new AMQP MessageSender.
@@ -308,6 +309,40 @@ public async Task SendAsync(Batch batch)
308309
// MessagingEventSource.Log.MessageSendStop(this.ClientId);
309310
}
310311

312+
/// <summary>
313+
/// Create a new <see cref="Batch"/> setting maximum size to the maximum message size allowed by the underlying namespace.
314+
/// </summary>
315+
public async Task<Batch> CreateBatch()
316+
{
317+
if (maxMessageSize != 0)
318+
{
319+
return new Batch(maxMessageSize);
320+
}
321+
322+
var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true);
323+
SendingAmqpLink amqpLink = null;
324+
try
325+
{
326+
if (!this.SendLinkManager.TryGetOpenedObject(out amqpLink))
327+
{
328+
amqpLink = await this.SendLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
329+
}
330+
331+
if (!amqpLink.Settings.MaxMessageSize.HasValue)
332+
{
333+
throw new Exception("Broker didn't provide maximum message size. Batch requires maximum message size to operate.");
334+
}
335+
336+
maxMessageSize = amqpLink.Settings.MaxMessageSize.Value;
337+
338+
return new Batch(maxMessageSize);
339+
}
340+
catch (Exception exception)
341+
{
342+
throw AmqpExceptionHelper.GetClientException(exception, amqpLink?.GetTrackingId(), null, amqpLink?.Session.IsClosing() ?? false);
343+
}
344+
}
345+
311346
/// <summary>
312347
/// Schedules a message to appear on Service Bus at a later time.
313348
/// </summary>

src/Microsoft.Azure.ServiceBus/QueueClient.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,17 @@ public Task SendAsync(Batch batch)
359359
return this.innerSender.SendAsync(batch);
360360
}
361361

362+
/// <summary>
363+
/// Create a new <see cref="Batch"/> setting maximum size to the maximum message size allowed by the underlying namespace.
364+
/// </summary>
365+
public Task<Batch> CreateBatch()
366+
{
367+
this.ThrowIfClosed();
368+
369+
return this.innerSender.CreateBatch();
370+
}
371+
372+
362373
/// <summary>
363374
/// Completes a <see cref="Message"/> using its lock token. This will delete the message from the queue.
364375
/// </summary>

src/Microsoft.Azure.ServiceBus/TopicClient.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,16 @@ public Task SendAsync(Batch batch)
194194
return this.innerSender.SendAsync(batch);
195195
}
196196

197+
/// <summary>
198+
/// Create a new <see cref="Batch"/> setting maximum size to the maximum message size allowed by the underlying namespace.
199+
/// </summary>
200+
public Task<Batch> CreateBatch()
201+
{
202+
this.ThrowIfClosed();
203+
204+
return this.innerSender.CreateBatch();
205+
}
206+
197207
/// <summary>
198208
/// Schedules a message to appear on Service Bus at a later time.
199209
/// </summary>

test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ namespace Microsoft.Azure.ServiceBus
227227
public System.Threading.Tasks.Task AbandonAsync(string lockToken, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null) { }
228228
public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { }
229229
public System.Threading.Tasks.Task CompleteAsync(string lockToken) { }
230+
public System.Threading.Tasks.Task<Microsoft.Azure.ServiceBus.Core.Batch> CreateBatch() { }
230231
public System.Threading.Tasks.Task DeadLetterAsync(string lockToken, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null) { }
231232
public System.Threading.Tasks.Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null) { }
232233
protected override System.Threading.Tasks.Task OnClosingAsync() { }
@@ -449,6 +450,7 @@ namespace Microsoft.Azure.ServiceBus
449450
public override Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; }
450451
public string TopicName { get; }
451452
public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { }
453+
public System.Threading.Tasks.Task<Microsoft.Azure.ServiceBus.Core.Batch> CreateBatch() { }
452454
protected override System.Threading.Tasks.Task OnClosingAsync() { }
453455
public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { }
454456
public System.Threading.Tasks.Task<long> ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { }
@@ -482,7 +484,7 @@ namespace Microsoft.Azure.ServiceBus.Core
482484
[System.Diagnostics.DebuggerDisplayAttribute("{DebuggerDisplay,nq}")]
483485
public class Batch : System.IDisposable
484486
{
485-
public Batch(long maximumBatchSize) { }
487+
public Batch(ulong maximumBatchSize) { }
486488
public int Length { get; }
487489
public void Dispose() { }
488490
public Microsoft.Azure.Amqp.AmqpMessage ToAmqpMessage() { }
@@ -584,6 +586,7 @@ namespace Microsoft.Azure.ServiceBus.Core
584586
public override Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; }
585587
public string TransferDestinationPath { get; }
586588
public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { }
589+
public System.Threading.Tasks.Task<Microsoft.Azure.ServiceBus.Core.Batch> CreateBatch() { }
587590
protected override System.Threading.Tasks.Task OnClosingAsync() { }
588591
public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { }
589592
public System.Threading.Tasks.Task<long> ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { }

test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public void Should_show_reflect_property_in_batch_size()
6767

6868
batch.TryAdd(message);
6969

70-
Assert.Equal(24, batch.Size);
70+
Assert.Equal((ulong)24, batch.Size);
7171
}
7272

7373
using (var batch = new Batch(100))
@@ -77,7 +77,7 @@ public void Should_show_reflect_property_in_batch_size()
7777

7878
batch.TryAdd(message);
7979

80-
Assert.Equal(45, batch.Size);
80+
Assert.Equal((ulong)45, batch.Size);
8181
}
8282
}
8383
}

test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -463,25 +463,31 @@ public async Task ClientsUseGlobalConnectionCloseFirstClientSecoundClientShouldS
463463

464464
[Fact]
465465
[DisplayTestMethodName]
466-
public async Task Sending_batch_with_properties()
466+
public async Task Sending_batch()
467467
{
468468
var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName);
469469
var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName, receiveMode: ReceiveMode.ReceiveAndDelete);
470470
try
471471
{
472-
var message = new Message("Hello Neeraj".GetBytes());
473-
message.UserProperties["custom"] = "value";
472+
var message1 = new Message("Hello Neeraj".GetBytes());
473+
var message2 = new Message("from".GetBytes());
474+
var message3 = new Message("Sean Feldman".GetBytes());
474475

475476
var batch = new Batch(100);
476-
Assert.True(batch.TryAdd(message), "Couldn't add first message");
477+
Assert.True(batch.TryAdd(message1), "Couldn't add first message");
478+
Assert.True(batch.TryAdd(message2), "Couldn't add second message");
479+
Assert.False(batch.TryAdd(message3), "Shouldn't be able to add third message");
477480
await sender.SendAsync(batch);
478481
batch.Dispose();
479482
await sender.CloseAsync();
480483

481-
var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 1);
482-
var receivedMessage = receivedMessages.FirstOrDefault();
483-
Assert.NotNull(receivedMessage);
484-
Assert.Equal("value", receivedMessage.UserProperties["custom"]);
484+
var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 2);
485+
var bodies = receivedMessages.Select(m => m.Body.GetString());
486+
var bodiesArray = bodies as string[] ?? bodies.ToArray();
487+
Assert.True(bodiesArray.Contains("Hello Neeraj") && bodiesArray.Contains("from"));
488+
489+
var extraMessage = await TestUtility.PeekMessageAsync(receiver);
490+
Assert.True(extraMessage == null, $"Should not have any messages other than the two, but an extra message is found. Body='{extraMessage?.Body.GetString()}'");
485491
}
486492
finally
487493
{
@@ -492,37 +498,50 @@ public async Task Sending_batch_with_properties()
492498

493499
[Fact]
494500
[DisplayTestMethodName]
495-
public async Task Sending_batch()
501+
public async Task Sending_batch_with_properties()
496502
{
497503
var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName);
498504
var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName, receiveMode: ReceiveMode.ReceiveAndDelete);
499505
try
500506
{
501-
var message1 = new Message("Hello Neeraj".GetBytes());
502-
var message2 = new Message("from".GetBytes());
503-
var message3 = new Message("Sean Feldman".GetBytes());
507+
var message = new Message("Hello Neeraj".GetBytes());
508+
message.UserProperties["custom"] = "value";
504509

505510
var batch = new Batch(100);
506-
Assert.True(batch.TryAdd(message1), "Couldn't add first message");
507-
Assert.True(batch.TryAdd(message2), "Couldn't add second message");
508-
Assert.False(batch.TryAdd(message3), "Shouldn't be able to add third message");
511+
Assert.True(batch.TryAdd(message), "Couldn't add message");
509512
await sender.SendAsync(batch);
510513
batch.Dispose();
511514
await sender.CloseAsync();
512515

513-
var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 2);
514-
var bodies = receivedMessages.Select(m => m.Body.GetString());
515-
var bodiesArray = bodies as string[] ?? bodies.ToArray();
516-
Assert.True(bodiesArray.Contains("Hello Neeraj") && bodiesArray.Contains("from"));
517-
518-
var extraMessage = await TestUtility.PeekMessageAsync(receiver);
519-
Assert.True(extraMessage == null, $"Should not have any messages other than the two, but an extra message is found. Body='{extraMessage?.Body.GetString()}'");
516+
var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 1);
517+
var receivedMessage = receivedMessages.FirstOrDefault();
518+
Assert.NotNull(receivedMessage);
519+
Assert.Equal("value", receivedMessage.UserProperties["custom"]);
520520
}
521521
finally
522522
{
523523
await sender.CloseAsync().ConfigureAwait(false);
524524
await receiver.CloseAsync().ConfigureAwait(false);
525525
}
526526
}
527+
528+
[Fact]
529+
[DisplayTestMethodName]
530+
public async Task Batch_should_have_maximum_size_set()
531+
{
532+
var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName);
533+
try
534+
{
535+
using (var batch = await sender.CreateBatch())
536+
{
537+
Assert.True(batch.maximumBatchSize == 256 * 1024 || batch.maximumBatchSize == 1024 * 1024,
538+
$"Maximum batch size was expected to be 256KB or 1MB, but it wasn't. Reported size: {batch.maximumBatchSize}");
539+
}
540+
}
541+
finally
542+
{
543+
await sender.CloseAsync().ConfigureAwait(false);
544+
}
545+
}
527546
}
528547
}

0 commit comments

Comments
 (0)