Skip to content

Commit f613ad5

Browse files
Merge pull request #122 from rabbitmq/rabbitmq-perf-test-121-publish-confirm-latencies
Calculate publisher confirm latency
2 parents 8ae9eb8 + 03cce8e commit f613ad5

File tree

12 files changed

+647
-229
lines changed

12 files changed

+647
-229
lines changed

pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
<junit.jupiter.version>5.3.0</junit.jupiter.version>
6464
<mockito.version>2.23.0</mockito.version>
6565
<hamcrest.version>2.0.0.0</hamcrest.version>
66+
<commons-lang3.version>3.8.1</commons-lang3.version>
6667
<jmh.version>1.20</jmh.version>
6768

6869
<!-- to sign artifacts when releasing -->
@@ -205,6 +206,15 @@
205206
<scope>test</scope>
206207
</dependency>
207208

209+
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
210+
<dependency>
211+
<groupId>org.apache.commons</groupId>
212+
<artifactId>commons-lang3</artifactId>
213+
<version>${commons-lang3.version}</version>
214+
<scope>test</scope>
215+
</dependency>
216+
217+
208218
<dependency>
209219
<groupId>org.openjdk.jmh</groupId>
210220
<artifactId>jmh-core</artifactId>

src/docs/asciidoc/monitoring.adoc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ PerfTest supports.
2020
Here are the metrics PerfTest can gather:
2121

2222
* default metrics: number of published, returned, confirmed, nacked, and consumed messages, message
23-
latency. Latency is a major concern in many types of workload, it can be easily monitored here.
23+
latency, publisher confirm latency. Message latency is a major concern in many types of workload, it can be easily monitored here.
24+
https://www.rabbitmq.com/confirms.html#publisher-confirms[Publisher confirm]
25+
latency reflects the time a message can be considered unsafe. It is
26+
calculated as soon as the `--confirm`/`-c` option is used.
2427
Default metrics are available as long as PerfTest support for a monitoring system
2528
is enabled.
2629
* client metrics: these are the https://www.rabbitmq.com/api-guide.html#metrics[Java Client metrics].

src/docs/asciidoc/usage.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ https://www.rabbitmq.com/confirms.html[Consumer prefetch (QoS)] can be configure
6262
bin/runjava com.rabbitmq.perf.PerfTest -x 1 -y 2 -u "throughput-test-7" --id "test-7" \
6363
-f persistent --multi-ack-every 200 -q 500
6464

65-
Publisher confirms can be used with maximum of N outstanding publishes:
65+
Publisher confirms can be used with a maximum of N outstanding publishes:
6666

6767
bin/runjava com.rabbitmq.perf.PerfTest -x 1 -y 2 -u "throughput-test-8" --id "test-8" \
6868
-f persistent -q 500 -c 500

src/main/java/com/rabbitmq/perf/LocalFilesMessageBodySource.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
*/
2525
public class LocalFilesMessageBodySource implements MessageBodySource {
2626

27+
private final long ZERO = 0L;
28+
2729
private final List<byte[]> bodies;
2830

2931
private final String contentType;
@@ -56,9 +58,9 @@ public LocalFilesMessageBodySource(List<String> filesNames) throws IOException {
5658
}
5759

5860
@Override
59-
public MessageBodyAndContentType create(int sequenceNumber) {
60-
return new MessageBodyAndContentType(
61-
bodies.get(sequenceNumber % bodies.size()), contentType
61+
public MessageEnvelope create(int sequenceNumber) {
62+
return new MessageEnvelope(
63+
bodies.get(sequenceNumber % bodies.size()), contentType, ZERO
6264
);
6365
}
6466
}

src/main/java/com/rabbitmq/perf/MessageBodySource.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,17 @@
2323
*/
2424
public interface MessageBodySource {
2525

26-
MessageBodyAndContentType create(int sequenceNumber) throws IOException;
26+
MessageEnvelope create(int sequenceNumber) throws IOException;
2727

28-
class MessageBodyAndContentType {
28+
class MessageEnvelope {
2929
private final byte [] body;
3030
private final String contentType;
31+
private final long time;
3132

32-
public MessageBodyAndContentType(byte[] body, String contentType) {
33+
public MessageEnvelope(byte[] body, String contentType, long time) {
3334
this.body = body;
3435
this.contentType = contentType;
36+
this.time = time;
3537
}
3638

3739
public byte[] getBody() {
@@ -41,6 +43,10 @@ public byte[] getBody() {
4143
public String getContentType() {
4244
return contentType;
4345
}
46+
47+
public long getTime() {
48+
return time;
49+
}
4450
}
4551

4652
}

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 11 additions & 195 deletions
Original file line numberDiff line numberDiff line change
@@ -15,40 +15,26 @@
1515

1616
package com.rabbitmq.perf;
1717

18+
import com.rabbitmq.client.ConnectionFactory;
19+
import com.rabbitmq.client.DefaultSaslConfig;
20+
import com.rabbitmq.client.impl.ClientVersion;
21+
import com.rabbitmq.client.impl.nio.NioParams;
22+
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
23+
import org.apache.commons.cli.*;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
import javax.net.ssl.SSLContext;
1828
import java.io.*;
1929
import java.nio.charset.Charset;
2030
import java.security.NoSuchAlgorithmException;
2131
import java.text.SimpleDateFormat;
22-
import java.util.Calendar;
23-
import java.util.HashMap;
24-
import java.util.List;
25-
import java.util.Locale;
26-
import java.util.Map;
27-
import java.util.Properties;
32+
import java.util.*;
2833
import java.util.concurrent.SynchronousQueue;
2934
import java.util.concurrent.ThreadPoolExecutor;
3035
import java.util.concurrent.TimeUnit;
3136
import java.util.function.Function;
3237

33-
import com.rabbitmq.client.impl.ClientVersion;
34-
import com.rabbitmq.client.impl.nio.NioParams;
35-
import io.micrometer.core.instrument.MeterRegistry;
36-
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
37-
import org.apache.commons.cli.CommandLine;
38-
import org.apache.commons.cli.CommandLineParser;
39-
import org.apache.commons.cli.GnuParser;
40-
import org.apache.commons.cli.HelpFormatter;
41-
import org.apache.commons.cli.Option;
42-
import org.apache.commons.cli.Options;
43-
import org.apache.commons.cli.ParseException;
44-
45-
import com.rabbitmq.client.ConnectionFactory;
46-
import com.rabbitmq.client.DefaultSaslConfig;
47-
import org.slf4j.Logger;
48-
import org.slf4j.LoggerFactory;
49-
50-
import javax.net.ssl.SSLContext;
51-
5238
import static com.rabbitmq.perf.OptionsUtils.forEach;
5339
import static java.lang.String.format;
5440
import static java.util.Arrays.asList;
@@ -562,176 +548,6 @@ private static String getExchangeName(CommandLineProxy cmd, String def) {
562548
return exchangeName;
563549
}
564550

565-
private static class PrintlnStats extends Stats {
566-
private final boolean sendStatsEnabled;
567-
private final boolean recvStatsEnabled;
568-
private final boolean returnStatsEnabled;
569-
private final boolean confirmStatsEnabled;
570-
private final boolean legacyMetrics;
571-
private final boolean useMillis;
572-
573-
private final String testID;
574-
private final PrintWriter out;
575-
576-
public PrintlnStats(String testID, long interval,
577-
boolean sendStatsEnabled, boolean recvStatsEnabled,
578-
boolean returnStatsEnabled, boolean confirmStatsEnabled,
579-
boolean legacyMetrics, boolean useMillis,
580-
PrintWriter out, MeterRegistry registry, String metricsPrefix) {
581-
super(interval, useMillis, registry, metricsPrefix);
582-
this.sendStatsEnabled = sendStatsEnabled;
583-
this.recvStatsEnabled = recvStatsEnabled;
584-
this.returnStatsEnabled = returnStatsEnabled;
585-
this.confirmStatsEnabled = confirmStatsEnabled;
586-
this.testID = testID;
587-
this.legacyMetrics = legacyMetrics;
588-
this.useMillis = useMillis;
589-
this.out = out;
590-
if (out != null) {
591-
out.printf("id,time (s),published (msg/s),returned (msg/s)," +
592-
"confirmed (msg/s),nacked (msg/s)," +
593-
"received (msg/s),min latency (%s),median latency (%s)," +
594-
"75th p. latency (%s),95th p. latency (%s),99th p. latency (%s)%n",
595-
units(), units(), units(), units(), units());
596-
}
597-
}
598-
599-
@Override
600-
protected void report(long now) {
601-
String output = "id: " + testID + ", ";
602-
603-
double ratePublished = 0.0;
604-
double rateReturned = 0.0;
605-
double rateConfirmed = 0.0;
606-
double rateNacked = 0.0;
607-
double rateConsumed = 0.0;
608-
609-
if (sendStatsEnabled) {
610-
ratePublished = rate(sendCountInterval, elapsedInterval);
611-
published(ratePublished);
612-
}
613-
if (sendStatsEnabled && returnStatsEnabled) {
614-
rateReturned = rate(returnCountInterval, elapsedInterval);
615-
returned(rateReturned);
616-
}
617-
if (sendStatsEnabled && confirmStatsEnabled) {
618-
rateConfirmed = rate(confirmCountInterval, elapsedInterval);
619-
confirmed(rateConfirmed);
620-
}
621-
if (sendStatsEnabled && confirmStatsEnabled) {
622-
rateNacked = rate(nackCountInterval, elapsedInterval);
623-
nacked(rateNacked);
624-
}
625-
if (recvStatsEnabled) {
626-
rateConsumed = rate(recvCountInterval, elapsedInterval);
627-
received(rateConsumed);
628-
}
629-
630-
output += "time: " + format("%.3f", (now - startTime)/1000.0) + "s";
631-
output +=
632-
getRate("sent", ratePublished, sendStatsEnabled) +
633-
getRate("returned", rateReturned, sendStatsEnabled && returnStatsEnabled) +
634-
getRate("confirmed", rateConfirmed, sendStatsEnabled && confirmStatsEnabled) +
635-
getRate("nacked", rateNacked, sendStatsEnabled && confirmStatsEnabled) +
636-
getRate("received", rateConsumed, recvStatsEnabled);
637-
638-
if (legacyMetrics) {
639-
output += (latencyCountInterval > 0 ?
640-
", min/avg/max latency: " +
641-
minLatency/1000L + "/" +
642-
cumulativeLatencyInterval / (1000L * latencyCountInterval) + "/" +
643-
maxLatency/1000L + " µs " :
644-
"");
645-
} else {
646-
output += (latencyCountInterval > 0 ?
647-
", min/median/75th/95th/99th latency: "
648-
+ div(latency.getSnapshot().getMin()) + "/"
649-
+ div(latency.getSnapshot().getMedian()) + "/"
650-
+ div(latency.getSnapshot().get75thPercentile()) + "/"
651-
+ div(latency.getSnapshot().get95thPercentile()) + "/"
652-
+ div(latency.getSnapshot().get99thPercentile()) + " " + units() :
653-
"");
654-
}
655-
656-
System.out.println(output);
657-
if (out != null) {
658-
out.println(testID + "," + format("%.3f", (now - startTime)/1000.0) + "," +
659-
rate(ratePublished, sendStatsEnabled)+ "," +
660-
rate(rateReturned, sendStatsEnabled && returnStatsEnabled)+ "," +
661-
rate(rateConfirmed, sendStatsEnabled && confirmStatsEnabled)+ "," +
662-
rate(rateNacked, sendStatsEnabled && confirmStatsEnabled)+ "," +
663-
rate(rateConsumed, recvStatsEnabled) + "," +
664-
(latencyCountInterval > 0 ?
665-
div(latency.getSnapshot().getMin()) + "," +
666-
div(latency.getSnapshot().getMedian()) + "," +
667-
div(latency.getSnapshot().get75thPercentile()) + "," +
668-
div(latency.getSnapshot().get95thPercentile()) + "," +
669-
div(latency.getSnapshot().get99thPercentile())
670-
: ",,,,")
671-
);
672-
}
673-
674-
}
675-
676-
private String units() {
677-
if (useMillis) {
678-
return "ms";
679-
} else {
680-
return "µs";
681-
}
682-
}
683-
684-
private long div(double p) {
685-
if (useMillis) {
686-
return (long)p;
687-
} else {
688-
return (long)(p / 1000L);
689-
}
690-
}
691-
692-
private String getRate(String descr, double rate, boolean display) {
693-
if (display) {
694-
return ", " + descr + ": " + formatRate(rate) + " msg/s";
695-
} else {
696-
return "";
697-
}
698-
}
699-
700-
public void printFinal() {
701-
long now = System.currentTimeMillis();
702-
703-
System.out.println("id: " + testID + ", sending rate avg: " +
704-
formatRate(sendCountTotal * 1000.0 / (now - startTime)) +
705-
" msg/s");
706-
707-
long elapsed = now - startTime;
708-
if (elapsed > 0) {
709-
System.out.println("id: " + testID + ", receiving rate avg: " +
710-
formatRate(recvCountTotal * 1000.0 / elapsed) +
711-
" msg/s");
712-
}
713-
}
714-
715-
private static String formatRate(double rate) {
716-
if (rate == 0.0) return format("%d", (long)rate);
717-
else if (rate < 1) return format("%1.2f", rate);
718-
else if (rate < 10) return format("%1.1f", rate);
719-
else return format("%d", (long)rate);
720-
}
721-
722-
private static String rate(double rate, boolean display) {
723-
if (display) {
724-
return formatRate(rate);
725-
} else {
726-
return "";
727-
}
728-
}
729-
730-
private static double rate(long count, long elapsed) {
731-
return 1000.0 * count / elapsed;
732-
}
733-
}
734-
735551
private static void versionInformation() {
736552
String lineSeparator = System.getProperty("line.separator");
737553
String version = format(

0 commit comments

Comments
 (0)