Skip to content

Commit 2ae0c79

Browse files
committed
Use default exchange by default
The default exchange is now the default if possible (e.g. when the routing key isn't specified and there's only one queue to publish too). For those cases, the routing key is the queue name and there's no need to create an exchange (a "direct"-named exchange was created previously). Fixes #45
1 parent a529660 commit 2ae0c79

File tree

4 files changed

+52
-43
lines changed

4 files changed

+52
-43
lines changed

src/main/java/com/rabbitmq/perf/Consumer.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class Consumer extends ProducerConsumerBase implements Runnable {
3535

3636
private ConsumerImpl q;
3737
private final Channel channel;
38-
private final String id;
38+
private final String routingKey;
3939
private final List<String> queueNames;
4040
private final int txSize;
4141
private final boolean autoAck;
@@ -47,14 +47,14 @@ public class Consumer extends ProducerConsumerBase implements Runnable {
4747
private final Map<String, String> consumerTagBranchMap = Collections.synchronizedMap(new HashMap<String, String>());
4848
private final ConsumerLatency consumerLatency;
4949

50-
public Consumer(Channel channel, String id,
50+
public Consumer(Channel channel, String routingKey,
5151
List<String> queueNames, int txSize, boolean autoAck,
5252
int multiAckEvery, Stats stats, float rateLimit, int msgLimit, int timeLimit,
5353
int consumerLatencyInMicroSeconds) {
5454

5555
this.channel = channel;
56-
this.id = id;
57-
this.queueNames = queueNames;
56+
this.routingKey = routingKey;
57+
this.queueNames = Collections.unmodifiableList(queueNames);
5858
this.rateLimit = rateLimit;
5959
this.txSize = txSize;
6060
this.autoAck = autoAck;
@@ -129,7 +129,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
129129

130130
now = System.currentTimeMillis();
131131

132-
stats.handleRecv(id.equals(envelope.getRoutingKey()) ? (nano - msgNano) : 0L);
132+
stats.handleRecv(routingKey.equals(envelope.getRoutingKey()) ? (nano - msgNano) : 0L);
133133
if (rateLimit > 0.0f) {
134134
delay(now);
135135
}
@@ -158,6 +158,10 @@ public void handleCancel(String consumerTag) throws IOException {
158158
}
159159
}
160160

161+
public List<String> getQueueNames() {
162+
return this.queueNames;
163+
}
164+
161165
private interface ConsumerLatency {
162166

163167
void simulateLatency();

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 19 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,19 @@ public class MulticastParams {
4343
private int producerMsgCount = 0;
4444
private int consumerMsgCount = 0;
4545

46-
private String exchangeName = "direct";
46+
private String exchangeName = "";
4747
private String exchangeType = "direct";
48-
private List<String> queueNames = new ArrayList<String>();
48+
private List<String> queueNames = new ArrayList<>();
4949
private String routingKey = null;
5050
private boolean randomRoutingKey = false;
5151

52-
private List<?> flags = new ArrayList<Object>();
52+
private List<?> flags = new ArrayList<>();
5353

5454
private int multiAckEvery = 0;
5555
private boolean autoAck = true;
5656
private boolean autoDelete = false;
5757

58-
private List<String> bodyFiles = new ArrayList<String>();
58+
private List<String> bodyFiles = new ArrayList<>();
5959
private String bodyContentType = null;
6060

6161
private boolean predeclared;
@@ -74,9 +74,9 @@ public void setExchangeName(String exchangeName) {
7474

7575
public void setQueueNames(List<String> queueNames) {
7676
if(queueNames == null) {
77-
this.queueNames = new ArrayList<String>();
77+
this.queueNames = new ArrayList<>();
7878
} else {
79-
this.queueNames = new ArrayList<String>(queueNames);
79+
this.queueNames = new ArrayList<>(queueNames);
8080
}
8181
}
8282

@@ -219,21 +219,21 @@ public boolean getRandomRoutingKey() {
219219

220220
public void setBodyFiles(List<String> bodyFiles) {
221221
if (bodyFiles == null) {
222-
this.bodyFiles = new ArrayList<String>();
222+
this.bodyFiles = new ArrayList<>();
223223
} else {
224-
this.bodyFiles = new ArrayList<String>(bodyFiles);
224+
this.bodyFiles = new ArrayList<>(bodyFiles);
225225
}
226226
}
227227

228228
public void setBodyContentType(String bodyContentType) {
229229
this.bodyContentType = bodyContentType;
230230
}
231231

232-
public Producer createProducer(Connection connection, Stats stats, String id) throws IOException {
232+
public Producer createProducer(Connection connection, Stats stats, String routingKey) throws IOException {
233233
Channel channel = connection.createChannel();
234234
if (producerTxSize > 0) channel.txSelect();
235235
if (confirm >= 0) channel.confirmSelect();
236-
if (!predeclared || !exchangeExists(connection, exchangeName)) {
236+
if (!predeclared && !exchangeExists(connection, exchangeName)) {
237237
channel.exchangeDeclare(exchangeName, exchangeType);
238238
}
239239
MessageBodySource messageBodySource = null;
@@ -242,7 +242,7 @@ public Producer createProducer(Connection connection, Stats stats, String id) th
242242
} else {
243243
messageBodySource = new TimeSequenceMessageBodySource(minMsgSize);
244244
}
245-
final Producer producer = new Producer(channel, exchangeName, id,
245+
final Producer producer = new Producer(channel, exchangeName, routingKey,
246246
randomRoutingKey, flags, producerTxSize,
247247
producerRateLimit, producerMsgCount,
248248
timeLimit,
@@ -252,26 +252,24 @@ public Producer createProducer(Connection connection, Stats stats, String id) th
252252
return producer;
253253
}
254254

255-
public Consumer createConsumer(Connection connection, Stats stats, String id) throws IOException {
255+
public Consumer createConsumer(Connection connection, Stats stats, String routingKey) throws IOException {
256256
Channel channel = connection.createChannel();
257257
if (consumerTxSize > 0) channel.txSelect();
258-
List<String> generatedQueueNames = configureQueues(connection, id);
258+
List<String> generatedQueueNames = configureQueues(connection, routingKey);
259259
if (consumerPrefetch > 0) channel.basicQos(consumerPrefetch);
260260
if (channelPrefetch > 0) channel.basicQos(channelPrefetch, true);
261-
return new Consumer(channel, id, generatedQueueNames,
261+
return new Consumer(channel, routingKey, generatedQueueNames,
262262
consumerTxSize, autoAck, multiAckEvery,
263263
stats, consumerRateLimit, consumerMsgCount, timeLimit, consumerLatencyInMicroseconds);
264264
}
265265

266266
public boolean shouldConfigureQueues() {
267-
// don't declare any queues when --predeclared is passed,
268-
// otherwise unwanted server-named queues without consumers will pile up. MK.
269267
return consumerCount == 0 && !(queueNames.size() == 0);
270268
}
271269

272-
public List<String> configureQueues(Connection connection, String id) throws IOException {
270+
public List<String> configureQueues(Connection connection, String routingKey) throws IOException {
273271
Channel channel = connection.createChannel();
274-
if (!predeclared || !exchangeExists(connection, exchangeName)) {
272+
if (!predeclared && !exchangeExists(connection, exchangeName)) {
275273
channel.exchangeDeclare(exchangeName, exchangeType);
276274
}
277275
// To ensure we get at-least 1 default queue:
@@ -295,7 +293,7 @@ public List<String> configureQueues(Connection connection, String id) throws IOE
295293
// skipping binding to default exchange,
296294
// as it's not possible to explicitly bind to it.
297295
if (!"".equals(exchangeName) && !"amq.default".equals(exchangeName)) {
298-
channel.queueBind(qName, exchangeName, id);
296+
channel.queueBind(qName, exchangeName, routingKey);
299297
}
300298
}
301299
channel.abort();
@@ -308,20 +306,12 @@ private static boolean exchangeExists(Connection connection, final String exchan
308306
// NB: default exchange always exists
309307
return true;
310308
} else {
311-
return exists(connection, new Checker() {
312-
public void check(Channel ch) throws IOException {
313-
ch.exchangeDeclarePassive(exchangeName);
314-
}
315-
});
309+
return exists(connection, ch -> ch.exchangeDeclarePassive(exchangeName));
316310
}
317311
}
318312

319313
private static boolean queueExists(Connection connection, final String queueName) throws IOException {
320-
return queueName != null && exists(connection, new Checker() {
321-
public void check(Channel ch) throws IOException {
322-
ch.queueDeclarePassive(queueName);
323-
}
324-
});
314+
return queueName != null && exists(connection, ch -> ch.queueDeclarePassive(queueName));
325315
}
326316

327317
private interface Checker {

src/main/java/com/rabbitmq/perf/MulticastSet.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
import java.net.URISyntaxException;
2323
import java.security.KeyManagementException;
2424
import java.security.NoSuchAlgorithmException;
25+
import java.util.HashSet;
2526
import java.util.List;
2627
import java.util.Random;
28+
import java.util.Set;
2729
import java.util.UUID;
2830
import java.util.concurrent.TimeoutException;
2931

3032
public class MulticastSet {
31-
private final String id;
33+
private final String routingKey;
3234
private final Stats stats;
3335
private final ConnectionFactory factory;
3436
private final MulticastParams params;
@@ -40,9 +42,9 @@ public class MulticastSet {
4042
public MulticastSet(Stats stats, ConnectionFactory factory,
4143
MulticastParams params, List<String> uris) {
4244
if (params.getRoutingKey() == null) {
43-
this.id = UUID.randomUUID().toString();
45+
this.routingKey = UUID.randomUUID().toString();
4446
} else {
45-
this.id = params.getRoutingKey();
47+
this.routingKey = params.getRoutingKey();
4648
}
4749
this.stats = stats;
4850
this.factory = factory;
@@ -54,9 +56,9 @@ public MulticastSet(Stats stats, ConnectionFactory factory,
5456
public MulticastSet(Stats stats, ConnectionFactory factory,
5557
MulticastParams params, String testID, List<String> uris) {
5658
if (params.getRoutingKey() == null) {
57-
this.id = UUID.randomUUID().toString();
59+
this.routingKey = UUID.randomUUID().toString();
5860
} else {
59-
this.id = params.getRoutingKey();
61+
this.routingKey = params.getRoutingKey();
6062
}
6163
this.stats = stats;
6264
this.factory = factory;
@@ -71,6 +73,7 @@ public void run() throws IOException, InterruptedException, TimeoutException, No
7173

7274
public void run(boolean announceStartup)
7375
throws IOException, InterruptedException, TimeoutException, NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
76+
Set<String> queueNames = new HashSet<>();
7477
Thread[] consumerThreads = new Thread[params.getConsumerThreadCount()];
7578
Connection[] consumerConnections = new Connection[params.getConsumerCount()];
7679
for (int i = 0; i < consumerConnections.length; i++) {
@@ -84,18 +87,30 @@ public void run(boolean announceStartup)
8487
if (announceStartup) {
8588
System.out.println("id: " + testID + ", starting consumer #" + i + ", channel #" + j);
8689
}
87-
Thread t = new Thread(params.createConsumer(conn, stats, id));
90+
Consumer consumer = params.createConsumer(conn, stats, routingKey);
91+
queueNames.addAll(consumer.getQueueNames());
92+
Thread t = new Thread(consumer);
8893
consumerThreads[(i * params.getConsumerChannelCount()) + j] = t;
8994
}
9095
}
9196

9297
if (params.shouldConfigureQueues()) {
9398
setUri();
9499
Connection conn = factory.newConnection();
95-
params.configureQueues(conn, id);
100+
List<String> configuredQueues = params.configureQueues(conn, routingKey);
101+
queueNames.addAll(configuredQueues);
96102
conn.close();
97103
}
98104

105+
String producersRoutingKey;
106+
// if one queue, and no routing key, the routing key
107+
// must be the queue name and we should use the default exchange
108+
if (queueNames.size() == 1 && params.getRoutingKey() == null) {
109+
producersRoutingKey = queueNames.iterator().next();
110+
} else {
111+
producersRoutingKey = this.routingKey;
112+
}
113+
99114
Thread[] producerThreads = new Thread[params.getProducerThreadCount()];
100115
Connection[] producerConnections = new Connection[params.getProducerCount()];
101116
for (int i = 0; i < producerConnections.length; i++) {
@@ -109,7 +124,7 @@ public void run(boolean announceStartup)
109124
if (announceStartup) {
110125
System.out.println("id: " + testID + ", starting producer #" + i + ", channel #" + j);
111126
}
112-
Thread t = new Thread(params.createProducer(conn, stats, id));
127+
Thread t = new Thread(params.createProducer(conn, stats, producersRoutingKey));
113128
producerThreads[(i * params.getProducerChannelCount()) + j] = t;
114129
}
115130
}

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public static void main(String[] args) {
5959
getInstance().getTime());
6060
testID = strArg(cmd, 'd', "test-"+testID);
6161
String exchangeType = strArg(cmd, 't', "direct");
62-
String exchangeName = getExchangeName(cmd, exchangeType);
62+
String exchangeName = getExchangeName(cmd, "");
6363
String queueNames = strArg(cmd, 'u', null);
6464
String routingKey = strArg(cmd, 'k', null);
6565
boolean randomRoutingKey = cmd.hasOption('K');

0 commit comments

Comments
 (0)