Skip to content

Commit 8145085

Browse files
committed
Add option for metrics prefix
[#158209647] References #42
1 parent 0f436c7 commit 8145085

File tree

6 files changed

+37
-30
lines changed

6 files changed

+37
-30
lines changed

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ clean: ## Clean all build artefacts
2121
compile: ## Compile the source code
2222
@mvnw compile
2323

24+
install: clean ## Create and copy the binaries into the local Maven repository
25+
@mvnw install -Dmaven.test.skip
26+
2427
jar: clean ## Build the JAR file
2528
@mvnw package
2629

src/main/java/com/rabbitmq/perf/AgentBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ protected void delay(long now, AgentState state) {
2424

2525
long elapsed = now - state.getLastStatsTime();
2626
//example: rateLimit is 5000 msg/s,
27-
//10 ms have elapsed, we have sent 200 messages
28-
//the 200 msgs we have actually sent should have taken us
27+
//10 ms have elapsed, we have published 200 messages
28+
//the 200 msgs we have actually published should have taken us
2929
//200 * 1000 / 5000 = 40 ms. So we pause for 40ms - 10ms
3030
long pause = (long) (state.getRateLimit() == 0.0f ?
3131
0.0f : (state.getMsgCount() * 1000.0 / state.getRateLimit() - elapsed));

src/main/java/com/rabbitmq/perf/BaseMetrics.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class BaseMetrics implements Metrics {
4141
public Options options() {
4242
Options options = new Options();
4343
options.addOption(new Option("mt", "metrics-tags", true, "metrics tags as key-value pairs separated by commas"));
44+
options.addOption(new Option("mpx", "metrics-prefix", true, "prefix for PerfTest metrics, default is perftest_"));
4445
options.addOption(new Option("mc", "metrics-client", false, "enable client metrics"));
4546
options.addOption(new Option("mcl", "metrics-class-loader", false, "enable JVM class loader metrics"));
4647
options.addOption(new Option("mjm", "metrics-jvm-memory", false, "enable JVM memory metrics"));
@@ -63,7 +64,7 @@ public void configure(CommandLineProxy cmd, CompositeMeterRegistry meterRegistry
6364
}
6465
meterRegistry.config().commonTags(tags);
6566
if (cmd.hasOption("mc")) {
66-
factory.setMetricsCollector(new MicrometerMetricsCollector(meterRegistry));
67+
factory.setMetricsCollector(new MicrometerMetricsCollector(meterRegistry, "client"));
6768
}
6869

6970
if (cmd.hasOption("mcl")) {

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -179,13 +179,14 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
179179
uris = singletonList(uri);
180180
}
181181

182+
String metricsPrefix = strArg(cmd, "mpx", "perftest_");
182183
//setup
183184
PrintlnStats stats = new PrintlnStats(testID,
184185
1000L * samplingInterval,
185186
producerCount > 0,
186187
consumerCount > 0,
187188
(flags.contains("mandatory") || flags.contains("immediate")),
188-
confirm != -1, legacyMetrics, useMillis, output, registry);
189+
confirm != -1, legacyMetrics, useMillis, output, registry, metricsPrefix);
189190

190191
SSLContext sslContext = perfTestOptions.skipSslContextConfiguration ? null :
191192
getSslContextIfNecessary(cmd, System.getProperties());
@@ -571,8 +572,8 @@ public PrintlnStats(String testID, long interval,
571572
boolean sendStatsEnabled, boolean recvStatsEnabled,
572573
boolean returnStatsEnabled, boolean confirmStatsEnabled,
573574
boolean legacyMetrics, boolean useMillis,
574-
PrintWriter out, MeterRegistry registry) {
575-
super(interval, useMillis, registry);
575+
PrintWriter out, MeterRegistry registry, String metricsPrefix) {
576+
super(interval, useMillis, registry, metricsPrefix);
576577
this.sendStatsEnabled = sendStatsEnabled;
577578
this.recvStatsEnabled = recvStatsEnabled;
578579
this.returnStatsEnabled = returnStatsEnabled;
@@ -582,7 +583,7 @@ public PrintlnStats(String testID, long interval,
582583
this.useMillis = useMillis;
583584
this.out = out;
584585
if (out != null) {
585-
out.printf("id,time (s),sent (msg/s),returned (msg/s)," +
586+
out.printf("id,time (s),published (msg/s),returned (msg/s)," +
586587
"confirmed (msg/s),nacked (msg/s)," +
587588
"received (msg/s),min latency (%s),median latency (%s)," +
588589
"75th p. latency (%s),95th p. latency (%s),99th p. latency (%s)%n",
@@ -594,15 +595,15 @@ public PrintlnStats(String testID, long interval,
594595
protected void report(long now) {
595596
String output = "id: " + testID + ", ";
596597

597-
double rateSent = 0.0;
598+
double ratePublished = 0.0;
598599
double rateReturned = 0.0;
599600
double rateConfirmed = 0.0;
600601
double rateNacked = 0.0;
601-
double rateReceived = 0.0;
602+
double rateConsumed = 0.0;
602603

603604
if (sendStatsEnabled) {
604-
rateSent = rate(sendCountInterval, elapsedInterval);
605-
sent(rateSent);
605+
ratePublished = rate(sendCountInterval, elapsedInterval);
606+
published(ratePublished);
606607
}
607608
if (sendStatsEnabled && returnStatsEnabled) {
608609
rateReturned = rate(returnCountInterval, elapsedInterval);
@@ -617,17 +618,17 @@ protected void report(long now) {
617618
nacked(rateNacked);
618619
}
619620
if (recvStatsEnabled) {
620-
rateReceived = rate(recvCountInterval, elapsedInterval);
621-
received(rateReceived);
621+
rateConsumed = rate(recvCountInterval, elapsedInterval);
622+
received(rateConsumed);
622623
}
623624

624625
output += "time: " + format("%.3f", (now - startTime)/1000.0) + "s";
625626
output +=
626-
getRate("sent", rateSent, sendStatsEnabled) +
627+
getRate("sent", ratePublished, sendStatsEnabled) +
627628
getRate("returned", rateReturned, sendStatsEnabled && returnStatsEnabled) +
628629
getRate("confirmed", rateConfirmed, sendStatsEnabled && confirmStatsEnabled) +
629630
getRate("nacked", rateNacked, sendStatsEnabled && confirmStatsEnabled) +
630-
getRate("received", rateReceived, recvStatsEnabled);
631+
getRate("received", rateConsumed, recvStatsEnabled);
631632

632633
if (legacyMetrics) {
633634
output += (latencyCountInterval > 0 ?
@@ -649,11 +650,11 @@ protected void report(long now) {
649650
System.out.println(output);
650651
if (out != null) {
651652
out.println(testID + "," + format("%.3f", (now - startTime)/1000.0) + "," +
652-
rate(rateSent, sendStatsEnabled)+ "," +
653+
rate(ratePublished, sendStatsEnabled)+ "," +
653654
rate(rateReturned, sendStatsEnabled && returnStatsEnabled)+ "," +
654655
rate(rateConfirmed, sendStatsEnabled && confirmStatsEnabled)+ "," +
655656
rate(rateNacked, sendStatsEnabled && confirmStatsEnabled)+ "," +
656-
rate(rateReceived, recvStatsEnabled) + "," +
657+
rate(rateConsumed, recvStatsEnabled) + "," +
657658
(latencyCountInterval > 0 ?
658659
div(latency.getSnapshot().getMin()) + "," +
659660
div(latency.getSnapshot().getMedian()) + "," +

src/main/java/com/rabbitmq/perf/Stats.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public abstract class Stats {
3333

3434
protected final long startTime;
3535
private final Consumer<Long> updateLatency;
36-
private final DoubleAccumulator sent, returned, confirmed, nacked, received;
36+
private final DoubleAccumulator published, returned, confirmed, nacked, consumed;
3737
protected long lastStatsTime;
3838
protected int sendCountInterval;
3939
protected int returnCountInterval;
@@ -53,27 +53,29 @@ public abstract class Stats {
5353
protected Histogram latency = new MetricRegistry().histogram("latency");
5454

5555
public Stats(long interval) {
56-
this(interval, false, new SimpleMeterRegistry());
56+
this(interval, false, new SimpleMeterRegistry(), null);
5757
}
5858

59-
public Stats(long interval, boolean useMs, MeterRegistry registry) {
59+
public Stats(long interval, boolean useMs, MeterRegistry registry, String metricsPrefix) {
6060
this.interval = interval;
6161
startTime = System.currentTimeMillis();
6262

63+
metricsPrefix = metricsPrefix == null ? "" : metricsPrefix;
64+
6365
Timer latencyTimer = Timer
64-
.builder("latency")
66+
.builder(metricsPrefix + "latency")
6567
.description("message latency")
6668
.publishPercentiles(0.5, 0.75, 0.95, 0.99)
6769
.distributionStatisticExpiry(Duration.ofMillis(this.interval))
6870
.sla()
6971
.register(registry);
7072

7173
DoubleBinaryOperator accumulatorFunction = (x, y) -> y;
72-
sent = registry.gauge("sent", new DoubleAccumulator(accumulatorFunction, 0.0));
73-
returned = registry.gauge("returned", new DoubleAccumulator(accumulatorFunction, 0.0));
74-
confirmed = registry.gauge("confirmed", new DoubleAccumulator(accumulatorFunction, 0.0));
75-
nacked = registry.gauge("nacked", new DoubleAccumulator(accumulatorFunction, 0.0));
76-
received = registry.gauge("received", new DoubleAccumulator(accumulatorFunction, 0.0));
74+
published = registry.gauge(metricsPrefix + "published", new DoubleAccumulator(accumulatorFunction, 0.0));
75+
returned = registry.gauge(metricsPrefix + "returned", new DoubleAccumulator(accumulatorFunction, 0.0));
76+
confirmed = registry.gauge(metricsPrefix + "confirmed", new DoubleAccumulator(accumulatorFunction, 0.0));
77+
nacked = registry.gauge(metricsPrefix + "nacked", new DoubleAccumulator(accumulatorFunction, 0.0));
78+
consumed = registry.gauge(metricsPrefix + "consumed", new DoubleAccumulator(accumulatorFunction, 0.0));
7779

7880
updateLatency = useMs ? latency -> latencyTimer.record(latency, TimeUnit.MILLISECONDS) :
7981
latency -> latencyTimer.record(latency, TimeUnit.NANOSECONDS);
@@ -147,8 +149,8 @@ public synchronized void handleRecv(long latency) {
147149
report();
148150
}
149151

150-
protected void sent(double rate) {
151-
this.sent.accumulate(rate);
152+
protected void published(double rate) {
153+
this.published.accumulate(rate);
152154
}
153155

154156
protected void returned(double rate) {
@@ -164,6 +166,6 @@ protected void nacked(double rate) {
164166
}
165167

166168
protected void received(double rate) {
167-
this.received.accumulate(rate);
169+
this.consumed.accumulate(rate);
168170
}
169171
}

src/test/java/com/rabbitmq/perf/TopologyTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,7 @@ public void sequenceMoreQueuesThanProducers() throws Exception {
582582
when(ch.queueDeclare(queueNameCaptor.capture(), anyBoolean(), anyBoolean(), anyBoolean(), isNull()))
583583
.then(invocation -> new AMQImpl.Queue.DeclareOk(invocation.getArgument(0), 0, 0));
584584

585-
// once all producers have sent messages (producerCount routing keys in the set),
585+
// once all producers have published messages (producerCount routing keys in the set),
586586
// we open the latch so MulticastSet.run can end
587587
Set<String> routingKeys = new HashSet<>();
588588
CountDownLatch latchPublishing = new CountDownLatch(1);

0 commit comments

Comments
 (0)