Skip to content

File tree

2 files changed

+80
-0
lines changed

2 files changed

+80
-0
lines changed

projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recording.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,8 @@ await DeleteAutoDeleteExchangeAsync(binding.Source,
265265

266266
}
267267

268+
internal int RecordedBindingsCount => _recordedBindings.Count;
269+
268270
internal async ValueTask RecordBindingAsync(RecordedBinding binding,
269271
bool recordedEntitiesSemaphoreHeld)
270272
{
@@ -419,6 +421,15 @@ private void DeleteAutoDeleteQueue(string queue)
419421
if (!AnyConsumersOnQueue(queue))
420422
{
421423
_recordedQueues.Remove(queue);
424+
// remove bindings targeting this queue; also cascade to auto-delete exchanges
425+
foreach (RecordedBinding binding in _recordedBindings.ToArray())
426+
{
427+
if (binding.Destination == queue)
428+
{
429+
DoDeleteRecordedBinding(binding);
430+
DoDeleteAutoDeleteExchange(binding.Source);
431+
}
432+
}
422433
}
423434
}
424435
}

projects/Test/Integration/ConnectionRecovery/TestRecoveryWithDeletedEntities.cs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131

3232
using System.Threading.Tasks;
3333
using RabbitMQ.Client;
34+
using RabbitMQ.Client.Events;
3435
using RabbitMQ.Client.Exceptions;
36+
using RabbitMQ.Client.Framing;
3537
using Xunit;
3638
using Xunit.Abstractions;
3739

@@ -150,5 +152,72 @@ public async Task TestThatDeletedQueuesDontReappearOnRecovery()
150152
AssertShutdownError(e.ShutdownReason, 404);
151153
}
152154
}
155+
156+
[Fact]
157+
public async Task TestAutoDeleteQueueBindingsRemovedWhenConsumerCancelled()
158+
{
159+
// See rabbitmq/rabbitmq-dotnet-client#1905.
160+
//
161+
// When the last consumer on an auto-delete queue is cancelled, the queue
162+
// and its bindings must be removed from recorded topology so that recovery
163+
// does not try to restore them.
164+
string exchangeName = GenerateExchangeName();
165+
await _channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct, autoDelete: true);
166+
167+
var queueDeclareOk = await _channel.QueueDeclareAsync("", false, false, autoDelete: true);
168+
string queueName = queueDeclareOk.QueueName;
169+
await _channel.QueueBindAsync(queueName, exchangeName, routingKey: "key");
170+
171+
var autorecoveringConn = (AutorecoveringConnection)_conn;
172+
Assert.Equal(1, autorecoveringConn.RecordedExchangesCount);
173+
Assert.Equal(1, autorecoveringConn.RecordedQueuesCount);
174+
Assert.Equal(1, autorecoveringConn.RecordedBindingsCount);
175+
176+
var consumer = new AsyncEventingBasicConsumer(_channel);
177+
string consumerTag = await _channel.BasicConsumeAsync(queueName, true, consumer);
178+
await _channel.BasicCancelAsync(consumerTag);
179+
180+
Assert.Equal(0, autorecoveringConn.RecordedExchangesCount);
181+
Assert.Equal(0, autorecoveringConn.RecordedQueuesCount);
182+
Assert.Equal(0, autorecoveringConn.RecordedBindingsCount);
183+
184+
await CloseAndWaitForRecoveryAsync();
185+
Assert.True(_channel.IsOpen);
186+
}
187+
188+
[Fact]
189+
public async Task TestAutoDeleteQueueBindingsRemovedWhenChannelClosed()
190+
{
191+
// See rabbitmq/rabbitmq-dotnet-client#1905.
192+
//
193+
// Same as above but uses channel closure as the trigger.
194+
string exchangeName = GenerateExchangeName();
195+
var autorecoveringConn = (AutorecoveringConnection)_conn;
196+
197+
IChannel ch = await _conn.CreateChannelAsync(_createChannelOptions);
198+
await using (ch.ConfigureAwait(false))
199+
{
200+
await ch.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct, autoDelete: true);
201+
202+
var queueDeclareOk = await ch.QueueDeclareAsync("", false, false, autoDelete: true);
203+
string queueName = queueDeclareOk.QueueName;
204+
await ch.QueueBindAsync(queueName, exchangeName, routingKey: "key");
205+
206+
Assert.Equal(1, autorecoveringConn.RecordedExchangesCount);
207+
Assert.Equal(1, autorecoveringConn.RecordedQueuesCount);
208+
Assert.Equal(1, autorecoveringConn.RecordedBindingsCount);
209+
210+
var consumer = new AsyncEventingBasicConsumer(ch);
211+
await ch.BasicConsumeAsync(queueName, true, consumer);
212+
await ch.CloseAsync();
213+
}
214+
215+
Assert.Equal(0, autorecoveringConn.RecordedExchangesCount);
216+
Assert.Equal(0, autorecoveringConn.RecordedQueuesCount);
217+
Assert.Equal(0, autorecoveringConn.RecordedBindingsCount);
218+
219+
await CloseAndWaitForRecoveryAsync();
220+
Assert.True(_channel.IsOpen);
221+
}
153222
}
154223
}

0 commit comments

Comments
 (0)