Skip to content

Commit ecd38d8

Browse files
committed
Add shutdown timeout argument, defaults to 5 seconds
The shutdown sequence (mainly connection closing) happens now in a separate thread to be easily stopped. AMQP connections won't be closed in case of timeout, but this avoids waiting for a long time when publisher connections are all blocked. [#162704749] References #126
1 parent e793322 commit ecd38d8

File tree

5 files changed

+195
-7
lines changed

5 files changed

+195
-7
lines changed

src/docs/asciidoc/usage.adoc

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,36 @@ messages from only one queue.
144144
Note it is possible to link:#customising-queues-and-messages[customise]
145145
the queue and to work against link:#working-with-many-queues[several queues] as well.
146146

147+
== Stopping PerfTest
148+
149+
There are 2 reasons for a PerfTest run to stop:
150+
151+
* one of the limits has been reached (time limit, producer or consumer message count)
152+
* the process is stopped by the user, e.g. by using Ctrl-C in the terminal
153+
154+
In both cases, PerfTest tries to exit as cleanly as possible, in a reasonable amount of time.
155+
Nevertheless, when PerfTest AMQP connections are throttled by the broker, because they're
156+
publishing too fast or because broker http://www.rabbitmq.com/alarms.html[alarms]
157+
have kicked in, it can take time to close them (several seconds or more for one connection).
158+
159+
If closing connections in the gentle way takes too long (5 seconds by default), PerfTest
160+
will move on to the most important resources to free and terminates. This can result
161+
in `client unexpectedly closed TCP connection` messages in the broker logs. Note this
162+
means the AMQP connection hasn't been closed with the right sequence of AMQP frames,
163+
but the socket has been closed properly. There's no resource leakage here.
164+
165+
The connection closing timeout can be set up with the `--shutdown-timeout` argument (or `-st`).
166+
The default timeout can be increased to let more time to close connections, e.g. the
167+
command below uses a shutdown timeout of 20 seconds:
168+
169+
bin/runjava com.rabbitmq.perf.PerfTest --shutdown-timeout 20
170+
171+
The connection closing sequence can also be skipped by setting the timeout to 0 or any negative
172+
value:
173+
174+
bin/runjava com.rabbitmq.perf.PerfTest --shutdown-timeout -1
175+
176+
With the previous command, PerfTest won't even try to close AMQP connections, it will exit
177+
as fast as possible, freeing only the most important resources. This is perfectly
178+
acceptable when performing runs on a test environment.
179+

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ public class MulticastParams {
9292
private int producerRandomStartDelayInSeconds;
9393
private int producerSchedulerThreadCount = -1;
9494
private int consumersThreadPools = -1;
95+
private int shutdownTimeout = 5;
9596

9697
public void setExchangeType(String exchangeType) {
9798
this.exchangeType = exchangeType;
@@ -234,6 +235,10 @@ public void setConsumersThreadPools(int consumersThreadPools) {
234235
this.consumersThreadPools = consumersThreadPools;
235236
}
236237

238+
public void setShutdownTimeout(int shutdownTimeout) {
239+
this.shutdownTimeout = shutdownTimeout;
240+
}
241+
237242
public int getConsumerCount() {
238243
return consumerCount;
239244
}
@@ -318,6 +323,10 @@ public int getConsumersThreadPools() {
318323
return consumersThreadPools;
319324
}
320325

326+
public int getShutdownTimeout() {
327+
return shutdownTimeout;
328+
}
329+
321330
public Producer createProducer(Connection connection, Stats stats, MulticastSet.CompletionHandler completionHandler) throws IOException {
322331
Channel channel = connection.createChannel(); //NOSONAR
323332
if (producerTxSize > 0) channel.txSelect();

src/main/java/com/rabbitmq/perf/MulticastSet.java

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package com.rabbitmq.perf;
1717

1818
import com.rabbitmq.client.*;
19-
import com.rabbitmq.client.impl.DefaultExceptionHandler;
2019
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
2120
import org.slf4j.Logger;
2221
import org.slf4j.LoggerFactory;
@@ -141,11 +140,35 @@ public void run(boolean announceStartup)
141140
startConsumers(consumerRunnables);
142141
startProducers(producerStates);
143142

144-
AutoCloseable shutdownSequence = this.shutdownService.wrap(
145-
() -> shutdown(configurationConnection, consumerConnections, producerStates, producerConnections)
146-
);
143+
144+
AutoCloseable shutdownSequence;
145+
int shutdownTimeout = this.params.getShutdownTimeout();
146+
if (shutdownTimeout > 0) {
147+
shutdownSequence = this.shutdownService.wrap(
148+
() -> {
149+
CountDownLatch latch = new CountDownLatch(1);
150+
Thread shutdownThread = new Thread(() -> {
151+
try {
152+
shutdown(configurationConnection, consumerConnections, producerStates, producerConnections);
153+
} finally {
154+
latch.countDown();
155+
}
156+
});
157+
shutdownThread.start();
158+
boolean done = latch.await(shutdownTimeout, TimeUnit.SECONDS);
159+
if (!done) {
160+
LOGGER.debug("Shutdown not completed in {} second(s), aborting.", shutdownTimeout);
161+
shutdownThread.interrupt();
162+
}
163+
}
164+
);
165+
} else {
166+
// no closing timeout, we don't do anything
167+
shutdownSequence = () -> { };
168+
}
147169

148170
this.completionHandler.waitForCompletion();
171+
149172
try {
150173
shutdownSequence.close();
151174
} catch (Exception e) {
@@ -251,6 +274,9 @@ private void shutdown(Connection configurationConnection, Connection[] consumerC
251274
try {
252275
LOGGER.debug("Starting test shutdown");
253276
for (AgentState producerState : producerStates) {
277+
if (Thread.interrupted()) {
278+
return;
279+
}
254280
boolean cancelled = producerState.task.cancel(true);
255281
LOGGER.debug("Producer has been correctly cancelled: {}", cancelled);
256282
}
@@ -259,23 +285,38 @@ private void shutdown(Connection configurationConnection, Connection[] consumerC
259285
for (AgentState producerState : producerStates) {
260286
if (!producerState.task.isDone()) {
261287
try {
288+
if (Thread.interrupted()) {
289+
return;
290+
}
262291
producerState.task.get(10, TimeUnit.SECONDS);
263292
} catch (Exception e) {
264293
LOGGER.debug("Error while waiting for producer to stop: {}. Moving on.", e.getMessage());
265294
}
266295
}
267296
}
268297

298+
if (Thread.interrupted()) {
299+
return;
300+
}
269301
dispose(configurationConnection);
270302

271303
for (Connection producerConnection : producerConnections) {
304+
if (Thread.interrupted()) {
305+
return;
306+
}
272307
dispose(producerConnection);
273308
}
274309

275310
for (Connection consumerConnection : consumerConnections) {
311+
if (Thread.interrupted()) {
312+
return;
313+
}
276314
dispose(consumerConnection);
277315
}
278316

317+
if (Thread.interrupted()) {
318+
return;
319+
}
279320
LOGGER.debug("Shutting down threading handler");
280321
this.threadingHandler.shutdown();
281322
LOGGER.debug("Threading handler shut down");
@@ -318,7 +359,7 @@ private static void dispose(Connection connection) {
318359
} catch (AlreadyClosedException e) {
319360
LOGGER.debug("Connection {} already closed", connection.getClientProvidedName());
320361
} catch (Exception e) {
321-
// don't do anything, we need to close the other connections
362+
// just log, we don't want to stop here
322363
LOGGER.debug("Error while closing connection {}: {}", connection.getClientProvidedName(), e.getMessage());
323364
}
324365
}

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
135135

136136
boolean disableConnectionRecovery = hasOption(cmd, "dcr");
137137
int consumersThreadPools = intArg(cmd, "ctp", -1);
138+
int shutdownTimeout = intArg(cmd, "st", 5);
138139

139140
String uri = strArg(cmd, 'h', "amqp://localhost");
140141
String urisParameter = strArg(cmd, 'H', null);
@@ -260,6 +261,7 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
260261
p.setProducerRandomStartDelayInSeconds(producerRandomStartDelayInSeconds);
261262
p.setProducerSchedulerThreadCount(producerSchedulingThreads);
262263
p.setConsumersThreadPools(consumersThreadPools);
264+
p.setShutdownTimeout(shutdownTimeout);
263265

264266
MulticastSet.CompletionHandler completionHandler = getCompletionHandler(p);
265267

@@ -496,6 +498,9 @@ public static Options getOptions() {
496498

497499
options.addOption(new Option("ctp", "consumers-thread-pools",true, "number of thread pools to use for all consumers, "
498500
+ "default is to use a thread pool for each consumer"));
501+
502+
options.addOption(new Option("st", "shutdown-timeout",true, "shutdown timeout, default is 5 seconds"));
503+
499504
return options;
500505
}
501506

src/test/java/com/rabbitmq/perf/MessageCountTimeLimitAndPublishingIntervalRateTest.java

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,10 @@ public void timeLimit() throws InterruptedException {
189189
callback("getNextPublishSeqNo", (proxy, method, args) -> 0L)
190190
);
191191

192+
AtomicInteger closeCount = new AtomicInteger(0);
192193
Connection connection = proxy(Connection.class,
193-
callback("createChannel", (proxy, method, args) -> channel)
194+
callback("createChannel", (proxy, method, args) -> channel),
195+
callback("close", (proxy, method, args) -> closeCount.incrementAndGet())
194196
);
195197

196198
MulticastSet multicastSet = getMulticastSet(connectionFactoryThatReturns(connection));
@@ -202,6 +204,7 @@ public void timeLimit() throws InterruptedException {
202204
waitAtMost(15, () -> testIsDone.get());
203205

204206
assertThat(testDurationInMs, greaterThanOrEqualTo(3000L));
207+
assertThat(closeCount.get(), is(4)); // the configuration connection is actually closed twice
205208
}
206209

207210
// -y 1 --pmessages 10 -x n -X m
@@ -493,6 +496,99 @@ public void publishingInterval() throws InterruptedException {
493496
assertThat(testDurationInMs, greaterThan(5000L));
494497
}
495498

499+
@Test
500+
public void shutdownCalledIfShutdownTimeoutIsGreatherThanZero() throws Exception {
501+
countsAndTimeLimit(0, 0, 0);
502+
503+
Channel channel = proxy(Channel.class,
504+
callback("basicPublish", (proxy, method, args) -> null),
505+
callback("getNextPublishSeqNo", (proxy, method, args) -> 0L)
506+
);
507+
508+
AtomicInteger connectionCloseCalls = new AtomicInteger(0);
509+
Connection connection = proxy(Connection.class,
510+
callback("createChannel", (proxy, method, args) -> channel),
511+
callback("close", (proxy, method, args) -> connectionCloseCalls.incrementAndGet())
512+
);
513+
514+
MulticastSet multicastSet = getMulticastSet(connectionFactoryThatReturns(connection));
515+
516+
run(multicastSet);
517+
518+
waitForRunToStart();
519+
520+
completionHandler.countDown();
521+
waitAtMost(10, () -> testIsDone.get());
522+
assertThat(connectionCloseCalls.get(), is(4)); // configuration connection is closed twice
523+
}
524+
525+
@Test
526+
public void shutdownNotCalledIfShutdownTimeoutIsZeroOrLess() throws Exception {
527+
countsAndTimeLimit(0, 0, 0);
528+
params.setShutdownTimeout(-1);
529+
530+
Channel channel = proxy(Channel.class,
531+
callback("basicPublish", (proxy, method, args) -> null),
532+
callback("getNextPublishSeqNo", (proxy, method, args) -> 0L)
533+
);
534+
535+
AtomicInteger connectionCloseCalls = new AtomicInteger(0);
536+
Connection connection = proxy(Connection.class,
537+
callback("createChannel", (proxy, method, args) -> channel),
538+
callback("close", (proxy, method, args) -> connectionCloseCalls.incrementAndGet())
539+
);
540+
541+
MulticastSet multicastSet = getMulticastSet(connectionFactoryThatReturns(connection));
542+
543+
run(multicastSet);
544+
545+
waitForRunToStart();
546+
547+
completionHandler.countDown();
548+
waitAtMost(10, () -> testIsDone.get());
549+
assertThat(connectionCloseCalls.get(), is(1)); // configuration connection is closed after configuration is done
550+
}
551+
552+
@Test
553+
public void shutdownNotCompletedIfTimeoutIsReached() throws Exception {
554+
countsAndTimeLimit(0, 0, 0);
555+
params.setShutdownTimeout(1);
556+
557+
Channel channel = proxy(Channel.class,
558+
callback("basicPublish", (proxy, method, args) -> null),
559+
callback("getNextPublishSeqNo", (proxy, method, args) -> 0L)
560+
);
561+
562+
AtomicInteger connectionCloseCalls = new AtomicInteger(0);
563+
Connection connection = proxy(Connection.class,
564+
callback("createChannel", (proxy, method, args) -> channel),
565+
callback("close", (proxy, method, args) -> {
566+
connectionCloseCalls.incrementAndGet();
567+
// the first call is to close the configuration connection at the beginning of the run,
568+
// so we simulate a timeout when closing a connection during the final shutdown
569+
if (connectionCloseCalls.get() == 2) {
570+
try {
571+
Thread.sleep(5000);
572+
} catch (InterruptedException e) {
573+
// the interrupt flag is cleared after an InterruptedException
574+
Thread.currentThread().interrupt();
575+
}
576+
}
577+
return null;
578+
})
579+
);
580+
581+
MulticastSet multicastSet = getMulticastSet(connectionFactoryThatReturns(connection));
582+
583+
run(multicastSet);
584+
585+
waitForRunToStart();
586+
587+
completionHandler.countDown();
588+
waitAtMost(10, () -> testIsDone.get());
589+
assertThat(connectionCloseCalls.get(), is(2));
590+
}
591+
496592
private Collection<Future<?>> sendMessagesToConsumer(int messagesCount, Consumer consumer) {
497593
final Collection<Future<?>> tasks = new ArrayList<>(messagesCount);
498594
IntStream.range(0, messagesCount).forEach(i -> {
@@ -561,7 +657,11 @@ private void run(MulticastSet multicastSet) {
561657
// one of the tests stops the execution, no need to be noisy
562658
LOGGER.warn("Run has been interrupted");
563659
} catch (Exception e) {
564-
LOGGER.warn("Error during run", e);
660+
if (e.getCause() instanceof InterruptedException) {
661+
LOGGER.warn("Run has been interrupted");
662+
} else {
663+
LOGGER.warn("Error during run", e);
664+
}
565665
}
566666
});
567667
}

0 commit comments

Comments
 (0)