Skip to content

Commit b9033fe

Browse files
authored
Merge pull request #244 from rabbitmq/variable-consumer-latency
Added variable consumer latency
2 parents 8eddc29 + 8784c74 commit b9033fe

File tree

6 files changed

+142
-38
lines changed

6 files changed

+142
-38
lines changed

src/docs/asciidoc/usage-advanced.adoc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,19 @@ and `DURATION` is in seconds. In the previous example, the size of published mes
120120
will be 1 kB for 30 seconds, then 10 kB for 20 seconds, then 5 kB for 45 seconds,
121121
then back to 1 kB for 30 seconds, and so on.
122122

123+
== Setting and varying consumer latency
124+
125+
You can simulate processing time per message with either a fixed or a variable latency value in microseconds.
126+
127+
The `--consumer-latency` (`-L`) option sets a fixed consumer latency in microseconds. In the example
128+
below a 1 ms latency is set:
129+
130+
bin/runjava com.rabbitmq.perf.PerfTest --consumer-latency 1000
131+
132+
The `--variable-latency` (`-vl`) option sets a variable consumer latency. In the example below it is
133+
set to 1 ms for 60 seconds then 1 second for 30 seconds:
134+
135+
bin/runjava com.rabbitmq.perf.PerfTest --variable-latency 1000:60 --variable-latency 1000000:30
123136

124137
== Working With Many Queues
125138

src/main/java/com/rabbitmq/perf/Consumer.java

Lines changed: 58 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,17 @@ public Consumer(ConsumerParameters parameters) {
9393
this.queueNames.set(new ArrayList<>(parameters.getQueueNames()));
9494
this.initialQueueNames = new ArrayList<>(parameters.getQueueNames());
9595

96-
int consumerLatencyInMicroSeconds = parameters.getConsumerLatencyInMicroSeconds();
97-
if (consumerLatencyInMicroSeconds <= 0) {
98-
this.consumerLatency = new NoWaitConsumerLatency();
99-
} else if (consumerLatencyInMicroSeconds >= 1000) {
100-
this.consumerLatency = new ThreadSleepConsumerLatency(consumerLatencyInMicroSeconds / 1000);
96+
if(parameters.getConsumerLatenciesIndicator().isVariable()) {
97+
this.consumerLatency = new VariableConsumerLatency(parameters.getConsumerLatenciesIndicator());
10198
} else {
102-
this.consumerLatency = new BusyWaitConsumerLatency(consumerLatencyInMicroSeconds * 1000);
99+
long consumerLatencyInMicroSeconds = parameters.getConsumerLatenciesIndicator().getValue();
100+
if (consumerLatencyInMicroSeconds <= 0) {
101+
this.consumerLatency = new NoWaitConsumerLatency();
102+
} else if (consumerLatencyInMicroSeconds >= 1000) {
103+
this.consumerLatency = new ThreadSleepConsumerLatency(parameters.getConsumerLatenciesIndicator());
104+
} else {
105+
this.consumerLatency = new BusyWaitConsumerLatency(parameters.getConsumerLatenciesIndicator());
106+
}
103107
}
104108

105109
if (timestampProvider.isTimestampInHeader()) {
@@ -387,6 +391,46 @@ private interface ConsumerLatency {
387391

388392
}
389393

394+
private static boolean latencySleep(long delay) {
395+
try {
396+
long ms = delay / 1000;
397+
Thread.sleep(ms);
398+
return true;
399+
} catch (InterruptedException e) {
400+
Thread.currentThread().interrupt();
401+
return false;
402+
}
403+
}
404+
405+
private static boolean latencyBusyWait(long delay) {
406+
delay = delay * 1000;
407+
long start = System.nanoTime();
408+
while (System.nanoTime() - start < delay) ;
409+
return true;
410+
}
411+
412+
private static class VariableConsumerLatency implements ConsumerLatency {
413+
414+
private final ValueIndicator<Long> consumerLatenciesIndicator;
415+
416+
private VariableConsumerLatency(ValueIndicator<Long> consumerLatenciesIndicator) {
417+
this.consumerLatenciesIndicator = consumerLatenciesIndicator;
418+
}
419+
420+
@Override
421+
public boolean simulateLatency() {
422+
long consumerLatencyInMicroSeconds = consumerLatenciesIndicator.getValue();
423+
if (consumerLatencyInMicroSeconds <= 0) {
424+
return true;
425+
} else if (consumerLatencyInMicroSeconds >= 1000) {
426+
return latencySleep(consumerLatencyInMicroSeconds);
427+
} else {
428+
return latencyBusyWait(consumerLatencyInMicroSeconds);
429+
}
430+
}
431+
432+
}
433+
390434
private static class NoWaitConsumerLatency implements ConsumerLatency {
391435

392436
@Override
@@ -398,38 +442,30 @@ public boolean simulateLatency() {
398442

399443
private static class ThreadSleepConsumerLatency implements ConsumerLatency {
400444

401-
private final int waitTime;
445+
private final ValueIndicator<Long> consumerLatenciesIndicator;
402446

403-
private ThreadSleepConsumerLatency(int waitTime) {
404-
this.waitTime = waitTime;
447+
private ThreadSleepConsumerLatency(ValueIndicator<Long> consumerLatenciesIndicator) {
448+
this.consumerLatenciesIndicator = consumerLatenciesIndicator;
405449
}
406450

407451
@Override
408452
public boolean simulateLatency() {
409-
try {
410-
Thread.sleep(waitTime);
411-
return true;
412-
} catch (InterruptedException e) {
413-
Thread.currentThread().interrupt();
414-
return false;
415-
}
453+
return latencySleep(consumerLatenciesIndicator.getValue());
416454
}
417455
}
418456

419457
// from https://stackoverflow.com/a/11499351
420458
private static class BusyWaitConsumerLatency implements ConsumerLatency {
421459

422-
private final long delay;
460+
private final ValueIndicator<Long> consumerLatenciesIndicator;
423461

424-
private BusyWaitConsumerLatency(long delay) {
425-
this.delay = delay;
462+
private BusyWaitConsumerLatency(ValueIndicator<Long> consumerLatenciesIndicator) {
463+
this.consumerLatenciesIndicator = consumerLatenciesIndicator;
426464
}
427465

428466
@Override
429467
public boolean simulateLatency() {
430-
long start = System.nanoTime();
431-
while(System.nanoTime() - start < delay);
432-
return true;
468+
return latencyBusyWait(consumerLatenciesIndicator.getValue());
433469
}
434470
}
435471

src/main/java/com/rabbitmq/perf/ConsumerParameters.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class ConsumerParameters {
3636
private Stats stats;
3737
private float rateLimit;
3838
private int msgLimit;
39-
private int consumerLatencyInMicroSeconds;
39+
private ValueIndicator<Long> consumerLatenciesIndicator;
4040
private TimestampProvider timestampProvider;
4141
private MulticastSet.CompletionHandler completionHandler;
4242
private Recovery.RecoveryProcess recoveryProcess;
@@ -129,12 +129,12 @@ public ConsumerParameters setMsgLimit(int msgLimit) {
129129
return this;
130130
}
131131

132-
public int getConsumerLatencyInMicroSeconds() {
133-
return consumerLatencyInMicroSeconds;
132+
public ValueIndicator<Long> getConsumerLatenciesIndicator() {
133+
return consumerLatenciesIndicator;
134134
}
135135

136-
public ConsumerParameters setConsumerLatencyInMicroSeconds(int consumerLatencyInMicroSeconds) {
137-
this.consumerLatencyInMicroSeconds = consumerLatencyInMicroSeconds;
136+
public ConsumerParameters setConsumerLatencyIndicator(ValueIndicator<Long> consumerLatenciesIndicator) {
137+
this.consumerLatenciesIndicator = consumerLatenciesIndicator;
138138
return this;
139139
}
140140

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,6 @@ public class MulticastParams {
7777

7878
private Map<String, Object> queueArguments = null;
7979

80-
private int consumerLatencyInMicroseconds = 0;
81-
8280
private String queuePattern = null;
8381
private int queueSequenceFrom = -1;
8482
private int queueSequenceTo = -1;
@@ -105,6 +103,9 @@ public class MulticastParams {
105103

106104
private List<String> messageSizes = new ArrayList<>();
107105

106+
private long consumerLatencyInMicroseconds;
107+
private List<String> consumerLatencies = new ArrayList<>();
108+
108109
private boolean polling = false;
109110

110111
private int pollingInterval = -1;
@@ -254,10 +255,6 @@ public void setQueueArguments(Map<String, Object> queueArguments) {
254255
this.queueArguments = queueArguments;
255256
}
256257

257-
public void setConsumerLatencyInMicroseconds(int consumerLatencyInMicroseconds) {
258-
this.consumerLatencyInMicroseconds = consumerLatencyInMicroseconds;
259-
}
260-
261258
public void setMessageProperties(Map<String, Object> messageProperties) {
262259
this.messageProperties = messageProperties;
263260
}
@@ -362,6 +359,15 @@ public void setMessageSizes(List<String> messageSizes) {
362359
this.messageSizes = messageSizes;
363360
}
364361

362+
public void setConsumerLatencyInMicroseconds(long consumerLatencyInMicroseconds) {
363+
this.consumerLatencyInMicroseconds = consumerLatencyInMicroseconds;
364+
}
365+
366+
367+
public void setConsumerLatencies(List<String> consumerLatencies) {
368+
this.consumerLatencies = consumerLatencies;
369+
}
370+
365371
public int getHeartbeatSenderThreads() {
366372
return heartbeatSenderThreads <= 0 ? producerCount + consumerCount : this.heartbeatSenderThreads;
367373
}
@@ -406,6 +412,14 @@ public List<String> getMessageSizes() {
406412
return messageSizes;
407413
}
408414

415+
public long getConsumerLatencyInMicroseconds() {
416+
return consumerLatencyInMicroseconds;
417+
}
418+
419+
public List<String> getConsumerLatencies() {
420+
return consumerLatencies;
421+
}
422+
409423
public void setPolling(boolean polling) {
410424
this.polling = polling;
411425
}
@@ -479,7 +493,11 @@ public Producer createProducer(Connection connection, Stats stats, MulticastSet.
479493
return producer;
480494
}
481495

482-
public Consumer createConsumer(Connection connection, Stats stats, MulticastSet.CompletionHandler completionHandler, ExecutorService executorService) throws IOException {
496+
public Consumer createConsumer(Connection connection,
497+
Stats stats,
498+
ValueIndicator<Long> consumerLatenciesIndicator,
499+
MulticastSet.CompletionHandler completionHandler,
500+
ExecutorService executorService) throws IOException {
483501
TopologyHandlerResult topologyHandlerResult = this.topologyHandler.configureQueuesForClient(connection);
484502
connection = topologyHandlerResult.connection;
485503
Channel channel = connection.createChannel(); //NOSONAR
@@ -508,7 +526,7 @@ public Consumer createConsumer(Connection connection, Stats stats, MulticastSet.
508526
.setStats(stats)
509527
.setRateLimit(consumerRateLimit)
510528
.setMsgLimit(consumerMsgCount)
511-
.setConsumerLatencyInMicroSeconds(consumerLatencyInMicroseconds)
529+
.setConsumerLatencyIndicator(consumerLatenciesIndicator)
512530
.setTimestampProvider(tsp)
513531
.setCompletionHandler(completionHandler)
514532
.setRecoveryProcess(recoveryProcess)

src/main/java/com/rabbitmq/perf/MulticastSet.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public class MulticastSet {
6060
private ThreadingHandler threadingHandler = new DefaultThreadingHandler();
6161
private final ValueIndicator<Float> rateIndicator;
6262
private final ValueIndicator<Integer> messageSizeIndicator;
63+
private final ValueIndicator<Long> consumerLatencyIndicator;
6364
private final ConnectionCreator connectionCreator;
6465

6566
public MulticastSet(Stats stats, ConnectionFactory factory,
@@ -102,6 +103,18 @@ public MulticastSet(Stats stats, ConnectionFactory factory,
102103
params.getMessageSizes(), scheduledExecutorService, input -> Integer.valueOf(input)
103104
);
104105
}
106+
107+
if (this.params.getConsumerLatencies() == null || this.params.getConsumerLatencies().isEmpty()) {
108+
this.consumerLatencyIndicator = new FixedValueIndicator<>(params.getConsumerLatencyInMicroseconds());
109+
} else {
110+
ScheduledExecutorService scheduledExecutorService = this.threadingHandler.scheduledExecutorService(
111+
"perf-test-variable-consumer-latency-scheduler", 1
112+
);
113+
this.consumerLatencyIndicator = new VariableValueIndicator<>(
114+
params.getConsumerLatencies(), scheduledExecutorService, input -> Long.valueOf(input)
115+
);
116+
}
117+
105118
this.connectionCreator = new ConnectionCreator(this.factory, this.uris);
106119
}
107120

@@ -297,7 +310,10 @@ private Function<Integer, ExecutorService> createConsumersExecutorsFactory() {
297310
return consumersExecutorsFactory;
298311
}
299312

300-
private void createConsumers(boolean announceStartup, Runnable[] consumerRunnables, Connection[] consumerConnections, Function<Integer, ExecutorService> consumersExecutorsFactory) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException, IOException, TimeoutException {
313+
private void createConsumers(boolean announceStartup,
314+
Runnable[] consumerRunnables,
315+
Connection[] consumerConnections,
316+
Function<Integer, ExecutorService> consumersExecutorsFactory) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException, IOException, TimeoutException {
301317
for (int i = 0; i < consumerConnections.length; i++) {
302318
if (announceStartup) {
303319
System.out.println("id: " + testID + ", starting consumer #" + i);
@@ -311,12 +327,12 @@ private void createConsumers(boolean announceStartup, Runnable[] consumerRunnabl
311327
if (announceStartup) {
312328
System.out.println("id: " + testID + ", starting consumer #" + i + ", channel #" + j);
313329
}
314-
consumerRunnables[(i * params.getConsumerChannelCount()) + j] = params.createConsumer(consumerConnection, stats, this.completionHandler, executorService);
330+
consumerRunnables[(i * params.getConsumerChannelCount()) + j] = params.createConsumer(consumerConnection, stats, this.consumerLatencyIndicator, this.completionHandler, executorService);
315331
}
316332
}
317333
}
318334

319-
private void createProducers(boolean announceStartup, AgentState[] producerStates, Connection[] producerConnections) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException, IOException, TimeoutException {
335+
private void createProducers(boolean announceStartup, AgentState[] producerStates, Connection[] producerConnections) throws IOException, TimeoutException {
320336
for (int i = 0; i < producerConnections.length; i++) {
321337
if (announceStartup) {
322338
System.out.println("id: " + testID + ", starting producer #" + i);
@@ -338,6 +354,7 @@ private void createProducers(boolean announceStartup, AgentState[] producerState
338354
}
339355

340356
private void startConsumers(Runnable[] consumerRunnables) throws InterruptedException {
357+
this.consumerLatencyIndicator.start();
341358
for (Runnable runnable : consumerRunnables) {
342359
runnable.run();
343360
if (params.getConsumerSlowStart()) {

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,18 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
164164
}
165165
}
166166

167+
List<String> variableConsumerLatencies = lstArg(cmd, "vl");
168+
if (variableConsumerLatencies != null && !variableConsumerLatencies.isEmpty()) {
169+
for (String variableConsumerLatency : variableConsumerLatencies) {
170+
try {
171+
VariableValueIndicator.validate(variableConsumerLatency);
172+
} catch (IllegalArgumentException e) {
173+
System.out.println(e.getMessage());
174+
systemExiter.exit(1);
175+
}
176+
}
177+
}
178+
167179
boolean polling = hasOption(cmd, "po");
168180
int pollingInterval = intArg(cmd, "pi", -1);
169181

@@ -299,6 +311,7 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
299311
p.setBodyContentType( bodyContentType);
300312
p.setQueueArguments(convertKeyValuePairs(queueArgs));
301313
p.setConsumerLatencyInMicroseconds(consumerLatencyInMicroseconds);
314+
p.setConsumerLatencies(variableConsumerLatencies);
302315
p.setQueuePattern(queuePattern);
303316
p.setQueueSequenceFrom(from);
304317
p.setQueueSequenceTo(to);
@@ -612,6 +625,13 @@ public static Options getOptions() {
612625
variableSize.setArgs(Option.UNLIMITED_VALUES);
613626
options.addOption(variableSize);
614627

628+
Option variableConsumerLatency = new Option("vl", "variable-latency",true,
629+
"variable consumer processing latency with [MICROSECONDS]:[DURATION] syntax, " +
630+
"where [MICROSECONDS] integer >= 0 and [DURATION] integer > 0. Use the option several times " +
631+
"to specify several values.");
632+
variableConsumerLatency.setArgs(Option.UNLIMITED_VALUES);
633+
options.addOption(variableConsumerLatency);
634+
615635
options.addOption(new Option("po", "polling",false,
616636
"use basic.get to consume messages. " +
617637
"Do not use this in real applications."));

0 commit comments

Comments
 (0)