Skip to content

Commit 0bdd703

Browse files
committed
* Remove / comment out all use of WaitForConfirms... methods.
1 parent 1e4295f commit 0bdd703

25 files changed

+59
-224
lines changed

projects/RabbitMQ.Client/IChannel.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,7 @@ Task QueueUnbindAsync(string queue, string exchange, string routingKey,
443443
/// <param name="cancellationToken">The cancellation token.</param>
444444
Task TxSelectAsync(CancellationToken cancellationToken = default);
445445

446+
#if REMOVING_WAIT_FOR_CONFIRMS
446447
/// <summary>
447448
/// Asynchronously wait until all published messages on this channel have been confirmed.
448449
/// </summary>
@@ -468,6 +469,7 @@ Task QueueUnbindAsync(string queue, string exchange, string routingKey,
468469
/// the channel.
469470
/// </remarks>
470471
Task WaitForConfirmsOrDieAsync(CancellationToken cancellationToken = default);
472+
#endif
471473

472474
/// <summary>
473475
/// Amount of time protocol operations (e.g. <code>queue.declare</code>) are allowed to take before

projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,11 +488,13 @@ public Task TxSelectAsync(CancellationToken cancellationToken)
488488
return InnerChannel.TxSelectAsync(cancellationToken);
489489
}
490490

491+
#if REMOVING_WAIT_FOR_CONFIRMS
491492
public Task<bool> WaitForConfirmsAsync(CancellationToken token = default)
492493
=> InnerChannel.WaitForConfirmsAsync(token);
493494

494495
public Task WaitForConfirmsOrDieAsync(CancellationToken token = default)
495496
=> InnerChannel.WaitForConfirmsOrDieAsync(token);
497+
#endif
496498

497499
[MethodImpl(MethodImplOptions.AggressiveInlining)]
498500
private void ThrowIfDisposed()

projects/RabbitMQ.Client/Impl/ChannelBase.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1754,6 +1754,7 @@ await ModelSendAsync(in method, k.CancellationToken)
17541754
}
17551755
}
17561756

1757+
#if REMOVING_WAIT_FOR_CONFIRMS
17571758
public async Task<bool> WaitForConfirmsAsync(CancellationToken cancellationToken = default)
17581759
{
17591760
if (false == _publisherConfirmationsEnabled)
@@ -1880,6 +1881,7 @@ await tokenRegistration.DisposeAsync()
18801881
#endif
18811882
}
18821883
}
1884+
#endif
18831885

18841886
// NOTE: this method is internal for its use in this test:
18851887
// TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout_ReturnFalse

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -756,8 +756,6 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
756756
~RabbitMQ.Client.IChannel.TxCommitAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
757757
~RabbitMQ.Client.IChannel.TxRollbackAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
758758
~RabbitMQ.Client.IChannel.TxSelectAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
759-
~RabbitMQ.Client.IChannel.WaitForConfirmsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<bool>
760-
~RabbitMQ.Client.IChannel.WaitForConfirmsOrDieAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
761759
~RabbitMQ.Client.IConnection.CloseAsync(ushort reasonCode, string reasonText, System.TimeSpan timeout, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
762760
~RabbitMQ.Client.IConnection.UpdateSecretAsync(string newSecret, string reason, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
763761
~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>

projects/Test/Applications/MassPublish/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ await publishChannel.BasicPublishAsync(exchange: ExchangeName, routingKey: Routi
149149
Interlocked.Increment(ref s_messagesSent);
150150
}
151151

152-
await publishChannel.WaitForConfirmsOrDieAsync();
152+
// await publishChannel.WaitForConfirmsOrDieAsync();
153153

154154
if (s_debug)
155155
{

projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ static async Task PublishMessagesIndividuallyAsync()
4040
await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body);
4141
}
4242

43-
await channel.WaitForConfirmsOrDieAsync();
43+
// await channel.WaitForConfirmsOrDieAsync();
4444

4545
sw.Stop();
4646

@@ -77,15 +77,15 @@ static async Task PublishMessagesInBatchAsync()
7777
await Task.WhenAll(publishTasks).WaitAsync(cts.Token);
7878
publishTasks.Clear();
7979

80-
await channel.WaitForConfirmsOrDieAsync(cts.Token);
80+
// await channel.WaitForConfirmsOrDieAsync(cts.Token);
8181
outstandingMessageCount = 0;
8282
}
8383
}
8484

8585
if (outstandingMessageCount > 0)
8686
{
87-
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
88-
await channel.WaitForConfirmsOrDieAsync(cts.Token);
87+
// using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
88+
// await channel.WaitForConfirmsOrDieAsync(cts.Token);
8989
}
9090

9191
sw.Stop();

projects/Test/Common/TestConnectionRecoveryBase.cs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
using System.Threading.Tasks;
3636
using RabbitMQ.Client;
3737
using RabbitMQ.Client.Framing;
38-
using RabbitMQ.Client.Impl;
3938
using Xunit;
4039
using Xunit.Abstractions;
4140

@@ -79,8 +78,7 @@ await WithTemporaryNonExclusiveQueueAsync(ch, async (_, q) =>
7978
string rk = "routing-key";
8079
await ch.QueueBindAsync(q, x, rk);
8180
await ch.BasicPublishAsync(x, rk, _messageBody);
82-
83-
Assert.True(await WaitForConfirmsWithCancellationAsync(ch));
81+
// Assert.True(await WaitForConfirmsWithCancellationAsync(ch));
8482
await ch.ExchangeDeclarePassiveAsync(x);
8583
});
8684
}
@@ -92,11 +90,6 @@ protected Task AssertExclusiveQueueRecoveryAsync(IChannel m, string q)
9290

9391
protected async Task AssertQueueRecoveryAsync(IChannel ch, string q, bool exclusive, IDictionary<string, object> arguments = null)
9492
{
95-
// TODO
96-
// Hack for rabbitmq/rabbitmq-dotnet-client#1682
97-
AutorecoveringChannel ach = (AutorecoveringChannel)ch;
98-
await ach.ConfirmSelectAsync(publisherConfirmationTrackingEnabled: true);
99-
10093
// Note: no need to enable publisher confirmations as they are
10194
// automatically enabled for channels
10295
await ch.QueueDeclareAsync(queue: q, passive: true, durable: false, exclusive: false, autoDelete: false, arguments: null);
@@ -106,7 +99,7 @@ protected async Task AssertQueueRecoveryAsync(IChannel ch, string q, bool exclus
10699
Assert.Equal(0u, ok1.MessageCount);
107100

108101
await ch.BasicPublishAsync("", q, _messageBody);
109-
Assert.True(await WaitForConfirmsWithCancellationAsync(ch));
102+
// Assert.True(await WaitForConfirmsWithCancellationAsync(ch));
110103

111104
RabbitMQ.Client.QueueDeclareOk ok2 = await ch.QueueDeclareAsync(queue: q, passive: false,
112105
durable: false, exclusive: exclusive, autoDelete: false, arguments: arguments);
@@ -223,7 +216,7 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName)
223216
}
224217

225218
await publishingChannel.BasicPublishAsync(string.Empty, queueName, _messageBody);
226-
await publishingChannel.WaitForConfirmsOrDieAsync();
219+
// await publishingChannel.WaitForConfirmsOrDieAsync();
227220
}
228221

229222
await publishingChannel.CloseAsync();
@@ -245,13 +238,15 @@ protected static TaskCompletionSource<bool> PrepareForShutdown(IConnection conn)
245238
return tcs;
246239
}
247240

241+
#if REMOVING_WAIT_FOR_CONFIRMS
248242
protected static Task<bool> WaitForConfirmsWithCancellationAsync(IChannel channel)
249243
{
250244
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(4)))
251245
{
252246
return channel.WaitForConfirmsAsync(cts.Token);
253247
}
254248
}
249+
#endif
255250

256251
protected Task WaitForShutdownAsync()
257252
{
@@ -376,7 +371,7 @@ protected static async Task<bool> SendAndConsumeMessageAsync(IConnection conn, s
376371
await ch.BasicPublishAsync(exchange: exchange, routingKey: routingKey,
377372
body: _encoding.GetBytes("test message"), mandatory: true);
378373

379-
await ch.WaitForConfirmsOrDieAsync();
374+
// await ch.WaitForConfirmsOrDieAsync();
380375

381376
try
382377
{

projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,6 @@ public async Task TestBasicAckAfterBasicGetAndChannelRecovery()
154154
[Fact]
155155
public async Task TestBasicAckEventHandlerRecovery()
156156
{
157-
// TODO
158-
// Hack for rabbitmq/rabbitmq-dotnet-client#1682
159-
AutorecoveringChannel ach = (AutorecoveringChannel)_channel;
160-
await ach.ConfirmSelectAsync(publisherConfirmationTrackingEnabled: false);
161-
162157
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
163158
((AutorecoveringChannel)_channel).BasicAcksAsync += (m, args) =>
164159
{

projects/Test/Integration/ConnectionRecovery/TestExchangeRecovery.cs

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131

3232
using System.Threading.Tasks;
3333
using RabbitMQ.Client;
34-
using RabbitMQ.Client.Impl;
3534
using Xunit;
3635
using Xunit.Abstractions;
3736

@@ -46,11 +45,6 @@ public TestExchangeRecovery(ITestOutputHelper output) : base(output)
4645
[Fact]
4746
public async Task TestExchangeRecoveryTest()
4847
{
49-
// TODO
50-
// Hack for rabbitmq/rabbitmq-dotnet-client#1682
51-
AutorecoveringChannel ach = (AutorecoveringChannel)_channel;
52-
await ach.ConfirmSelectAsync(publisherConfirmationTrackingEnabled: true);
53-
5448
string x = "dotnet-client.test.recovery.x1";
5549
await DeclareNonDurableExchangeAsync(_channel, x);
5650
await CloseAndWaitForRecoveryAsync();
@@ -61,11 +55,6 @@ public async Task TestExchangeRecoveryTest()
6155
[Fact]
6256
public async Task TestExchangeToExchangeBindingRecovery()
6357
{
64-
// TODO
65-
// Hack for rabbitmq/rabbitmq-dotnet-client#1682
66-
AutorecoveringChannel ach = (AutorecoveringChannel)_channel;
67-
await ach.ConfirmSelectAsync(publisherConfirmationTrackingEnabled: true);
68-
6958
string q = (await _channel.QueueDeclareAsync("", false, false, false)).QueueName;
7059

7160
string ex_source = GenerateExchangeName();
@@ -82,7 +71,7 @@ public async Task TestExchangeToExchangeBindingRecovery()
8271
await CloseAndWaitForRecoveryAsync();
8372
Assert.True(_channel.IsOpen);
8473
await _channel.BasicPublishAsync(ex_source, "", body: _encoding.GetBytes("msg"), mandatory: true);
85-
await _channel.WaitForConfirmsOrDieAsync();
74+
// await _channel.WaitForConfirmsOrDieAsync();
8675
await AssertMessageCountAsync(q, 1);
8776
}
8877
finally

projects/Test/Integration/TestAsyncConsumer.cs

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
231231
{
232232
await publishChannel.BasicPublishAsync(string.Empty, queueName, body1);
233233
await publishChannel.BasicPublishAsync(string.Empty, queueName, body2);
234-
await publishChannel.WaitForConfirmsOrDieAsync();
234+
// await publishChannel.WaitForConfirmsOrDieAsync();
235235
}
236236

237237
await publishChannel.CloseAsync();
@@ -435,11 +435,6 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false,
435435
[Fact]
436436
public async Task TestBasicAckAsync()
437437
{
438-
// TODO
439-
// Hack for rabbitmq/rabbitmq-dotnet-client#1682
440-
AutorecoveringChannel ach = (AutorecoveringChannel)_channel;
441-
await ach.ConfirmSelectAsync(publisherConfirmationTrackingEnabled: true);
442-
443438
await ValidateConsumerDispatchConcurrency();
444439

445440
string queueName = GenerateQueueName();
@@ -493,7 +488,7 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false,
493488
{
494489
byte[] _body = _encoding.GetBytes(Guid.NewGuid().ToString());
495490
await _channel.BasicPublishAsync(string.Empty, queueName, _body);
496-
await _channel.WaitForConfirmsOrDieAsync();
491+
// await _channel.WaitForConfirmsOrDieAsync();
497492
}
498493

499494
return true;
@@ -608,11 +603,6 @@ public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer()
608603
[Fact]
609604
public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650()
610605
{
611-
// TODO
612-
// Hack for rabbitmq/rabbitmq-dotnet-client#1682
613-
AutorecoveringChannel ach = (AutorecoveringChannel)_channel;
614-
await ach.ConfirmSelectAsync(publisherConfirmationTrackingEnabled: true);
615-
616606
await ValidateConsumerDispatchConcurrency();
617607

618608
string exchangeName = GenerateExchangeName();
@@ -660,7 +650,7 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650()
660650
await innerChannel.BasicPublishAsync(exchangeName, queue2Name,
661651
mandatory: true,
662652
body: Encoding.ASCII.GetBytes(nameof(TestCreateChannelWithinAsyncConsumerCallback_GH650)));
663-
await innerChannel.WaitForConfirmsOrDieAsync();
653+
// await innerChannel.WaitForConfirmsOrDieAsync();
664654
await innerChannel.CloseAsync();
665655
};
666656
await _channel.BasicConsumeAsync(queue1Name, autoAck: true, consumer1);
@@ -676,7 +666,7 @@ await innerChannel.BasicPublishAsync(exchangeName, queue2Name,
676666
// Note: no need to enable publisher confirmations as they are
677667
// automatically enabled for channels
678668
await _channel.BasicPublishAsync(exchangeName, queue1Name, body: GetRandomBody(1024));
679-
await _channel.WaitForConfirmsOrDieAsync();
669+
// await _channel.WaitForConfirmsOrDieAsync();
680670

681671
Assert.True(await tcs.Task);
682672
}

0 commit comments

Comments
 (0)