Skip to content

Commit c10fb19

Browse files
committed
Track publisher confirmations automatically
Fixes #1682 * Remove `ConfirmSelectAsync` from `IChannel` * Add parameters to enable confirmations on `IConnection.CreateChannelAsync`
1 parent be1d1e3 commit c10fb19

29 files changed

+234
-100
lines changed

projects/RabbitMQ.Client/Constants.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public static class Constants
8787
/// <summary>
8888
/// The default consumer dispatch concurrency. See <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
8989
/// to set this value for every channel created on a connection,
90-
/// and <see cref="IConnection.CreateChannelAsync(ushort?, System.Threading.CancellationToken)"/>
90+
/// and <see cref="IConnection.CreateChannelAsync(bool, bool, ushort?, System.Threading.CancellationToken)" />
9191
/// for setting this value for a particular channel.
9292
/// </summary>
9393
public const ushort DefaultConsumerDispatchConcurrency = 1;

projects/RabbitMQ.Client/IChannel.cs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -265,14 +265,6 @@ Task CloseAsync(ushort replyCode, string replyText, bool abort,
265265
Task CloseAsync(ShutdownEventArgs reason, bool abort,
266266
CancellationToken cancellationToken = default);
267267

268-
/// <summary>
269-
/// Asynchronously enable publisher confirmations.
270-
/// </summary>
271-
/// <param name="trackConfirmations">Set to <c>false</c> if tracking via <see cref="BasicAcksAsync"/> and <see cref="BasicNacksAsync"/> yourself.</param>
272-
/// <param name="cancellationToken">CancellationToken for this operation.</param>
273-
Task ConfirmSelectAsync(bool trackConfirmations = true,
274-
CancellationToken cancellationToken = default);
275-
276268
/// <summary>Asynchronously declare an exchange.</summary>
277269
/// <param name="exchange">The name of the exchange.</param>
278270
/// <param name="type">The type of the exchange.</param>

projects/RabbitMQ.Client/IConnection.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,12 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
240240
/// <summary>
241241
/// Asynchronously create and return a fresh channel, session, and channel.
242242
/// </summary>
243+
/// <param name="publisherConfirmations">
244+
/// Enable or disable publisher confirmations on this channel. Defaults to <c>false</c>
245+
/// </param>
246+
/// <param name="publisherConfirmationTracking">
247+
/// Should this library track publisher confirmations for you? Defaults to <c>false</c>
248+
/// </param>
243249
/// <param name="consumerDispatchConcurrency">
244250
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
245251
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
@@ -251,7 +257,8 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
251257
/// In addition to that consumers need to be thread/concurrency safe.
252258
/// </param>
253259
/// <param name="cancellationToken">Cancellation token</param>
254-
Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
260+
Task<IChannel> CreateChannelAsync(bool publisherConfirmations = false, bool publisherConfirmationTracking = false,
261+
ushort? consumerDispatchConcurrency = null,
255262
CancellationToken cancellationToken = default);
256263
}
257264
}

projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,16 +251,26 @@ await CloseInnerConnectionAsync()
251251
}
252252
}
253253

254-
public async Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
255-
CancellationToken cancellationToken = default)
254+
public async Task<IChannel> CreateChannelAsync(bool publisherConfirmations = false, bool publisherConfirmationTracking = false,
255+
ushort? consumerDispatchConcurrency = null, CancellationToken cancellationToken = default)
256256
{
257257
EnsureIsOpen();
258+
258259
ushort cdc = consumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);
260+
259261
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cdc, cancellationToken)
260262
.ConfigureAwait(false);
263+
261264
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc);
265+
if (publisherConfirmations)
266+
{
267+
await channel.ConfirmSelectAsync(trackConfirmations: publisherConfirmationTracking,
268+
cancellationToken: cancellationToken).ConfigureAwait(false);
269+
}
270+
262271
await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
263272
.ConfigureAwait(false);
273+
264274
return channel;
265275
}
266276

projects/RabbitMQ.Client/Impl/Connection.cs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -264,13 +264,30 @@ await CloseAsync(ea, true,
264264
}
265265
}
266266

267-
public Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
268-
CancellationToken cancellationToken = default)
267+
public async Task<IChannel> CreateChannelAsync(bool publisherConfirmations = false, bool publisherConfirmationTracking = false,
268+
ushort? consumerDispatchConcurrency = null, CancellationToken cancellationToken = default)
269269
{
270270
EnsureIsOpen();
271271
ISession session = CreateSession();
272272
var channel = new Channel(_config, session, consumerDispatchConcurrency);
273-
return channel.OpenAsync(cancellationToken);
273+
IChannel ch = await channel.OpenAsync(cancellationToken)
274+
.ConfigureAwait(false);
275+
if (publisherConfirmations)
276+
{
277+
// TODO yes this is ugly but will be fixed as part of rabbitmq/rabbitmq-dotnet-client#1682
278+
if (ch is not AutorecoveringChannel ac)
279+
{
280+
ChannelBase chb = (ChannelBase)ch;
281+
await chb.ConfirmSelectAsync(publisherConfirmationTracking, cancellationToken)
282+
.ConfigureAwait(false);
283+
}
284+
else
285+
{
286+
await ac.ConfirmSelectAsync(publisherConfirmationTracking, cancellationToken)
287+
.ConfigureAwait(false);
288+
}
289+
}
290+
return ch;
274291
}
275292

276293
internal ISession CreateSession()

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -825,10 +825,8 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client
825825
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
826826
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
827827
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
828-
RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
829828
const RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency = 1 -> ushort
830829
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort
831-
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
832830
RabbitMQ.Client.IConnection.CallbackExceptionAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs!>!
833831
RabbitMQ.Client.IConnection.ConnectionBlockedAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.ConnectionBlockedEventArgs!>!
834832
RabbitMQ.Client.IConnection.ConnectionRecoveryErrorAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs!>!
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
RabbitMQ.Client.IConnection.CreateChannelAsync(bool publisherConfirmations = false, bool publisherConfirmationTracking = false, ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!

projects/Test/Applications/MassPublish/Program.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,10 @@ await consumeChannel.BasicConsumeAsync(queue: QueueName, autoAck: true, consumer
137137

138138
publishTasks.Add(Task.Run(async () =>
139139
{
140-
using IChannel publishChannel = await publishConnection.CreateChannelAsync();
140+
using IChannel publishChannel = await publishConnection.CreateChannelAsync(publisherConfirmations: true,
141+
publisherConfirmationTracking: true);
141142
publishChannel.ChannelShutdownAsync += Channel_ChannelShutdownAsync;
142143

143-
await publishChannel.ConfirmSelectAsync();
144144

145145
for (int i = 0; i < ItemsPerBatch; i++)
146146
{

projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ static async Task PublishMessagesIndividuallyAsync()
2424
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages individually and handling confirms all at once");
2525

2626
await using IConnection connection = await CreateConnectionAsync();
27-
await using IChannel channel = await connection.CreateChannelAsync();
27+
await using IChannel channel = await connection.CreateChannelAsync(publisherConfirmations: true,
28+
publisherConfirmationTracking: true);
2829

2930
// declare a server-named queue
3031
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
3132
string queueName = queueDeclareResult.QueueName;
32-
await channel.ConfirmSelectAsync();
3333

3434
var sw = new Stopwatch();
3535
sw.Start();
@@ -57,7 +57,6 @@ static async Task PublishMessagesInBatchAsync()
5757
// declare a server-named queue
5858
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
5959
string queueName = queueDeclareResult.QueueName;
60-
await channel.ConfirmSelectAsync();
6160

6261
int batchSize = 100;
6362
int outstandingMessageCount = 0;
@@ -98,16 +97,15 @@ async Task HandlePublishConfirmsAsynchronously()
9897
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms asynchronously");
9998

10099
await using IConnection connection = await CreateConnectionAsync();
101-
await using IChannel channel = await connection.CreateChannelAsync();
100+
101+
// NOTE: setting trackConfirmations to false because this program
102+
// is tracking them itself.
103+
await using IChannel channel = await connection.CreateChannelAsync(publisherConfirmationTracking: false);
102104

103105
// declare a server-named queue
104106
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
105107
string queueName = queueDeclareResult.QueueName;
106108

107-
// NOTE: setting trackConfirmations to false because this program
108-
// is tracking them itself.
109-
await channel.ConfirmSelectAsync(trackConfirmations: false);
110-
111109
bool publishingCompleted = false;
112110
var allMessagesConfirmedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
113111
var outstandingConfirms = new LinkedList<ulong>();

projects/Test/Common/TestConnectionRecoveryBase.cs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
using System.Threading.Tasks;
3636
using RabbitMQ.Client;
3737
using RabbitMQ.Client.Framing;
38+
using RabbitMQ.Client.Impl;
3839
using Xunit;
3940
using Xunit.Abstractions;
4041

@@ -71,17 +72,16 @@ protected async Task AssertConsumerCountAsync(IChannel ch, string q, uint count)
7172
Assert.Equal(count, ok.ConsumerCount);
7273
}
7374

74-
protected async Task AssertExchangeRecoveryAsync(IChannel m, string x)
75+
protected async Task AssertExchangeRecoveryAsync(IChannel ch, string x)
7576
{
76-
await m.ConfirmSelectAsync();
77-
await WithTemporaryNonExclusiveQueueAsync(m, async (_, q) =>
77+
await WithTemporaryNonExclusiveQueueAsync(ch, async (_, q) =>
7878
{
7979
string rk = "routing-key";
80-
await m.QueueBindAsync(q, x, rk);
81-
await m.BasicPublishAsync(x, rk, _messageBody);
80+
await ch.QueueBindAsync(q, x, rk);
81+
await ch.BasicPublishAsync(x, rk, _messageBody);
8282

83-
Assert.True(await WaitForConfirmsWithCancellationAsync(m));
84-
await m.ExchangeDeclarePassiveAsync(x);
83+
Assert.True(await WaitForConfirmsWithCancellationAsync(ch));
84+
await ch.ExchangeDeclarePassiveAsync(x);
8585
});
8686
}
8787

@@ -92,7 +92,13 @@ protected Task AssertExclusiveQueueRecoveryAsync(IChannel m, string q)
9292

9393
protected async Task AssertQueueRecoveryAsync(IChannel ch, string q, bool exclusive, IDictionary<string, object> arguments = null)
9494
{
95-
await ch.ConfirmSelectAsync();
95+
// TODO
96+
// Hack for rabbitmq/rabbitmq-dotnet-client#1682
97+
AutorecoveringChannel ach = (AutorecoveringChannel)ch;
98+
await ach.ConfirmSelectAsync(trackConfirmations: true);
99+
100+
// Note: no need to enable publisher confirmations as they are
101+
// automatically enabled for channels
96102
await ch.QueueDeclareAsync(queue: q, passive: true, durable: false, exclusive: false, autoDelete: false, arguments: null);
97103

98104
RabbitMQ.Client.QueueDeclareOk ok1 = await ch.QueueDeclareAsync(queue: q, passive: false,
@@ -204,9 +210,10 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName)
204210
{
205211
using (AutorecoveringConnection publishingConn = await CreateAutorecoveringConnectionAsync())
206212
{
207-
using (IChannel publishingChannel = await publishingConn.CreateChannelAsync())
213+
using (IChannel publishingChannel = await publishingConn.CreateChannelAsync(publisherConfirmations: true, publisherConfirmationTracking: true))
208214
{
209-
await publishingChannel.ConfirmSelectAsync();
215+
// Note: no need to enable publisher confirmations as they are
216+
// automatically enabled for channels
210217

211218
for (ushort i = 0; i < TotalMessageCount; i++)
212219
{
@@ -358,10 +365,8 @@ public virtual Task PostHandleDeliveryAsync(ulong deliveryTag,
358365

359366
protected static async Task<bool> SendAndConsumeMessageAsync(IConnection conn, string queue, string exchange, string routingKey)
360367
{
361-
using (IChannel ch = await conn.CreateChannelAsync())
368+
using (IChannel ch = await conn.CreateChannelAsync(publisherConfirmations: true, publisherConfirmationTracking: true))
362369
{
363-
await ch.ConfirmSelectAsync();
364-
365370
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
366371

367372
var consumer = new AckingBasicConsumer(ch, 1, tcs);

0 commit comments

Comments
 (0)