Skip to content

Commit fc02da2

Browse files
Revert "Use default exchange by default"
This reverts commit 2ae0c79. Closes #54. References #45.
1 parent 5694b4c commit fc02da2

File tree

4 files changed

+43
-52
lines changed

4 files changed

+43
-52
lines changed

src/main/java/com/rabbitmq/perf/Consumer.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public class Consumer extends ProducerConsumerBase implements Runnable {
3333

3434
private ConsumerImpl q;
3535
private final Channel channel;
36-
private final String routingKey;
36+
private final String id;
3737
private final List<String> queueNames;
3838
private final int txSize;
3939
private final boolean autoAck;
@@ -45,14 +45,14 @@ public class Consumer extends ProducerConsumerBase implements Runnable {
4545
private final Map<String, String> consumerTagBranchMap = Collections.synchronizedMap(new HashMap<String, String>());
4646
private final ConsumerLatency consumerLatency;
4747

48-
public Consumer(Channel channel, String routingKey,
48+
public Consumer(Channel channel, String id,
4949
List<String> queueNames, int txSize, boolean autoAck,
5050
int multiAckEvery, Stats stats, float rateLimit, int msgLimit, int timeLimit,
5151
int consumerLatencyInMicroSeconds) {
5252

5353
this.channel = channel;
54-
this.routingKey = routingKey;
55-
this.queueNames = Collections.unmodifiableList(queueNames);
54+
this.id = id;
55+
this.queueNames = queueNames;
5656
this.rateLimit = rateLimit;
5757
this.txSize = txSize;
5858
this.autoAck = autoAck;
@@ -132,7 +132,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
132132

133133
now = System.currentTimeMillis();
134134

135-
stats.handleRecv(routingKey.equals(envelope.getRoutingKey()) ? (nano - msgNano) : 0L);
135+
stats.handleRecv(id.equals(envelope.getRoutingKey()) ? (nano - msgNano) : 0L);
136136
if (rateLimit > 0.0f) {
137137
delay(now);
138138
}
@@ -161,10 +161,6 @@ public void handleCancel(String consumerTag) throws IOException {
161161
}
162162
}
163163

164-
public List<String> getQueueNames() {
165-
return this.queueNames;
166-
}
167-
168164
private interface ConsumerLatency {
169165

170166
void simulateLatency();

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,19 @@ public class MulticastParams {
4343
private int producerMsgCount = 0;
4444
private int consumerMsgCount = 0;
4545

46-
private String exchangeName = "";
46+
private String exchangeName = "direct";
4747
private String exchangeType = "direct";
48-
private List<String> queueNames = new ArrayList<>();
48+
private List<String> queueNames = new ArrayList<String>();
4949
private String routingKey = null;
5050
private boolean randomRoutingKey = false;
5151

52-
private List<?> flags = new ArrayList<>();
52+
private List<?> flags = new ArrayList<Object>();
5353

5454
private int multiAckEvery = 0;
5555
private boolean autoAck = true;
5656
private boolean autoDelete = false;
5757

58-
private List<String> bodyFiles = new ArrayList<>();
58+
private List<String> bodyFiles = new ArrayList<String>();
5959
private String bodyContentType = null;
6060

6161
private boolean predeclared;
@@ -74,9 +74,9 @@ public void setExchangeName(String exchangeName) {
7474

7575
public void setQueueNames(List<String> queueNames) {
7676
if(queueNames == null) {
77-
this.queueNames = new ArrayList<>();
77+
this.queueNames = new ArrayList<String>();
7878
} else {
79-
this.queueNames = new ArrayList<>(queueNames);
79+
this.queueNames = new ArrayList<String>(queueNames);
8080
}
8181
}
8282

@@ -219,21 +219,21 @@ public boolean getRandomRoutingKey() {
219219

220220
public void setBodyFiles(List<String> bodyFiles) {
221221
if (bodyFiles == null) {
222-
this.bodyFiles = new ArrayList<>();
222+
this.bodyFiles = new ArrayList<String>();
223223
} else {
224-
this.bodyFiles = new ArrayList<>(bodyFiles);
224+
this.bodyFiles = new ArrayList<String>(bodyFiles);
225225
}
226226
}
227227

228228
public void setBodyContentType(String bodyContentType) {
229229
this.bodyContentType = bodyContentType;
230230
}
231231

232-
public Producer createProducer(Connection connection, Stats stats, String routingKey) throws IOException {
232+
public Producer createProducer(Connection connection, Stats stats, String id) throws IOException {
233233
Channel channel = connection.createChannel();
234234
if (producerTxSize > 0) channel.txSelect();
235235
if (confirm >= 0) channel.confirmSelect();
236-
if (!predeclared && !exchangeExists(connection, exchangeName)) {
236+
if (!predeclared || !exchangeExists(connection, exchangeName)) {
237237
channel.exchangeDeclare(exchangeName, exchangeType);
238238
}
239239
MessageBodySource messageBodySource = null;
@@ -242,7 +242,7 @@ public Producer createProducer(Connection connection, Stats stats, String routin
242242
} else {
243243
messageBodySource = new TimeSequenceMessageBodySource(minMsgSize);
244244
}
245-
final Producer producer = new Producer(channel, exchangeName, routingKey,
245+
final Producer producer = new Producer(channel, exchangeName, id,
246246
randomRoutingKey, flags, producerTxSize,
247247
producerRateLimit, producerMsgCount,
248248
timeLimit,
@@ -252,24 +252,26 @@ public Producer createProducer(Connection connection, Stats stats, String routin
252252
return producer;
253253
}
254254

255-
public Consumer createConsumer(Connection connection, Stats stats, String routingKey) throws IOException {
255+
public Consumer createConsumer(Connection connection, Stats stats, String id) throws IOException {
256256
Channel channel = connection.createChannel();
257257
if (consumerTxSize > 0) channel.txSelect();
258-
List<String> generatedQueueNames = configureQueues(connection, routingKey);
258+
List<String> generatedQueueNames = configureQueues(connection, id);
259259
if (consumerPrefetch > 0) channel.basicQos(consumerPrefetch);
260260
if (channelPrefetch > 0) channel.basicQos(channelPrefetch, true);
261-
return new Consumer(channel, routingKey, generatedQueueNames,
261+
return new Consumer(channel, id, generatedQueueNames,
262262
consumerTxSize, autoAck, multiAckEvery,
263263
stats, consumerRateLimit, consumerMsgCount, timeLimit, consumerLatencyInMicroseconds);
264264
}
265265

266266
public boolean shouldConfigureQueues() {
267+
// don't declare any queues when --predeclared is passed,
268+
// otherwise unwanted server-named queues without consumers will pile up. MK.
267269
return consumerCount == 0 && !(queueNames.size() == 0);
268270
}
269271

270-
public List<String> configureQueues(Connection connection, String routingKey) throws IOException {
272+
public List<String> configureQueues(Connection connection, String id) throws IOException {
271273
Channel channel = connection.createChannel();
272-
if (!predeclared && !exchangeExists(connection, exchangeName)) {
274+
if (!predeclared || !exchangeExists(connection, exchangeName)) {
273275
channel.exchangeDeclare(exchangeName, exchangeType);
274276
}
275277
// To ensure we get at-least 1 default queue:
@@ -293,7 +295,7 @@ public List<String> configureQueues(Connection connection, String routingKey) th
293295
// skipping binding to default exchange,
294296
// as it's not possible to explicitly bind to it.
295297
if (!"".equals(exchangeName) && !"amq.default".equals(exchangeName)) {
296-
channel.queueBind(qName, exchangeName, routingKey);
298+
channel.queueBind(qName, exchangeName, id);
297299
}
298300
}
299301
channel.abort();
@@ -306,12 +308,20 @@ private static boolean exchangeExists(Connection connection, final String exchan
306308
// NB: default exchange always exists
307309
return true;
308310
} else {
309-
return exists(connection, ch -> ch.exchangeDeclarePassive(exchangeName));
311+
return exists(connection, new Checker() {
312+
public void check(Channel ch) throws IOException {
313+
ch.exchangeDeclarePassive(exchangeName);
314+
}
315+
});
310316
}
311317
}
312318

313319
private static boolean queueExists(Connection connection, final String queueName) throws IOException {
314-
return queueName != null && exists(connection, ch -> ch.queueDeclarePassive(queueName));
320+
return queueName != null && exists(connection, new Checker() {
321+
public void check(Channel ch) throws IOException {
322+
ch.queueDeclarePassive(queueName);
323+
}
324+
});
315325
}
316326

317327
private interface Checker {

src/main/java/com/rabbitmq/perf/MulticastSet.java

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,13 @@
2222
import java.net.URISyntaxException;
2323
import java.security.KeyManagementException;
2424
import java.security.NoSuchAlgorithmException;
25-
import java.util.HashSet;
2625
import java.util.List;
2726
import java.util.Random;
28-
import java.util.Set;
2927
import java.util.UUID;
3028
import java.util.concurrent.TimeoutException;
3129

3230
public class MulticastSet {
33-
private final String routingKey;
31+
private final String id;
3432
private final Stats stats;
3533
private final ConnectionFactory factory;
3634
private final MulticastParams params;
@@ -42,9 +40,9 @@ public class MulticastSet {
4240
public MulticastSet(Stats stats, ConnectionFactory factory,
4341
MulticastParams params, List<String> uris) {
4442
if (params.getRoutingKey() == null) {
45-
this.routingKey = UUID.randomUUID().toString();
43+
this.id = UUID.randomUUID().toString();
4644
} else {
47-
this.routingKey = params.getRoutingKey();
45+
this.id = params.getRoutingKey();
4846
}
4947
this.stats = stats;
5048
this.factory = factory;
@@ -56,9 +54,9 @@ public MulticastSet(Stats stats, ConnectionFactory factory,
5654
public MulticastSet(Stats stats, ConnectionFactory factory,
5755
MulticastParams params, String testID, List<String> uris) {
5856
if (params.getRoutingKey() == null) {
59-
this.routingKey = UUID.randomUUID().toString();
57+
this.id = UUID.randomUUID().toString();
6058
} else {
61-
this.routingKey = params.getRoutingKey();
59+
this.id = params.getRoutingKey();
6260
}
6361
this.stats = stats;
6462
this.factory = factory;
@@ -73,7 +71,6 @@ public void run() throws IOException, InterruptedException, TimeoutException, No
7371

7472
public void run(boolean announceStartup)
7573
throws IOException, InterruptedException, TimeoutException, NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
76-
Set<String> queueNames = new HashSet<>();
7774
Thread[] consumerThreads = new Thread[params.getConsumerThreadCount()];
7875
Connection[] consumerConnections = new Connection[params.getConsumerCount()];
7976
for (int i = 0; i < consumerConnections.length; i++) {
@@ -87,30 +84,18 @@ public void run(boolean announceStartup)
8784
if (announceStartup) {
8885
System.out.println("id: " + testID + ", starting consumer #" + i + ", channel #" + j);
8986
}
90-
Consumer consumer = params.createConsumer(conn, stats, routingKey);
91-
queueNames.addAll(consumer.getQueueNames());
92-
Thread t = new Thread(consumer);
87+
Thread t = new Thread(params.createConsumer(conn, stats, id));
9388
consumerThreads[(i * params.getConsumerChannelCount()) + j] = t;
9489
}
9590
}
9691

9792
if (params.shouldConfigureQueues()) {
9893
setUri();
9994
Connection conn = factory.newConnection();
100-
List<String> configuredQueues = params.configureQueues(conn, routingKey);
101-
queueNames.addAll(configuredQueues);
95+
params.configureQueues(conn, id);
10296
conn.close();
10397
}
10498

105-
String producersRoutingKey;
106-
// if one queue, and no routing key, the routing key
107-
// must be the queue name and we should use the default exchange
108-
if (queueNames.size() == 1 && params.getRoutingKey() == null) {
109-
producersRoutingKey = queueNames.iterator().next();
110-
} else {
111-
producersRoutingKey = this.routingKey;
112-
}
113-
11499
Thread[] producerThreads = new Thread[params.getProducerThreadCount()];
115100
Connection[] producerConnections = new Connection[params.getProducerCount()];
116101
for (int i = 0; i < producerConnections.length; i++) {
@@ -124,7 +109,7 @@ public void run(boolean announceStartup)
124109
if (announceStartup) {
125110
System.out.println("id: " + testID + ", starting producer #" + i + ", channel #" + j);
126111
}
127-
Thread t = new Thread(params.createProducer(conn, stats, producersRoutingKey));
112+
Thread t = new Thread(params.createProducer(conn, stats, id));
128113
producerThreads[(i * params.getProducerChannelCount()) + j] = t;
129114
}
130115
}

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public static void main(String[] args) {
5959
getInstance().getTime());
6060
testID = strArg(cmd, 'd', "test-"+testID);
6161
String exchangeType = strArg(cmd, 't', "direct");
62-
String exchangeName = getExchangeName(cmd, "");
62+
String exchangeName = getExchangeName(cmd, exchangeType);
6363
String queueNames = strArg(cmd, 'u', null);
6464
String routingKey = strArg(cmd, 'k', null);
6565
boolean randomRoutingKey = cmd.hasOption('K');

0 commit comments

Comments
 (0)