diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java index 48bb7b130..0e3e82d95 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java @@ -1095,8 +1095,12 @@ void recordExchange(String exchange, RecordedExchange x) { void deleteRecordedExchange(String exchange) { this.recordedExchanges.remove(exchange); - Set xs = this.removeBindingsWithDestination(exchange); - for (RecordedBinding b : xs) { + Set xs1 = this.removeBindingsWithDestination(exchange); + for (RecordedBinding b : xs1) { + this.maybeDeleteRecordedAutoDeleteExchange(b.getSource()); + } + Set xs2 = this.removeBindingsWithSource(exchange); + for (RecordedBinding b : xs2) { this.maybeDeleteRecordedAutoDeleteExchange(b.getSource()); } } @@ -1160,11 +1164,19 @@ boolean hasMoreConsumersOnQueue(Collection consumers, String q } Set removeBindingsWithDestination(String s) { + return this.removeBindingsWithCondition(b -> b.getDestination().equals(s)); + } + + Set removeBindingsWithSource(String s) { + return this.removeBindingsWithCondition(b -> b.getSource().equals(s)); + } + + private Set removeBindingsWithCondition(Predicate condition) { final Set result = new LinkedHashSet<>(); synchronized (this.recordedBindings) { for (Iterator it = this.recordedBindings.iterator(); it.hasNext(); ) { RecordedBinding b = it.next(); - if(b.getDestination().equals(s)) { + if (condition.test(b)) { it.remove(); result.add(b); } diff --git a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java index 4257c5a7b..5145929eb 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java +++ b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java @@ -865,6 +865,21 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie } } + @Test public void thatBindingFromDeletedExchangeIsDeleted() throws IOException, InterruptedException { + String q = generateQueueName(); + channel.queueDeclare(q, false, false, false, null); + try { + String x = generateExchangeName(); + channel.exchangeDeclare(x, "fanout"); + channel.queueBind(q, x, ""); + assertRecordedBinding(connection, 1); + channel.exchangeDelete(x); + assertRecordedBinding(connection, 0); + } finally { + channel.queueDelete(q); + } + } + private void assertConsumerCount(int exp, String q) throws IOException { assertThat(channel.queueDeclarePassive(q).getConsumerCount()).isEqualTo(exp); } @@ -1017,4 +1032,8 @@ private static void assertRecordedQueues(Connection conn, int size) { private static void assertRecordedExchanges(Connection conn, int size) { assertThat(((AutorecoveringConnection)conn).getRecordedExchanges()).hasSize(size); } + + private static void assertRecordedBinding(Connection conn, int size) { + assertThat(((AutorecoveringConnection)conn).getRecordedBindings()).hasSize(size); + } }