Skip to content

Commit feb39e2

Browse files
committed
Start metrics service before consumers and publishers
This way it outputs results even if consumers and publishers take time to start.
1 parent 5e88196 commit feb39e2

File tree

3 files changed

+37
-25
lines changed

3 files changed

+37
-25
lines changed

src/main/java/com/rabbitmq/perf/MulticastSet.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,8 @@ public void run(boolean announceStartup)
236236

237237
createProducers(announceStartup, producerStates, producerConnections);
238238

239+
this.performanceMetrics.start();
240+
239241
startConsumers(consumerRunnables);
240242
startProducers(producerStates);
241243

@@ -300,7 +302,6 @@ public void run(boolean announceStartup)
300302
shutdownSequence = () -> { };
301303
}
302304

303-
this.performanceMetrics.start();
304305
this.completionHandler.waitForCompletion();
305306

306307
try {
@@ -432,7 +433,9 @@ private void startConsumers(Runnable[] consumerRunnables) throws InterruptedExce
432433
if (this.params.getConsumerStartDelay().getSeconds() <= 0) {
433434
this.consumerLatencyIndicator.start();
434435
for (Runnable runnable : consumerRunnables) {
436+
LOGGER.debug("Starting consumer runnable...");
435437
runnable.run();
438+
LOGGER.debug("Consumer runnable started");
436439
if (params.getConsumerSlowStart()) {
437440
System.out.println("Delaying start by 1 second because -S/--slow-start was requested");
438441
Thread.sleep(1000);

src/main/java/com/rabbitmq/perf/metrics/DefaultPerformanceMetrics.java

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public final class DefaultPerformanceMetrics implements PerformanceMetrics, Auto
7373
private final Duration interval;
7474
private final TimeUnit latencyCollectionTimeUnit;
7575
private final AtomicBoolean firstReport = new AtomicBoolean(false);
76-
private final AtomicBoolean closed = new AtomicBoolean(false);
76+
private final AtomicBoolean started = new AtomicBoolean(false);
7777
private final AtomicReference<Histogram> consumedLatency, confirmedLatency;
7878
private final MetricsFormatter formatter;
7979

@@ -162,27 +162,29 @@ private static Runnable wrapInCatch(Runnable runnable) {
162162

163163
@Override
164164
public void start() {
165-
startTime.set(System.nanoTime());
166-
lastTick.set(startTime.get());
167-
startTimeForTotal.set(startTime.get());
168-
169-
scheduledExecutorService.scheduleAtFixedRate(wrapInCatch(() -> {
170-
if (this.closed.get()) {
171-
return;
172-
}
173-
if (noActivity()) {
174-
this.publishedRate.accumulate(0);
175-
this.confirmedRate.accumulate(0);
176-
this.nackedRate.accumulate(0);
177-
this.returnedRate.accumulate(0);
178-
this.receivedRate.accumulate(0);
179-
this.confirmedLatency.set(histogram());
180-
this.consumedLatency.set(histogram());
181-
} else {
182-
metrics(System.nanoTime());
183-
}
184-
185-
}), interval.getSeconds(), interval.getSeconds(), TimeUnit.SECONDS);
165+
if (this.started.compareAndSet(false, true)) {
166+
startTime.set(System.nanoTime());
167+
lastTick.set(startTime.get());
168+
startTimeForTotal.set(startTime.get());
169+
170+
scheduledExecutorService.scheduleAtFixedRate(wrapInCatch(() -> {
171+
if (!this.started.get()) {
172+
return;
173+
}
174+
if (noActivity()) {
175+
this.publishedRate.accumulate(0);
176+
this.confirmedRate.accumulate(0);
177+
this.nackedRate.accumulate(0);
178+
this.returnedRate.accumulate(0);
179+
this.receivedRate.accumulate(0);
180+
this.confirmedLatency.set(histogram());
181+
this.consumedLatency.set(histogram());
182+
} else {
183+
metrics(System.nanoTime());
184+
}
185+
186+
}), interval.getSeconds(), interval.getSeconds(), TimeUnit.SECONDS);
187+
}
186188
}
187189

188190
void metrics(long currentTime) {
@@ -211,7 +213,7 @@ void metrics(long currentTime) {
211213
this.confirmedLatency.set(histogram());
212214
this.consumedLatency.set(histogram());
213215

214-
if (!this.closed.get()) {
216+
if (this.started.get()) {
215217
if (this.firstReport.compareAndSet(false, true)) {
216218
this.formatter.header();
217219
}
@@ -320,9 +322,15 @@ public void resetGlobals() {
320322

321323
@Override
322324
public void close() {
323-
if (this.closed.compareAndSet(false, true)) {
325+
if (this.started.compareAndSet(false, true)) {
324326
this.scheduledExecutorService.shutdownNow();
325327
printFinal();
326328
}
327329
}
330+
331+
// for testing
332+
void started(boolean value) {
333+
this.started.set(value);
334+
}
335+
328336
}

src/test/java/com/rabbitmq/perf/metrics/DefaultPerformanceMetricsTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ public void stats(TestConfiguration testConfiguration) {
166166

167167
void execute(Configurator configurator) {
168168
DefaultPerformanceMetrics metrics = metrics(configurator);
169+
metrics.started(true);
169170
metrics.metrics(System.nanoTime());
170171
this.defaultOutput = defaultConsoleOut.toString();
171172
this.compactOutput = compactConsoleOut.toString();

0 commit comments

Comments
 (0)