diff --git a/src/main/java/com/rabbitmq/perf/Consumer.java b/src/main/java/com/rabbitmq/perf/Consumer.java index 2635a9e0..a11841f1 100644 --- a/src/main/java/com/rabbitmq/perf/Consumer.java +++ b/src/main/java/com/rabbitmq/perf/Consumer.java @@ -214,7 +214,7 @@ private void startBasicGetConsumer() { try { GetResponse response = ch.basicGet(queue, autoAck); if (response != null) { - delegate.handleMessage( + delegate.maybeHandleMessage( response.getEnvelope(), response.getProps(), response.getBody(), ch); } } catch (IOException e) { @@ -280,7 +280,14 @@ private ConsumerImpl(Channel channel) { public void handleDelivery( String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { - this.handleMessage(envelope, properties, body, channel); + this.maybeHandleMessage(envelope, properties, body, channel); + } + + private void maybeHandleMessage( + Envelope envelope, BasicProperties properties, byte[] body, Channel ch) throws IOException { + if (ch.isOpen()) { + handleMessage(envelope, properties, body, ch); + } } void handleMessage(Envelope envelope, BasicProperties properties, byte[] body, Channel ch) diff --git a/src/test/java/com/rabbitmq/perf/MessageCountTimeLimitAndPublishingIntervalRateTest.java b/src/test/java/com/rabbitmq/perf/MessageCountTimeLimitAndPublishingIntervalRateTest.java index 53634e59..8f104735 100644 --- a/src/test/java/com/rabbitmq/perf/MessageCountTimeLimitAndPublishingIntervalRateTest.java +++ b/src/test/java/com/rabbitmq/perf/MessageCountTimeLimitAndPublishingIntervalRateTest.java @@ -337,6 +337,11 @@ public void consumerCount(int consumersCount, int channelsCount) throws Exceptio Channel channel = proxy( Channel.class, + callback( + "isOpen", + (proxy, method, args) -> { + return true; + }), callback( "basicConsume", (proxy, method, args) -> { @@ -458,6 +463,11 @@ void shouldAckOneLastTimeWhenQueueIsEmpty() throws Exception { Channel channel = proxy( Channel.class, + callback( + "isOpen", + (proxy, method, args) -> { + return true; + }), callback( "basicConsume", (proxy, method, args) -> { @@ -924,6 +934,11 @@ public void pollingWithBasicGet(int consumersCount, int channelsCount) throws Ex Channel channel = proxy( Channel.class, + callback( + "isOpen", + (proxy, method, args) -> { + return true; + }), callback( "basicGet", (proxy, method, args) -> { @@ -1014,6 +1029,11 @@ public void ackNack(String nackParameter) throws Exception { Channel channel = proxy( Channel.class, + callback( + "isOpen", + (proxy, method, args) -> { + return true; + }), callback( "basicConsume", (proxy, method, args) -> {