Skip to content

Commit 8285bac

Browse files
committed
Add timeout when waiting for publisher confirms
The --confirm options is for the outstanding publisher confirms, Producer doesn't publish when the limit is reached, waiting on a Semaphore. There was no timeout, so the producer waits for ever even if the connection is dead. This could block the JVM process when the broker would die abruptly. The --confirmTimeout option now allows to specify a timeout in seconds. The default is 30 seconds, a negative value means no timeout (i.e. same behaviour as before). Fixes #53
1 parent eb7cb6e commit 8285bac

File tree

3 files changed

+27
-3
lines changed

3 files changed

+27
-3
lines changed

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
public class MulticastParams {
2929
private long confirm = -1;
30+
private int confirmTimeout = 30;
3031
private int consumerCount = 1;
3132
private int producerCount = 1;
3233
private int consumerChannelCount = 1;
@@ -124,6 +125,10 @@ public void setConfirm(long confirm) {
124125
this.confirm = confirm;
125126
}
126127

128+
public void setConfirmTimeout(int confirmTimeout) {
129+
this.confirmTimeout = confirmTimeout;
130+
}
131+
127132
public void setAutoAck(boolean autoAck) {
128133
this.autoAck = autoAck;
129134
}
@@ -246,7 +251,7 @@ public Producer createProducer(Connection connection, Stats stats, String id) th
246251
randomRoutingKey, flags, producerTxSize,
247252
producerRateLimit, producerMsgCount,
248253
timeLimit,
249-
confirm, messageBodySource, stats);
254+
confirm, confirmTimeout, messageBodySource, stats);
250255
channel.addReturnListener(producer);
251256
channel.addConfirmListener(producer);
252257
return producer;

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ public static void main(String[] args) {
7272
int producerTxSize = intArg(cmd, 'm', 0);
7373
int consumerTxSize = intArg(cmd, 'n', 0);
7474
long confirm = intArg(cmd, 'c', -1);
75+
int confirmTimeout = intArg(cmd, "ct", 30);
7576
boolean autoAck = cmd.hasOption('a');
7677
int multiAckEvery = intArg(cmd, 'A', 0);
7778
int channelPrefetch = intArg(cmd, 'Q', 0);
@@ -127,6 +128,7 @@ public static void main(String[] args) {
127128
p.setAutoAck( autoAck);
128129
p.setAutoDelete( autoDelete);
129130
p.setConfirm( confirm);
131+
p.setConfirmTimeout( confirmTimeout);
130132
p.setConsumerCount( consumerCount);
131133
p.setConsumerChannelCount( consumerChannelCount);
132134
p.setConsumerMsgCount( consumerMsgCount);
@@ -221,6 +223,7 @@ private static Options getOptions() {
221223
options.addOption(new Option("m", "ptxsize", true, "producer tx size"));
222224
options.addOption(new Option("n", "ctxsize", true, "consumer tx size"));
223225
options.addOption(new Option("c", "confirm", true, "max unconfirmed publishes"));
226+
options.addOption(new Option("ct", "confirmTimeout", true, "waiting timeout for unconfirmed publishes before failing (in seconds)"));
224227
options.addOption(new Option("a", "autoack", false,"auto ack"));
225228
options.addOption(new Option("A", "multiAckEvery", true, "multi ack every"));
226229
options.addOption(new Option("q", "qos", true, "consumer prefetch count"));
@@ -256,6 +259,10 @@ private static int intArg(CommandLine cmd, char opt, int def) {
256259
return Integer.parseInt(cmd.getOptionValue(opt, Integer.toString(def)));
257260
}
258261

262+
private static int intArg(CommandLine cmd, String opt, int def) {
263+
return Integer.parseInt(cmd.getOptionValue(opt, Integer.toString(def)));
264+
}
265+
259266
private static float floatArg(CommandLine cmd, char opt, float def) {
260267
return Float.parseFloat(cmd.getOptionValue(opt, Float.toString(def)));
261268
}

src/main/java/com/rabbitmq/perf/Producer.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.TreeSet;
2828
import java.util.UUID;
2929
import java.util.concurrent.Semaphore;
30+
import java.util.concurrent.TimeUnit;
3031

3132
public class Producer extends ProducerConsumerBase implements Runnable, ReturnListener,
3233
ConfirmListener
@@ -46,13 +47,14 @@ public class Producer extends ProducerConsumerBase implements Runnable, ReturnLi
4647
private final MessageBodySource messageBodySource;
4748

4849
private Semaphore confirmPool;
50+
private int confirmTimeout;
4951
private final SortedSet<Long> unconfirmedSet =
5052
Collections.synchronizedSortedSet(new TreeSet<Long>());
5153

5254
public Producer(Channel channel, String exchangeName, String id, boolean randomRoutingKey,
5355
List<?> flags, int txSize,
5456
float rateLimit, int msgLimit, int timeLimit,
55-
long confirm, MessageBodySource messageBodySource, Stats stats)
57+
long confirm, int confirmTimeout, MessageBodySource messageBodySource, Stats stats)
5658
throws IOException {
5759

5860
this.channel = channel;
@@ -68,6 +70,7 @@ public Producer(Channel channel, String exchangeName, String id, boolean randomR
6870
this.messageBodySource = messageBodySource;
6971
if (confirm > 0) {
7072
this.confirmPool = new Semaphore((int)confirm);
73+
this.confirmTimeout = confirmTimeout;
7174
}
7275
this.stats = stats;
7376
}
@@ -129,7 +132,16 @@ public void run() {
129132
(msgLimit == 0 || msgCount < msgLimit)) {
130133
delay(now);
131134
if (confirmPool != null) {
132-
confirmPool.acquire();
135+
if (confirmTimeout < 0) {
136+
confirmPool.acquire();
137+
} else {
138+
boolean acquired = confirmPool.tryAcquire(confirmTimeout, TimeUnit.SECONDS);
139+
if (!acquired) {
140+
// waiting for too long, broker may be gone, stopping thread
141+
throw new RuntimeException("Waiting for publisher confirms for too long");
142+
}
143+
}
144+
133145
}
134146
publish(messageBodySource.create(totalMsgCount));
135147
totalMsgCount++;

0 commit comments

Comments
 (0)