Skip to content

Commit 5bb678b

Browse files
committed
Shutdown cleanly after Ctrl-C
This commit makes PerfTest shut down cleanly when the process is Ctrl-C-ed. The closing tasks are accumulated in a shutdown service that executes them whether the process exits normally or is Ctrl-C-ed. Note this isn't bullet-proof: if some connections are throttled by the broker because they publish too fast, closing them can easily time out, so PerfTest may take some time to terminate and some "client unexpectedly closed TCP connection" messages can still show up in the broker. For cases where PerfTest doesn't saturate the broker, it should terminate fast and cleanly. Fixes #126
1 parent 066c0d9 commit 5bb678b

File tree

7 files changed

+303
-67
lines changed

7 files changed

+303
-67
lines changed

src/main/java/com/rabbitmq/perf/MulticastSet.java

Lines changed: 50 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,8 @@
1515

1616
package com.rabbitmq.perf;
1717

18-
import com.rabbitmq.client.AlreadyClosedException;
19-
import com.rabbitmq.client.Connection;
20-
import com.rabbitmq.client.ConnectionFactory;
21-
import com.rabbitmq.client.Recoverable;
22-
import com.rabbitmq.client.RecoveryListener;
18+
import com.rabbitmq.client.*;
19+
import com.rabbitmq.client.impl.DefaultExceptionHandler;
2320
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
2421
import org.slf4j.Logger;
2522
import org.slf4j.LoggerFactory;
@@ -32,14 +29,7 @@
3229
import java.util.Collection;
3330
import java.util.List;
3431
import java.util.Random;
35-
import java.util.concurrent.CountDownLatch;
36-
import java.util.concurrent.ExecutionException;
37-
import java.util.concurrent.ExecutorService;
38-
import java.util.concurrent.Executors;
39-
import java.util.concurrent.Future;
40-
import java.util.concurrent.ScheduledExecutorService;
41-
import java.util.concurrent.TimeUnit;
42-
import java.util.concurrent.TimeoutException;
32+
import java.util.concurrent.*;
4333
import java.util.concurrent.atomic.AtomicBoolean;
4434
import java.util.function.Function;
4535
import java.util.function.Supplier;
@@ -70,21 +60,27 @@ public class MulticastSet {
7060
private final List<String> uris;
7161
private final Random random = new Random();
7262
private final CompletionHandler completionHandler;
63+
private final ShutdownService shutdownService;
7364
private ThreadingHandler threadingHandler = new DefaultThreadingHandler();
7465

7566
public MulticastSet(Stats stats, ConnectionFactory factory,
7667
MulticastParams params, List<String> uris, CompletionHandler completionHandler) {
77-
this(stats, factory, params, "perftest", uris, completionHandler);
68+
this(stats, factory, params, "perftest", uris, completionHandler, new ShutdownService());
7869
}
7970

8071
public MulticastSet(Stats stats, ConnectionFactory factory,
81-
MulticastParams params, String testID, List<String> uris, CompletionHandler completionHandler) {
72+
MulticastParams params, String testID, List<String> uris, CompletionHandler completionHandler) {
73+
this(stats, factory, params, testID, uris, completionHandler, new ShutdownService());
74+
}
75+
public MulticastSet(Stats stats, ConnectionFactory factory,
76+
MulticastParams params, String testID, List<String> uris, CompletionHandler completionHandler, ShutdownService shutdownService) {
8277
this.stats = stats;
8378
this.factory = factory;
8479
this.params = params;
8580
this.testID = testID;
8681
this.uris = uris;
8782
this.completionHandler = completionHandler;
83+
this.shutdownService = shutdownService;
8884
this.params.init();
8985
}
9086

@@ -145,9 +141,16 @@ public void run(boolean announceStartup)
145141
startConsumers(consumerRunnables);
146142
startProducers(producerStates);
147143

148-
this.completionHandler.waitForCompletion();
144+
AutoCloseable shutdownSequence = this.shutdownService.wrap(
145+
() -> shutdown(configurationConnection, consumerConnections, producerStates, producerConnections)
146+
);
149147

150-
shutdown(configurationConnection, consumerConnections, producerStates, producerConnections);
148+
this.completionHandler.waitForCompletion();
149+
try {
150+
shutdownSequence.close();
151+
} catch (Exception e) {
152+
throw new RuntimeException(e);
153+
}
151154
}
152155

153156
private Function<Integer, ExecutorService> createConsumersExecutorsFactory() {
@@ -248,38 +251,29 @@ private void shutdown(Connection configurationConnection, Connection[] consumerC
248251
try {
249252
LOGGER.debug("Starting test shutdown");
250253
for (AgentState producerState : producerStates) {
251-
producerState.task.cancel(true);
254+
boolean cancelled = producerState.task.cancel(true);
255+
LOGGER.debug("Producer has been correctly cancelled: {}", cancelled);
252256
}
253257

254-
try {
255-
LOGGER.debug("Closing configuration connection");
256-
configurationConnection.close();
257-
LOGGER.debug("Configuration connection closed");
258-
} catch (AlreadyClosedException e) {
259-
LOGGER.debug("Configuration connection was already closed");
260-
} catch (Exception e) {
261-
LOGGER.debug("Error while closing configuration connection: {}", e.getMessage());
258+
// we do our best to stop producers before closing their connections
259+
for (AgentState producerState : producerStates) {
260+
if (!producerState.task.isDone()) {
261+
try {
262+
producerState.task.get(10, TimeUnit.SECONDS);
263+
} catch (Exception e) {
264+
LOGGER.debug("Error while waiting for producer to stop: {}. Moving on.", e.getMessage());
265+
}
266+
}
262267
}
263268

269+
dispose(configurationConnection);
270+
264271
for (Connection producerConnection : producerConnections) {
265-
try {
266-
producerConnection.close();
267-
} catch (Exception e) {
268-
// don't do anything, we need to close the other connections
269-
}
272+
dispose(producerConnection);
270273
}
271274

272275
for (Connection consumerConnection : consumerConnections) {
273-
try {
274-
LOGGER.debug("Closing consumer connection {}", consumerConnection.getClientProvidedName());
275-
consumerConnection.close();
276-
LOGGER.debug("Consumer connection {} has been closed", consumerConnection.getClientProvidedName());
277-
} catch (AlreadyClosedException e) {
278-
LOGGER.debug("Consumer connection {} already closed", consumerConnection.getClientProvidedName());
279-
} catch (Exception e) {
280-
// don't do anything, we need to close the other connections
281-
LOGGER.debug("Error while closing consumer connection {}: {}", consumerConnection.getClientProvidedName(), e.getMessage());
282-
}
276+
dispose(consumerConnection);
283277
}
284278

285279
LOGGER.debug("Shutting down threading handler");
@@ -314,6 +308,21 @@ public void handleRecovery(Recoverable recoverable) {
314308
}
315309
}
316310

311+
private static void dispose(Connection connection) {
312+
try {
313+
LOGGER.debug("Closing connection {}", connection.getClientProvidedName());
314+
// we need to shutdown, it should not take forever, so we pick a reasonably
315+
// small timeout (3 seconds)
316+
connection.close(AMQP.REPLY_SUCCESS, "Closed by PerfTest", 3000);
317+
LOGGER.debug("Connection {} has been closed", connection.getClientProvidedName());
318+
} catch (AlreadyClosedException e) {
319+
LOGGER.debug("Connection {} already closed", connection.getClientProvidedName());
320+
} catch (Exception e) {
321+
// don't do anything, we need to close the other connections
322+
LOGGER.debug("Error while closing connection {}: {}", connection.getClientProvidedName(), e.getMessage());
323+
}
324+
}
325+
317326
private void setUri() throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
318327
if (uris != null) {
319328
factory.setUri(uri());

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
import com.rabbitmq.client.ConnectionFactory;
1919
import com.rabbitmq.client.DefaultSaslConfig;
20+
import com.rabbitmq.client.ExceptionHandler;
2021
import com.rabbitmq.client.impl.ClientVersion;
22+
import com.rabbitmq.client.impl.DefaultExceptionHandler;
2123
import com.rabbitmq.client.impl.nio.NioParams;
2224
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
2325
import org.apache.commons.cli.*;
@@ -30,6 +32,7 @@
3032
import java.security.NoSuchAlgorithmException;
3133
import java.text.SimpleDateFormat;
3234
import java.util.*;
35+
import java.util.concurrent.ExecutorService;
3336
import java.util.concurrent.SynchronousQueue;
3437
import java.util.concurrent.ThreadPoolExecutor;
3538
import java.util.concurrent.TimeUnit;
@@ -46,9 +49,11 @@ public class PerfTest {
4649

4750
public static void main(String [] args, PerfTestOptions perfTestOptions) {
4851
SystemExiter systemExiter = perfTestOptions.systemExiter;
52+
ShutdownService shutdownService = perfTestOptions.shutdownService;
4953
Options options = getOptions();
5054
CommandLineParser parser = new GnuParser();
5155
CompositeMetrics metrics = new CompositeMetrics();
56+
shutdownService.wrap(() -> metrics.close());
5257
Options metricsOptions = metrics.options();
5358
forEach(metricsOptions, option -> options.addOption(option));
5459
try {
@@ -143,12 +148,13 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
143148
factory.setTopologyRecoveryEnabled(false);
144149

145150
CompositeMeterRegistry registry = new CompositeMeterRegistry();
151+
shutdownService.wrap(() -> registry.close());
146152

147153
metrics.configure(cmd, registry, factory);
148154

149155
PrintWriter output;
150156
if (outputFile != null) {
151-
output = openCsvFileForWriting(outputFile);
157+
output = openCsvFileForWriting(outputFile, shutdownService);
152158
} else {
153159
output = null;
154160
}
@@ -204,6 +210,10 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
204210
}
205211

206212
factory = configureNioIfRequested(cmd, factory);
213+
if (factory.getNioParams().getNioExecutor() != null) {
214+
ExecutorService nioExecutor = factory.getNioParams().getNioExecutor();
215+
shutdownService.wrap(() -> nioExecutor.shutdownNow());
216+
}
207217

208218
MulticastParams p = new MulticastParams();
209219
p.setAutoAck( autoAck);
@@ -253,16 +263,12 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
253263

254264
MulticastSet.CompletionHandler completionHandler = getCompletionHandler(p);
255265

256-
MulticastSet set = new MulticastSet(stats, factory, p, testID, uris, completionHandler);
266+
factory.setExceptionHandler(perfTestOptions.exceptionHandler);
267+
268+
MulticastSet set = new MulticastSet(stats, factory, p, testID, uris, completionHandler, shutdownService);
257269
set.run(true);
258270

259271
stats.printFinal();
260-
261-
if (factory.getNioParams().getNioExecutor() != null) {
262-
factory.getNioParams().getNioExecutor().shutdownNow();
263-
}
264-
265-
registry.close();
266272
}
267273
catch (ParseException exp) {
268274
System.err.println("Parsing failed. Reason: " + exp.getMessage());
@@ -272,13 +278,11 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
272278
LOGGER.error("Main thread caught exception", e);
273279
systemExiter.exit(1);
274280
} finally {
275-
if (metrics != null) {
276-
metrics.close();
277-
}
281+
shutdownService.close();
278282
}
279283
}
280284

281-
private static PrintWriter openCsvFileForWriting(String outputFile) throws IOException {
285+
private static PrintWriter openCsvFileForWriting(String outputFile, ShutdownService shutdownService) throws IOException {
282286
PrintWriter output;
283287
File file = new File(outputFile);
284288
if (file.exists()) {
@@ -288,7 +292,7 @@ private static PrintWriter openCsvFileForWriting(String outputFile) throws IOExc
288292
}
289293
}
290294
output = new PrintWriter(new BufferedWriter(new FileWriter(file, true)), true); //NOSONAR
291-
Runtime.getRuntime().addShutdownHook(new Thread(() -> output.close()));
295+
shutdownService.wrap(() -> output.close());
292296
return output;
293297
}
294298

@@ -358,7 +362,13 @@ static MulticastSet.CompletionHandler getCompletionHandler(MulticastParams p) {
358362

359363
public static void main(String[] args) throws IOException {
360364
Log.configureLog();
361-
main(args, new PerfTestOptions().setSystemExiter(new JvmSystemExiter()).setSkipSslContextConfiguration(false));
365+
PerfTestOptions perfTestOptions = new PerfTestOptions();
366+
Runtime.getRuntime().addShutdownHook(new Thread(() -> perfTestOptions.shutdownService.close()));
367+
main(args, perfTestOptions
368+
.setSystemExiter(new JvmSystemExiter())
369+
.setSkipSslContextConfiguration(false)
370+
.setExceptionHandler(new RelaxedExceptionHandler())
371+
);
362372
}
363373

364374
private static SSLContext getSslContextIfNecessary(CommandLineProxy cmd, Properties systemProperties) throws NoSuchAlgorithmException {
@@ -585,6 +595,10 @@ public static class PerfTestOptions {
585595

586596
private boolean skipSslContextConfiguration = false;
587597

598+
private ShutdownService shutdownService = new ShutdownService();
599+
600+
private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
601+
588602
private Function<String, String> argumentLookup = LONG_OPTION_TO_ENVIRONMENT_VARIABLE
589603
.andThen(ENVIRONMENT_VARIABLE_PREFIX)
590604
.andThen(ENVIRONMENT_VARIABLE_LOOKUP);
@@ -603,6 +617,16 @@ public PerfTestOptions setArgumentLookup(Function<String, String> argumentLookup
603617
this.argumentLookup = argumentLookup;
604618
return this;
605619
}
620+
621+
public PerfTestOptions setShutdownService(ShutdownService shutdownService) {
622+
this.shutdownService = shutdownService;
623+
return this;
624+
}
625+
626+
public PerfTestOptions setExceptionHandler(ExceptionHandler exceptionHandler) {
627+
this.exceptionHandler = exceptionHandler;
628+
return this;
629+
}
606630
}
607631

608632
/**

src/main/java/com/rabbitmq/perf/PrintlnStats.java

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.io.PrintStream;
2222
import java.io.PrintWriter;
23+
import java.util.concurrent.atomic.AtomicBoolean;
2324

2425
import static java.lang.String.format;
2526

@@ -40,6 +41,8 @@ class PrintlnStats extends Stats {
4041
private final PrintWriter csvOut;
4142
private final PrintStream out;
4243

44+
private final AtomicBoolean printFinalOnGoingOrDone = new AtomicBoolean(false);
45+
4346
public PrintlnStats(String testID, long interval,
4447
boolean sendStatsEnabled, boolean recvStatsEnabled,
4548
boolean returnStatsEnabled, boolean confirmStatsEnabled,
@@ -100,6 +103,12 @@ private static double rate(long count, long elapsed) {
100103

101104
@Override
102105
protected void report(long now) {
106+
if (!printFinalOnGoingOrDone.get()) {
107+
doReport(now);
108+
}
109+
}
110+
111+
private void doReport(long now) {
103112
String output = "id: " + testID + ", ";
104113

105114
double ratePublished = 0.0;
@@ -169,7 +178,10 @@ protected void report(long now) {
169178
}
170179
}
171180

172-
this.out.println(output);
181+
if (!printFinalOnGoingOrDone.get()) {
182+
this.out.println(output);
183+
}
184+
173185
writeToCsvIfNecessary(now, ratePublished, rateReturned, rateConfirmed, rateNacked, rateConsumed, consumerLatencyStats, confirmLatencyStats);
174186
}
175187

@@ -181,7 +193,7 @@ private String legacyMetrics() {
181193
}
182194

183195
private void writeToCsvIfNecessary(long now, double ratePublished, double rateReturned, double rateConfirmed, double rateNacked, double rateConsumed, long[] consumerLatencyStats, long[] confirmLatencyStats) {
184-
if (this.csvOut != null) {
196+
if (this.csvOut != null && !printFinalOnGoingOrDone.get()) {
185197
if (consumerLatencyStats == null) {
186198
consumerLatencyStats = getStats(latency);
187199
}
@@ -248,17 +260,19 @@ private String getRate(String descr, double rate, boolean display) {
248260
}
249261

250262
public void printFinal() {
251-
long now = System.currentTimeMillis();
252-
253-
System.out.println("id: " + testID + ", sending rate avg: " +
254-
formatRate(sendCountTotal * 1000.0 / (now - startTime)) +
255-
" " + MESSAGE_RATE_LABEL);
263+
if (printFinalOnGoingOrDone.compareAndSet(false, true)) {
264+
long now = System.currentTimeMillis();
256265

257-
long elapsed = now - startTime;
258-
if (elapsed > 0) {
259-
System.out.println("id: " + testID + ", receiving rate avg: " +
260-
formatRate(recvCountTotal * 1000.0 / elapsed) +
266+
System.out.println("id: " + testID + ", sending rate avg: " +
267+
formatRate(sendCountTotal * 1000.0 / (now - startTime)) +
261268
" " + MESSAGE_RATE_LABEL);
269+
270+
long elapsed = now - startTime;
271+
if (elapsed > 0) {
272+
System.out.println("id: " + testID + ", receiving rate avg: " +
273+
formatRate(recvCountTotal * 1000.0 / elapsed) +
274+
" " + MESSAGE_RATE_LABEL);
275+
}
262276
}
263277
}
264278
}

0 commit comments

Comments
 (0)