Skip to content

Commit 60e2e38

Browse files
Merge pull request #127 from rabbitmq/rabbitmq-perf-test-126-clean-shutdown
Shutdown cleanly after Ctrl-C
2 parents 066c0d9 + 5bb678b commit 60e2e38

File tree

7 files changed

+303
-67
lines changed

7 files changed

+303
-67
lines changed

src/main/java/com/rabbitmq/perf/MulticastSet.java

Lines changed: 50 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,8 @@
1515

1616
package com.rabbitmq.perf;
1717

18-
import com.rabbitmq.client.AlreadyClosedException;
19-
import com.rabbitmq.client.Connection;
20-
import com.rabbitmq.client.ConnectionFactory;
21-
import com.rabbitmq.client.Recoverable;
22-
import com.rabbitmq.client.RecoveryListener;
18+
import com.rabbitmq.client.*;
19+
import com.rabbitmq.client.impl.DefaultExceptionHandler;
2320
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
2421
import org.slf4j.Logger;
2522
import org.slf4j.LoggerFactory;
@@ -32,14 +29,7 @@
3229
import java.util.Collection;
3330
import java.util.List;
3431
import java.util.Random;
35-
import java.util.concurrent.CountDownLatch;
36-
import java.util.concurrent.ExecutionException;
37-
import java.util.concurrent.ExecutorService;
38-
import java.util.concurrent.Executors;
39-
import java.util.concurrent.Future;
40-
import java.util.concurrent.ScheduledExecutorService;
41-
import java.util.concurrent.TimeUnit;
42-
import java.util.concurrent.TimeoutException;
32+
import java.util.concurrent.*;
4333
import java.util.concurrent.atomic.AtomicBoolean;
4434
import java.util.function.Function;
4535
import java.util.function.Supplier;
@@ -70,21 +60,27 @@ public class MulticastSet {
7060
private final List<String> uris;
7161
private final Random random = new Random();
7262
private final CompletionHandler completionHandler;
63+
private final ShutdownService shutdownService;
7364
private ThreadingHandler threadingHandler = new DefaultThreadingHandler();
7465

7566
public MulticastSet(Stats stats, ConnectionFactory factory,
7667
MulticastParams params, List<String> uris, CompletionHandler completionHandler) {
77-
this(stats, factory, params, "perftest", uris, completionHandler);
68+
this(stats, factory, params, "perftest", uris, completionHandler, new ShutdownService());
7869
}
7970

8071
public MulticastSet(Stats stats, ConnectionFactory factory,
81-
MulticastParams params, String testID, List<String> uris, CompletionHandler completionHandler) {
72+
MulticastParams params, String testID, List<String> uris, CompletionHandler completionHandler) {
73+
this(stats, factory, params, testID, uris, completionHandler, new ShutdownService());
74+
}
75+
public MulticastSet(Stats stats, ConnectionFactory factory,
76+
MulticastParams params, String testID, List<String> uris, CompletionHandler completionHandler, ShutdownService shutdownService) {
8277
this.stats = stats;
8378
this.factory = factory;
8479
this.params = params;
8580
this.testID = testID;
8681
this.uris = uris;
8782
this.completionHandler = completionHandler;
83+
this.shutdownService = shutdownService;
8884
this.params.init();
8985
}
9086

@@ -145,9 +141,16 @@ public void run(boolean announceStartup)
145141
startConsumers(consumerRunnables);
146142
startProducers(producerStates);
147143

148-
this.completionHandler.waitForCompletion();
144+
AutoCloseable shutdownSequence = this.shutdownService.wrap(
145+
() -> shutdown(configurationConnection, consumerConnections, producerStates, producerConnections)
146+
);
149147

150-
shutdown(configurationConnection, consumerConnections, producerStates, producerConnections);
148+
this.completionHandler.waitForCompletion();
149+
try {
150+
shutdownSequence.close();
151+
} catch (Exception e) {
152+
throw new RuntimeException(e);
153+
}
151154
}
152155

153156
private Function<Integer, ExecutorService> createConsumersExecutorsFactory() {
@@ -248,38 +251,29 @@ private void shutdown(Connection configurationConnection, Connection[] consumerC
248251
try {
249252
LOGGER.debug("Starting test shutdown");
250253
for (AgentState producerState : producerStates) {
251-
producerState.task.cancel(true);
254+
boolean cancelled = producerState.task.cancel(true);
255+
LOGGER.debug("Producer has been correctly cancelled: {}", cancelled);
252256
}
253257

254-
try {
255-
LOGGER.debug("Closing configuration connection");
256-
configurationConnection.close();
257-
LOGGER.debug("Configuration connection closed");
258-
} catch (AlreadyClosedException e) {
259-
LOGGER.debug("Configuration connection was already closed");
260-
} catch (Exception e) {
261-
LOGGER.debug("Error while closing configuration connection: {}", e.getMessage());
258+
// we do our best to stop producers before closing their connections
259+
for (AgentState producerState : producerStates) {
260+
if (!producerState.task.isDone()) {
261+
try {
262+
producerState.task.get(10, TimeUnit.SECONDS);
263+
} catch (Exception e) {
264+
LOGGER.debug("Error while waiting for producer to stop: {}. Moving on.", e.getMessage());
265+
}
266+
}
262267
}
263268

269+
dispose(configurationConnection);
270+
264271
for (Connection producerConnection : producerConnections) {
265-
try {
266-
producerConnection.close();
267-
} catch (Exception e) {
268-
// don't do anything, we need to close the other connections
269-
}
272+
dispose(producerConnection);
270273
}
271274

272275
for (Connection consumerConnection : consumerConnections) {
273-
try {
274-
LOGGER.debug("Closing consumer connection {}", consumerConnection.getClientProvidedName());
275-
consumerConnection.close();
276-
LOGGER.debug("Consumer connection {} has been closed", consumerConnection.getClientProvidedName());
277-
} catch (AlreadyClosedException e) {
278-
LOGGER.debug("Consumer connection {} already closed", consumerConnection.getClientProvidedName());
279-
} catch (Exception e) {
280-
// don't do anything, we need to close the other connections
281-
LOGGER.debug("Error while closing consumer connection {}: {}", consumerConnection.getClientProvidedName(), e.getMessage());
282-
}
276+
dispose(consumerConnection);
283277
}
284278

285279
LOGGER.debug("Shutting down threading handler");
@@ -314,6 +308,21 @@ public void handleRecovery(Recoverable recoverable) {
314308
}
315309
}
316310

311+
private static void dispose(Connection connection) {
312+
try {
313+
LOGGER.debug("Closing connection {}", connection.getClientProvidedName());
314+
// we need to shutdown, it should not take forever, so we pick a reasonably
315+
// small timeout (3 seconds)
316+
connection.close(AMQP.REPLY_SUCCESS, "Closed by PerfTest", 3000);
317+
LOGGER.debug("Connection {} has been closed", connection.getClientProvidedName());
318+
} catch (AlreadyClosedException e) {
319+
LOGGER.debug("Connection {} already closed", connection.getClientProvidedName());
320+
} catch (Exception e) {
321+
// don't do anything, we need to close the other connections
322+
LOGGER.debug("Error while closing connection {}: {}", connection.getClientProvidedName(), e.getMessage());
323+
}
324+
}
325+
317326
private void setUri() throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
318327
if (uris != null) {
319328
factory.setUri(uri());

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
import com.rabbitmq.client.ConnectionFactory;
1919
import com.rabbitmq.client.DefaultSaslConfig;
20+
import com.rabbitmq.client.ExceptionHandler;
2021
import com.rabbitmq.client.impl.ClientVersion;
22+
import com.rabbitmq.client.impl.DefaultExceptionHandler;
2123
import com.rabbitmq.client.impl.nio.NioParams;
2224
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
2325
import org.apache.commons.cli.*;
@@ -30,6 +32,7 @@
3032
import java.security.NoSuchAlgorithmException;
3133
import java.text.SimpleDateFormat;
3234
import java.util.*;
35+
import java.util.concurrent.ExecutorService;
3336
import java.util.concurrent.SynchronousQueue;
3437
import java.util.concurrent.ThreadPoolExecutor;
3538
import java.util.concurrent.TimeUnit;
@@ -46,9 +49,11 @@ public class PerfTest {
4649

4750
public static void main(String [] args, PerfTestOptions perfTestOptions) {
4851
SystemExiter systemExiter = perfTestOptions.systemExiter;
52+
ShutdownService shutdownService = perfTestOptions.shutdownService;
4953
Options options = getOptions();
5054
CommandLineParser parser = new GnuParser();
5155
CompositeMetrics metrics = new CompositeMetrics();
56+
shutdownService.wrap(() -> metrics.close());
5257
Options metricsOptions = metrics.options();
5358
forEach(metricsOptions, option -> options.addOption(option));
5459
try {
@@ -143,12 +148,13 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
143148
factory.setTopologyRecoveryEnabled(false);
144149

145150
CompositeMeterRegistry registry = new CompositeMeterRegistry();
151+
shutdownService.wrap(() -> registry.close());
146152

147153
metrics.configure(cmd, registry, factory);
148154

149155
PrintWriter output;
150156
if (outputFile != null) {
151-
output = openCsvFileForWriting(outputFile);
157+
output = openCsvFileForWriting(outputFile, shutdownService);
152158
} else {
153159
output = null;
154160
}
@@ -204,6 +210,10 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
204210
}
205211

206212
factory = configureNioIfRequested(cmd, factory);
213+
if (factory.getNioParams().getNioExecutor() != null) {
214+
ExecutorService nioExecutor = factory.getNioParams().getNioExecutor();
215+
shutdownService.wrap(() -> nioExecutor.shutdownNow());
216+
}
207217

208218
MulticastParams p = new MulticastParams();
209219
p.setAutoAck( autoAck);
@@ -253,16 +263,12 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
253263

254264
MulticastSet.CompletionHandler completionHandler = getCompletionHandler(p);
255265

256-
MulticastSet set = new MulticastSet(stats, factory, p, testID, uris, completionHandler);
266+
factory.setExceptionHandler(perfTestOptions.exceptionHandler);
267+
268+
MulticastSet set = new MulticastSet(stats, factory, p, testID, uris, completionHandler, shutdownService);
257269
set.run(true);
258270

259271
stats.printFinal();
260-
261-
if (factory.getNioParams().getNioExecutor() != null) {
262-
factory.getNioParams().getNioExecutor().shutdownNow();
263-
}
264-
265-
registry.close();
266272
}
267273
catch (ParseException exp) {
268274
System.err.println("Parsing failed. Reason: " + exp.getMessage());
@@ -272,13 +278,11 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
272278
LOGGER.error("Main thread caught exception", e);
273279
systemExiter.exit(1);
274280
} finally {
275-
if (metrics != null) {
276-
metrics.close();
277-
}
281+
shutdownService.close();
278282
}
279283
}
280284

281-
private static PrintWriter openCsvFileForWriting(String outputFile) throws IOException {
285+
private static PrintWriter openCsvFileForWriting(String outputFile, ShutdownService shutdownService) throws IOException {
282286
PrintWriter output;
283287
File file = new File(outputFile);
284288
if (file.exists()) {
@@ -288,7 +292,7 @@ private static PrintWriter openCsvFileForWriting(String outputFile) throws IOExc
288292
}
289293
}
290294
output = new PrintWriter(new BufferedWriter(new FileWriter(file, true)), true); //NOSONAR
291-
Runtime.getRuntime().addShutdownHook(new Thread(() -> output.close()));
295+
shutdownService.wrap(() -> output.close());
292296
return output;
293297
}
294298

@@ -358,7 +362,13 @@ static MulticastSet.CompletionHandler getCompletionHandler(MulticastParams p) {
358362

359363
public static void main(String[] args) throws IOException {
360364
Log.configureLog();
361-
main(args, new PerfTestOptions().setSystemExiter(new JvmSystemExiter()).setSkipSslContextConfiguration(false));
365+
PerfTestOptions perfTestOptions = new PerfTestOptions();
366+
Runtime.getRuntime().addShutdownHook(new Thread(() -> perfTestOptions.shutdownService.close()));
367+
main(args, perfTestOptions
368+
.setSystemExiter(new JvmSystemExiter())
369+
.setSkipSslContextConfiguration(false)
370+
.setExceptionHandler(new RelaxedExceptionHandler())
371+
);
362372
}
363373

364374
private static SSLContext getSslContextIfNecessary(CommandLineProxy cmd, Properties systemProperties) throws NoSuchAlgorithmException {
@@ -585,6 +595,10 @@ public static class PerfTestOptions {
585595

586596
private boolean skipSslContextConfiguration = false;
587597

598+
private ShutdownService shutdownService = new ShutdownService();
599+
600+
private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
601+
588602
private Function<String, String> argumentLookup = LONG_OPTION_TO_ENVIRONMENT_VARIABLE
589603
.andThen(ENVIRONMENT_VARIABLE_PREFIX)
590604
.andThen(ENVIRONMENT_VARIABLE_LOOKUP);
@@ -603,6 +617,16 @@ public PerfTestOptions setArgumentLookup(Function<String, String> argumentLookup
603617
this.argumentLookup = argumentLookup;
604618
return this;
605619
}
620+
621+
public PerfTestOptions setShutdownService(ShutdownService shutdownService) {
622+
this.shutdownService = shutdownService;
623+
return this;
624+
}
625+
626+
public PerfTestOptions setExceptionHandler(ExceptionHandler exceptionHandler) {
627+
this.exceptionHandler = exceptionHandler;
628+
return this;
629+
}
606630
}
607631

608632
/**

src/main/java/com/rabbitmq/perf/PrintlnStats.java

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.io.PrintStream;
2222
import java.io.PrintWriter;
23+
import java.util.concurrent.atomic.AtomicBoolean;
2324

2425
import static java.lang.String.format;
2526

@@ -40,6 +41,8 @@ class PrintlnStats extends Stats {
4041
private final PrintWriter csvOut;
4142
private final PrintStream out;
4243

44+
private final AtomicBoolean printFinalOnGoingOrDone = new AtomicBoolean(false);
45+
4346
public PrintlnStats(String testID, long interval,
4447
boolean sendStatsEnabled, boolean recvStatsEnabled,
4548
boolean returnStatsEnabled, boolean confirmStatsEnabled,
@@ -100,6 +103,12 @@ private static double rate(long count, long elapsed) {
100103

101104
@Override
102105
protected void report(long now) {
106+
if (!printFinalOnGoingOrDone.get()) {
107+
doReport(now);
108+
}
109+
}
110+
111+
private void doReport(long now) {
103112
String output = "id: " + testID + ", ";
104113

105114
double ratePublished = 0.0;
@@ -169,7 +178,10 @@ protected void report(long now) {
169178
}
170179
}
171180

172-
this.out.println(output);
181+
if (!printFinalOnGoingOrDone.get()) {
182+
this.out.println(output);
183+
}
184+
173185
writeToCsvIfNecessary(now, ratePublished, rateReturned, rateConfirmed, rateNacked, rateConsumed, consumerLatencyStats, confirmLatencyStats);
174186
}
175187

@@ -181,7 +193,7 @@ private String legacyMetrics() {
181193
}
182194

183195
private void writeToCsvIfNecessary(long now, double ratePublished, double rateReturned, double rateConfirmed, double rateNacked, double rateConsumed, long[] consumerLatencyStats, long[] confirmLatencyStats) {
184-
if (this.csvOut != null) {
196+
if (this.csvOut != null && !printFinalOnGoingOrDone.get()) {
185197
if (consumerLatencyStats == null) {
186198
consumerLatencyStats = getStats(latency);
187199
}
@@ -248,17 +260,19 @@ private String getRate(String descr, double rate, boolean display) {
248260
}
249261

250262
public void printFinal() {
251-
long now = System.currentTimeMillis();
252-
253-
System.out.println("id: " + testID + ", sending rate avg: " +
254-
formatRate(sendCountTotal * 1000.0 / (now - startTime)) +
255-
" " + MESSAGE_RATE_LABEL);
263+
if (printFinalOnGoingOrDone.compareAndSet(false, true)) {
264+
long now = System.currentTimeMillis();
256265

257-
long elapsed = now - startTime;
258-
if (elapsed > 0) {
259-
System.out.println("id: " + testID + ", receiving rate avg: " +
260-
formatRate(recvCountTotal * 1000.0 / elapsed) +
266+
System.out.println("id: " + testID + ", sending rate avg: " +
267+
formatRate(sendCountTotal * 1000.0 / (now - startTime)) +
261268
" " + MESSAGE_RATE_LABEL);
269+
270+
long elapsed = now - startTime;
271+
if (elapsed > 0) {
272+
System.out.println("id: " + testID + ", receiving rate avg: " +
273+
formatRate(recvCountTotal * 1000.0 / elapsed) +
274+
" " + MESSAGE_RATE_LABEL);
275+
}
262276
}
263277
}
264278
}

0 commit comments

Comments
 (0)