Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ static class AmqpMessageConverter
const string EnqueueSequenceNumberName = "x-opt-enqueue-sequence-number";
const string LockedUntilName = "x-opt-locked-until";
const string PublisherName = "x-opt-publisher";
const string PartitionKeyName = "x-opt-partition-key";
const string PartitionIdName = "x-opt-partition-id";
const string ViaPartitionKeyName = "x-opt-via-partition-key";
internal const string PartitionKeyName = "x-opt-partition-key";
internal const string ViaPartitionKeyName = "x-opt-via-partition-key";
const string DeadLetterSourceName = "x-opt-deadletter-source";
const string TimeSpanName = AmqpConstants.Vendor + ":timespan";
const string UriName = AmqpConstants.Vendor + ":uri";
Expand Down Expand Up @@ -641,7 +641,7 @@ static ArraySegment<byte> StreamToBytes(Stream stream)
return buffer;
}

private static Data ToData(AmqpMessage message)
internal static Data ToData(AmqpMessage message)
{
ArraySegment<byte>[] payload = message.GetPayload();
var buffer = new BufferListStream(payload);
Expand Down
148 changes: 148 additions & 0 deletions src/Microsoft.Azure.ServiceBus/Core/Batch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Threading.Tasks;

namespace Microsoft.Azure.ServiceBus.Core
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Framing;
using Microsoft.Azure.ServiceBus.Amqp;
using Microsoft.Azure.ServiceBus.Diagnostics;

[DebuggerDisplay("{" + nameof(DebuggerDisplay) + ",nq}")]
public class Batch : IDisposable
{
internal readonly ulong maximumBatchSize;
private readonly Func<Message, Task<Message>> pluginsCallback;
private AmqpMessage firstMessage;
private readonly List<Data> datas;
private AmqpMessage result;
private (string messageId, string sessionId, string partitionKey, string viaPartitionKey) originalMessageData;

/// <summary>
/// Construct a new batch with a maximum batch size and outgoing plugins callback.
/// <remarks>
/// To construct a batch at run-time, use <see cref="MessageSender"/>, <see cref="QueueClient"/>, or <see cref="TopicClient"/>.
/// Use this constructor for testing and custom implementations.
/// </remarks>
/// </summary>
/// <param name="maximumBatchSize">Maximum batch size allowed for batch.</param>
/// <param name="pluginsCallback">Plugins callback to invoke on outgoing messages regisered with batch.</param>
public Batch(ulong maximumBatchSize, Func<Message, Task<Message>> pluginsCallback)
{
this.maximumBatchSize = maximumBatchSize;
this.pluginsCallback = pluginsCallback;
this.datas = new List<Data>();
this.result = AmqpMessage.Create(datas);
OriginalMessageList = new List<Message>();
}

/// <summary>
/// Add <see cref="Message"/> to the batch if the overall size of the batch with the added message is not exceeding the batch maximum.
/// </summary>
/// <param name="message"><see cref="Message"/> to add to the batch.</param>
/// <returns></returns>
public async Task<bool> TryAdd(Message message)
{
ThrowIfDisposed();

message.VerifyMessageIsNotPreviouslyReceived();

var processedMessage = await pluginsCallback(message);

var amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(processedMessage);

if (firstMessage == null)
{
originalMessageData = (processedMessage.MessageId, processedMessage.SessionId, processedMessage.PartitionKey, processedMessage.ViaPartitionKey);
firstMessage = amqpMessage;
}

var data = AmqpMessageConverter.ToData(amqpMessage);
datas.Add(data);

if (Size <= maximumBatchSize)
{
OriginalMessageList.Add(message);
return true;
}

datas.Remove(data);
return false;

}

/// <summary>
/// Number of messages in batch.
/// </summary>
public int Length => datas.Count;

internal ulong Size => (ulong) result.SerializedMessageSize;


/// <summary>
/// Convert batch to AMQP message.
/// </summary>
/// <returns></returns>
public AmqpMessage ToAmqpMessage()
{
ThrowIfDisposed();

if (datas.Count == 1)
{
firstMessage.Batchable = true;
return firstMessage;
}

if (originalMessageData.messageId != null)
{
result.Properties.MessageId = originalMessageData.messageId;
}

if (originalMessageData.sessionId != null)
{
result.Properties.GroupId = originalMessageData.sessionId;
}

if (originalMessageData.partitionKey != null)
{
result.MessageAnnotations.Map[AmqpMessageConverter.PartitionKeyName] = originalMessageData.partitionKey;
}

if (originalMessageData.viaPartitionKey != null)
{
result.MessageAnnotations.Map[AmqpMessageConverter.ViaPartitionKeyName] = originalMessageData.viaPartitionKey;
}

result.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat;
result.Batchable = true;
return result;
}

public void Dispose()
{
// TODO: review if there's anything else to do
firstMessage?.Dispose();
result?.Dispose();

datas.Clear();
OriginalMessageList.Clear();
}

private void ThrowIfDisposed()
{
if (result == null)
{
throw new Exception("Batch is has been disposed and cannot be re-used.");
}
}

internal List<Message> OriginalMessageList { get; }

private string DebuggerDisplay => $"Batch: size={Size} message count={datas.Count} maximum size={maximumBatchSize}";
}
}
10 changes: 10 additions & 0 deletions src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ public interface ISenderClient : IClientEntity
/// </summary>
Task SendAsync(IList<Message> messageList);

// TODO: extract methods into this interface for the next major version
// /// <summary>
// /// Sends a <see cref="Batch"/> of messages to Service Bus.
// /// </summary>
// Task SendAsync(Batch batch);
// /// <summary>
// /// Create a new <see cref="Batch"/> setting maximum size to the maximum message size allowed by the underlying namespace.
// /// </summary>
// Task<Batch> CreateBatch();

/// <summary>
/// Schedules a message to appear on Service Bus.
/// </summary>
Expand Down
97 changes: 82 additions & 15 deletions src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ namespace Microsoft.Azure.ServiceBus.Core
using Microsoft.Azure.Amqp.Framing;
using Microsoft.Azure.ServiceBus.Amqp;
using Microsoft.Azure.ServiceBus.Primitives;
using Microsoft.Azure.ServiceBus.Diagnostics;


/// <summary>
/// The MessageSender can be used to send messages to Queues or Topics.
Expand All @@ -41,6 +43,7 @@ public class MessageSender : ClientEntity, IMessageSender
readonly ServiceBusDiagnosticSource diagnosticSource;
readonly bool isViaSender;
readonly string transferDestinationPath;
private ulong maxMessageSize = 0;

/// <summary>
/// Creates a new AMQP MessageSender.
Expand Down Expand Up @@ -236,7 +239,7 @@ public async Task SendAsync(IList<Message> messageList)
{
this.ThrowIfClosed();

var count = MessageSender.ValidateMessages(messageList);
var count = MessageSender.VerifyMessagesAreNotPreviouslyReceived(messageList);
MessagingEventSource.Log.MessageSendStart(this.ClientId, count);

bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
Expand All @@ -247,7 +250,7 @@ public async Task SendAsync(IList<Message> messageList)
{
var processedMessages = await this.ProcessMessages(messageList).ConfigureAwait(false);

sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(processedMessages), this.OperationTimeout);
sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(() => AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(processedMessages)), this.OperationTimeout);
await sendTask.ConfigureAwait(false);
}
catch (Exception exception)
Expand All @@ -268,6 +271,78 @@ public async Task SendAsync(IList<Message> messageList)
MessagingEventSource.Log.MessageSendStop(this.ClientId);
}

/// <summary>
/// Sends a <see cref="Batch"/> of messages to Service Bus.
/// </summary>
public async Task SendAsync(Batch batch)
{
this.ThrowIfClosed();

MessagingEventSource.Log.MessageSendStart(this.ClientId, batch.Length);

var isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
// TODO: diagnostics (Start/Stop) is currently not possible. Requires change in how Diagnostics works.
// See https://github.com/SeanFeldman/azure-service-bus-dotnet/pull/1#issuecomment-415515524 for details.
var activity = isDiagnosticSourceEnabled ? this.diagnosticSource.SendStart(batch.OriginalMessageList) : null;
Task sendTask;

try
{
sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(batch.ToAmqpMessage), this.OperationTimeout);
await sendTask.ConfigureAwait(false);
}
catch (Exception exception)
{
if (isDiagnosticSourceEnabled)
{
this.diagnosticSource.ReportException(exception);
}

MessagingEventSource.Log.MessageSendException(this.ClientId, exception);
throw;
}
// finally
// {
this.diagnosticSource.SendStop(activity, batch.OriginalMessageList, sendTask?.Status);
// }

MessagingEventSource.Log.MessageSendStop(this.ClientId);
}

/// <summary>
/// Create a new <see cref="Batch"/> setting maximum size to the maximum message size allowed by the underlying namespace.
/// </summary>
public async Task<Batch> CreateBatch()
{
if (maxMessageSize != 0)
{
return new Batch(maxMessageSize, ProcessMessage);
}

var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true);
SendingAmqpLink amqpLink = null;
try
{
if (!this.SendLinkManager.TryGetOpenedObject(out amqpLink))
{
amqpLink = await this.SendLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
}

if (!amqpLink.Settings.MaxMessageSize.HasValue)
{
throw new Exception("Broker didn't provide maximum message size. Batch requires maximum message size to operate.");
}

maxMessageSize = amqpLink.Settings.MaxMessageSize.Value;

return new Batch(maxMessageSize, ProcessMessage);
}
catch (Exception exception)
{
throw AmqpExceptionHelper.GetClientException(exception, amqpLink?.GetTrackingId(), null, amqpLink?.Session.IsClosing() ?? false);
}
}

/// <summary>
/// Schedules a message to appear on Service Bus at a later time.
/// </summary>
Expand Down Expand Up @@ -296,7 +371,7 @@ public async Task<long> ScheduleMessageAsync(Message message, DateTimeOffset sch
}

message.ScheduledEnqueueTimeUtc = scheduleEnqueueTimeUtc.UtcDateTime;
MessageSender.ValidateMessage(message);
message.VerifyMessageIsNotPreviouslyReceived();
MessagingEventSource.Log.ScheduleMessageStart(this.ClientId, scheduleEnqueueTimeUtc);
long result = 0;

Expand Down Expand Up @@ -445,7 +520,7 @@ protected override async Task OnClosingAsync()
await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false);
}

static int ValidateMessages(IList<Message> messageList)
static int VerifyMessagesAreNotPreviouslyReceived(IList<Message> messageList)
{
var count = 0;
if (messageList == null)
Expand All @@ -456,20 +531,12 @@ static int ValidateMessages(IList<Message> messageList)
foreach (var message in messageList)
{
count++;
ValidateMessage(message);
message.VerifyMessageIsNotPreviouslyReceived();
}

return count;
}

static void ValidateMessage(Message message)
{
if (message.SystemProperties.IsLockTokenSet)
{
throw Fx.Exception.Argument(nameof(message), "Cannot send a message that was already received.");
}
}

static void CloseSession(SendingAmqpLink link)
{
// Note we close the session (which includes the link).
Expand Down Expand Up @@ -521,10 +588,10 @@ async Task<IList<Message>> ProcessMessages(IList<Message> messageList)
return processedMessageList;
}

async Task OnSendAsync(IList<Message> messageList)
async Task OnSendAsync(Func<AmqpMessage> amqpMessageProvider)
{
var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true);
using (var amqpMessage = AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(messageList))
using (var amqpMessage = amqpMessageProvider())
{
SendingAmqpLink amqpLink = null;
try
Expand Down
Loading