Skip to content

Commit e8d707b

Browse files
committed
Merge branch '1.x.x-stable'
2 parents 0a3e06b + 8285bac commit e8d707b

File tree

3 files changed

+27
-3
lines changed

3 files changed

+27
-3
lines changed

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
public class MulticastParams {
2929
private long confirm = -1;
30+
private int confirmTimeout = 30;
3031
private int consumerCount = 1;
3132
private int producerCount = 1;
3233
private int consumerChannelCount = 1;
@@ -134,6 +135,10 @@ public void setConfirm(long confirm) {
134135
this.confirm = confirm;
135136
}
136137

138+
public void setConfirmTimeout(int confirmTimeout) {
139+
this.confirmTimeout = confirmTimeout;
140+
}
141+
137142
public void setAutoAck(boolean autoAck) {
138143
this.autoAck = autoAck;
139144
}
@@ -264,7 +269,7 @@ public Producer createProducer(Connection connection, Stats stats, String id) th
264269
randomRoutingKey, flags, producerTxSize,
265270
producerRateLimit, producerMsgCount,
266271
timeLimit,
267-
confirm, messageBodySource, stats);
272+
confirm, confirmTimeout, messageBodySource, stats);
268273
channel.addReturnListener(producer);
269274
channel.addConfirmListener(producer);
270275
return producer;

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public static void main(String[] args) {
7474
int producerTxSize = intArg(cmd, 'm', 0);
7575
int consumerTxSize = intArg(cmd, 'n', 0);
7676
long confirm = intArg(cmd, 'c', -1);
77+
int confirmTimeout = intArg(cmd, "ct", 30);
7778
boolean autoAck = cmd.hasOption('a');
7879
int multiAckEvery = intArg(cmd, 'A', 0);
7980
int channelPrefetch = intArg(cmd, 'Q', 0);
@@ -151,6 +152,7 @@ public void run() {
151152
p.setAutoAck( autoAck);
152153
p.setAutoDelete( autoDelete);
153154
p.setConfirm( confirm);
155+
p.setConfirmTimeout( confirmTimeout);
154156
p.setConsumerCount( consumerCount);
155157
p.setConsumerChannelCount( consumerChannelCount);
156158
p.setConsumerMsgCount( consumerMsgCount);
@@ -249,6 +251,7 @@ private static Options getOptions() {
249251
options.addOption(new Option("m", "ptxsize", true, "producer tx size"));
250252
options.addOption(new Option("n", "ctxsize", true, "consumer tx size"));
251253
options.addOption(new Option("c", "confirm", true, "max unconfirmed publishes"));
254+
options.addOption(new Option("ct", "confirmTimeout", true, "waiting timeout for unconfirmed publishes before failing (in seconds)"));
252255
options.addOption(new Option("a", "autoack", false,"auto ack"));
253256
options.addOption(new Option("A", "multi-ack-every", true, "multi ack every"));
254257
options.addOption(new Option("q", "qos", true, "consumer prefetch count"));
@@ -286,6 +289,10 @@ private static int intArg(CommandLine cmd, char opt, int def) {
286289
return Integer.parseInt(cmd.getOptionValue(opt, Integer.toString(def)));
287290
}
288291

292+
private static int intArg(CommandLine cmd, String opt, int def) {
293+
return Integer.parseInt(cmd.getOptionValue(opt, Integer.toString(def)));
294+
}
295+
289296
private static float floatArg(CommandLine cmd, char opt, float def) {
290297
return Float.parseFloat(cmd.getOptionValue(opt, Float.toString(def)));
291298
}

src/main/java/com/rabbitmq/perf/Producer.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.TreeSet;
2828
import java.util.UUID;
2929
import java.util.concurrent.Semaphore;
30+
import java.util.concurrent.TimeUnit;
3031

3132
public class Producer extends ProducerConsumerBase implements Runnable, ReturnListener,
3233
ConfirmListener
@@ -46,13 +47,14 @@ public class Producer extends ProducerConsumerBase implements Runnable, ReturnLi
4647
private final MessageBodySource messageBodySource;
4748

4849
private Semaphore confirmPool;
50+
private int confirmTimeout;
4951
private final SortedSet<Long> unconfirmedSet =
5052
Collections.synchronizedSortedSet(new TreeSet<Long>());
5153

5254
public Producer(Channel channel, String exchangeName, String id, boolean randomRoutingKey,
5355
List<?> flags, int txSize,
5456
float rateLimit, int msgLimit, int timeLimit,
55-
long confirm, MessageBodySource messageBodySource, Stats stats)
57+
long confirm, int confirmTimeout, MessageBodySource messageBodySource, Stats stats)
5658
throws IOException {
5759

5860
this.channel = channel;
@@ -68,6 +70,7 @@ public Producer(Channel channel, String exchangeName, String id, boolean randomR
6870
this.messageBodySource = messageBodySource;
6971
if (confirm > 0) {
7072
this.confirmPool = new Semaphore((int)confirm);
73+
this.confirmTimeout = confirmTimeout;
7174
}
7275
this.stats = stats;
7376
}
@@ -129,7 +132,16 @@ public void run() {
129132
(msgLimit == 0 || msgCount < msgLimit)) {
130133
delay(now);
131134
if (confirmPool != null) {
132-
confirmPool.acquire();
135+
if (confirmTimeout < 0) {
136+
confirmPool.acquire();
137+
} else {
138+
boolean acquired = confirmPool.tryAcquire(confirmTimeout, TimeUnit.SECONDS);
139+
if (!acquired) {
140+
// waiting for too long, broker may be gone, stopping thread
141+
throw new RuntimeException("Waiting for publisher confirms for too long");
142+
}
143+
}
144+
133145
}
134146
publish(messageBodySource.create(totalMsgCount));
135147
totalMsgCount++;

0 commit comments

Comments
 (0)