Skip to content

Commit 3043cdf

Browse files
committed
Revert "Start metrics service before consumers and publishers"
This reverts commit feb39e2.
1 parent feb39e2 commit 3043cdf

File tree

3 files changed

+25
-37
lines changed

3 files changed

+25
-37
lines changed

src/main/java/com/rabbitmq/perf/MulticastSet.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,6 @@ public void run(boolean announceStartup)
236236

237237
createProducers(announceStartup, producerStates, producerConnections);
238238

239-
this.performanceMetrics.start();
240-
241239
startConsumers(consumerRunnables);
242240
startProducers(producerStates);
243241

@@ -302,6 +300,7 @@ public void run(boolean announceStartup)
302300
shutdownSequence = () -> { };
303301
}
304302

303+
this.performanceMetrics.start();
305304
this.completionHandler.waitForCompletion();
306305

307306
try {
@@ -433,9 +432,7 @@ private void startConsumers(Runnable[] consumerRunnables) throws InterruptedExce
433432
if (this.params.getConsumerStartDelay().getSeconds() <= 0) {
434433
this.consumerLatencyIndicator.start();
435434
for (Runnable runnable : consumerRunnables) {
436-
LOGGER.debug("Starting consumer runnable...");
437435
runnable.run();
438-
LOGGER.debug("Consumer runnable started");
439436
if (params.getConsumerSlowStart()) {
440437
System.out.println("Delaying start by 1 second because -S/--slow-start was requested");
441438
Thread.sleep(1000);

src/main/java/com/rabbitmq/perf/metrics/DefaultPerformanceMetrics.java

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public final class DefaultPerformanceMetrics implements PerformanceMetrics, Auto
7373
private final Duration interval;
7474
private final TimeUnit latencyCollectionTimeUnit;
7575
private final AtomicBoolean firstReport = new AtomicBoolean(false);
76-
private final AtomicBoolean started = new AtomicBoolean(false);
76+
private final AtomicBoolean closed = new AtomicBoolean(false);
7777
private final AtomicReference<Histogram> consumedLatency, confirmedLatency;
7878
private final MetricsFormatter formatter;
7979

@@ -162,29 +162,27 @@ private static Runnable wrapInCatch(Runnable runnable) {
162162

163163
@Override
164164
public void start() {
165-
if (this.started.compareAndSet(false, true)) {
166-
startTime.set(System.nanoTime());
167-
lastTick.set(startTime.get());
168-
startTimeForTotal.set(startTime.get());
169-
170-
scheduledExecutorService.scheduleAtFixedRate(wrapInCatch(() -> {
171-
if (!this.started.get()) {
172-
return;
173-
}
174-
if (noActivity()) {
175-
this.publishedRate.accumulate(0);
176-
this.confirmedRate.accumulate(0);
177-
this.nackedRate.accumulate(0);
178-
this.returnedRate.accumulate(0);
179-
this.receivedRate.accumulate(0);
180-
this.confirmedLatency.set(histogram());
181-
this.consumedLatency.set(histogram());
182-
} else {
183-
metrics(System.nanoTime());
184-
}
185-
186-
}), interval.getSeconds(), interval.getSeconds(), TimeUnit.SECONDS);
187-
}
165+
startTime.set(System.nanoTime());
166+
lastTick.set(startTime.get());
167+
startTimeForTotal.set(startTime.get());
168+
169+
scheduledExecutorService.scheduleAtFixedRate(wrapInCatch(() -> {
170+
if (this.closed.get()) {
171+
return;
172+
}
173+
if (noActivity()) {
174+
this.publishedRate.accumulate(0);
175+
this.confirmedRate.accumulate(0);
176+
this.nackedRate.accumulate(0);
177+
this.returnedRate.accumulate(0);
178+
this.receivedRate.accumulate(0);
179+
this.confirmedLatency.set(histogram());
180+
this.consumedLatency.set(histogram());
181+
} else {
182+
metrics(System.nanoTime());
183+
}
184+
185+
}), interval.getSeconds(), interval.getSeconds(), TimeUnit.SECONDS);
188186
}
189187

190188
void metrics(long currentTime) {
@@ -213,7 +211,7 @@ void metrics(long currentTime) {
213211
this.confirmedLatency.set(histogram());
214212
this.consumedLatency.set(histogram());
215213

216-
if (this.started.get()) {
214+
if (!this.closed.get()) {
217215
if (this.firstReport.compareAndSet(false, true)) {
218216
this.formatter.header();
219217
}
@@ -322,15 +320,9 @@ public void resetGlobals() {
322320

323321
@Override
324322
public void close() {
325-
if (this.started.compareAndSet(false, true)) {
323+
if (this.closed.compareAndSet(false, true)) {
326324
this.scheduledExecutorService.shutdownNow();
327325
printFinal();
328326
}
329327
}
330-
331-
// for testing
332-
void started(boolean value) {
333-
this.started.set(value);
334-
}
335-
336328
}

src/test/java/com/rabbitmq/perf/metrics/DefaultPerformanceMetricsTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,6 @@ public void stats(TestConfiguration testConfiguration) {
166166

167167
void execute(Configurator configurator) {
168168
DefaultPerformanceMetrics metrics = metrics(configurator);
169-
metrics.started(true);
170169
metrics.metrics(System.nanoTime());
171170
this.defaultOutput = defaultConsoleOut.toString();
172171
this.compactOutput = compactConsoleOut.toString();

0 commit comments

Comments
 (0)