Skip to content
10 changes: 10 additions & 0 deletions projects/RabbitMQ.Client/CreateChannelOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ public sealed class CreateChannelOptions
/// </summary>
public bool PublisherConfirmationTrackingEnabled { get; set; } = false;

/// <summary>
/// The number of allowed outstanding publisher confirmations before publishing is blocked.
///
/// Defaults to <c>128</c>
///
/// Set to <c>null</c>, to allow an unlimited number of outstanding confirmations.
///
/// </summary>
public ushort? MaxOutstandingPublisherConfirmations { get; set; } = 128;

/// <summary>
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
Expand Down
18 changes: 14 additions & 4 deletions projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
private ushort _prefetchCountGlobal;
private bool _publisherConfirmationsEnabled = false;
private bool _publisherConfirmationTrackingEnabled = false;
private ushort? _maxOutstandingPublisherConfirmations = null;
private bool _usesTransactions;
private ushort _consumerDispatchConcurrency;

Expand All @@ -71,14 +72,20 @@ public TimeSpan ContinuationTimeout
set => InnerChannel.ContinuationTimeout = value;
}

public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel innerChannel,
ushort consumerDispatchConcurrency, bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled)
// TODO just pass create channel options
public AutorecoveringChannel(AutorecoveringConnection conn,
RecoveryAwareChannel innerChannel,
ushort consumerDispatchConcurrency,
bool publisherConfirmationsEnabled,
bool publisherConfirmationTrackingEnabled,
ushort? maxOutstandingPublisherConfirmations)
{
_connection = conn;
_innerChannel = innerChannel;
_consumerDispatchConcurrency = consumerDispatchConcurrency;
_publisherConfirmationsEnabled = publisherConfirmationsEnabled;
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
_maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations;
}

public event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync
Expand Down Expand Up @@ -164,8 +171,11 @@ internal async Task<bool> AutomaticallyRecoverAsync(AutorecoveringConnection con
_connection = conn;

RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(
_publisherConfirmationsEnabled, _publisherConfirmationTrackingEnabled,
_consumerDispatchConcurrency, cancellationToken)
_publisherConfirmationsEnabled,
_publisherConfirmationTrackingEnabled,
_maxOutstandingPublisherConfirmations,
_consumerDispatchConcurrency,
cancellationToken)
.ConfigureAwait(false);
newChannel.TakeOver(_innerChannel);

Expand Down
22 changes: 18 additions & 4 deletions projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -184,16 +184,21 @@ public event AsyncEventHandler<RecoveringConsumerEventArgs> RecoveringConsumerAs

public IProtocol Protocol => Endpoint.Protocol;

// TODO pass channel creation options?
public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(
bool publisherConfirmationsEnabled = false,
bool publisherConfirmationTrackingEnabled = false,
ushort? maxOutstandingPublisherConfirmations = null,
ushort? consumerDispatchConcurrency = null,
CancellationToken cancellationToken = default)
{
ISession session = InnerConnection.CreateSession();
var result = new RecoveryAwareChannel(_config, session, consumerDispatchConcurrency);
return (RecoveryAwareChannel)await result.OpenAsync(
publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled, cancellationToken)
publisherConfirmationsEnabled,
publisherConfirmationTrackingEnabled,
maxOutstandingPublisherConfirmations,
cancellationToken)
.ConfigureAwait(false);
}

Expand Down Expand Up @@ -266,11 +271,20 @@ public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = d
ushort cdc = options.ConsumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);

RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(
options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, cdc, cancellationToken)
options.PublisherConfirmationsEnabled,
options.PublisherConfirmationTrackingEnabled,
options.MaxOutstandingPublisherConfirmations,
cdc,
cancellationToken)
.ConfigureAwait(false);

var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc,
options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled);
// TODO just pass create channel options
var autorecoveringChannel = new AutorecoveringChannel(this,
recoveryAwareChannel,
cdc,
options.PublisherConfirmationsEnabled,
options.PublisherConfirmationTrackingEnabled,
options.MaxOutstandingPublisherConfirmations);
await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
.ConfigureAwait(false);
return autorecoveringChannel;
Expand Down
24 changes: 22 additions & 2 deletions projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.Framing;
Expand All @@ -41,6 +42,8 @@ namespace RabbitMQ.Client.Impl
{
internal partial class Channel : IChannel, IRecoverable
{
private readonly AsyncManualResetEvent _flowControlBlock = new(true);

public async ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default)
Expand All @@ -53,7 +56,10 @@ public async ValueTask BasicPublishAsync<TProperties>(string exchange, string ro
await MaybeStartPublisherConfirmationTracking(cancellationToken)
.ConfigureAwait(false);

await EnforceFlowControlAsync(cancellationToken)
await MaybeEnforceFlowControlAsync(cancellationToken)
.ConfigureAwait(false);

await MaybeEnforceOutstandingPublisherConfirmationsAsync(cancellationToken)
.ConfigureAwait(false);

var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
Expand Down Expand Up @@ -108,7 +114,10 @@ public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, Cac
await MaybeStartPublisherConfirmationTracking(cancellationToken)
.ConfigureAwait(false);

await EnforceFlowControlAsync(cancellationToken)
await MaybeEnforceFlowControlAsync(cancellationToken)
.ConfigureAwait(false);

await MaybeEnforceOutstandingPublisherConfirmationsAsync(cancellationToken)
.ConfigureAwait(false);

var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
Expand Down Expand Up @@ -220,5 +229,16 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers
}
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private ValueTask MaybeEnforceFlowControlAsync(CancellationToken cancellationToken)
{
if (_flowControlBlock.IsSet)
{
return default;
}

return _flowControlBlock.WaitAsync(cancellationToken);
}
}
}
59 changes: 58 additions & 1 deletion projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ internal partial class Channel : IChannel, IRecoverable
{
private bool _publisherConfirmationsEnabled = false;
private bool _publisherConfirmationTrackingEnabled = false;
private ushort? _maxOutstandingPublisherConfirmations = null;
private ulong _nextPublishSeqNo = 0;
private readonly SemaphoreSlim _confirmSemaphore = new(1, 1);
private readonly ConcurrentDictionary<ulong, TaskCompletionSource<bool>> _confirmsTaskCompletionSources = new();
private readonly AsyncManualResetEvent _maxOutstandingPublisherConfirmsReached = new(true);

private class PublisherConfirmationInfo
{
Expand Down Expand Up @@ -115,10 +117,13 @@ public async ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToke
}
}

private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled)
private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled,
bool publisherConfirmationTrackingEnabled,
ushort? maxOutstandingPublisherConfirmations)
{
_publisherConfirmationsEnabled = publisherConfirmationsEnabled;
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
_maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations;
}

private async Task MaybeConfirmSelect(CancellationToken cancellationToken)
Expand All @@ -136,6 +141,7 @@ private async Task MaybeConfirmSelect(CancellationToken cancellationToken)
if (_publisherConfirmationTrackingEnabled)
{
_confirmsTaskCompletionSources.Clear();
MaybeUnblockPublishers();
}
_nextPublishSeqNo = 1;
}
Expand Down Expand Up @@ -181,13 +187,15 @@ private void HandleAck(ulong deliveryTag, bool multiple)
{
pair.Value.SetResult(true);
_confirmsTaskCompletionSources.Remove(pair.Key, out _);
MaybeUnblockPublishers();
}
}
}
else
{
if (_confirmsTaskCompletionSources.TryRemove(deliveryTag, out TaskCompletionSource<bool>? tcs))
{
MaybeUnblockPublishers();
tcs.SetResult(true);
}
}
Expand All @@ -207,13 +215,15 @@ private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn)
{
pair.Value.SetException(new PublishException(pair.Key, isReturn));
_confirmsTaskCompletionSources.Remove(pair.Key, out _);
MaybeUnblockPublishers();
}
}
}
else
{
if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource<bool>? tcs))
{
MaybeUnblockPublishers();
tcs.SetException(new PublishException(deliveryTag, isReturn));
}
}
Expand Down Expand Up @@ -256,6 +266,7 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
}

_confirmsTaskCompletionSources.Clear();
MaybeUnblockPublishers();
}
}
finally
Expand All @@ -280,6 +291,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
{
publisherConfirmationTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs;
MaybeBlockPublishers();
}

_nextPublishSeqNo++;
Expand All @@ -306,6 +318,7 @@ private bool MaybeHandleExceptionWithEnabledPublisherConfirmations(PublisherConf
if (_publisherConfirmationTrackingEnabled)
{
_confirmsTaskCompletionSources.TryRemove(publisherConfirmationInfo.PublishSequenceNumber, out _);
MaybeUnblockPublishers();
}

exceptionWasHandled = publisherConfirmationInfo.MaybeHandleException(ex);
Expand All @@ -329,5 +342,49 @@ await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
}
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private ValueTask MaybeEnforceOutstandingPublisherConfirmationsAsync(CancellationToken cancellationToken)
{
if (_publisherConfirmationTrackingEnabled)
{
if (_maxOutstandingPublisherConfirmsReached.IsSet)
{
return default;
}
else
{
return _maxOutstandingPublisherConfirmsReached.WaitAsync(cancellationToken);
}
}

return default;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void MaybeBlockPublishers()
{
if (_publisherConfirmationTrackingEnabled)
{
if (_maxOutstandingPublisherConfirmations is not null
&& _confirmsTaskCompletionSources.Count >= _maxOutstandingPublisherConfirmations)
{
_maxOutstandingPublisherConfirmsReached.Reset();
}
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void MaybeUnblockPublishers()
{
if (_publisherConfirmationTrackingEnabled)
{
if (_maxOutstandingPublisherConfirmations is not null
&& _confirmsTaskCompletionSources.Count < _maxOutstandingPublisherConfirmations)
{
_maxOutstandingPublisherConfirmsReached.Set();
}
}
}
}
}
23 changes: 7 additions & 16 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ internal partial class Channel : IChannel, IRecoverable
// AMQP only allows one RPC operation to be active at a time.
protected readonly SemaphoreSlim _rpcSemaphore = new SemaphoreSlim(1, 1);
private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
private readonly AsyncManualResetEvent _flowControlBlock = new AsyncManualResetEvent(true);

private ShutdownEventArgs? _closeReason;
public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason);
Expand Down Expand Up @@ -361,11 +360,14 @@ protected bool Enqueue(IRpcContinuation k)
}
}

internal async Task<IChannel> OpenAsync(bool publisherConfirmationsEnabled = false,
bool publisherConfirmationTrackingEnabled = false,
CancellationToken cancellationToken = default)
internal async Task<IChannel> OpenAsync(bool publisherConfirmationsEnabled,
bool publisherConfirmationTrackingEnabled,
ushort? maxOutstandingPublisherConfirmations,
CancellationToken cancellationToken)
{
ConfigurePublisherConfirmations(publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled);
ConfigurePublisherConfirmations(publisherConfirmationsEnabled,
publisherConfirmationTrackingEnabled,
maxOutstandingPublisherConfirmations);

bool enqueued = false;
var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
Expand Down Expand Up @@ -450,17 +452,6 @@ protected ValueTask ModelSendAsync<TMethod, THeader>(in TMethod method, in THead
return Session.TransmitAsync(in method, in header, body, cancellationToken);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected ValueTask EnforceFlowControlAsync(CancellationToken cancellationToken)
{
if (_flowControlBlock.IsSet)
{
return default;
}

return _flowControlBlock.WaitAsync(cancellationToken);
}

internal Task OnCallbackExceptionAsync(CallbackExceptionEventArgs args)
{
return _callbackExceptionAsyncWrapper.InvokeAsync(this, args);
Expand Down
6 changes: 5 additions & 1 deletion projects/RabbitMQ.Client/Impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,11 @@ public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = d

// TODO channel CreateChannelAsync() to combine ctor and OpenAsync
var channel = new Channel(_config, session, options.ConsumerDispatchConcurrency);
IChannel ch = await channel.OpenAsync(options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, cancellationToken)
IChannel ch = await channel.OpenAsync(
options.PublisherConfirmationsEnabled,
options.PublisherConfirmationTrackingEnabled,
options.MaxOutstandingPublisherConfirmations,
cancellationToken)
.ConfigureAwait(false);
return ch;
}
Expand Down
2 changes: 2 additions & 0 deletions projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ RabbitMQ.Client.CreateChannelOptions
RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.get -> ushort?
RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.set -> void
RabbitMQ.Client.CreateChannelOptions.CreateChannelOptions() -> void
RabbitMQ.Client.CreateChannelOptions.MaxOutstandingPublisherConfirmations.get -> ushort?
RabbitMQ.Client.CreateChannelOptions.MaxOutstandingPublisherConfirmations.set -> void
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.get -> bool
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.set -> void
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.get -> bool
Expand Down
Loading