Skip to content

Commit 8199bf7

Browse files
committed
Use queue name if necessary as routing key in consumers
Otherwise stats latency is never updated and received messages metrics aren't not displayed. References #45
1 parent 2ae0c79 commit 8199bf7

File tree

2 files changed

+15
-2
lines changed

2 files changed

+15
-2
lines changed

src/main/java/com/rabbitmq/perf/Consumer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class Consumer extends ProducerConsumerBase implements Runnable {
3535

3636
private ConsumerImpl q;
3737
private final Channel channel;
38-
private final String routingKey;
38+
private String routingKey;
3939
private final List<String> queueNames;
4040
private final int txSize;
4141
private final boolean autoAck;
@@ -94,6 +94,10 @@ public void run() {
9494
}
9595
}
9696

97+
public void setRoutingKey(String routingKey) {
98+
this.routingKey = routingKey;
99+
}
100+
97101
private class ConsumerImpl extends DefaultConsumer {
98102
long now;
99103
int totalMsgCount = 0;
@@ -128,7 +132,6 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
128132
}
129133

130134
now = System.currentTimeMillis();
131-
132135
stats.handleRecv(routingKey.equals(envelope.getRoutingKey()) ? (nano - msgNano) : 0L);
133136
if (rateLimit > 0.0f) {
134137
delay(now);

src/main/java/com/rabbitmq/perf/MulticastSet.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.net.URISyntaxException;
2323
import java.security.KeyManagementException;
2424
import java.security.NoSuchAlgorithmException;
25+
import java.util.ArrayList;
26+
import java.util.Collection;
2527
import java.util.HashSet;
2628
import java.util.List;
2729
import java.util.Random;
@@ -74,6 +76,7 @@ public void run() throws IOException, InterruptedException, TimeoutException, No
7476
public void run(boolean announceStartup)
7577
throws IOException, InterruptedException, TimeoutException, NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
7678
Set<String> queueNames = new HashSet<>();
79+
Collection<Consumer> consumers = new ArrayList<>();
7780
Thread[] consumerThreads = new Thread[params.getConsumerThreadCount()];
7881
Connection[] consumerConnections = new Connection[params.getConsumerCount()];
7982
for (int i = 0; i < consumerConnections.length; i++) {
@@ -88,6 +91,7 @@ public void run(boolean announceStartup)
8891
System.out.println("id: " + testID + ", starting consumer #" + i + ", channel #" + j);
8992
}
9093
Consumer consumer = params.createConsumer(conn, stats, routingKey);
94+
consumers.add(consumer);
9195
queueNames.addAll(consumer.getQueueNames());
9296
Thread t = new Thread(consumer);
9397
consumerThreads[(i * params.getConsumerChannelCount()) + j] = t;
@@ -111,6 +115,12 @@ public void run(boolean announceStartup)
111115
producersRoutingKey = this.routingKey;
112116
}
113117

118+
// consumers need the publishing routing key to match it against
119+
// against the received message and update the stats
120+
for (Consumer consumer : consumers) {
121+
consumer.setRoutingKey(producersRoutingKey);
122+
}
123+
114124
Thread[] producerThreads = new Thread[params.getProducerThreadCount()];
115125
Connection[] producerConnections = new Connection[params.getProducerCount()];
116126
for (int i = 0; i < producerConnections.length; i++) {

0 commit comments

Comments
 (0)