Skip to content

Commit 175e4f5

Browse files
committed
Add consumer latency option
To simulate some processing on the consumer side. Fixes #40
1 parent 211f2af commit 175e4f5

File tree

3 files changed

+73
-3
lines changed

3 files changed

+73
-3
lines changed

src/main/java/com/rabbitmq/perf/Consumer.java

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,12 @@ public class Consumer extends ProducerConsumerBase implements Runnable {
4545
private final long timeLimit;
4646
private final CountDownLatch latch = new CountDownLatch(1);
4747
private final Map<String, String> consumerTagBranchMap = Collections.synchronizedMap(new HashMap<String, String>());
48+
private final ConsumerLatency consumerLatency;
4849

4950
public Consumer(Channel channel, String id,
5051
List<String> queueNames, int txSize, boolean autoAck,
51-
int multiAckEvery, Stats stats, float rateLimit, int msgLimit, int timeLimit) {
52+
int multiAckEvery, Stats stats, float rateLimit, int msgLimit, int timeLimit,
53+
int consumerLatencyInMicroSeconds) {
5254

5355
this.channel = channel;
5456
this.id = id;
@@ -60,6 +62,13 @@ public Consumer(Channel channel, String id,
6062
this.stats = stats;
6163
this.msgLimit = msgLimit;
6264
this.timeLimit = 1000L * timeLimit;
65+
if (consumerLatencyInMicroSeconds <= 0) {
66+
this.consumerLatency = new NoWaitConsumerLatency();
67+
} else if (consumerLatencyInMicroSeconds >= 1000) {
68+
this.consumerLatency = new ThreadSleepConsumerLatency(consumerLatencyInMicroSeconds / 1000);
69+
} else {
70+
this.consumerLatency = new BusyWaitConsumerLatency(consumerLatencyInMicroSeconds * 1000);
71+
}
6372
}
6473

6574
public void run() {
@@ -121,7 +130,10 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
121130
now = System.currentTimeMillis();
122131

123132
stats.handleRecv(id.equals(envelope.getRoutingKey()) ? (nano - msgNano) : 0L);
124-
delay(now);
133+
if (rateLimit > 0.0f) {
134+
delay(now);
135+
}
136+
consumerLatency.simulateLatency();
125137
}
126138
if (msgLimit != 0 && msgCount >= msgLimit) { // NB: not quite the inverse of above
127139
latch.countDown();
@@ -145,4 +157,53 @@ public void handleCancel(String consumerTag) throws IOException {
145157
}
146158
}
147159
}
160+
161+
private interface ConsumerLatency {
162+
163+
void simulateLatency();
164+
165+
}
166+
167+
private static class NoWaitConsumerLatency implements ConsumerLatency {
168+
169+
@Override
170+
public void simulateLatency() {
171+
// NO OP
172+
}
173+
174+
}
175+
176+
private static class ThreadSleepConsumerLatency implements ConsumerLatency {
177+
178+
private final int waitTime;
179+
180+
private ThreadSleepConsumerLatency(int waitTime) {
181+
this.waitTime = waitTime;
182+
}
183+
184+
@Override
185+
public void simulateLatency() {
186+
try {
187+
Thread.sleep(waitTime);
188+
} catch (InterruptedException e) {
189+
throw new RuntimeException("Exception while simulating latency", e);
190+
}
191+
}
192+
}
193+
194+
// from https://stackoverflow.com/a/11499351
195+
private static class BusyWaitConsumerLatency implements ConsumerLatency {
196+
197+
private final long delay;
198+
199+
private BusyWaitConsumerLatency(long delay) {
200+
this.delay = delay;
201+
}
202+
203+
@Override
204+
public void simulateLatency() {
205+
long start = System.nanoTime();
206+
while(System.nanoTime() - start < delay);
207+
}
208+
}
148209
}

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ public class MulticastParams {
6262

6363
private Map<String, Object> queueArguments;
6464

65+
private int consumerLatencyInMicroseconds;
66+
6567
public void setExchangeType(String exchangeType) {
6668
this.exchangeType = exchangeType;
6769
}
@@ -175,6 +177,10 @@ public void setQueueArguments(Map<String, Object> queueArguments) {
175177
this.queueArguments = queueArguments;
176178
}
177179

180+
public void setConsumerLatencyInMicroseconds(int consumerLatencyInMicroseconds) {
181+
this.consumerLatencyInMicroseconds = consumerLatencyInMicroseconds;
182+
}
183+
178184
public int getConsumerCount() {
179185
return consumerCount;
180186
}
@@ -254,7 +260,7 @@ public Consumer createConsumer(Connection connection, Stats stats, String id) th
254260
if (channelPrefetch > 0) channel.basicQos(channelPrefetch, true);
255261
return new Consumer(channel, id, generatedQueueNames,
256262
consumerTxSize, autoAck, multiAckEvery,
257-
stats, consumerRateLimit, consumerMsgCount, timeLimit);
263+
stats, consumerRateLimit, consumerMsgCount, timeLimit, consumerLatencyInMicroseconds);
258264
}
259265

260266
public boolean shouldConfigureQueues() {

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public static void main(String[] args) {
8888
boolean predeclared = cmd.hasOption('p');
8989
boolean autoDelete = boolArg(cmd, "ad", true);
9090
String queueArgs = strArg(cmd, "qa", null);
91+
int consumerLatencyInMicroseconds = intArg(cmd, 'L', 0);
9192

9293
String uri = strArg(cmd, 'h', "amqp://localhost");
9394
String urisParameter = strArg(cmd, 'H', null);
@@ -151,6 +152,7 @@ public static void main(String[] args) {
151152
p.setBodyFiles( bodyFiles == null ? null : asList(bodyFiles.split(",")));
152153
p.setBodyContentType( bodyContentType);
153154
p.setQueueArguments(queueArguments(queueArgs));
155+
p.setConsumerLatencyInMicroseconds(consumerLatencyInMicroseconds);
154156

155157
MulticastSet set = new MulticastSet(stats, factory, p, testID, uris);
156158
set.run(true);
@@ -231,6 +233,7 @@ private static Options getOptions() {
231233
options.addOption(new Option("T", "bodyContenType", true, "body content-type"));
232234
options.addOption(new Option("ad", "autoDelete", true, "should the queue be auto-deleted, default is true"));
233235
options.addOption(new Option("qa", "queueArgs", true, "queue arguments as key/pair values, separated by commas"));
236+
options.addOption(new Option("L", "consumerLatency", true, "consumer latency in microseconds"));
234237
options.addOption(new Option("useDefaultSslContext", "useDefaultSslContext", false,"use JVM default SSL context"));
235238
return options;
236239
}

0 commit comments

Comments
 (0)