From dbb160669662bc1ed27d43b6e798d151f921e596 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Tue, 18 Mar 2025 18:03:51 +0100 Subject: [PATCH 1/2] Dispatch messages only when channel is open This makes PerfTest more reactive with large prefetch and long processing times: client TCP buffer gets full, the broker cannot write to the client socket, it closes the connection (after a 30-second timeout by default), the client tries to close the connection channels, but it uses the consumer dispatcher to do so, and it is busy processing the slow messages, so the channels are never properly shut down. Recovery cannot occur then. By dispatching messages only when the channel is open, the consumer dispatcher backlog gets cleared right away, channels get closed normally and recovery can kick in. --- src/main/java/com/rabbitmq/perf/Consumer.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/rabbitmq/perf/Consumer.java b/src/main/java/com/rabbitmq/perf/Consumer.java index 2635a9e0..b7ac6baf 100644 --- a/src/main/java/com/rabbitmq/perf/Consumer.java +++ b/src/main/java/com/rabbitmq/perf/Consumer.java @@ -214,7 +214,7 @@ private void startBasicGetConsumer() { try { GetResponse response = ch.basicGet(queue, autoAck); if (response != null) { - delegate.handleMessage( + delegate.maybeHandleMessage( response.getEnvelope(), response.getProps(), response.getBody(), ch); } } catch (IOException e) { @@ -280,7 +280,13 @@ private ConsumerImpl(Channel channel) { public void handleDelivery( String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { - this.handleMessage(envelope, properties, body, channel); + this.maybeHandleMessage(envelope, properties, body, channel); + } + + private void maybeHandleMessage(Envelope envelope, BasicProperties properties, byte[] body, Channel ch) throws IOException { + if (ch.isOpen()) { + handleMessage(envelope, properties, body, ch); + } } void handleMessage(Envelope envelope, BasicProperties properties, byte[] body, Channel ch) From f9ec634601fb51b2ff3cd19d02a01eea52a1e4b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Tue, 18 Mar 2025 18:24:41 +0100 Subject: [PATCH 2/2] Fix test --- src/main/java/com/rabbitmq/perf/Consumer.java | 3 ++- ...imeLimitAndPublishingIntervalRateTest.java | 20 +++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/rabbitmq/perf/Consumer.java b/src/main/java/com/rabbitmq/perf/Consumer.java index b7ac6baf..a11841f1 100644 --- a/src/main/java/com/rabbitmq/perf/Consumer.java +++ b/src/main/java/com/rabbitmq/perf/Consumer.java @@ -283,7 +283,8 @@ public void handleDelivery( this.maybeHandleMessage(envelope, properties, body, channel); } - private void maybeHandleMessage(Envelope envelope, BasicProperties properties, byte[] body, Channel ch) throws IOException { + private void maybeHandleMessage( + Envelope envelope, BasicProperties properties, byte[] body, Channel ch) throws IOException { if (ch.isOpen()) { handleMessage(envelope, properties, body, ch); } diff --git a/src/test/java/com/rabbitmq/perf/MessageCountTimeLimitAndPublishingIntervalRateTest.java b/src/test/java/com/rabbitmq/perf/MessageCountTimeLimitAndPublishingIntervalRateTest.java index 53634e59..8f104735 100644 --- a/src/test/java/com/rabbitmq/perf/MessageCountTimeLimitAndPublishingIntervalRateTest.java +++ b/src/test/java/com/rabbitmq/perf/MessageCountTimeLimitAndPublishingIntervalRateTest.java @@ -337,6 +337,11 @@ public void consumerCount(int consumersCount, int channelsCount) throws Exceptio Channel channel = proxy( Channel.class, + callback( + "isOpen", + (proxy, method, args) -> { + return true; + }), callback( "basicConsume", (proxy, method, args) -> { @@ -458,6 +463,11 @@ void shouldAckOneLastTimeWhenQueueIsEmpty() throws Exception { Channel channel = proxy( Channel.class, + callback( + "isOpen", + (proxy, method, args) -> { + return true; + }), callback( "basicConsume", (proxy, method, args) -> { @@ -924,6 +934,11 @@ public void pollingWithBasicGet(int consumersCount, int channelsCount) throws Ex Channel channel = proxy( Channel.class, + callback( + "isOpen", + (proxy, method, args) -> { + return true; + }), callback( "basicGet", (proxy, method, args) -> { @@ -1014,6 +1029,11 @@ public void ackNack(String nackParameter) throws Exception { Channel channel = proxy( Channel.class, + callback( + "isOpen", + (proxy, method, args) -> { + return true; + }), callback( "basicConsume", (proxy, method, args) -> {