diff --git a/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs b/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs index 27e32d6d..8793d1ae 100644 --- a/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs +++ b/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs @@ -23,9 +23,9 @@ static class AmqpMessageConverter const string EnqueueSequenceNumberName = "x-opt-enqueue-sequence-number"; const string LockedUntilName = "x-opt-locked-until"; const string PublisherName = "x-opt-publisher"; - const string PartitionKeyName = "x-opt-partition-key"; const string PartitionIdName = "x-opt-partition-id"; - const string ViaPartitionKeyName = "x-opt-via-partition-key"; + internal const string PartitionKeyName = "x-opt-partition-key"; + internal const string ViaPartitionKeyName = "x-opt-via-partition-key"; const string DeadLetterSourceName = "x-opt-deadletter-source"; const string TimeSpanName = AmqpConstants.Vendor + ":timespan"; const string UriName = AmqpConstants.Vendor + ":uri"; @@ -641,7 +641,7 @@ static ArraySegment StreamToBytes(Stream stream) return buffer; } - private static Data ToData(AmqpMessage message) + internal static Data ToData(AmqpMessage message) { ArraySegment[] payload = message.GetPayload(); var buffer = new BufferListStream(payload); diff --git a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs new file mode 100644 index 00000000..cdf06d5e --- /dev/null +++ b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs @@ -0,0 +1,148 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Threading.Tasks; + +namespace Microsoft.Azure.ServiceBus.Core +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using Microsoft.Azure.Amqp; + using Microsoft.Azure.Amqp.Framing; + using Microsoft.Azure.ServiceBus.Amqp; + using Microsoft.Azure.ServiceBus.Diagnostics; + + [DebuggerDisplay("{" + nameof(DebuggerDisplay) + ",nq}")] + public class Batch : IDisposable + { + internal readonly ulong maximumBatchSize; + private readonly Func> pluginsCallback; + private AmqpMessage firstMessage; + private readonly List datas; + private AmqpMessage result; + private (string messageId, string sessionId, string partitionKey, string viaPartitionKey) originalMessageData; + + /// + /// Construct a new batch with a maximum batch size and outgoing plugins callback. + /// + /// To construct a batch at run-time, use , , or . + /// Use this constructor for testing and custom implementations. + /// + /// + /// Maximum batch size allowed for batch. + /// Plugins callback to invoke on outgoing messages regisered with batch. + public Batch(ulong maximumBatchSize, Func> pluginsCallback) + { + this.maximumBatchSize = maximumBatchSize; + this.pluginsCallback = pluginsCallback; + this.datas = new List(); + this.result = AmqpMessage.Create(datas); + OriginalMessageList = new List(); + } + + /// + /// Add to the batch if the overall size of the batch with the added message is not exceeding the batch maximum. + /// + /// to add to the batch. + /// + public async Task TryAdd(Message message) + { + ThrowIfDisposed(); + + message.VerifyMessageIsNotPreviouslyReceived(); + + var processedMessage = await pluginsCallback(message); + + var amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(processedMessage); + + if (firstMessage == null) + { + originalMessageData = (processedMessage.MessageId, processedMessage.SessionId, processedMessage.PartitionKey, processedMessage.ViaPartitionKey); + firstMessage = amqpMessage; + } + + var data = AmqpMessageConverter.ToData(amqpMessage); + datas.Add(data); + + if (Size <= maximumBatchSize) + { + OriginalMessageList.Add(message); + return true; + } + + datas.Remove(data); + return false; + + } + + /// + /// Number of messages in batch. + /// + public int Length => datas.Count; + + internal ulong Size => (ulong) result.SerializedMessageSize; + + + /// + /// Convert batch to AMQP message. + /// + /// + public AmqpMessage ToAmqpMessage() + { + ThrowIfDisposed(); + + if (datas.Count == 1) + { + firstMessage.Batchable = true; + return firstMessage; + } + + if (originalMessageData.messageId != null) + { + result.Properties.MessageId = originalMessageData.messageId; + } + + if (originalMessageData.sessionId != null) + { + result.Properties.GroupId = originalMessageData.sessionId; + } + + if (originalMessageData.partitionKey != null) + { + result.MessageAnnotations.Map[AmqpMessageConverter.PartitionKeyName] = originalMessageData.partitionKey; + } + + if (originalMessageData.viaPartitionKey != null) + { + result.MessageAnnotations.Map[AmqpMessageConverter.ViaPartitionKeyName] = originalMessageData.viaPartitionKey; + } + + result.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat; + result.Batchable = true; + return result; + } + + public void Dispose() + { + // TODO: review if there's anything else to do + firstMessage?.Dispose(); + result?.Dispose(); + + datas.Clear(); + OriginalMessageList.Clear(); + } + + private void ThrowIfDisposed() + { + if (result == null) + { + throw new Exception("Batch is has been disposed and cannot be re-used."); + } + } + + internal List OriginalMessageList { get; } + + private string DebuggerDisplay => $"Batch: size={Size} message count={datas.Count} maximum size={maximumBatchSize}"; + } +} \ No newline at end of file diff --git a/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs b/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs index aed01c9f..69aa5ab2 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs @@ -25,6 +25,16 @@ public interface ISenderClient : IClientEntity /// Task SendAsync(IList messageList); + // TODO: extract methods into this interface for the next major version + // /// + // /// Sends a of messages to Service Bus. + // /// + // Task SendAsync(Batch batch); + // /// + // /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. + // /// + // Task CreateBatch(); + /// /// Schedules a message to appear on Service Bus. /// diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index 72613d65..a0ca7a7b 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -15,6 +15,8 @@ namespace Microsoft.Azure.ServiceBus.Core using Microsoft.Azure.Amqp.Framing; using Microsoft.Azure.ServiceBus.Amqp; using Microsoft.Azure.ServiceBus.Primitives; + using Microsoft.Azure.ServiceBus.Diagnostics; + /// /// The MessageSender can be used to send messages to Queues or Topics. @@ -41,6 +43,7 @@ public class MessageSender : ClientEntity, IMessageSender readonly ServiceBusDiagnosticSource diagnosticSource; readonly bool isViaSender; readonly string transferDestinationPath; + private ulong maxMessageSize = 0; /// /// Creates a new AMQP MessageSender. @@ -236,7 +239,7 @@ public async Task SendAsync(IList messageList) { this.ThrowIfClosed(); - var count = MessageSender.ValidateMessages(messageList); + var count = MessageSender.VerifyMessagesAreNotPreviouslyReceived(messageList); MessagingEventSource.Log.MessageSendStart(this.ClientId, count); bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); @@ -247,7 +250,7 @@ public async Task SendAsync(IList messageList) { var processedMessages = await this.ProcessMessages(messageList).ConfigureAwait(false); - sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(processedMessages), this.OperationTimeout); + sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(() => AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(processedMessages)), this.OperationTimeout); await sendTask.ConfigureAwait(false); } catch (Exception exception) @@ -268,6 +271,78 @@ public async Task SendAsync(IList messageList) MessagingEventSource.Log.MessageSendStop(this.ClientId); } + /// + /// Sends a of messages to Service Bus. + /// + public async Task SendAsync(Batch batch) + { + this.ThrowIfClosed(); + + MessagingEventSource.Log.MessageSendStart(this.ClientId, batch.Length); + + var isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); + // TODO: diagnostics (Start/Stop) is currently not possible. Requires change in how Diagnostics works. + // See https://github.com/SeanFeldman/azure-service-bus-dotnet/pull/1#issuecomment-415515524 for details. + var activity = isDiagnosticSourceEnabled ? this.diagnosticSource.SendStart(batch.OriginalMessageList) : null; + Task sendTask; + + try + { + sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(batch.ToAmqpMessage), this.OperationTimeout); + await sendTask.ConfigureAwait(false); + } + catch (Exception exception) + { + if (isDiagnosticSourceEnabled) + { + this.diagnosticSource.ReportException(exception); + } + + MessagingEventSource.Log.MessageSendException(this.ClientId, exception); + throw; + } + // finally + // { + this.diagnosticSource.SendStop(activity, batch.OriginalMessageList, sendTask?.Status); + // } + + MessagingEventSource.Log.MessageSendStop(this.ClientId); + } + + /// + /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. + /// + public async Task CreateBatch() + { + if (maxMessageSize != 0) + { + return new Batch(maxMessageSize, ProcessMessage); + } + + var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true); + SendingAmqpLink amqpLink = null; + try + { + if (!this.SendLinkManager.TryGetOpenedObject(out amqpLink)) + { + amqpLink = await this.SendLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false); + } + + if (!amqpLink.Settings.MaxMessageSize.HasValue) + { + throw new Exception("Broker didn't provide maximum message size. Batch requires maximum message size to operate."); + } + + maxMessageSize = amqpLink.Settings.MaxMessageSize.Value; + + return new Batch(maxMessageSize, ProcessMessage); + } + catch (Exception exception) + { + throw AmqpExceptionHelper.GetClientException(exception, amqpLink?.GetTrackingId(), null, amqpLink?.Session.IsClosing() ?? false); + } + } + /// /// Schedules a message to appear on Service Bus at a later time. /// @@ -296,7 +371,7 @@ public async Task ScheduleMessageAsync(Message message, DateTimeOffset sch } message.ScheduledEnqueueTimeUtc = scheduleEnqueueTimeUtc.UtcDateTime; - MessageSender.ValidateMessage(message); + message.VerifyMessageIsNotPreviouslyReceived(); MessagingEventSource.Log.ScheduleMessageStart(this.ClientId, scheduleEnqueueTimeUtc); long result = 0; @@ -445,7 +520,7 @@ protected override async Task OnClosingAsync() await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false); } - static int ValidateMessages(IList messageList) + static int VerifyMessagesAreNotPreviouslyReceived(IList messageList) { var count = 0; if (messageList == null) @@ -456,20 +531,12 @@ static int ValidateMessages(IList messageList) foreach (var message in messageList) { count++; - ValidateMessage(message); + message.VerifyMessageIsNotPreviouslyReceived(); } return count; } - static void ValidateMessage(Message message) - { - if (message.SystemProperties.IsLockTokenSet) - { - throw Fx.Exception.Argument(nameof(message), "Cannot send a message that was already received."); - } - } - static void CloseSession(SendingAmqpLink link) { // Note we close the session (which includes the link). @@ -521,10 +588,10 @@ async Task> ProcessMessages(IList messageList) return processedMessageList; } - async Task OnSendAsync(IList messageList) + async Task OnSendAsync(Func amqpMessageProvider) { var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true); - using (var amqpMessage = AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(messageList)) + using (var amqpMessage = amqpMessageProvider()) { SendingAmqpLink amqpLink = null; try diff --git a/src/Microsoft.Azure.ServiceBus/Extensions/MessageDiagnosticsExtensions.cs b/src/Microsoft.Azure.ServiceBus/Extensions/MessageDiagnosticsExtensions.cs index a7547c26..e2d6cead 100644 --- a/src/Microsoft.Azure.ServiceBus/Extensions/MessageDiagnosticsExtensions.cs +++ b/src/Microsoft.Azure.ServiceBus/Extensions/MessageDiagnosticsExtensions.cs @@ -6,6 +6,7 @@ namespace Microsoft.Azure.ServiceBus.Diagnostics using System; using System.Collections.Generic; using System.Diagnostics; + using Microsoft.Azure.ServiceBus.Primitives; public static class MessageExtensions { @@ -15,20 +16,20 @@ public static class MessageExtensions /// New with tracing context /// /// - /// Tracing context is used to correlate telemetry between producer and consumer and + /// Tracing context is used to correlate telemetry between producer and consumer and /// represented by 'Diagnostic-Id' and 'Correlation-Context' properties in . - /// + /// /// .NET SDK automatically injects context when sending message to the ServiceBus (if diagnostics is enabled by tracing system). - /// + /// /// /// 'Diagnostic-Id' uniquely identifies operation that enqueued message /// /// /// 'Correlation-Context' is comma separated list of sting key value pairs represeting optional context for the operation. /// - /// + /// /// If there is no tracing context in the message, this method returns without parent. - /// + /// /// Returned needs to be started before it can be used (see example below) /// /// @@ -39,7 +40,7 @@ public static class MessageExtensions /// var activity = message.ExtractActivity(); /// activity.Start(); /// Logger.LogInformation($"Message received, Id = {Activity.Current.Id}") - /// try + /// try /// { /// // process message /// } @@ -47,7 +48,7 @@ public static class MessageExtensions /// { /// Logger.LogError($"Exception {ex}, Id = {Activity.Current.Id}") /// } - /// finally + /// finally /// { /// activity.Stop(); /// // Activity is stopped, we no longer have it in Activity.Current, let's user activity now @@ -55,10 +56,10 @@ public static class MessageExtensions /// } /// } /// - /// - /// Note that every log is stamped with .Id, that could be used within + /// + /// Note that every log is stamped with .Id, that could be used within /// any nested method call (sync or async) - is an ambient context that flows with async method calls. - /// + /// /// public static Activity ExtractActivity(this Message message, string activityName = null) @@ -149,5 +150,14 @@ internal static bool TryExtractContext(this Message message, out IList @@ -349,6 +349,27 @@ public Task SendAsync(IList messageList) return this.InnerSender.SendAsync(messageList); } + /// + /// Sends a of messages to Service Bus. + /// + public Task SendAsync(Batch batch) + { + this.ThrowIfClosed(); + + return this.innerSender.SendAsync(batch); + } + + /// + /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. + /// + public Task CreateBatch() + { + this.ThrowIfClosed(); + + return this.innerSender.CreateBatch(); + } + + /// /// Completes a using its lock token. This will delete the message from the queue. /// diff --git a/src/Microsoft.Azure.ServiceBus/TopicClient.cs b/src/Microsoft.Azure.ServiceBus/TopicClient.cs index fe9ff6be..c833add1 100644 --- a/src/Microsoft.Azure.ServiceBus/TopicClient.cs +++ b/src/Microsoft.Azure.ServiceBus/TopicClient.cs @@ -164,7 +164,7 @@ internal MessageSender InnerSender return this.innerSender; } } - + ICbsTokenProvider CbsTokenProvider { get; } /// @@ -184,6 +184,26 @@ public Task SendAsync(IList messageList) return this.InnerSender.SendAsync(messageList); } + /// + /// Sends a of messages to Service Bus. + /// + public Task SendAsync(Batch batch) + { + this.ThrowIfClosed(); + + return this.innerSender.SendAsync(batch); + } + + /// + /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. + /// + public Task CreateBatch() + { + this.ThrowIfClosed(); + + return this.innerSender.CreateBatch(); + } + /// /// Schedules a message to appear on Service Bus at a later time. /// diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index 58825105..9ff1fe6c 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -227,6 +227,7 @@ namespace Microsoft.Azure.ServiceBus public System.Threading.Tasks.Task AbandonAsync(string lockToken, System.Collections.Generic.IDictionary propertiesToModify = null) { } public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { } public System.Threading.Tasks.Task CompleteAsync(string lockToken) { } + public System.Threading.Tasks.Task CreateBatch() { } public System.Threading.Tasks.Task DeadLetterAsync(string lockToken, System.Collections.Generic.IDictionary propertiesToModify = null) { } public System.Threading.Tasks.Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null) { } protected override System.Threading.Tasks.Task OnClosingAsync() { } @@ -238,6 +239,7 @@ namespace Microsoft.Azure.ServiceBus public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList messageList) { } + public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Core.Batch batch) { } public override void UnregisterPlugin(string serviceBusPluginName) { } } public sealed class QuotaExceededException : Microsoft.Azure.ServiceBus.ServiceBusException @@ -448,11 +450,13 @@ namespace Microsoft.Azure.ServiceBus public override Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; } public string TopicName { get; } public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { } + public System.Threading.Tasks.Task CreateBatch() { } protected override System.Threading.Tasks.Task OnClosingAsync() { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList messageList) { } + public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Core.Batch batch) { } public override void UnregisterPlugin(string serviceBusPluginName) { } } public enum TransportType @@ -477,6 +481,15 @@ namespace Microsoft.Azure.ServiceBus namespace Microsoft.Azure.ServiceBus.Core { + [System.Diagnostics.DebuggerDisplayAttribute("{DebuggerDisplay,nq}")] + public class Batch : System.IDisposable + { + public Batch(ulong maximumBatchSize, System.Func> pluginsCallback) { } + public int Length { get; } + public void Dispose() { } + public Microsoft.Azure.Amqp.AmqpMessage ToAmqpMessage() { } + public System.Threading.Tasks.Task TryAdd(Microsoft.Azure.ServiceBus.Message message) { } + } public interface IMessageReceiver : Microsoft.Azure.ServiceBus.Core.IReceiverClient, Microsoft.Azure.ServiceBus.IClientEntity { long LastPeekedSequenceNumber { get; } @@ -573,11 +586,13 @@ namespace Microsoft.Azure.ServiceBus.Core public override Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; } public string TransferDestinationPath { get; } public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { } + public System.Threading.Tasks.Task CreateBatch() { } protected override System.Threading.Tasks.Task OnClosingAsync() { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList messageList) { } + public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Core.Batch batch) { } public override void UnregisterPlugin(string serviceBusPluginName) { } } public abstract class ServiceBusPlugin diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/Diagnostics/QueueClientDiagnosticsTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/Diagnostics/QueueClientDiagnosticsTests.cs index 363203b1..e7106704 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/Diagnostics/QueueClientDiagnosticsTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/Diagnostics/QueueClientDiagnosticsTests.cs @@ -309,6 +309,40 @@ async Task BatchSendReceiveFireEvents() Assert.True(this.events.IsEmpty); } + [Fact] + [DisplayTestMethodName] + async Task BatchSendWithBatchReceiveFireEvents() + { + this.queueClient = new QueueClient(TestUtility.NamespaceConnectionString, TestConstants.NonPartitionedQueueName, + ReceiveMode.ReceiveAndDelete); + + this.listener.Enable( (name, queuName, arg) => name.Contains("Send") || name.Contains("Receive") ); + await TestUtility.SendMessagesUsingBatchAsync(this.queueClient.InnerSender, 5); + var messages = await TestUtility.ReceiveMessagesAsync(this.queueClient.InnerReceiver, 5); + + Assert.True(this.events.TryDequeue(out var sendStart1)); + AssertSendStart(sendStart1.eventName, sendStart1.payload, sendStart1.activity, null, 5); + + Assert.True(this.events.TryDequeue(out var sendStop1)); + AssertSendStop(sendStop1.eventName, sendStop1.payload, sendStop1.activity, sendStop1.activity, 5); + + int receivedStopCount = 0; + string relatedTo = ""; + while (this.events.TryDequeue(out var receiveStart)) + { + var startCount = AssertReceiveStart(receiveStart.eventName, receiveStart.payload, receiveStart.activity, -1); + + Assert.True(this.events.TryDequeue(out var receiveStop)); + receivedStopCount += AssertReceiveStop(receiveStop.eventName, receiveStop.payload, receiveStop.activity, receiveStart.activity, null, startCount, -1); + relatedTo += receiveStop.activity.Tags.Single(t => t.Key == "RelatedTo").Value; + } + + Assert.Equal(5, receivedStopCount); + Assert.Contains(sendStart1.activity.Id, relatedTo); + + Assert.True(this.events.IsEmpty); + } + [Fact] [DisplayTestMethodName] async Task PeekFireEvents() diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs new file mode 100644 index 00000000..5ff743d9 --- /dev/null +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs @@ -0,0 +1,88 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Threading.Tasks; + +namespace Microsoft.Azure.ServiceBus.UnitTests.Primitives +{ + using System; + using System.Text; + using Microsoft.Azure.ServiceBus.Core; + using Xunit; + + public class BatchTests + { + private Func> fakePluginsCallback = Task.FromResult; + + [Fact] + public async Task Should_return_false_when_is_about_to_exceed_max_batch_size() + { + using (var batch = new Batch(1, fakePluginsCallback)) + { + var wasAdded = await batch.TryAdd(new Message(Encoding.UTF8.GetBytes("hello"))); + Assert.False(wasAdded, "Message should not have been added, but it was."); + } + } + + [Fact] + public void Should_throw_if_batch_disposed() + { + using (var batch = new Batch(1, fakePluginsCallback)) + { + batch.Dispose(); + Assert.ThrowsAsync(() => batch.TryAdd(new Message())); + } + } + + [Fact] + public void Should_throw_when_trying_to_add_an_already_received_message_to_batch() + { + using (var batch = new Batch(100, fakePluginsCallback)) + { + var message = new Message("test".GetBytes()); + message.SystemProperties.LockTokenGuid = Guid.NewGuid(); + + Assert.ThrowsAsync(() => batch.TryAdd(message)); + } + } + + [Theory] + [InlineData(1)] + [InlineData(3)] + public async Task Should_report_how_many_messages_are_in_batch(int numberOfMessages) + { + using (var batch = new Batch(100, fakePluginsCallback)) + { + for (var i = 0; i < numberOfMessages; i++) + { + await batch.TryAdd(new Message()); + } + + Assert.Equal(numberOfMessages, batch.Length); + } + } + + [Fact] + public async Task Should_reflect_property_in_batch_size() + { + using (var batch = new Batch(100, fakePluginsCallback)) + { + var message = new Message(); + + await batch.TryAdd(message); + + Assert.Equal((ulong)24, batch.Size); + } + + using (var batch = new Batch(100, fakePluginsCallback)) + { + var message = new Message(); + message.UserProperties["custom"] = "value"; + + await batch.TryAdd(message); + + Assert.Equal((ulong)45, batch.Size); + } + } + } +} \ No newline at end of file diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs index 97f90db0..7942d72c 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs @@ -4,6 +4,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests { using System; + using System.Linq; using System.Text; using System.Collections.Generic; using System.Threading; @@ -13,9 +14,9 @@ namespace Microsoft.Azure.ServiceBus.UnitTests public class SenderReceiverTests : SenderReceiverClientTestBase { - private static TimeSpan TwoSeconds = TimeSpan.FromSeconds(2); + private static readonly TimeSpan TwoSeconds = TimeSpan.FromSeconds(2); - public static IEnumerable TestPermutations => new object[][] + public static IEnumerable TestPermutations => new[] { new object[] {TestConstants.NonPartitionedQueueName}, new object[] {TestConstants.PartitionedQueueName} @@ -376,7 +377,7 @@ public async Task ClientThrowsObjectDisposedExceptionWhenUserCloseConnectionAndW var recivedMessage = await receiver.ReceiveAsync().ConfigureAwait(false); Assert.True(Encoding.UTF8.GetString(recivedMessage.Body) == Encoding.UTF8.GetString(messageBody)); - + var connection = sender.ServiceBusConnection; Assert.Throws(() => new MessageSender(connection, TestConstants.PartitionedQueueName)); } @@ -413,7 +414,7 @@ public async Task SendMesageCloseConnectionCreateAnotherConnectionSendAgainMessa messageBody = Encoding.UTF8.GetBytes("Message 2"); message = new Message(messageBody); - await sender.SendAsync(message); + await sender.SendAsync(message); recivedMessage = await receiver.ReceiveAsync().ConfigureAwait(false); Assert.True(Encoding.UTF8.GetString(recivedMessage.Body) == Encoding.UTF8.GetString(messageBody)); @@ -459,5 +460,131 @@ public async Task ClientsUseGlobalConnectionCloseFirstClientSecoundClientShouldS await receiver.CloseAsync().ConfigureAwait(false); } } + + [Fact] + [DisplayTestMethodName] + public async Task Sending_batch() + { + var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName); + var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName, receiveMode: ReceiveMode.ReceiveAndDelete); + try + { + var message1 = new Message("Hello Neeraj".GetBytes()); + var message2 = new Message("from".GetBytes()); + var message3 = new Message("Sean Feldman".GetBytes()); + + var batch = new Batch(100, Task.FromResult); + Assert.True(await batch.TryAdd(message1), "Couldn't add first message"); + Assert.True(await batch.TryAdd(message2), "Couldn't add second message"); + Assert.False(await batch.TryAdd(message3), "Shouldn't be able to add third message"); + await sender.SendAsync(batch); + batch.Dispose(); + await sender.CloseAsync(); + + var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 2); + var bodies = receivedMessages.Select(m => m.Body.GetString()); + var bodiesArray = bodies as string[] ?? bodies.ToArray(); + Assert.True(bodiesArray.Contains("Hello Neeraj") && bodiesArray.Contains("from")); + + var extraMessage = await TestUtility.PeekMessageAsync(receiver); + Assert.True(extraMessage == null, $"Should not have any messages other than the two, but an extra message is found. Body='{extraMessage?.Body.GetString()}'"); + } + finally + { + await sender.CloseAsync().ConfigureAwait(false); + await receiver.CloseAsync().ConfigureAwait(false); + } + } + + [Fact] + [DisplayTestMethodName] + public async Task Sending_batch_with_properties() + { + var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName); + var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName, receiveMode: ReceiveMode.ReceiveAndDelete); + try + { + var message = new Message("Hello Neeraj".GetBytes()); + message.UserProperties["custom"] = "value"; + + var batch = new Batch(100, Task.FromResult); + Assert.True(await batch.TryAdd(message), "Couldn't add message"); + await sender.SendAsync(batch); + batch.Dispose(); + await sender.CloseAsync(); + + var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 1); + var receivedMessage = receivedMessages.FirstOrDefault(); + Assert.NotNull(receivedMessage); + Assert.Equal("value", receivedMessage.UserProperties["custom"]); + } + finally + { + await sender.CloseAsync().ConfigureAwait(false); + await receiver.CloseAsync().ConfigureAwait(false); + } + } + + [Fact] + [DisplayTestMethodName] + public async Task Batch_should_have_maximum_size_set() + { + var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName); + try + { + using (var batch = await sender.CreateBatch()) + { + Assert.True(batch.maximumBatchSize == 256 * 1024 || batch.maximumBatchSize == 1024 * 1024, + $"Maximum batch size was expected to be 256KB or 1MB, but it wasn't. Reported size: {batch.maximumBatchSize}"); + } + } + finally + { + await sender.CloseAsync().ConfigureAwait(false); + } + } + + [Fact] + [DisplayTestMethodName] + public async Task Batch_should_go_through_outgoing_plugins() + { + var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName); + var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName, receiveMode: ReceiveMode.ReceiveAndDelete); + + sender.RegisterPlugin(new RemoveVowelsPlugin()); + try + { + var batch = await sender.CreateBatch(); + await batch.TryAdd(new Message("Hello Neeraj".GetBytes())); + await batch.TryAdd(new Message("from".GetBytes())); + await batch.TryAdd(new Message("Sean Feldman".GetBytes())); + + await sender.SendAsync(batch); + batch.Dispose(); + await sender.CloseAsync(); + + var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 3); + var bodies = receivedMessages.Select(m => m.Body.GetString()); + var bodiesArray = bodies as string[] ?? bodies.ToArray(); + Assert.True(bodiesArray.Contains("Hll Nrj") && bodiesArray.Contains("frm") && bodiesArray.Contains("Sn Fldmn")); + } + finally + { + await sender.CloseAsync().ConfigureAwait(false); + await receiver.CloseAsync().ConfigureAwait(false); + } + } + + class RemoveVowelsPlugin : ServiceBusPlugin + { + public override string Name { get; } = nameof(RemoveVowelsPlugin); + + public override Task BeforeMessageSend(Message message) + { + message.Body = new string(message.Body.GetString().Where( x => "aeiouy".Contains(x) == false).ToArray()).GetBytes(); + return Task.FromResult(message); + } + } + } } \ No newline at end of file diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/TestUtility.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/TestUtility.cs index acaca126..b4d5331c 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/TestUtility.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/TestUtility.cs @@ -66,6 +66,25 @@ internal static async Task SendMessagesAsync(IMessageSender messageSender, int m await messageSender.SendAsync(messagesToSend); Log($"Sent {messageCount} messages"); } + internal static async Task SendMessagesUsingBatchAsync(IMessageSender messageSender, int messageCount) + { + if (messageCount == 0) + { + await Task.FromResult(false); + } + + var sender = (MessageSender)messageSender; + var batch = await sender.CreateBatch(); + for (var i = 0; i < messageCount; i++) + { + var message = new Message(Encoding.UTF8.GetBytes("test" + i)); + message.Label = "test" + i; + await batch.TryAdd(message); + } + + await sender.SendAsync(batch); + Log($"Sent {messageCount} messages"); + } internal static async Task> ReceiveMessagesAsync(IMessageReceiver messageReceiver, int messageCount) { @@ -98,7 +117,7 @@ internal static async Task> ReceiveDeferredMessagesAsync(IMessage var msg = await messageReceiver.ReceiveDeferredMessageAsync(sequenceNumber); if (msg != null) { - messagesToReturn.Add(msg); + messagesToReturn.Add(msg); } }