Skip to content
11 changes: 11 additions & 0 deletions projects/RabbitMQ.Client/CreateChannelOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ public sealed class CreateChannelOptions
/// </summary>
public bool PublisherConfirmationTrackingEnabled { get; set; } = false;

/// <summary>
/// If publisher confirmation tracking is enabled, this represents the number of allowed
/// outstanding publisher confirmations before publishing is blocked.
///
/// Defaults to <c>128</c>
///
/// Set to <c>null</c>, to allow an unlimited number of outstanding confirmations.
///
/// </summary>
public ushort? MaxOutstandingPublisherConfirmations { get; set; } = 128;

/// <summary>
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
Expand Down
18 changes: 14 additions & 4 deletions projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
private ushort _prefetchCountGlobal;
private bool _publisherConfirmationsEnabled = false;
private bool _publisherConfirmationTrackingEnabled = false;
private ushort? _maxOutstandingPublisherConfirmations = null;
private bool _usesTransactions;
private ushort _consumerDispatchConcurrency;

Expand All @@ -71,14 +72,20 @@ public TimeSpan ContinuationTimeout
set => InnerChannel.ContinuationTimeout = value;
}

public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel innerChannel,
ushort consumerDispatchConcurrency, bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled)
// TODO just pass create channel options
public AutorecoveringChannel(AutorecoveringConnection conn,
RecoveryAwareChannel innerChannel,
ushort consumerDispatchConcurrency,
bool publisherConfirmationsEnabled,
bool publisherConfirmationTrackingEnabled,
ushort? maxOutstandingPublisherConfirmations)
{
_connection = conn;
_innerChannel = innerChannel;
_consumerDispatchConcurrency = consumerDispatchConcurrency;
_publisherConfirmationsEnabled = publisherConfirmationsEnabled;
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
_maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations;
}

public event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync
Expand Down Expand Up @@ -164,8 +171,11 @@ internal async Task<bool> AutomaticallyRecoverAsync(AutorecoveringConnection con
_connection = conn;

RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(
_publisherConfirmationsEnabled, _publisherConfirmationTrackingEnabled,
_consumerDispatchConcurrency, cancellationToken)
_publisherConfirmationsEnabled,
_publisherConfirmationTrackingEnabled,
_maxOutstandingPublisherConfirmations,
_consumerDispatchConcurrency,
cancellationToken)
.ConfigureAwait(false);
newChannel.TakeOver(_innerChannel);

Expand Down
22 changes: 18 additions & 4 deletions projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -184,16 +184,21 @@ public event AsyncEventHandler<RecoveringConsumerEventArgs> RecoveringConsumerAs

public IProtocol Protocol => Endpoint.Protocol;

// TODO pass channel creation options?
public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(
bool publisherConfirmationsEnabled = false,
bool publisherConfirmationTrackingEnabled = false,
ushort? maxOutstandingPublisherConfirmations = null,
ushort? consumerDispatchConcurrency = null,
CancellationToken cancellationToken = default)
{
ISession session = InnerConnection.CreateSession();
var result = new RecoveryAwareChannel(_config, session, consumerDispatchConcurrency);
return (RecoveryAwareChannel)await result.OpenAsync(
publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled, cancellationToken)
publisherConfirmationsEnabled,
publisherConfirmationTrackingEnabled,
maxOutstandingPublisherConfirmations,
cancellationToken)
.ConfigureAwait(false);
}

Expand Down Expand Up @@ -266,11 +271,20 @@ public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = d
ushort cdc = options.ConsumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);

RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(
options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, cdc, cancellationToken)
options.PublisherConfirmationsEnabled,
options.PublisherConfirmationTrackingEnabled,
options.MaxOutstandingPublisherConfirmations,
cdc,
cancellationToken)
.ConfigureAwait(false);

var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc,
options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled);
// TODO just pass create channel options
var autorecoveringChannel = new AutorecoveringChannel(this,
recoveryAwareChannel,
cdc,
options.PublisherConfirmationsEnabled,
options.PublisherConfirmationTrackingEnabled,
options.MaxOutstandingPublisherConfirmations);
await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
.ConfigureAwait(false);
return autorecoveringChannel;
Expand Down
18 changes: 16 additions & 2 deletions projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.Framing;
Expand All @@ -41,6 +42,8 @@ namespace RabbitMQ.Client.Impl
{
internal partial class Channel : IChannel, IRecoverable
{
private readonly AsyncManualResetEvent _flowControlBlock = new(true);

public async ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default)
Expand All @@ -53,7 +56,7 @@ public async ValueTask BasicPublishAsync<TProperties>(string exchange, string ro
await MaybeStartPublisherConfirmationTracking(cancellationToken)
.ConfigureAwait(false);

await EnforceFlowControlAsync(cancellationToken)
await MaybeEnforceFlowControlAsync(cancellationToken)
.ConfigureAwait(false);

var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
Expand Down Expand Up @@ -108,7 +111,7 @@ public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, Cac
await MaybeStartPublisherConfirmationTracking(cancellationToken)
.ConfigureAwait(false);

await EnforceFlowControlAsync(cancellationToken)
await MaybeEnforceFlowControlAsync(cancellationToken)
.ConfigureAwait(false);

var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
Expand Down Expand Up @@ -220,5 +223,16 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers
}
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private ValueTask MaybeEnforceFlowControlAsync(CancellationToken cancellationToken)
{
if (_flowControlBlock.IsSet)
{
return default;
}

return _flowControlBlock.WaitAsync(cancellationToken);
}
}
}
38 changes: 37 additions & 1 deletion projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ internal partial class Channel : IChannel, IRecoverable
{
private bool _publisherConfirmationsEnabled = false;
private bool _publisherConfirmationTrackingEnabled = false;
private ushort? _maxOutstandingPublisherConfirmations = null;
private SemaphoreSlim? _maxOutstandingConfirmationsSemaphore;
private ulong _nextPublishSeqNo = 0;
private readonly SemaphoreSlim _confirmSemaphore = new(1, 1);
private readonly ConcurrentDictionary<ulong, TaskCompletionSource<bool>> _confirmsTaskCompletionSources = new();
Expand Down Expand Up @@ -115,10 +117,20 @@ public async ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToke
}
}

private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled)
private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled,
bool publisherConfirmationTrackingEnabled,
ushort? maxOutstandingPublisherConfirmations)
{
_publisherConfirmationsEnabled = publisherConfirmationsEnabled;
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
_maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations;

if (_publisherConfirmationTrackingEnabled && _maxOutstandingPublisherConfirmations is not null)
{
_maxOutstandingConfirmationsSemaphore = new SemaphoreSlim(
(int)_maxOutstandingPublisherConfirmations,
(int)_maxOutstandingPublisherConfirmations);
}
}

private async Task MaybeConfirmSelect(CancellationToken cancellationToken)
Expand Down Expand Up @@ -270,6 +282,18 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
{
if (_publisherConfirmationsEnabled)
{
if (_publisherConfirmationTrackingEnabled)
{
if (_maxOutstandingPublisherConfirmations is not null)
{
int percentOfMax = _confirmsTaskCompletionSources.Count / (int)_maxOutstandingPublisherConfirmations;
if (percentOfMax > 0.5)
{
await Task.Delay(1000 * percentOfMax).ConfigureAwait(false);
}
}
}

await _confirmSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);

Expand All @@ -280,6 +304,12 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
{
publisherConfirmationTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs;

if (_maxOutstandingConfirmationsSemaphore is not null)
{
await _maxOutstandingConfirmationsSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
}
}

_nextPublishSeqNo++;
Expand Down Expand Up @@ -327,6 +357,12 @@ private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationIn
await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
.ConfigureAwait(false);
}

if (_publisherConfirmationTrackingEnabled &&
_maxOutstandingConfirmationsSemaphore is not null)
{
_maxOutstandingConfirmationsSemaphore.Release();
}
}
}
}
Expand Down
29 changes: 11 additions & 18 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ internal partial class Channel : IChannel, IRecoverable
// AMQP only allows one RPC operation to be active at a time.
protected readonly SemaphoreSlim _rpcSemaphore = new SemaphoreSlim(1, 1);
private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
private readonly AsyncManualResetEvent _flowControlBlock = new AsyncManualResetEvent(true);

private ShutdownEventArgs? _closeReason;
public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason);
Expand Down Expand Up @@ -361,11 +360,14 @@ protected bool Enqueue(IRpcContinuation k)
}
}

internal async Task<IChannel> OpenAsync(bool publisherConfirmationsEnabled = false,
bool publisherConfirmationTrackingEnabled = false,
CancellationToken cancellationToken = default)
internal async Task<IChannel> OpenAsync(bool publisherConfirmationsEnabled,
bool publisherConfirmationTrackingEnabled,
ushort? maxOutstandingPublisherConfirmations,
CancellationToken cancellationToken)
{
ConfigurePublisherConfirmations(publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled);
ConfigurePublisherConfirmations(publisherConfirmationsEnabled,
publisherConfirmationTrackingEnabled,
maxOutstandingPublisherConfirmations);

bool enqueued = false;
var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
Expand Down Expand Up @@ -450,17 +452,6 @@ protected ValueTask ModelSendAsync<TMethod, THeader>(in TMethod method, in THead
return Session.TransmitAsync(in method, in header, body, cancellationToken);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected ValueTask EnforceFlowControlAsync(CancellationToken cancellationToken)
{
if (_flowControlBlock.IsSet)
{
return default;
}

return _flowControlBlock.WaitAsync(cancellationToken);
}

internal Task OnCallbackExceptionAsync(CallbackExceptionEventArgs args)
{
return _callbackExceptionAsyncWrapper.InvokeAsync(this, args);
Expand Down Expand Up @@ -540,7 +531,8 @@ protected virtual void Dispose(bool disposing)

ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
_confirmSemaphore?.Dispose();
_confirmSemaphore.Dispose();
_maxOutstandingConfirmationsSemaphore?.Dispose();
}
}

Expand All @@ -561,7 +553,8 @@ protected virtual async ValueTask DisposeAsyncCore()

ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
_confirmSemaphore?.Dispose();
_confirmSemaphore.Dispose();
_maxOutstandingConfirmationsSemaphore?.Dispose();
}

public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken)
Expand Down
6 changes: 5 additions & 1 deletion projects/RabbitMQ.Client/Impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,11 @@ public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = d

// TODO channel CreateChannelAsync() to combine ctor and OpenAsync
var channel = new Channel(_config, session, options.ConsumerDispatchConcurrency);
IChannel ch = await channel.OpenAsync(options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, cancellationToken)
IChannel ch = await channel.OpenAsync(
options.PublisherConfirmationsEnabled,
options.PublisherConfirmationTrackingEnabled,
options.MaxOutstandingPublisherConfirmations,
cancellationToken)
.ConfigureAwait(false);
return ch;
}
Expand Down
2 changes: 2 additions & 0 deletions projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ RabbitMQ.Client.CreateChannelOptions
RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.get -> ushort?
RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.set -> void
RabbitMQ.Client.CreateChannelOptions.CreateChannelOptions() -> void
RabbitMQ.Client.CreateChannelOptions.MaxOutstandingPublisherConfirmations.get -> ushort?
RabbitMQ.Client.CreateChannelOptions.MaxOutstandingPublisherConfirmations.set -> void
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.get -> bool
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.set -> void
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.get -> bool
Expand Down
27 changes: 16 additions & 11 deletions projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,18 @@
using System.Threading.Tasks;
using RabbitMQ.Client;

const ushort MAX_OUTSTANDING_CONFIRMS = 256;

const int MESSAGE_COUNT = 50_000;
bool debug = false;

var channelOpts = new CreateChannelOptions
{
PublisherConfirmationsEnabled = true,
PublisherConfirmationTrackingEnabled = true,
MaxOutstandingPublisherConfirmations = MAX_OUTSTANDING_CONFIRMS
};

#pragma warning disable CS8321 // Local function is declared but never used

await PublishMessagesIndividuallyAsync();
Expand All @@ -53,12 +62,12 @@ static Task<IConnection> CreateConnectionAsync()
return factory.CreateConnectionAsync();
}

static async Task PublishMessagesIndividuallyAsync()
async Task PublishMessagesIndividuallyAsync()
{
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms per-message");

await using IConnection connection = await CreateConnectionAsync();
await using IChannel channel = await connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
await using IChannel channel = await connection.CreateChannelAsync(channelOpts);

// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
Expand All @@ -85,18 +94,18 @@ static async Task PublishMessagesIndividuallyAsync()
Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages individually in {sw.ElapsedMilliseconds:N0} ms");
}

static async Task PublishMessagesInBatchAsync()
async Task PublishMessagesInBatchAsync()
{
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms in batches");

await using IConnection connection = await CreateConnectionAsync();
await using IChannel channel = await connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
await using IChannel channel = await connection.CreateChannelAsync(channelOpts);

// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;

int batchSize = 1000;
int batchSize = MAX_OUTSTANDING_CONFIRMS;
int outstandingMessageCount = 0;

var sw = new Stopwatch();
Expand Down Expand Up @@ -154,12 +163,8 @@ async Task HandlePublishConfirmsAsynchronously()

await using IConnection connection = await CreateConnectionAsync();

var channelOptions = new CreateChannelOptions
{
PublisherConfirmationsEnabled = true,
PublisherConfirmationTrackingEnabled = false
};
await using IChannel channel = await connection.CreateChannelAsync(channelOptions);
channelOpts.PublisherConfirmationTrackingEnabled = false;
await using IChannel channel = await connection.CreateChannelAsync(channelOpts);

// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
Expand Down
Loading