From 25002c75d5a66e37ef9bd8d70433fd00ce257a14 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Sun, 29 Jul 2018 01:05:41 -0600 Subject: [PATCH 01/15] Initial (simple) batching implementation --- .../Amqp/AmqpMessageConverter.cs | 9 +- src/Microsoft.Azure.ServiceBus/Core/Batch.cs | 121 ++++++++++++++++++ .../Core/MessageSender.cs | 41 +++++- .../Primitives/BatchTests.cs | 33 +++++ .../SenderReceiverTests.cs | 41 +++++- 5 files changed, 235 insertions(+), 10 deletions(-) create mode 100644 src/Microsoft.Azure.ServiceBus/Core/Batch.cs create mode 100644 test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs diff --git a/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs b/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs index 27e32d6d..81afa1de 100644 --- a/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs +++ b/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs @@ -1,6 +1,9 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System.Diagnostics; +using System.Linq; + namespace Microsoft.Azure.ServiceBus.Amqp { using System; @@ -23,9 +26,9 @@ static class AmqpMessageConverter const string EnqueueSequenceNumberName = "x-opt-enqueue-sequence-number"; const string LockedUntilName = "x-opt-locked-until"; const string PublisherName = "x-opt-publisher"; - const string PartitionKeyName = "x-opt-partition-key"; const string PartitionIdName = "x-opt-partition-id"; - const string ViaPartitionKeyName = "x-opt-via-partition-key"; + internal const string PartitionKeyName = "x-opt-partition-key"; + internal const string ViaPartitionKeyName = "x-opt-via-partition-key"; const string DeadLetterSourceName = "x-opt-deadletter-source"; const string TimeSpanName = AmqpConstants.Vendor + ":timespan"; const string UriName = AmqpConstants.Vendor + ":uri"; @@ -641,7 +644,7 @@ static ArraySegment StreamToBytes(Stream stream) return buffer; } - private static Data ToData(AmqpMessage message) + internal static Data ToData(AmqpMessage message) { ArraySegment[] payload = message.GetPayload(); var buffer = new BufferListStream(payload); diff --git a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs new file mode 100644 index 00000000..c13817fb --- /dev/null +++ b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs @@ -0,0 +1,121 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using Microsoft.Azure.Amqp; +using Microsoft.Azure.Amqp.Framing; +using Microsoft.Azure.ServiceBus.Amqp; + +namespace Microsoft.Azure.ServiceBus.Core +{ + [DebuggerDisplay("{" + nameof(DebuggerDisplay) + ",nq}")] + public class Batch : IDisposable + { + private readonly long maximumBatchSize; + private AmqpMessage firstMessage; + private readonly List datas; + private AmqpMessage result; + private (string messageId, string sessionId, string partitionKey, string viaPartitionKey) originalMessageData; + + /// + /// Construct a new batch with a maximum batch size. + /// + /// Maximum batch size allowed for batch. + public Batch(long maximumBatchSize) + { + this.maximumBatchSize = maximumBatchSize; + this.datas = new List(); + this.result = AmqpMessage.Create(datas); + } + + /// + /// Add to the batch if the overall size of the batch with the added message is not exceeding the batch maximum. + /// + /// to add to the batch. + /// + public bool TryAdd(Message message) + { + ThrowIfDisposed(); + + var amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(message); + + if (firstMessage == null) + { + originalMessageData = (message.MessageId, message.SessionId, message.PartitionKey, message.ViaPartitionKey); + firstMessage = amqpMessage; + } + + var data = AmqpMessageConverter.ToData(amqpMessage); + datas.Add(data); + + if (Size <= maximumBatchSize) + { + return true; + } + + datas.Remove(data); + return false; + + } + + private long Size => result.SerializedMessageSize; + + /// + /// Convert batch to AMQP message + /// + /// + public AmqpMessage ToAmqpMessage() + { + ThrowIfDisposed(); + + if (datas.Count == 1) + { + firstMessage.Batchable = true; + return firstMessage; + } + + if (originalMessageData.messageId != null) + { + result.Properties.MessageId = originalMessageData.messageId; + } + + if (originalMessageData.sessionId != null) + { + result.Properties.GroupId = originalMessageData.sessionId; + } + + if (originalMessageData.partitionKey != null) + { + result.MessageAnnotations.Map[AmqpMessageConverter.PartitionKeyName] = originalMessageData.partitionKey; + } + + if (originalMessageData.viaPartitionKey != null) + { + result.MessageAnnotations.Map[AmqpMessageConverter.ViaPartitionKeyName] = originalMessageData.viaPartitionKey; + } + + result.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat; + result.Batchable = true; + return result; + } + + public void Dispose() + { + // TODO: review if there's anything else to do + firstMessage?.Dispose(); + result?.Dispose(); + + firstMessage = null; + result = null; + } + + private void ThrowIfDisposed() + { + if (result == null) + { + throw new Exception("Batch is has been disposed and cannot be re-used."); + } + } + + private string DebuggerDisplay => $"Batch: size={Size} message count={datas.Count}"; + } +} \ No newline at end of file diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index 72613d65..eaebfc5e 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -247,7 +247,7 @@ public async Task SendAsync(IList messageList) { var processedMessages = await this.ProcessMessages(messageList).ConfigureAwait(false); - sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(processedMessages), this.OperationTimeout); + sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(() => AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(processedMessages)), this.OperationTimeout); await sendTask.ConfigureAwait(false); } catch (Exception exception) @@ -267,6 +267,41 @@ public async Task SendAsync(IList messageList) MessagingEventSource.Log.MessageSendStop(this.ClientId); } + public async Task SendAsync(Batch batch) + { + this.ThrowIfClosed(); + + //var count = MessageSender.ValidateMessages(messageList); +// MessagingEventSource.Log.MessageSendStart(this.ClientId, count); + +// var isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); +// var activity = isDiagnosticSourceEnabled ? this.diagnosticSource.SendStart(messageList) : null; + Task sendTask = null; + + try + { + //var processedMessages = await this.ProcessMessages(messageList).ConfigureAwait(false); + + sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(batch.ToAmqpMessage), this.OperationTimeout); + await sendTask.ConfigureAwait(false); + } + catch (Exception exception) + { +// if (isDiagnosticSourceEnabled) +// { +// this.diagnosticSource.ReportException(exception); +// } + + MessagingEventSource.Log.MessageSendException(this.ClientId, exception); + throw; + } + finally + { +// this.diagnosticSource.SendStop(activity, messageList, sendTask?.Status); + } + +// MessagingEventSource.Log.MessageSendStop(this.ClientId); + } /// /// Schedules a message to appear on Service Bus at a later time. @@ -521,10 +556,10 @@ async Task> ProcessMessages(IList messageList) return processedMessageList; } - async Task OnSendAsync(IList messageList) + async Task OnSendAsync(Func amqpMessageProvider) { var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true); - using (var amqpMessage = AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(messageList)) + using (var amqpMessage = amqpMessageProvider()) { SendingAmqpLink amqpLink = null; try diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs new file mode 100644 index 00000000..f77e9b54 --- /dev/null +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs @@ -0,0 +1,33 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Microsoft.Azure.ServiceBus.UnitTests.Primitives +{ + using System; + using System.Text; + using Microsoft.Azure.ServiceBus.Core; + using Xunit; + + public class BatchTests + { + [Fact] + public void Should_return_false_when_is_about_to_exceed_max_batch_size() + { + using (var batch = new Batch(1)) + { + var wasAdded = batch.TryAdd(new Message(Encoding.UTF8.GetBytes("hello"))); + Assert.False(wasAdded, "Message should not have been added, but it was."); + } + } + + [Fact] + public void Should_throw_if_batch_disposed() + { + using (var batch = new Batch(1)) + { + batch.Dispose(); + Assert.Throws(() => batch.TryAdd(new Message())); + } + } + } +} \ No newline at end of file diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs index 97f90db0..880ef5af 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs @@ -13,9 +13,9 @@ namespace Microsoft.Azure.ServiceBus.UnitTests public class SenderReceiverTests : SenderReceiverClientTestBase { - private static TimeSpan TwoSeconds = TimeSpan.FromSeconds(2); + private static readonly TimeSpan TwoSeconds = TimeSpan.FromSeconds(2); - public static IEnumerable TestPermutations => new object[][] + public static IEnumerable TestPermutations => new[] { new object[] {TestConstants.NonPartitionedQueueName}, new object[] {TestConstants.PartitionedQueueName} @@ -376,7 +376,7 @@ public async Task ClientThrowsObjectDisposedExceptionWhenUserCloseConnectionAndW var recivedMessage = await receiver.ReceiveAsync().ConfigureAwait(false); Assert.True(Encoding.UTF8.GetString(recivedMessage.Body) == Encoding.UTF8.GetString(messageBody)); - + var connection = sender.ServiceBusConnection; Assert.Throws(() => new MessageSender(connection, TestConstants.PartitionedQueueName)); } @@ -413,7 +413,7 @@ public async Task SendMesageCloseConnectionCreateAnotherConnectionSendAgainMessa messageBody = Encoding.UTF8.GetBytes("Message 2"); message = new Message(messageBody); - await sender.SendAsync(message); + await sender.SendAsync(message); recivedMessage = await receiver.ReceiveAsync().ConfigureAwait(false); Assert.True(Encoding.UTF8.GetString(recivedMessage.Body) == Encoding.UTF8.GetString(messageBody)); @@ -459,5 +459,38 @@ public async Task ClientsUseGlobalConnectionCloseFirstClientSecoundClientShouldS await receiver.CloseAsync().ConfigureAwait(false); } } + + [Fact] + [DisplayTestMethodName] + public async Task Sending_batch() + { + var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName); + var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName, receiveMode: ReceiveMode.ReceiveAndDelete); + try + { + var message1 = new Message(Encoding.UTF8.GetBytes("Hello Neeraj")); + var message2 = new Message(Encoding.UTF8.GetBytes("from")); + var message3 = new Message(Encoding.UTF8.GetBytes("Sean Feldman")); + + var batch = new Batch(100); + Assert.True(batch.TryAdd(message1), "Couldn't add first message"); + Assert.True(batch.TryAdd(message2), "Couldn't add second message"); + Assert.False(batch.TryAdd(message3), "Shouldn't be able to add third message"); + await sender.SendAsync(batch); + //batch.dispose() + await sender.CloseAsync(); + + var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 2); + Assert.Equal("Hello Neeraj", Encoding.UTF8.GetString(receivedMessages[0].Body)); + Assert.Equal("from", Encoding.UTF8.GetString(receivedMessages[1].Body)); + var extraMessage = await TestUtility.PeekMessageAsync(receiver); + Assert.True(extraMessage == null, "Should not have any messages other than the two, but an extra message is found"); + } + finally + { + await sender.CloseAsync().ConfigureAwait(false); + await receiver.CloseAsync().ConfigureAwait(false); + } + } } } \ No newline at end of file From 1f8dd51a1f284b017ac9ade082d62c5df726ab24 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Sun, 29 Jul 2018 01:38:30 -0600 Subject: [PATCH 02/15] Updating public API --- .../API/ApiApprovals.ApproveAzureServiceBus.approved.txt | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index 58825105..28b9f9b8 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -477,6 +477,14 @@ namespace Microsoft.Azure.ServiceBus namespace Microsoft.Azure.ServiceBus.Core { + [System.Diagnostics.DebuggerDisplayAttribute("{DebuggerDisplay,nq}")] + public class Batch : System.IDisposable + { + public Batch(long maximumBatchSize) { } + public void Dispose() { } + public Microsoft.Azure.Amqp.AmqpMessage ToAmqpMessage() { } + public bool TryAdd(Microsoft.Azure.ServiceBus.Message message) { } + } public interface IMessageReceiver : Microsoft.Azure.ServiceBus.Core.IReceiverClient, Microsoft.Azure.ServiceBus.IClientEntity { long LastPeekedSequenceNumber { get; } @@ -578,6 +586,7 @@ namespace Microsoft.Azure.ServiceBus.Core public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList messageList) { } + public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Core.Batch batch) { } public override void UnregisterPlugin(string serviceBusPluginName) { } } public abstract class ServiceBusPlugin From 418444bfdf4577bee386b7db91f7123428aa345b Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Sun, 29 Jul 2018 22:53:02 -0600 Subject: [PATCH 03/15] Modify test to remove reliance on order of messages --- .../SenderReceiverTests.cs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs index 880ef5af..6e7e2947 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System.Linq; + namespace Microsoft.Azure.ServiceBus.UnitTests { using System; @@ -477,12 +479,14 @@ public async Task Sending_batch() Assert.True(batch.TryAdd(message2), "Couldn't add second message"); Assert.False(batch.TryAdd(message3), "Shouldn't be able to add third message"); await sender.SendAsync(batch); - //batch.dispose() + batch.Dispose(); await sender.CloseAsync(); var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 2); - Assert.Equal("Hello Neeraj", Encoding.UTF8.GetString(receivedMessages[0].Body)); - Assert.Equal("from", Encoding.UTF8.GetString(receivedMessages[1].Body)); + var bodies = receivedMessages.Select(m => Encoding.UTF8.GetString(m.Body)); + Assert.Collection(bodies, item => Assert.Contains("Hello Neeraj", item), + item => Assert.Contains("from", item)); + var extraMessage = await TestUtility.PeekMessageAsync(receiver); Assert.True(extraMessage == null, "Should not have any messages other than the two, but an extra message is found"); } From 5d203f2c59395a16d57aee6c97c9a252e53ae26c Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Sun, 29 Jul 2018 23:23:10 -0600 Subject: [PATCH 04/15] Adding batched send to QueueClient and TopicClient --- .../Core/ISenderClient.cs | 6 ++++++ .../Core/MessageSender.cs | 4 ++++ src/Microsoft.Azure.ServiceBus/QueueClient.cs | 14 ++++++++++++-- src/Microsoft.Azure.ServiceBus/TopicClient.cs | 12 +++++++++++- ...piApprovals.ApproveAzureServiceBus.approved.txt | 2 ++ 5 files changed, 35 insertions(+), 3 deletions(-) diff --git a/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs b/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs index aed01c9f..c66a5e94 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs @@ -25,6 +25,12 @@ public interface ISenderClient : IClientEntity /// Task SendAsync(IList messageList); +// TODO: extract method into this interface for the next major version +// /// +// /// Sends a of messages to Service Bus. +// /// +// Task SendAsync(Batch batch); + /// /// Schedules a message to appear on Service Bus. /// diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index eaebfc5e..7ea9126c 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -267,6 +267,10 @@ public async Task SendAsync(IList messageList) MessagingEventSource.Log.MessageSendStop(this.ClientId); } + + /// + /// Sends a of messages to Service Bus. + /// public async Task SendAsync(Batch batch) { this.ThrowIfClosed(); diff --git a/src/Microsoft.Azure.ServiceBus/QueueClient.cs b/src/Microsoft.Azure.ServiceBus/QueueClient.cs index 145f5851..79b57124 100644 --- a/src/Microsoft.Azure.ServiceBus/QueueClient.cs +++ b/src/Microsoft.Azure.ServiceBus/QueueClient.cs @@ -89,7 +89,7 @@ public QueueClient(string connectionString, string entityPath, ReceiveMode recei { throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString); } - + this.OwnsConnection = true; } @@ -328,7 +328,7 @@ internal SessionPumpHost SessionPumpHost return this.sessionPumpHost; } } - + ICbsTokenProvider CbsTokenProvider { get; } /// @@ -349,6 +349,16 @@ public Task SendAsync(IList messageList) return this.InnerSender.SendAsync(messageList); } + /// + /// Sends a of messages to Service Bus. + /// + public Task SendAsync(Batch batch) + { + this.ThrowIfClosed(); + + return this.innerSender.SendAsync(batch); + } + /// /// Completes a using its lock token. This will delete the message from the queue. /// diff --git a/src/Microsoft.Azure.ServiceBus/TopicClient.cs b/src/Microsoft.Azure.ServiceBus/TopicClient.cs index fe9ff6be..6a1d4b18 100644 --- a/src/Microsoft.Azure.ServiceBus/TopicClient.cs +++ b/src/Microsoft.Azure.ServiceBus/TopicClient.cs @@ -164,7 +164,7 @@ internal MessageSender InnerSender return this.innerSender; } } - + ICbsTokenProvider CbsTokenProvider { get; } /// @@ -184,6 +184,16 @@ public Task SendAsync(IList messageList) return this.InnerSender.SendAsync(messageList); } + /// + /// Sends a of messages to Service Bus. + /// + public Task SendAsync(Batch batch) + { + this.ThrowIfClosed(); + + return this.innerSender.SendAsync(batch); + } + /// /// Schedules a message to appear on Service Bus at a later time. /// diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index 28b9f9b8..3adcebf3 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -238,6 +238,7 @@ namespace Microsoft.Azure.ServiceBus public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList messageList) { } + public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Core.Batch batch) { } public override void UnregisterPlugin(string serviceBusPluginName) { } } public sealed class QuotaExceededException : Microsoft.Azure.ServiceBus.ServiceBusException @@ -453,6 +454,7 @@ namespace Microsoft.Azure.ServiceBus public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList messageList) { } + public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Core.Batch batch) { } public override void UnregisterPlugin(string serviceBusPluginName) { } } public enum TransportType From 379594756233d05c17f9761f512f26b569cb4670 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Tue, 31 Jul 2018 00:56:26 -0600 Subject: [PATCH 05/15] Remove order implied by Assert.Collection() Adding extra info to identify why the test is failing on full framework --- .../SenderReceiverTests.cs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs index 6e7e2947..c287873e 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs @@ -1,11 +1,10 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using System.Linq; - namespace Microsoft.Azure.ServiceBus.UnitTests { using System; + using System.Linq; using System.Text; using System.Collections.Generic; using System.Threading; @@ -470,9 +469,9 @@ public async Task Sending_batch() var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName, receiveMode: ReceiveMode.ReceiveAndDelete); try { - var message1 = new Message(Encoding.UTF8.GetBytes("Hello Neeraj")); - var message2 = new Message(Encoding.UTF8.GetBytes("from")); - var message3 = new Message(Encoding.UTF8.GetBytes("Sean Feldman")); + var message1 = new Message("Hello Neeraj".GetBytes()); + var message2 = new Message("from".GetBytes()); + var message3 = new Message("Sean Feldman".GetBytes()); var batch = new Batch(100); Assert.True(batch.TryAdd(message1), "Couldn't add first message"); @@ -483,12 +482,12 @@ public async Task Sending_batch() await sender.CloseAsync(); var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 2); - var bodies = receivedMessages.Select(m => Encoding.UTF8.GetString(m.Body)); - Assert.Collection(bodies, item => Assert.Contains("Hello Neeraj", item), - item => Assert.Contains("from", item)); + var bodies = receivedMessages.Select(m => m.Body.GetString()); + var bodiesArray = bodies as string[] ?? bodies.ToArray(); + Assert.True(bodiesArray.Contains("Hello Neeraj") && bodiesArray.Contains("from")); var extraMessage = await TestUtility.PeekMessageAsync(receiver); - Assert.True(extraMessage == null, "Should not have any messages other than the two, but an extra message is found"); + Assert.True(extraMessage == null, $"Should not have any messages other than the two, but an extra message is found. Body='{extraMessage?.Body.GetString()}'"); } finally { From f18ec48d8bdb173cf4808ee07c4634203c3bad13 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Tue, 31 Jul 2018 00:07:02 -0600 Subject: [PATCH 06/15] Validate received messages cannot be added to Batch implemented --- src/Microsoft.Azure.ServiceBus/Core/Batch.cs | 3 ++ .../Core/MessageSender.cs | 20 +++++-------- .../MessageDiagnosticsExtensions.cs | 30 ++++++++++++------- .../Primitives/BatchTests.cs | 12 ++++++++ 4 files changed, 42 insertions(+), 23 deletions(-) diff --git a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs index c13817fb..137febc5 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs @@ -4,6 +4,7 @@ using Microsoft.Azure.Amqp; using Microsoft.Azure.Amqp.Framing; using Microsoft.Azure.ServiceBus.Amqp; +using Microsoft.Azure.ServiceBus.Diagnostics; namespace Microsoft.Azure.ServiceBus.Core { @@ -36,6 +37,8 @@ public bool TryAdd(Message message) { ThrowIfDisposed(); + message.VerifyMessageIsNotPreviouslyReceived(); + var amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(message); if (firstMessage == null) diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index 7ea9126c..f8ea3fd8 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using Microsoft.Azure.ServiceBus.Diagnostics; + namespace Microsoft.Azure.ServiceBus.Core { using System; @@ -16,6 +18,7 @@ namespace Microsoft.Azure.ServiceBus.Core using Microsoft.Azure.ServiceBus.Amqp; using Microsoft.Azure.ServiceBus.Primitives; + /// /// The MessageSender can be used to send messages to Queues or Topics. /// @@ -236,7 +239,7 @@ public async Task SendAsync(IList messageList) { this.ThrowIfClosed(); - var count = MessageSender.ValidateMessages(messageList); + var count = MessageSender.VerifyMessagesAreNotPreviouslyReceived(messageList); MessagingEventSource.Log.MessageSendStart(this.ClientId, count); bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); @@ -275,7 +278,6 @@ public async Task SendAsync(Batch batch) { this.ThrowIfClosed(); - //var count = MessageSender.ValidateMessages(messageList); // MessagingEventSource.Log.MessageSendStart(this.ClientId, count); // var isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); @@ -335,7 +337,7 @@ public async Task ScheduleMessageAsync(Message message, DateTimeOffset sch } message.ScheduledEnqueueTimeUtc = scheduleEnqueueTimeUtc.UtcDateTime; - MessageSender.ValidateMessage(message); + message.VerifyMessageIsNotPreviouslyReceived(); MessagingEventSource.Log.ScheduleMessageStart(this.ClientId, scheduleEnqueueTimeUtc); long result = 0; @@ -484,7 +486,7 @@ protected override async Task OnClosingAsync() await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false); } - static int ValidateMessages(IList messageList) + static int VerifyMessagesAreNotPreviouslyReceived(IList messageList) { var count = 0; if (messageList == null) @@ -495,20 +497,12 @@ static int ValidateMessages(IList messageList) foreach (var message in messageList) { count++; - ValidateMessage(message); + message.VerifyMessageIsNotPreviouslyReceived(); } return count; } - static void ValidateMessage(Message message) - { - if (message.SystemProperties.IsLockTokenSet) - { - throw Fx.Exception.Argument(nameof(message), "Cannot send a message that was already received."); - } - } - static void CloseSession(SendingAmqpLink link) { // Note we close the session (which includes the link). diff --git a/src/Microsoft.Azure.ServiceBus/Extensions/MessageDiagnosticsExtensions.cs b/src/Microsoft.Azure.ServiceBus/Extensions/MessageDiagnosticsExtensions.cs index a7547c26..e2d6cead 100644 --- a/src/Microsoft.Azure.ServiceBus/Extensions/MessageDiagnosticsExtensions.cs +++ b/src/Microsoft.Azure.ServiceBus/Extensions/MessageDiagnosticsExtensions.cs @@ -6,6 +6,7 @@ namespace Microsoft.Azure.ServiceBus.Diagnostics using System; using System.Collections.Generic; using System.Diagnostics; + using Microsoft.Azure.ServiceBus.Primitives; public static class MessageExtensions { @@ -15,20 +16,20 @@ public static class MessageExtensions /// New with tracing context /// /// - /// Tracing context is used to correlate telemetry between producer and consumer and + /// Tracing context is used to correlate telemetry between producer and consumer and /// represented by 'Diagnostic-Id' and 'Correlation-Context' properties in . - /// + /// /// .NET SDK automatically injects context when sending message to the ServiceBus (if diagnostics is enabled by tracing system). - /// + /// /// /// 'Diagnostic-Id' uniquely identifies operation that enqueued message /// /// /// 'Correlation-Context' is comma separated list of sting key value pairs represeting optional context for the operation. /// - /// + /// /// If there is no tracing context in the message, this method returns without parent. - /// + /// /// Returned needs to be started before it can be used (see example below) /// /// @@ -39,7 +40,7 @@ public static class MessageExtensions /// var activity = message.ExtractActivity(); /// activity.Start(); /// Logger.LogInformation($"Message received, Id = {Activity.Current.Id}") - /// try + /// try /// { /// // process message /// } @@ -47,7 +48,7 @@ public static class MessageExtensions /// { /// Logger.LogError($"Exception {ex}, Id = {Activity.Current.Id}") /// } - /// finally + /// finally /// { /// activity.Stop(); /// // Activity is stopped, we no longer have it in Activity.Current, let's user activity now @@ -55,10 +56,10 @@ public static class MessageExtensions /// } /// } /// - /// - /// Note that every log is stamped with .Id, that could be used within + /// + /// Note that every log is stamped with .Id, that could be used within /// any nested method call (sync or async) - is an ambient context that flows with async method calls. - /// + /// /// public static Activity ExtractActivity(this Message message, string activityName = null) @@ -149,5 +150,14 @@ internal static bool TryExtractContext(this Message message, out IList(() => batch.TryAdd(new Message())); } } + + [Fact] + public void Should_throw_when_trying_add_received_message_to_batch() + { + using (var batch = new Batch(100)) + { + var message = new Message("test".GetBytes()); + message.SystemProperties.LockTokenGuid = Guid.NewGuid(); + + Assert.Throws(() => batch.TryAdd(message)); + } + } } } \ No newline at end of file From aa58610ed43498078cef2d8e4cf45d48671ad7ec Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Tue, 31 Jul 2018 00:26:23 -0600 Subject: [PATCH 07/15] Cleanup --- .../Amqp/AmqpMessageConverter.cs | 3 --- src/Microsoft.Azure.ServiceBus/Core/Batch.cs | 17 ++++++++++------- .../Core/MessageSender.cs | 3 +-- .../Primitives/BatchTests.cs | 2 +- 4 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs b/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs index 81afa1de..8793d1ae 100644 --- a/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs +++ b/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs @@ -1,9 +1,6 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using System.Diagnostics; -using System.Linq; - namespace Microsoft.Azure.ServiceBus.Amqp { using System; diff --git a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs index 137febc5..205817fd 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs @@ -1,13 +1,16 @@ -using System; -using System.Collections.Generic; -using System.Diagnostics; -using Microsoft.Azure.Amqp; -using Microsoft.Azure.Amqp.Framing; -using Microsoft.Azure.ServiceBus.Amqp; -using Microsoft.Azure.ServiceBus.Diagnostics; +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. namespace Microsoft.Azure.ServiceBus.Core { + using System; + using System.Collections.Generic; + using System.Diagnostics; + using Microsoft.Azure.Amqp; + using Microsoft.Azure.Amqp.Framing; + using Microsoft.Azure.ServiceBus.Amqp; + using Microsoft.Azure.ServiceBus.Diagnostics; + [DebuggerDisplay("{" + nameof(DebuggerDisplay) + ",nq}")] public class Batch : IDisposable { diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index f8ea3fd8..a98a1501 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -1,8 +1,6 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using Microsoft.Azure.ServiceBus.Diagnostics; - namespace Microsoft.Azure.ServiceBus.Core { using System; @@ -17,6 +15,7 @@ namespace Microsoft.Azure.ServiceBus.Core using Microsoft.Azure.Amqp.Framing; using Microsoft.Azure.ServiceBus.Amqp; using Microsoft.Azure.ServiceBus.Primitives; + using Microsoft.Azure.ServiceBus.Diagnostics; /// diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs index 65a3db28..a17baf9d 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs @@ -31,7 +31,7 @@ public void Should_throw_if_batch_disposed() } [Fact] - public void Should_throw_when_trying_add_received_message_to_batch() + public void Should_throw_when_trying_to_add_an_already_received_message_to_batch() { using (var batch = new Batch(100)) { From 0cef302cf033bf43586bc2074ffff4478f00af7b Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Tue, 31 Jul 2018 16:26:14 -0600 Subject: [PATCH 08/15] Enable MessagingEventSource logging to comply with the rest of SendAsync methods --- src/Microsoft.Azure.ServiceBus/Core/Batch.cs | 8 +++++++- .../Core/MessageSender.cs | 2 +- ...Approvals.ApproveAzureServiceBus.approved.txt | 1 + .../Primitives/BatchTests.cs | 16 ++++++++++++++++ 4 files changed, 25 insertions(+), 2 deletions(-) diff --git a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs index 205817fd..bfdf6582 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs @@ -63,10 +63,16 @@ public bool TryAdd(Message message) } + /// + /// Number of messages in batch. + /// + public int Length => datas.Count; + private long Size => result.SerializedMessageSize; + /// - /// Convert batch to AMQP message + /// Convert batch to AMQP message. /// /// public AmqpMessage ToAmqpMessage() diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index a98a1501..66a3334a 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -277,7 +277,7 @@ public async Task SendAsync(Batch batch) { this.ThrowIfClosed(); -// MessagingEventSource.Log.MessageSendStart(this.ClientId, count); + MessagingEventSource.Log.MessageSendStart(this.ClientId, batch.Length); // var isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); // var activity = isDiagnosticSourceEnabled ? this.diagnosticSource.SendStart(messageList) : null; diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index 3adcebf3..59317378 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -483,6 +483,7 @@ namespace Microsoft.Azure.ServiceBus.Core public class Batch : System.IDisposable { public Batch(long maximumBatchSize) { } + public int Length { get; } public void Dispose() { } public Microsoft.Azure.Amqp.AmqpMessage ToAmqpMessage() { } public bool TryAdd(Microsoft.Azure.ServiceBus.Message message) { } diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs index a17baf9d..3763baf0 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs @@ -41,5 +41,21 @@ public void Should_throw_when_trying_to_add_an_already_received_message_to_batch Assert.Throws(() => batch.TryAdd(message)); } } + + [Theory] + [InlineData(1)] + [InlineData(3)] + public void Should_report_how_many_messages_are_in_batch(int numberOfMessages) + { + using (var batch = new Batch(100)) + { + for (var i = 0; i < numberOfMessages; i++) + { + batch.TryAdd(new Message()); + } + + Assert.Equal(numberOfMessages, batch.Length); + } + } } } \ No newline at end of file From ccdb74d5c967f25e71aeab8e5bfd1885ff0d074c Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Wed, 1 Aug 2018 22:34:55 -0600 Subject: [PATCH 09/15] Verify custom properties sent using a Batch are coming through --- .../SenderReceiverTests.cs | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs index c287873e..d5f17273 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs @@ -461,6 +461,35 @@ public async Task ClientsUseGlobalConnectionCloseFirstClientSecoundClientShouldS } } + [Fact] + [DisplayTestMethodName] + public async Task Sending_batch_with_properties() + { + var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName); + var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName, receiveMode: ReceiveMode.ReceiveAndDelete); + try + { + var message = new Message("Hello Neeraj".GetBytes()); + message.UserProperties["custom"] = "value"; + + var batch = new Batch(100); + Assert.True(batch.TryAdd(message), "Couldn't add first message"); + await sender.SendAsync(batch); + batch.Dispose(); + await sender.CloseAsync(); + + var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 1); + var receivedMessage = receivedMessages.FirstOrDefault(); + Assert.NotNull(receivedMessage); + Assert.Equal("value", receivedMessage.UserProperties["custom"]); + } + finally + { + await sender.CloseAsync().ConfigureAwait(false); + await receiver.CloseAsync().ConfigureAwait(false); + } + } + [Fact] [DisplayTestMethodName] public async Task Sending_batch() From 6673a090bb923a0a11d0b819e6f6b15fe311aa27 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Wed, 1 Aug 2018 22:44:26 -0600 Subject: [PATCH 10/15] Verify custom properties affect Batch size (serialized bytes) --- src/Microsoft.Azure.ServiceBus/Core/Batch.cs | 2 +- .../Primitives/BatchTests.cs | 23 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs index bfdf6582..97c41558 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs @@ -68,7 +68,7 @@ public bool TryAdd(Message message) /// public int Length => datas.Count; - private long Size => result.SerializedMessageSize; + internal long Size => result.SerializedMessageSize; /// diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs index 3763baf0..034512e7 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs @@ -57,5 +57,28 @@ public void Should_report_how_many_messages_are_in_batch(int numberOfMessages) Assert.Equal(numberOfMessages, batch.Length); } } + + [Fact] + public void Should_show_reflect_property_in_batch_size() + { + using (var batch = new Batch(100)) + { + var message = new Message(); + + batch.TryAdd(message); + + Assert.Equal(24, batch.Size); + } + + using (var batch = new Batch(100)) + { + var message = new Message(); + message.UserProperties["custom"] = "value"; + + batch.TryAdd(message); + + Assert.Equal(45, batch.Size); + } + } } } \ No newline at end of file From f0ed313d18c1d76679e87597611630848e858bb5 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Thu, 2 Aug 2018 00:31:43 -0600 Subject: [PATCH 11/15] Provide an API to create Batch initiated with supported maximum message size --- src/Microsoft.Azure.ServiceBus/Core/Batch.cs | 8 +-- .../Core/ISenderClient.cs | 14 +++-- .../Core/MessageSender.cs | 35 +++++++++++ src/Microsoft.Azure.ServiceBus/QueueClient.cs | 11 ++++ src/Microsoft.Azure.ServiceBus/TopicClient.cs | 10 +++ ...rovals.ApproveAzureServiceBus.approved.txt | 5 +- .../Primitives/BatchTests.cs | 4 +- .../SenderReceiverTests.cs | 63 ++++++++++++------- 8 files changed, 116 insertions(+), 34 deletions(-) diff --git a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs index 97c41558..296b7f91 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs @@ -14,7 +14,7 @@ namespace Microsoft.Azure.ServiceBus.Core [DebuggerDisplay("{" + nameof(DebuggerDisplay) + ",nq}")] public class Batch : IDisposable { - private readonly long maximumBatchSize; + internal readonly ulong maximumBatchSize; private AmqpMessage firstMessage; private readonly List datas; private AmqpMessage result; @@ -24,7 +24,7 @@ public class Batch : IDisposable /// Construct a new batch with a maximum batch size. /// /// Maximum batch size allowed for batch. - public Batch(long maximumBatchSize) + public Batch(ulong maximumBatchSize) { this.maximumBatchSize = maximumBatchSize; this.datas = new List(); @@ -68,7 +68,7 @@ public bool TryAdd(Message message) /// public int Length => datas.Count; - internal long Size => result.SerializedMessageSize; + internal ulong Size => (ulong) result.SerializedMessageSize; /// @@ -128,6 +128,6 @@ private void ThrowIfDisposed() } } - private string DebuggerDisplay => $"Batch: size={Size} message count={datas.Count}"; + private string DebuggerDisplay => $"Batch: size={Size} message count={datas.Count} maximum size={maximumBatchSize}"; } } \ No newline at end of file diff --git a/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs b/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs index c66a5e94..69aa5ab2 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs @@ -25,11 +25,15 @@ public interface ISenderClient : IClientEntity /// Task SendAsync(IList messageList); -// TODO: extract method into this interface for the next major version -// /// -// /// Sends a of messages to Service Bus. -// /// -// Task SendAsync(Batch batch); + // TODO: extract methods into this interface for the next major version + // /// + // /// Sends a of messages to Service Bus. + // /// + // Task SendAsync(Batch batch); + // /// + // /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. + // /// + // Task CreateBatch(); /// /// Schedules a message to appear on Service Bus. diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index 66a3334a..c609a733 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -43,6 +43,7 @@ public class MessageSender : ClientEntity, IMessageSender readonly ServiceBusDiagnosticSource diagnosticSource; readonly bool isViaSender; readonly string transferDestinationPath; + private ulong maxMessageSize = 0; /// /// Creates a new AMQP MessageSender. @@ -308,6 +309,40 @@ public async Task SendAsync(Batch batch) // MessagingEventSource.Log.MessageSendStop(this.ClientId); } + /// + /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. + /// + public async Task CreateBatch() + { + if (maxMessageSize != 0) + { + return new Batch(maxMessageSize); + } + + var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true); + SendingAmqpLink amqpLink = null; + try + { + if (!this.SendLinkManager.TryGetOpenedObject(out amqpLink)) + { + amqpLink = await this.SendLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false); + } + + if (!amqpLink.Settings.MaxMessageSize.HasValue) + { + throw new Exception("Broker didn't provide maximum message size. Batch requires maximum message size to operate."); + } + + maxMessageSize = amqpLink.Settings.MaxMessageSize.Value; + + return new Batch(maxMessageSize); + } + catch (Exception exception) + { + throw AmqpExceptionHelper.GetClientException(exception, amqpLink?.GetTrackingId(), null, amqpLink?.Session.IsClosing() ?? false); + } + } + /// /// Schedules a message to appear on Service Bus at a later time. /// diff --git a/src/Microsoft.Azure.ServiceBus/QueueClient.cs b/src/Microsoft.Azure.ServiceBus/QueueClient.cs index 79b57124..dc9696bd 100644 --- a/src/Microsoft.Azure.ServiceBus/QueueClient.cs +++ b/src/Microsoft.Azure.ServiceBus/QueueClient.cs @@ -359,6 +359,17 @@ public Task SendAsync(Batch batch) return this.innerSender.SendAsync(batch); } + /// + /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. + /// + public Task CreateBatch() + { + this.ThrowIfClosed(); + + return this.innerSender.CreateBatch(); + } + + /// /// Completes a using its lock token. This will delete the message from the queue. /// diff --git a/src/Microsoft.Azure.ServiceBus/TopicClient.cs b/src/Microsoft.Azure.ServiceBus/TopicClient.cs index 6a1d4b18..c833add1 100644 --- a/src/Microsoft.Azure.ServiceBus/TopicClient.cs +++ b/src/Microsoft.Azure.ServiceBus/TopicClient.cs @@ -194,6 +194,16 @@ public Task SendAsync(Batch batch) return this.innerSender.SendAsync(batch); } + /// + /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. + /// + public Task CreateBatch() + { + this.ThrowIfClosed(); + + return this.innerSender.CreateBatch(); + } + /// /// Schedules a message to appear on Service Bus at a later time. /// diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index 59317378..96d46d21 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -227,6 +227,7 @@ namespace Microsoft.Azure.ServiceBus public System.Threading.Tasks.Task AbandonAsync(string lockToken, System.Collections.Generic.IDictionary propertiesToModify = null) { } public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { } public System.Threading.Tasks.Task CompleteAsync(string lockToken) { } + public System.Threading.Tasks.Task CreateBatch() { } public System.Threading.Tasks.Task DeadLetterAsync(string lockToken, System.Collections.Generic.IDictionary propertiesToModify = null) { } public System.Threading.Tasks.Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null) { } protected override System.Threading.Tasks.Task OnClosingAsync() { } @@ -449,6 +450,7 @@ namespace Microsoft.Azure.ServiceBus public override Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; } public string TopicName { get; } public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { } + public System.Threading.Tasks.Task CreateBatch() { } protected override System.Threading.Tasks.Task OnClosingAsync() { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } @@ -482,7 +484,7 @@ namespace Microsoft.Azure.ServiceBus.Core [System.Diagnostics.DebuggerDisplayAttribute("{DebuggerDisplay,nq}")] public class Batch : System.IDisposable { - public Batch(long maximumBatchSize) { } + public Batch(ulong maximumBatchSize) { } public int Length { get; } public void Dispose() { } public Microsoft.Azure.Amqp.AmqpMessage ToAmqpMessage() { } @@ -584,6 +586,7 @@ namespace Microsoft.Azure.ServiceBus.Core public override Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; } public string TransferDestinationPath { get; } public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { } + public System.Threading.Tasks.Task CreateBatch() { } protected override System.Threading.Tasks.Task OnClosingAsync() { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs index 034512e7..9ff76ff2 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs @@ -67,7 +67,7 @@ public void Should_show_reflect_property_in_batch_size() batch.TryAdd(message); - Assert.Equal(24, batch.Size); + Assert.Equal((ulong)24, batch.Size); } using (var batch = new Batch(100)) @@ -77,7 +77,7 @@ public void Should_show_reflect_property_in_batch_size() batch.TryAdd(message); - Assert.Equal(45, batch.Size); + Assert.Equal((ulong)45, batch.Size); } } } diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs index d5f17273..8ac036c7 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs @@ -463,25 +463,31 @@ public async Task ClientsUseGlobalConnectionCloseFirstClientSecoundClientShouldS [Fact] [DisplayTestMethodName] - public async Task Sending_batch_with_properties() + public async Task Sending_batch() { var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName); var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName, receiveMode: ReceiveMode.ReceiveAndDelete); try { - var message = new Message("Hello Neeraj".GetBytes()); - message.UserProperties["custom"] = "value"; + var message1 = new Message("Hello Neeraj".GetBytes()); + var message2 = new Message("from".GetBytes()); + var message3 = new Message("Sean Feldman".GetBytes()); var batch = new Batch(100); - Assert.True(batch.TryAdd(message), "Couldn't add first message"); + Assert.True(batch.TryAdd(message1), "Couldn't add first message"); + Assert.True(batch.TryAdd(message2), "Couldn't add second message"); + Assert.False(batch.TryAdd(message3), "Shouldn't be able to add third message"); await sender.SendAsync(batch); batch.Dispose(); await sender.CloseAsync(); - var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 1); - var receivedMessage = receivedMessages.FirstOrDefault(); - Assert.NotNull(receivedMessage); - Assert.Equal("value", receivedMessage.UserProperties["custom"]); + var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 2); + var bodies = receivedMessages.Select(m => m.Body.GetString()); + var bodiesArray = bodies as string[] ?? bodies.ToArray(); + Assert.True(bodiesArray.Contains("Hello Neeraj") && bodiesArray.Contains("from")); + + var extraMessage = await TestUtility.PeekMessageAsync(receiver); + Assert.True(extraMessage == null, $"Should not have any messages other than the two, but an extra message is found. Body='{extraMessage?.Body.GetString()}'"); } finally { @@ -492,31 +498,25 @@ public async Task Sending_batch_with_properties() [Fact] [DisplayTestMethodName] - public async Task Sending_batch() + public async Task Sending_batch_with_properties() { var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName); var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName, receiveMode: ReceiveMode.ReceiveAndDelete); try { - var message1 = new Message("Hello Neeraj".GetBytes()); - var message2 = new Message("from".GetBytes()); - var message3 = new Message("Sean Feldman".GetBytes()); + var message = new Message("Hello Neeraj".GetBytes()); + message.UserProperties["custom"] = "value"; var batch = new Batch(100); - Assert.True(batch.TryAdd(message1), "Couldn't add first message"); - Assert.True(batch.TryAdd(message2), "Couldn't add second message"); - Assert.False(batch.TryAdd(message3), "Shouldn't be able to add third message"); + Assert.True(batch.TryAdd(message), "Couldn't add message"); await sender.SendAsync(batch); batch.Dispose(); await sender.CloseAsync(); - var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 2); - var bodies = receivedMessages.Select(m => m.Body.GetString()); - var bodiesArray = bodies as string[] ?? bodies.ToArray(); - Assert.True(bodiesArray.Contains("Hello Neeraj") && bodiesArray.Contains("from")); - - var extraMessage = await TestUtility.PeekMessageAsync(receiver); - Assert.True(extraMessage == null, $"Should not have any messages other than the two, but an extra message is found. Body='{extraMessage?.Body.GetString()}'"); + var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 1); + var receivedMessage = receivedMessages.FirstOrDefault(); + Assert.NotNull(receivedMessage); + Assert.Equal("value", receivedMessage.UserProperties["custom"]); } finally { @@ -524,5 +524,24 @@ public async Task Sending_batch() await receiver.CloseAsync().ConfigureAwait(false); } } + + [Fact] + [DisplayTestMethodName] + public async Task Batch_should_have_maximum_size_set() + { + var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName); + try + { + using (var batch = await sender.CreateBatch()) + { + Assert.True(batch.maximumBatchSize == 256 * 1024 || batch.maximumBatchSize == 1024 * 1024, + $"Maximum batch size was expected to be 256KB or 1MB, but it wasn't. Reported size: {batch.maximumBatchSize}"); + } + } + finally + { + await sender.CloseAsync().ConfigureAwait(false); + } + } } } \ No newline at end of file From 8d6a01142a2bee26701ba31ee81848434b4e802a Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Thu, 2 Aug 2018 01:23:53 -0600 Subject: [PATCH 12/15] Pass messages in Batch through outgoing plugins --- src/Microsoft.Azure.ServiceBus/Core/Batch.cs | 21 +++++-- .../Core/MessageSender.cs | 4 +- ...rovals.ApproveAzureServiceBus.approved.txt | 4 +- .../Primitives/BatchTests.cs | 34 +++++++----- .../SenderReceiverTests.cs | 55 +++++++++++++++++-- 5 files changed, 88 insertions(+), 30 deletions(-) diff --git a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs index 296b7f91..a8835603 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System.Threading.Tasks; + namespace Microsoft.Azure.ServiceBus.Core { using System; @@ -15,18 +17,25 @@ namespace Microsoft.Azure.ServiceBus.Core public class Batch : IDisposable { internal readonly ulong maximumBatchSize; + private readonly Func> pluginsCallback; private AmqpMessage firstMessage; private readonly List datas; private AmqpMessage result; private (string messageId, string sessionId, string partitionKey, string viaPartitionKey) originalMessageData; /// - /// Construct a new batch with a maximum batch size. + /// Construct a new batch with a maximum batch size and outgoing plugins callback. + /// + /// To construct a batch at run-time, use , , or . + /// Use this constructor for testing and custom implementations. + /// /// /// Maximum batch size allowed for batch. - public Batch(ulong maximumBatchSize) + /// Plugins callback to invoke on outgoing messages regisered with batch. + public Batch(ulong maximumBatchSize, Func> pluginsCallback) { this.maximumBatchSize = maximumBatchSize; + this.pluginsCallback = pluginsCallback; this.datas = new List(); this.result = AmqpMessage.Create(datas); } @@ -36,17 +45,19 @@ public Batch(ulong maximumBatchSize) /// /// to add to the batch. /// - public bool TryAdd(Message message) + public async Task TryAdd(Message message) { ThrowIfDisposed(); message.VerifyMessageIsNotPreviouslyReceived(); - var amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(message); + var processedMessage = await pluginsCallback(message); + + var amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(processedMessage); if (firstMessage == null) { - originalMessageData = (message.MessageId, message.SessionId, message.PartitionKey, message.ViaPartitionKey); + originalMessageData = (processedMessage.MessageId, processedMessage.SessionId, processedMessage.PartitionKey, processedMessage.ViaPartitionKey); firstMessage = amqpMessage; } diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index c609a733..8a0dc9c4 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -316,7 +316,7 @@ public async Task CreateBatch() { if (maxMessageSize != 0) { - return new Batch(maxMessageSize); + return new Batch(maxMessageSize, ProcessMessage); } var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true); @@ -335,7 +335,7 @@ public async Task CreateBatch() maxMessageSize = amqpLink.Settings.MaxMessageSize.Value; - return new Batch(maxMessageSize); + return new Batch(maxMessageSize, ProcessMessage); } catch (Exception exception) { diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index 96d46d21..9ff1fe6c 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -484,11 +484,11 @@ namespace Microsoft.Azure.ServiceBus.Core [System.Diagnostics.DebuggerDisplayAttribute("{DebuggerDisplay,nq}")] public class Batch : System.IDisposable { - public Batch(ulong maximumBatchSize) { } + public Batch(ulong maximumBatchSize, System.Func> pluginsCallback) { } public int Length { get; } public void Dispose() { } public Microsoft.Azure.Amqp.AmqpMessage ToAmqpMessage() { } - public bool TryAdd(Microsoft.Azure.ServiceBus.Message message) { } + public System.Threading.Tasks.Task TryAdd(Microsoft.Azure.ServiceBus.Message message) { } } public interface IMessageReceiver : Microsoft.Azure.ServiceBus.Core.IReceiverClient, Microsoft.Azure.ServiceBus.IClientEntity { diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs index 9ff76ff2..5ff743d9 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System.Threading.Tasks; + namespace Microsoft.Azure.ServiceBus.UnitTests.Primitives { using System; @@ -10,12 +12,14 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.Primitives public class BatchTests { + private Func> fakePluginsCallback = Task.FromResult; + [Fact] - public void Should_return_false_when_is_about_to_exceed_max_batch_size() + public async Task Should_return_false_when_is_about_to_exceed_max_batch_size() { - using (var batch = new Batch(1)) + using (var batch = new Batch(1, fakePluginsCallback)) { - var wasAdded = batch.TryAdd(new Message(Encoding.UTF8.GetBytes("hello"))); + var wasAdded = await batch.TryAdd(new Message(Encoding.UTF8.GetBytes("hello"))); Assert.False(wasAdded, "Message should not have been added, but it was."); } } @@ -23,35 +27,35 @@ public void Should_return_false_when_is_about_to_exceed_max_batch_size() [Fact] public void Should_throw_if_batch_disposed() { - using (var batch = new Batch(1)) + using (var batch = new Batch(1, fakePluginsCallback)) { batch.Dispose(); - Assert.Throws(() => batch.TryAdd(new Message())); + Assert.ThrowsAsync(() => batch.TryAdd(new Message())); } } [Fact] public void Should_throw_when_trying_to_add_an_already_received_message_to_batch() { - using (var batch = new Batch(100)) + using (var batch = new Batch(100, fakePluginsCallback)) { var message = new Message("test".GetBytes()); message.SystemProperties.LockTokenGuid = Guid.NewGuid(); - Assert.Throws(() => batch.TryAdd(message)); + Assert.ThrowsAsync(() => batch.TryAdd(message)); } } [Theory] [InlineData(1)] [InlineData(3)] - public void Should_report_how_many_messages_are_in_batch(int numberOfMessages) + public async Task Should_report_how_many_messages_are_in_batch(int numberOfMessages) { - using (var batch = new Batch(100)) + using (var batch = new Batch(100, fakePluginsCallback)) { for (var i = 0; i < numberOfMessages; i++) { - batch.TryAdd(new Message()); + await batch.TryAdd(new Message()); } Assert.Equal(numberOfMessages, batch.Length); @@ -59,23 +63,23 @@ public void Should_report_how_many_messages_are_in_batch(int numberOfMessages) } [Fact] - public void Should_show_reflect_property_in_batch_size() + public async Task Should_reflect_property_in_batch_size() { - using (var batch = new Batch(100)) + using (var batch = new Batch(100, fakePluginsCallback)) { var message = new Message(); - batch.TryAdd(message); + await batch.TryAdd(message); Assert.Equal((ulong)24, batch.Size); } - using (var batch = new Batch(100)) + using (var batch = new Batch(100, fakePluginsCallback)) { var message = new Message(); message.UserProperties["custom"] = "value"; - batch.TryAdd(message); + await batch.TryAdd(message); Assert.Equal((ulong)45, batch.Size); } diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs index 8ac036c7..7942d72c 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs @@ -473,10 +473,10 @@ public async Task Sending_batch() var message2 = new Message("from".GetBytes()); var message3 = new Message("Sean Feldman".GetBytes()); - var batch = new Batch(100); - Assert.True(batch.TryAdd(message1), "Couldn't add first message"); - Assert.True(batch.TryAdd(message2), "Couldn't add second message"); - Assert.False(batch.TryAdd(message3), "Shouldn't be able to add third message"); + var batch = new Batch(100, Task.FromResult); + Assert.True(await batch.TryAdd(message1), "Couldn't add first message"); + Assert.True(await batch.TryAdd(message2), "Couldn't add second message"); + Assert.False(await batch.TryAdd(message3), "Shouldn't be able to add third message"); await sender.SendAsync(batch); batch.Dispose(); await sender.CloseAsync(); @@ -507,8 +507,8 @@ public async Task Sending_batch_with_properties() var message = new Message("Hello Neeraj".GetBytes()); message.UserProperties["custom"] = "value"; - var batch = new Batch(100); - Assert.True(batch.TryAdd(message), "Couldn't add message"); + var batch = new Batch(100, Task.FromResult); + Assert.True(await batch.TryAdd(message), "Couldn't add message"); await sender.SendAsync(batch); batch.Dispose(); await sender.CloseAsync(); @@ -543,5 +543,48 @@ public async Task Batch_should_have_maximum_size_set() await sender.CloseAsync().ConfigureAwait(false); } } + + [Fact] + [DisplayTestMethodName] + public async Task Batch_should_go_through_outgoing_plugins() + { + var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName); + var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName, receiveMode: ReceiveMode.ReceiveAndDelete); + + sender.RegisterPlugin(new RemoveVowelsPlugin()); + try + { + var batch = await sender.CreateBatch(); + await batch.TryAdd(new Message("Hello Neeraj".GetBytes())); + await batch.TryAdd(new Message("from".GetBytes())); + await batch.TryAdd(new Message("Sean Feldman".GetBytes())); + + await sender.SendAsync(batch); + batch.Dispose(); + await sender.CloseAsync(); + + var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 3); + var bodies = receivedMessages.Select(m => m.Body.GetString()); + var bodiesArray = bodies as string[] ?? bodies.ToArray(); + Assert.True(bodiesArray.Contains("Hll Nrj") && bodiesArray.Contains("frm") && bodiesArray.Contains("Sn Fldmn")); + } + finally + { + await sender.CloseAsync().ConfigureAwait(false); + await receiver.CloseAsync().ConfigureAwait(false); + } + } + + class RemoveVowelsPlugin : ServiceBusPlugin + { + public override string Name { get; } = nameof(RemoveVowelsPlugin); + + public override Task BeforeMessageSend(Message message) + { + message.Body = new string(message.Body.GetString().Where( x => "aeiouy".Contains(x) == false).ToArray()).GetBytes(); + return Task.FromResult(message); + } + } + } } \ No newline at end of file From 8f57db7cd153f44f09cd1cbc67aa9628db4385b3 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Thu, 23 Aug 2018 22:49:52 -0600 Subject: [PATCH 13/15] Report exception via diagnostics --- .../Core/MessageSender.cs | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index 8a0dc9c4..79739f72 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -280,33 +280,31 @@ public async Task SendAsync(Batch batch) MessagingEventSource.Log.MessageSendStart(this.ClientId, batch.Length); -// var isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); -// var activity = isDiagnosticSourceEnabled ? this.diagnosticSource.SendStart(messageList) : null; - Task sendTask = null; + var isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); + // var activity = isDiagnosticSourceEnabled ? this.diagnosticSource.SendStart(messageList) : null; + Task sendTask; try { - //var processedMessages = await this.ProcessMessages(messageList).ConfigureAwait(false); - sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(batch.ToAmqpMessage), this.OperationTimeout); await sendTask.ConfigureAwait(false); } catch (Exception exception) { -// if (isDiagnosticSourceEnabled) -// { -// this.diagnosticSource.ReportException(exception); -// } + if (isDiagnosticSourceEnabled) + { + this.diagnosticSource.ReportException(exception); + } MessagingEventSource.Log.MessageSendException(this.ClientId, exception); throw; } - finally - { -// this.diagnosticSource.SendStop(activity, messageList, sendTask?.Status); - } + // finally + // { + // this.diagnosticSource.SendStop(activity, messageList, sendTask?.Status); + // } -// MessagingEventSource.Log.MessageSendStop(this.ClientId); + // MessagingEventSource.Log.MessageSendStop(this.ClientId); } /// From e8d0c9ec4071ce7d532c6a0c2cf036031522faaf Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Thu, 23 Aug 2018 22:52:50 -0600 Subject: [PATCH 14/15] Add tracking TODO --- src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index 79739f72..3b54a270 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -281,6 +281,8 @@ public async Task SendAsync(Batch batch) MessagingEventSource.Log.MessageSendStart(this.ClientId, batch.Length); var isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); + // TODO: diagnostics (Start/Stop) is currently not possible. Requires change in how Diagnostics works. + // See https://github.com/SeanFeldman/azure-service-bus-dotnet/pull/1#issuecomment-415515524 for details. // var activity = isDiagnosticSourceEnabled ? this.diagnosticSource.SendStart(messageList) : null; Task sendTask; From c1d69c4c386cb8b7bf3b15d679da18a369590a09 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Mon, 20 Aug 2018 23:35:22 -0600 Subject: [PATCH 15/15] Add diagnostics --- src/Microsoft.Azure.ServiceBus/Core/Batch.cs | 8 +++-- .../Core/MessageSender.cs | 6 ++-- .../QueueClientDiagnosticsTests.cs | 34 +++++++++++++++++++ .../TestUtility.cs | 21 +++++++++++- 4 files changed, 63 insertions(+), 6 deletions(-) diff --git a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs index a8835603..cdf06d5e 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs @@ -38,6 +38,7 @@ public Batch(ulong maximumBatchSize, Func> pluginsCallbac this.pluginsCallback = pluginsCallback; this.datas = new List(); this.result = AmqpMessage.Create(datas); + OriginalMessageList = new List(); } /// @@ -66,6 +67,7 @@ public async Task TryAdd(Message message) if (Size <= maximumBatchSize) { + OriginalMessageList.Add(message); return true; } @@ -127,8 +129,8 @@ public void Dispose() firstMessage?.Dispose(); result?.Dispose(); - firstMessage = null; - result = null; + datas.Clear(); + OriginalMessageList.Clear(); } private void ThrowIfDisposed() @@ -139,6 +141,8 @@ private void ThrowIfDisposed() } } + internal List OriginalMessageList { get; } + private string DebuggerDisplay => $"Batch: size={Size} message count={datas.Count} maximum size={maximumBatchSize}"; } } \ No newline at end of file diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index 3b54a270..a0ca7a7b 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -283,7 +283,7 @@ public async Task SendAsync(Batch batch) var isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); // TODO: diagnostics (Start/Stop) is currently not possible. Requires change in how Diagnostics works. // See https://github.com/SeanFeldman/azure-service-bus-dotnet/pull/1#issuecomment-415515524 for details. - // var activity = isDiagnosticSourceEnabled ? this.diagnosticSource.SendStart(messageList) : null; + var activity = isDiagnosticSourceEnabled ? this.diagnosticSource.SendStart(batch.OriginalMessageList) : null; Task sendTask; try @@ -303,10 +303,10 @@ public async Task SendAsync(Batch batch) } // finally // { - // this.diagnosticSource.SendStop(activity, messageList, sendTask?.Status); + this.diagnosticSource.SendStop(activity, batch.OriginalMessageList, sendTask?.Status); // } - // MessagingEventSource.Log.MessageSendStop(this.ClientId); + MessagingEventSource.Log.MessageSendStop(this.ClientId); } /// diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/Diagnostics/QueueClientDiagnosticsTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/Diagnostics/QueueClientDiagnosticsTests.cs index 363203b1..e7106704 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/Diagnostics/QueueClientDiagnosticsTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/Diagnostics/QueueClientDiagnosticsTests.cs @@ -309,6 +309,40 @@ async Task BatchSendReceiveFireEvents() Assert.True(this.events.IsEmpty); } + [Fact] + [DisplayTestMethodName] + async Task BatchSendWithBatchReceiveFireEvents() + { + this.queueClient = new QueueClient(TestUtility.NamespaceConnectionString, TestConstants.NonPartitionedQueueName, + ReceiveMode.ReceiveAndDelete); + + this.listener.Enable( (name, queuName, arg) => name.Contains("Send") || name.Contains("Receive") ); + await TestUtility.SendMessagesUsingBatchAsync(this.queueClient.InnerSender, 5); + var messages = await TestUtility.ReceiveMessagesAsync(this.queueClient.InnerReceiver, 5); + + Assert.True(this.events.TryDequeue(out var sendStart1)); + AssertSendStart(sendStart1.eventName, sendStart1.payload, sendStart1.activity, null, 5); + + Assert.True(this.events.TryDequeue(out var sendStop1)); + AssertSendStop(sendStop1.eventName, sendStop1.payload, sendStop1.activity, sendStop1.activity, 5); + + int receivedStopCount = 0; + string relatedTo = ""; + while (this.events.TryDequeue(out var receiveStart)) + { + var startCount = AssertReceiveStart(receiveStart.eventName, receiveStart.payload, receiveStart.activity, -1); + + Assert.True(this.events.TryDequeue(out var receiveStop)); + receivedStopCount += AssertReceiveStop(receiveStop.eventName, receiveStop.payload, receiveStop.activity, receiveStart.activity, null, startCount, -1); + relatedTo += receiveStop.activity.Tags.Single(t => t.Key == "RelatedTo").Value; + } + + Assert.Equal(5, receivedStopCount); + Assert.Contains(sendStart1.activity.Id, relatedTo); + + Assert.True(this.events.IsEmpty); + } + [Fact] [DisplayTestMethodName] async Task PeekFireEvents() diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/TestUtility.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/TestUtility.cs index acaca126..b4d5331c 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/TestUtility.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/TestUtility.cs @@ -66,6 +66,25 @@ internal static async Task SendMessagesAsync(IMessageSender messageSender, int m await messageSender.SendAsync(messagesToSend); Log($"Sent {messageCount} messages"); } + internal static async Task SendMessagesUsingBatchAsync(IMessageSender messageSender, int messageCount) + { + if (messageCount == 0) + { + await Task.FromResult(false); + } + + var sender = (MessageSender)messageSender; + var batch = await sender.CreateBatch(); + for (var i = 0; i < messageCount; i++) + { + var message = new Message(Encoding.UTF8.GetBytes("test" + i)); + message.Label = "test" + i; + await batch.TryAdd(message); + } + + await sender.SendAsync(batch); + Log($"Sent {messageCount} messages"); + } internal static async Task> ReceiveMessagesAsync(IMessageReceiver messageReceiver, int messageCount) { @@ -98,7 +117,7 @@ internal static async Task> ReceiveDeferredMessagesAsync(IMessage var msg = await messageReceiver.ReceiveDeferredMessageAsync(sequenceNumber); if (msg != null) { - messagesToReturn.Add(msg); + messagesToReturn.Add(msg); } }