Skip to content

Using -R (--consumer-rate) leads to broken pipes #879

@mkuratczyk

Description

@mkuratczyk

Describe the bug

Version: perf-test from main as of 22.09.2025

When using -R/--consumer-rate, at least with low values, leads to broken pipe error after 30s. The connection is re-established and the same error happens again after 30s.

Reproduction steps

  1. perf-test -ad false -f persistent -u cq -c 1 -R 2 -C 100000 -D 100000
  2. Wait 30s
java.net.SocketException: Broken pipe
	at java.base/sun.nio.ch.SocketDispatcher.write0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:65)
	at java.base/sun.nio.ch.NioSocketImpl.tryWrite(NioSocketImpl.java:402)
	at java.base/sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:418)
	at java.base/sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:448)
	at java.base/sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:821)
	at java.base/java.net.Socket$SocketOutputStream.implWrite(Socket.java:1086)
	at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1076)
	at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:123)
	at java.base/java.io.BufferedOutputStream.flush(BufferedOutputStream.java:203)
	at java.base/java.io.DataOutputStream.flush(DataOutputStream.java:142)
	at com.rabbitmq.client.impl.SocketFrameHandler.flush(SocketFrameHandler.java:217)
	at com.rabbitmq.client.impl.AMQConnection.flush(AMQConnection.java:665)
	at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:156)
	at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:503)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:470)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:460)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:97)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:439)
	at com.rabbitmq.perf.Consumer.lambda$static$0(Consumer.java:48)
	at com.rabbitmq.perf.Consumer$ConsumerImpl.lambda$ackIfNecessary$0(Consumer.java:344)
	at com.rabbitmq.perf.AgentBase.dealWithWriteOperation(AgentBase.java:90)
	at com.rabbitmq.perf.Consumer$ConsumerImpl.ackIfNecessary(Consumer.java:341)
	at com.rabbitmq.perf.Consumer$ConsumerImpl.handleMessage(Consumer.java:311)
	at com.rabbitmq.perf.Consumer$ConsumerImpl.maybeHandleMessage(Consumer.java:292)
	at com.rabbitmq.perf.Consumer$ConsumerImpl.handleDelivery(Consumer.java:286)
	at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:111)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1090)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:614)
	at java.base/java.lang.Thread.run(Thread.java:1474)

On the server side:

[error] <0.726.0> Error on AMQP connection <0.726.0> (127.0.0.1:65255 -> 127.0.0.1:5672 - perf-test-consumer-0, vhost: '/', user: 'guest', state: running), channel 1:
[error] <0.726.0>  {shutdown,{writer,send_failed,timeout}}
[warning] <0.726.0> Non-AMQP exit reason '{shutdown,{writer,send_failed,timeout}}'
[error] <0.726.0> Error on AMQP connection <0.726.0> (127.0.0.1:65255 -> 127.0.0.1:5672 - perf-test-consumer-0, vhost: '/', user: 'guest', state: running), channel 1:error while terminating:
[error] <0.726.0> shutdown
[error] <0.726.0> closing AMQP connection <0.726.0> (127.0.0.1:65255 -> 127.0.0.1:5672 - perf-test-consumer-0, duration: '31s'):
[error] <0.726.0> {inet_error,enotconn}

Expected behavior

stable connection

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions