Skip to content

Commit 0a3e06b

Browse files
Merge pull request #57 from dmcintyre-pivotal/master
Support for testing sharded queues
2 parents 413bf5d + 6a7c065 commit 0a3e06b

File tree

3 files changed

+29
-1
lines changed

3 files changed

+29
-1
lines changed

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,14 @@ public class MulticastParams {
4242
private float consumerRateLimit = 0;
4343
private int producerMsgCount = 0;
4444
private int consumerMsgCount = 0;
45+
private boolean consumerSlowStart = false;
4546

4647
private String exchangeName = "direct";
4748
private String exchangeType = "direct";
4849
private List<String> queueNames = new ArrayList<String>();
4950
private String routingKey = null;
5051
private boolean randomRoutingKey = false;
52+
private boolean skipBindingQueues = false;
5153

5254
private List<?> flags = new ArrayList<Object>();
5355

@@ -87,6 +89,10 @@ public void setRoutingKey(String routingKey) {
8789
public void setRandomRoutingKey(boolean randomRoutingKey) {
8890
this.randomRoutingKey = randomRoutingKey;
8991
}
92+
93+
public void setSkipBindingQueues(boolean skipBindingQueues) {
94+
this.skipBindingQueues = skipBindingQueues;
95+
}
9096

9197
public void setProducerRateLimit(float producerRateLimit) {
9298
this.producerRateLimit = producerRateLimit;
@@ -111,6 +117,10 @@ public void setConsumerCount(int consumerCount) {
111117
public void setConsumerChannelCount(int consumerChannelCount) {
112118
this.consumerChannelCount = consumerChannelCount;
113119
}
120+
121+
public void setConsumerSlowStart(boolean slowStart) {
122+
this.consumerSlowStart = slowStart;
123+
}
114124

115125
public void setProducerTxSize(int producerTxSize) {
116126
this.producerTxSize = producerTxSize;
@@ -188,6 +198,10 @@ public int getConsumerCount() {
188198
public int getConsumerChannelCount() {
189199
return consumerChannelCount;
190200
}
201+
202+
public boolean getConsumerSlowStart() {
203+
return consumerSlowStart;
204+
}
191205

192206
public int getConsumerThreadCount() {
193207
return consumerCount * consumerChannelCount;
@@ -216,6 +230,10 @@ public String getRoutingKey() {
216230
public boolean getRandomRoutingKey() {
217231
return randomRoutingKey;
218232
}
233+
234+
public boolean getSkipBindingQueues() {
235+
return skipBindingQueues;
236+
}
219237

220238
public void setBodyFiles(List<String> bodyFiles) {
221239
if (bodyFiles == null) {
@@ -294,7 +312,7 @@ public List<String> configureQueues(Connection connection, String id) throws IOE
294312
generatedQueueNames.add(qName);
295313
// skipping binding to default exchange,
296314
// as it's not possible to explicitly bind to it.
297-
if (!"".equals(exchangeName) && !"amq.default".equals(exchangeName)) {
315+
if (!"".equals(exchangeName) && !"amq.default".equals(exchangeName) && !skipBindingQueues) {
298316
channel.queueBind(qName, exchangeName, id);
299317
}
300318
}

src/main/java/com/rabbitmq/perf/MulticastSet.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ public void run(boolean announceStartup)
116116

117117
for (Thread consumerThread : consumerThreads) {
118118
consumerThread.start();
119+
if(params.getConsumerSlowStart()) {
120+
System.out.println("Delaying start by 1 second because -S/--slow-start was requested");
121+
Thread.sleep(1000);
122+
}
119123
}
120124

121125
for (Thread producerThread : producerThreads) {

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public static void main(String[] args) {
6363
String queueNames = strArg(cmd, 'u', null);
6464
String routingKey = strArg(cmd, 'k', null);
6565
boolean randomRoutingKey = cmd.hasOption('K');
66+
boolean skipBindingQueues= cmd.hasOption("sb");
6667
int samplingInterval = intArg(cmd, 'i', 1);
6768
float producerRateLimit = floatArg(cmd, 'r', 0.0f);
6869
float consumerRateLimit = floatArg(cmd, 'R', 0.0f);
@@ -78,6 +79,7 @@ public static void main(String[] args) {
7879
int channelPrefetch = intArg(cmd, 'Q', 0);
7980
int consumerPrefetch = intArg(cmd, 'q', 0);
8081
int minMsgSize = intArg(cmd, 's', 0);
82+
boolean slowStart = cmd.hasOption('S');
8183
int timeLimit = intArg(cmd, 'z', 0);
8284
int producerMsgCount = intArg(cmd, 'C', 0);
8385
int consumerMsgCount = intArg(cmd, 'D', 0);
@@ -154,6 +156,7 @@ public void run() {
154156
p.setConsumerMsgCount( consumerMsgCount);
155157
p.setConsumerRateLimit( consumerRateLimit);
156158
p.setConsumerTxSize( consumerTxSize);
159+
p.setConsumerSlowStart( slowStart);
157160
p.setExchangeName( exchangeName);
158161
p.setExchangeType( exchangeType);
159162
p.setFlags( flags);
@@ -168,6 +171,7 @@ public void run() {
168171
p.setProducerTxSize( producerTxSize);
169172
p.setQueueNames( queueNames == null ? null : asList(queueNames.split(",")));
170173
p.setRoutingKey( routingKey);
174+
p.setSkipBindingQueues( skipBindingQueues);
171175
p.setRandomRoutingKey( randomRoutingKey);
172176
p.setProducerRateLimit( producerRateLimit);
173177
p.setTimeLimit( timeLimit);
@@ -233,11 +237,13 @@ private static Options getOptions() {
233237
options.addOption(new Option("u", "queue", true, "queue name"));
234238
options.addOption(new Option("k", "routing-key", true, "routing key"));
235239
options.addOption(new Option("K", "random-routing-key", false,"use random routing key per message"));
240+
options.addOption(new Option("sb", "skip-binding-queues", false,"don't bind queues to the exchange"));
236241
options.addOption(new Option("i", "interval", true, "sampling interval in seconds"));
237242
options.addOption(new Option("r", "rate", true, "producer rate limit"));
238243
options.addOption(new Option("R", "consumer-rate", true, "consumer rate limit"));
239244
options.addOption(new Option("x", "producers", true, "producer count"));
240245
options.addOption(new Option("y", "consumers", true, "consumer count"));
246+
options.addOption(new Option("S", "slow-start", false,"start consumers slowly (1 sec delay between each)"));
241247
options.addOption(new Option("X", "producer-channel-count", true, "channels per producer"));
242248
options.addOption(new Option("Y", "consumer-channel-count", true, "channels per consumer"));
243249
options.addOption(new Option("m", "ptxsize", true, "producer tx size"));

0 commit comments

Comments
 (0)