diff --git a/src/main/java/com/rabbitmq/client/impl/ChannelN.java b/src/main/java/com/rabbitmq/client/impl/ChannelN.java index d97d6f261..4c0861339 100644 --- a/src/main/java/com/rabbitmq/client/impl/ChannelN.java +++ b/src/main/java/com/rabbitmq/client/impl/ChannelN.java @@ -33,6 +33,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; /** * Main interface to AMQP protocol functionality. Public API - @@ -605,10 +606,13 @@ protected void close(int closeCode, signal.initCause(cause); } + AtomicBoolean finishProcessShutdownSignalCalled = new AtomicBoolean(false); BlockingRpcContinuation k = new BlockingRpcContinuation(){ @Override public AMQCommand transformReply(AMQCommand command) { - ChannelN.this.finishProcessShutdownSignal(); + if (finishProcessShutdownSignalCalled.compareAndSet(false, true)) { + ChannelN.this.finishProcessShutdownSignal(); + } return command; }}; boolean notify = false; @@ -639,6 +643,13 @@ public AMQCommand transformReply(AMQCommand command) { if (!abort) throw ioe; } finally { + if (finishProcessShutdownSignalCalled.compareAndSet(false, true)) { + try { + ChannelN.this.finishProcessShutdownSignal(); + } catch (Exception e) { + LOGGER.info("Error while processing shutdown signal: {}", e.getMessage()); + } + } if (abort || notify) { // Now we know everything's been cleaned up and there should // be no more surprises arriving on the wire. Release the diff --git a/src/test/java/com/rabbitmq/client/test/BlockedConnectionTest.java b/src/test/java/com/rabbitmq/client/test/BlockedConnectionTest.java index e38faedd7..7af27fbfd 100644 --- a/src/test/java/com/rabbitmq/client/test/BlockedConnectionTest.java +++ b/src/test/java/com/rabbitmq/client/test/BlockedConnectionTest.java @@ -20,25 +20,16 @@ import static com.rabbitmq.client.test.TestUtils.waitAtMost; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.assertj.core.api.Assertions.fail; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; -import java.io.IOException; import java.time.Duration; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import com.rabbitmq.client.ConsumerShutdownSignalCallback; -import com.rabbitmq.client.DeliverCallback; -import com.rabbitmq.client.Delivery; import com.rabbitmq.client.MessageProperties; -import com.rabbitmq.client.ShutdownListener; -import com.rabbitmq.client.ShutdownSignalException; -import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -86,6 +77,9 @@ void shutdownListenerShouldBeCalledWhenChannelDies() throws Exception { CountDownLatch blockedLatch = new CountDownLatch(1); c.addBlockedListener(reason -> blockedLatch.countDown(), () -> {}); Channel ch = c.createChannel(); + String q = ch.queueDeclare().getQueue(); + CountDownLatch consShutdownLatch = new CountDownLatch(1); + ch.basicConsume(q, (ctag, msg) -> { }, (ctag, r) -> consShutdownLatch.countDown()); CountDownLatch chShutdownLatch = new CountDownLatch(1); ch.addShutdownListener(cause -> chShutdownLatch.countDown()); ch.confirmSelect(); @@ -98,6 +92,7 @@ void shutdownListenerShouldBeCalledWhenChannelDies() throws Exception { ch.basicPublish("", "", MessageProperties.BASIC, "".getBytes()); assertThatThrownBy(() -> ch.waitForConfirmsOrDie(confirmTimeout)) .isInstanceOf(TimeoutException.class); + assertThat(consShutdownLatch).is(completed()); assertThat(chShutdownLatch).is(completed()); } finally { if (blocked) { diff --git a/src/test/java/com/rabbitmq/client/test/functional/ConsumerCancelNotification.java b/src/test/java/com/rabbitmq/client/test/functional/ConsumerNotifications.java similarity index 83% rename from src/test/java/com/rabbitmq/client/test/functional/ConsumerCancelNotification.java rename to src/test/java/com/rabbitmq/client/test/functional/ConsumerNotifications.java index 1bd12f17c..db984b06a 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/ConsumerCancelNotification.java +++ b/src/test/java/com/rabbitmq/client/test/functional/ConsumerNotifications.java @@ -28,10 +28,11 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -public class ConsumerCancelNotification extends BrokerTestCase { +public class ConsumerNotifications extends BrokerTestCase { private final String queue = "cancel_notification_queue"; @@ -42,7 +43,7 @@ public class ConsumerCancelNotification extends BrokerTestCase { channel.queueDeclare(queue, false, true, false, null); Consumer consumer = new DefaultConsumer(channel) { @Override - public void handleCancel(String consumerTag) throws IOException { + public void handleCancel(String consumerTag) { try { result.put(true); } catch (InterruptedException e) { @@ -55,7 +56,31 @@ public void handleCancel(String consumerTag) throws IOException { assertTrue(result.take()); } - class AlteringConsumer extends DefaultConsumer { + @Test public void consumerCancellationHandlerUsesBlockingOperations() + throws IOException, InterruptedException { + final String altQueue = "basic.cancel.fallback"; + channel.queueDeclare(queue, false, true, false, null); + + CountDownLatch latch = new CountDownLatch(1); + final AlteringConsumer consumer = new AlteringConsumer(channel, altQueue, latch); + + channel.basicConsume(queue, consumer); + channel.queueDelete(queue); + + latch.await(2, TimeUnit.SECONDS); + } + + @Test + void handleShutdownShouldBeCalledWhenChannelIsClosed() throws Exception { + Channel ch = connection.createChannel(); + String q = ch.queueDeclare().getQueue(); + CountDownLatch latch = new CountDownLatch(1); + ch.basicConsume(q, (ctag, msg) -> {}, (ctag, r) -> latch.countDown()); + ch.close(); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + } + + private static class AlteringConsumer extends DefaultConsumer { private final String altQueue; private final CountDownLatch latch; @@ -81,18 +106,4 @@ public void handleCancel(String consumerTag) { } } } - - @Test public void consumerCancellationHandlerUsesBlockingOperations() - throws IOException, InterruptedException { - final String altQueue = "basic.cancel.fallback"; - channel.queueDeclare(queue, false, true, false, null); - - CountDownLatch latch = new CountDownLatch(1); - final AlteringConsumer consumer = new AlteringConsumer(channel, altQueue, latch); - - channel.basicConsume(queue, consumer); - channel.queueDelete(queue); - - latch.await(2, TimeUnit.SECONDS); - } } diff --git a/src/test/java/com/rabbitmq/client/test/functional/FunctionalTestSuite.java b/src/test/java/com/rabbitmq/client/test/functional/FunctionalTestSuite.java index 820728ce0..7f7767fbd 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/FunctionalTestSuite.java +++ b/src/test/java/com/rabbitmq/client/test/functional/FunctionalTestSuite.java @@ -53,7 +53,7 @@ DefaultExchange.class, UnbindAutoDeleteExchange.class, Confirm.class, - ConsumerCancelNotification.class, + ConsumerNotifications.class, UnexpectedFrames.class, PerQueueTTL.class, PerMessageTTL.class,