Skip to content

Commit 524cec5

Browse files
authored
Merge pull request #59 from rabbitmq/lrb-unix-timestamp
Add "--use-millis" option
2 parents e9ef942 + e67649d commit 524cec5

File tree

6 files changed

+159
-110
lines changed

6 files changed

+159
-110
lines changed

src/main/java/com/rabbitmq/perf/Consumer.java

Lines changed: 32 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
package com.rabbitmq.perf;
1717

18-
import com.rabbitmq.client.AMQP;
1918
import com.rabbitmq.client.AMQP.BasicProperties;
2019
import com.rabbitmq.client.Channel;
2120
import com.rabbitmq.client.DefaultConsumer;
@@ -31,6 +30,7 @@
3130
import java.util.Map;
3231
import java.util.concurrent.CountDownLatch;
3332
import java.util.concurrent.TimeUnit;
33+
import java.util.function.BiFunction;
3434

3535
public class Consumer extends ProducerConsumerBase implements Runnable {
3636

@@ -47,46 +47,50 @@ public class Consumer extends ProducerConsumerBase implements Runnable {
4747
private final CountDownLatch latch = new CountDownLatch(1);
4848
private final Map<String, String> consumerTagBranchMap = Collections.synchronizedMap(new HashMap<String, String>());
4949
private final ConsumerLatency consumerLatency;
50-
private final TimestampExtractor timestampExtractor;
50+
private final BiFunction<BasicProperties, byte[], Long> timestampExtractor;
51+
private final TimestampProvider timestampProvider;
5152

5253
public Consumer(Channel channel, String id,
5354
List<String> queueNames, int txSize, boolean autoAck,
5455
int multiAckEvery, Stats stats, float rateLimit, int msgLimit, final int timeLimit,
55-
int consumerLatencyInMicroSeconds, boolean extractTimestampFromHeader) {
56-
57-
this.channel = channel;
58-
this.id = id;
59-
this.queueNames = queueNames;
60-
this.rateLimit = rateLimit;
61-
this.txSize = txSize;
62-
this.autoAck = autoAck;
63-
this.multiAckEvery = multiAckEvery;
64-
this.stats = stats;
65-
this.msgLimit = msgLimit;
66-
this.timeLimit = 1000L * timeLimit;
56+
int consumerLatencyInMicroSeconds,
57+
TimestampProvider timestampProvider) {
58+
59+
this.channel = channel;
60+
this.id = id;
61+
this.queueNames = queueNames;
62+
this.rateLimit = rateLimit;
63+
this.txSize = txSize;
64+
this.autoAck = autoAck;
65+
this.multiAckEvery = multiAckEvery;
66+
this.stats = stats;
67+
this.msgLimit = msgLimit;
68+
this.timeLimit = 1000L * timeLimit;
69+
this.timestampProvider = timestampProvider;
70+
6771
if (consumerLatencyInMicroSeconds <= 0) {
6872
this.consumerLatency = new NoWaitConsumerLatency();
6973
} else if (consumerLatencyInMicroSeconds >= 1000) {
7074
this.consumerLatency = new ThreadSleepConsumerLatency(consumerLatencyInMicroSeconds / 1000);
7175
} else {
7276
this.consumerLatency = new BusyWaitConsumerLatency(consumerLatencyInMicroSeconds * 1000);
7377
}
74-
if (extractTimestampFromHeader) {
75-
this.timestampExtractor = new TimestampExtractor() {
76-
@Override
77-
public long extract(BasicProperties properties, byte[] body) throws IOException {
78+
79+
if (timestampProvider.isTimestampInHeader()) {
80+
this.timestampExtractor = (properties, body) -> {
7881
Object timestamp = properties.getHeaders().get(Producer.TIMESTAMP_HEADER);
7982
return timestamp == null ? Long.MAX_VALUE : (Long) timestamp;
80-
}
8183
};
8284
} else {
83-
this.timestampExtractor = new TimestampExtractor() {
84-
@Override
85-
public long extract(BasicProperties properties, byte[] body) throws IOException {
86-
DataInputStream d = new DataInputStream(new ByteArrayInputStream(body));
85+
this.timestampExtractor = (properties, body) -> {
86+
DataInputStream d = new DataInputStream(new ByteArrayInputStream(body));
87+
try {
8788
d.readInt();
8889
return d.readLong();
90+
} catch (IOException e) {
91+
throw new RuntimeException("Error while extracting timestamp from body");
8992
}
93+
9094
};
9195
}
9296
}
@@ -130,10 +134,8 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
130134
msgCount++;
131135

132136
if (msgLimit == 0 || msgCount <= msgLimit) {
133-
long msgNano = timestampExtractor.extract(properties, body);
134-
135-
136-
long nano = System.nanoTime();
137+
long messageTimestamp = timestampExtractor.apply(properties, body);
138+
long nowTimestamp = timestampProvider.getCurrentTime();
137139

138140
if (!autoAck) {
139141
if (multiAckEvery == 0) {
@@ -147,9 +149,10 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
147149
channel.txCommit();
148150
}
149151

150-
now = System.currentTimeMillis();
152+
long diff_time = timestampProvider.getDifference(nowTimestamp, messageTimestamp);
153+
stats.handleRecv(id.equals(envelope.getRoutingKey()) ? diff_time : 0L);
151154

152-
stats.handleRecv(id.equals(envelope.getRoutingKey()) ? (nano - msgNano) : 0L);
155+
now = System.currentTimeMillis();
153156
if (rateLimit > 0.0f) {
154157
delay(now);
155158
}
@@ -227,9 +230,4 @@ public void simulateLatency() {
227230
}
228231
}
229232

230-
private interface TimestampExtractor {
231-
232-
long extract(BasicProperties properties, byte[] body) throws IOException;
233-
234-
}
235233
}

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public class MulticastParams {
6262
private String bodyContentType = null;
6363

6464
private boolean predeclared;
65+
private boolean useMillis;
6566

6667
private Map<String, Object> queueArguments;
6768

@@ -163,6 +164,10 @@ public void setTimeLimit(int timeLimit) {
163164
this.timeLimit = timeLimit;
164165
}
165166

167+
public void setUseMillis(boolean useMillis) {
168+
this.useMillis = useMillis;
169+
}
170+
166171
public void setProducerMsgCount(int producerMsgCount) {
167172
this.producerMsgCount = producerMsgCount;
168173
}
@@ -259,21 +264,21 @@ public Producer createProducer(Connection connection, Stats stats, String id) th
259264
if (!predeclared || !exchangeExists(connection, exchangeName)) {
260265
channel.exchangeDeclare(exchangeName, exchangeType);
261266
}
262-
MessageBodySource messageBodySource = null;
263-
boolean timestampInHeader;
267+
MessageBodySource messageBodySource;
268+
TimestampProvider tsp;
264269
if (bodyFiles.size() > 0) {
270+
tsp = new TimestampProvider(useMillis, true);
265271
messageBodySource = new LocalFilesMessageBodySource(bodyFiles, bodyContentType);
266-
timestampInHeader = true;
267272
} else {
268-
messageBodySource = new TimeSequenceMessageBodySource(minMsgSize);
269-
timestampInHeader = false;
273+
tsp = new TimestampProvider(useMillis, false);
274+
messageBodySource = new TimeSequenceMessageBodySource(tsp, minMsgSize);
270275
}
271276
final Producer producer = new Producer(channel, exchangeName, id,
272277
randomRoutingKey, flags, producerTxSize,
273278
producerRateLimit, producerMsgCount,
274279
timeLimit,
275280
confirm, confirmTimeout, messageBodySource,
276-
timestampInHeader, stats);
281+
tsp, stats);
277282
channel.addReturnListener(producer);
278283
channel.addConfirmListener(producer);
279284
return producer;
@@ -292,11 +297,11 @@ public Consumer createConsumer(Connection connection, Stats stats, String id) th
292297
} else {
293298
timestampInHeader = false;
294299
}
295-
300+
TimestampProvider tsp = new TimestampProvider(useMillis, timestampInHeader);
296301
return new Consumer(channel, id, generatedQueueNames,
297302
consumerTxSize, autoAck, multiAckEvery,
298303
stats, consumerRateLimit, consumerMsgCount, timeLimit,
299-
consumerLatencyInMicroseconds, timestampInHeader);
304+
consumerLatencyInMicroseconds, tsp);
300305
}
301306

302307
public boolean shouldConfigureQueues() {

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 41 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ public static void main(String[] args) {
9292
boolean predeclared = cmd.hasOption('p');
9393
boolean legacyMetrics = cmd.hasOption('l');
9494
boolean autoDelete = boolArg(cmd, "ad", true);
95+
boolean useMillis = cmd.hasOption("ms");
9596
String queueArgs = strArg(cmd, "qa", null);
9697
int consumerLatencyInMicroseconds = intArg(cmd, 'L', 0);
9798

@@ -106,18 +107,12 @@ public static void main(String[] args) {
106107
file.delete();
107108
}
108109
output = new PrintWriter(new BufferedWriter(new FileWriter(file)), true);
109-
Runtime.getRuntime().addShutdownHook(new Thread() {
110-
111-
@Override
112-
public void run() {
113-
output.close();
114-
}
115-
});
110+
Runtime.getRuntime().addShutdownHook(new Thread(() -> output.close()));
116111
} else {
117112
output = null;
118113
}
119114

120-
List<String> uris = null;
115+
List<String> uris;
121116
if(urisParameter != null) {
122117
String [] urisArray = urisParameter.split(",");
123118
for(int i = 0; i< urisArray.length; i++) {
@@ -133,9 +128,8 @@ public void run() {
133128
1000L * samplingInterval,
134129
producerCount > 0,
135130
consumerCount > 0,
136-
(flags.contains("mandatory") ||
137-
flags.contains("immediate")),
138-
confirm != -1, legacyMetrics, output);
131+
(flags.contains("mandatory") || flags.contains("immediate")),
132+
confirm != -1, legacyMetrics, useMillis, output);
139133

140134
SSLContext sslContext = getSslContextIfNecessary(cmd, System.getProperties());
141135

@@ -177,6 +171,7 @@ public void run() {
177171
p.setRandomRoutingKey( randomRoutingKey);
178172
p.setProducerRateLimit( producerRateLimit);
179173
p.setTimeLimit( timeLimit);
174+
p.setUseMillis( useMillis);
180175
p.setBodyFiles( bodyFiles == null ? null : asList(bodyFiles.split(",")));
181176
p.setBodyContentType( bodyContentType);
182177
p.setQueueArguments(queueArguments(queueArgs));
@@ -187,7 +182,7 @@ public void run() {
187182

188183
stats.printFinal();
189184
}
190-
catch( ParseException exp ) {
185+
catch (ParseException exp) {
191186
System.err.println("Parsing failed. Reason: " + exp.getMessage());
192187
usage(options);
193188
} catch (Exception e) {
@@ -268,9 +263,11 @@ private static Options getOptions() {
268263
options.addOption(new Option("p", "predeclared", false,"allow use of predeclared objects"));
269264
options.addOption(new Option("B", "body", true, "comma-separated list of files to use in message bodies"));
270265
options.addOption(new Option("T", "body-content-type", true, "body content-type"));
271-
options.addOption(new Option("l", "legacy-metrics", false, "display legacy metrics (min/avg/max latency)"));
266+
options.addOption(new Option("l", "legacy-metrics", false,"display legacy metrics (min/avg/max latency)"));
272267
options.addOption(new Option("o", "output-file", true, "output file for timing results"));
273268
options.addOption(new Option("ad", "auto-delete", true, "should the queue be auto-deleted, default is true"));
269+
options.addOption(new Option("ms", "use-millis", false,"should latency be collected in milliseconds, default is false. "
270+
+ "Set to true if producers are consumers run on different machines."));
274271
options.addOption(new Option("qa", "queue-args", true, "queue arguments as key/pair values, separated by commas"));
275272
options.addOption(new Option("L", "consumer-latency", true, "consumer latency in microseconds"));
276273
options.addOption(new Option("udsc", "use-default-ssl-context", false,"use JVM default SSL context"));
@@ -344,14 +341,15 @@ private static class PrintlnStats extends Stats {
344341
private final boolean returnStatsEnabled;
345342
private final boolean confirmStatsEnabled;
346343
private final boolean legacyMetrics;
344+
private final boolean useMillis;
347345

348346
private final String testID;
349347
private final PrintWriter out;
350348

351349
public PrintlnStats(String testID, long interval,
352350
boolean sendStatsEnabled, boolean recvStatsEnabled,
353351
boolean returnStatsEnabled, boolean confirmStatsEnabled,
354-
boolean legacyMetrics,
352+
boolean legacyMetrics, boolean useMillis,
355353
PrintWriter out) {
356354
super(interval);
357355
this.sendStatsEnabled = sendStatsEnabled;
@@ -360,13 +358,13 @@ public PrintlnStats(String testID, long interval,
360358
this.confirmStatsEnabled = confirmStatsEnabled;
361359
this.testID = testID;
362360
this.legacyMetrics = legacyMetrics;
361+
this.useMillis = useMillis;
363362
this.out = out;
364363
if (out != null) {
365364
out.println("id,time (s),sent (msg/s),returned (msg/s),confirmed (msg/s), nacked (msg/s), received (msg/s),"
366365
+ "min latency (microseconds),median latency (microseconds),75th p. latency (microseconds),95th p. latency (microseconds),"
367366
+ "99th p. latency (microseconds)");
368367
}
369-
370368
}
371369

372370
@Override
@@ -391,11 +389,10 @@ protected void report(long now) {
391389
} else {
392390
output += (latencyCountInterval > 0 ?
393391
", min/median/75th/95th/99th latency: "
394-
+ latency.getSnapshot().getMin()/1000L + "/"
395-
+ (long) latency.getSnapshot().getMedian()/1000L + "/"
396-
+ (long) latency.getSnapshot().get75thPercentile()/1000L + "/"
397-
+ (long) latency.getSnapshot().get95thPercentile()/1000L + "/"
398-
+ (long) latency.getSnapshot().get99thPercentile()/1000L + " microseconds" :
392+
+ div(latency.getSnapshot().getMin()) + "/"
393+
+ div(latency.getSnapshot().getMedian()) + "/"
394+
+ div(latency.getSnapshot().get75thPercentile()) + "/"
395+
+ div(latency.getSnapshot().get95thPercentile()) + units() :
399396
"");
400397
}
401398

@@ -408,23 +405,40 @@ protected void report(long now) {
408405
rate(nackCountInterval, elapsedInterval, sendStatsEnabled && confirmStatsEnabled)+ "," +
409406
rate(recvCountInterval, elapsedInterval, recvStatsEnabled) + "," +
410407
(latencyCountInterval > 0 ?
411-
latency.getSnapshot().getMin()/1000L + "," +
412-
(long) latency.getSnapshot().getMedian()/1000L + "," +
413-
(long) latency.getSnapshot().get75thPercentile()/1000L + "," +
414-
(long) latency.getSnapshot().get95thPercentile()/1000L + "," +
415-
(long) latency.getSnapshot().get99thPercentile()/1000L
408+
div(latency.getSnapshot().getMin()) + "," +
409+
div(latency.getSnapshot().getMedian()) + "," +
410+
div(latency.getSnapshot().get75thPercentile()) + "," +
411+
div(latency.getSnapshot().get95thPercentile()) + "," +
412+
div(latency.getSnapshot().get99thPercentile())
416413
: ",,,,")
417414
);
418415
}
419416

420417
}
421418

419+
private String units() {
420+
if (useMillis) {
421+
return " milliseconds";
422+
} else {
423+
return " microseconds";
424+
}
425+
}
426+
427+
private long div(double p) {
428+
if (useMillis) {
429+
return (long)p;
430+
} else {
431+
return (long)(p / 1000L);
432+
}
433+
}
434+
422435
private String getRate(String descr, long count, boolean display,
423436
long elapsed) {
424-
if (display)
437+
if (display) {
425438
return ", " + descr + ": " + formatRate(1000.0 * count / elapsed) + " msg/s";
426-
else
439+
} else {
427440
return "";
441+
}
428442
}
429443

430444
public void printFinal() {

0 commit comments

Comments
 (0)