Skip to content

Commit cb8629d

Browse files
authored
Merge pull request #228 from rabbitmq/rabbitmq-perf-test-222-reason-for-stopping
Print out reason(s) for stopping
2 parents 0980e6e + 1781a88 commit cb8629d

16 files changed

+199
-72
lines changed

src/main/java/com/rabbitmq/perf/Consumer.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class Consumer extends AgentBase implements Runnable {
4040

4141
private static final AckNackOperation NACK_OPERATION =
4242
(ch, envelope, multiple) -> ch.basicNack(envelope.getDeliveryTag(), multiple, true);
43+
static final String STOP_REASON_CONSUMER_REACHED_MESSAGE_LIMIT = "Consumer reached message limit";
4344

4445
private volatile ConsumerImpl q;
4546
private final Channel channel;
@@ -250,7 +251,7 @@ void handleMessage(Envelope envelope, BasicProperties properties, byte[] body, C
250251
}
251252
}
252253
if (msgLimit != 0 && currentMessageCount >= msgLimit) { // NB: not quite the inverse of above
253-
countDown();
254+
countDown(STOP_REASON_CONSUMER_REACHED_MESSAGE_LIMIT);
254255
}
255256
}
256257

@@ -280,7 +281,7 @@ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig
280281
);
281282
if (!recoveryProcess.isEnabled()) {
282283
LOGGER.debug("Counting down for consumer");
283-
countDown();
284+
countDown("Consumer shut down");
284285
}
285286
}
286287

@@ -297,9 +298,9 @@ public void handleCancel(String consumerTag) throws IOException {
297298
}
298299
}
299300

300-
private void countDown() {
301+
private void countDown(String reason) {
301302
if (completed.compareAndSet(false, true)) {
302-
completionHandler.countDown();
303+
completionHandler.countDown(reason);
303304
}
304305
}
305306

src/main/java/com/rabbitmq/perf/MulticastSet.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class MulticastSet {
4949
*/
5050
private static final int PUBLISHING_INTERVAL_NB_PRODUCERS_PER_THREAD = 50;
5151
private static final String PRODUCER_THREAD_PREFIX = "perf-test-producer-";
52+
static final String STOP_REASON_REACHED_TIME_LIMIT = "Reached time limit";
5253
private final Stats stats;
5354
private final ConnectionFactory factory;
5455
private final MulticastParams params;
@@ -501,7 +502,7 @@ public interface CompletionHandler {
501502

502503
void waitForCompletion() throws InterruptedException;
503504

504-
void countDown();
505+
void countDown(String reason);
505506
}
506507

507508
static class DefaultThreadingHandler implements ThreadingHandler {
@@ -566,34 +567,49 @@ private static class AgentState {
566567
private Future<?> task;
567568
}
568569

570+
private static void recordReason(Map<String, Integer> reasons, String reason) {
571+
reasons.compute(reason, (keyReason, count) -> count == null ? 1 : ++count);
572+
}
573+
569574
static class DefaultCompletionHandler implements CompletionHandler {
570575

571576
private final int timeLimit;
572577
private final CountDownLatch latch;
578+
private final ConcurrentMap<String, Integer> reasons;
579+
private final AtomicBoolean completed = new AtomicBoolean(false);
573580

574-
DefaultCompletionHandler(int timeLimit, int countLimit) {
581+
DefaultCompletionHandler(int timeLimit, int countLimit, ConcurrentMap<String, Integer> reasons) {
575582
this.timeLimit = timeLimit;
576583
this.latch = new CountDownLatch(countLimit <= 0 ? 1 : countLimit);
584+
this.reasons = reasons;
577585
}
578586

579587
@Override
580588
public void waitForCompletion() throws InterruptedException {
581589
if (timeLimit <= 0) {
582590
this.latch.await();
591+
completed.set(true);
583592
} else {
584593
boolean countedDown = this.latch.await(timeLimit, TimeUnit.SECONDS);
594+
completed.set(true);
585595
if (LOGGER.isDebugEnabled()) {
586596
LOGGER.debug("Completed, counted down? {}", countedDown);
587597
}
598+
if (!countedDown) {
599+
recordReason(reasons, STOP_REASON_REACHED_TIME_LIMIT);
600+
}
588601
}
589602
}
590603

591604
@Override
592-
public void countDown() {
605+
public void countDown(String reason) {
593606
if (LOGGER.isDebugEnabled()) {
594-
LOGGER.debug("Counting down");
607+
LOGGER.debug("Counting down ({})", reason);
608+
}
609+
if (!completed.get()) {
610+
recordReason(reasons, reason);
611+
latch.countDown();
595612
}
596-
latch.countDown();
597613
}
598614
}
599615

@@ -605,14 +621,20 @@ public void countDown() {
605621
static class NoLimitCompletionHandler implements CompletionHandler {
606622

607623
private final CountDownLatch latch = new CountDownLatch(1);
624+
private final ConcurrentMap<String, Integer> reasons;
625+
626+
NoLimitCompletionHandler(ConcurrentMap<String, Integer> reasons) {
627+
this.reasons = reasons;
628+
}
608629

609630
@Override
610631
public void waitForCompletion() throws InterruptedException {
611632
latch.await();
612633
}
613634

614635
@Override
615-
public void countDown() {
636+
public void countDown(String reason) {
637+
recordReason(reasons, reason);
616638
latch.countDown();
617639
}
618640
}

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,7 @@
3232
import java.security.NoSuchAlgorithmException;
3333
import java.text.SimpleDateFormat;
3434
import java.util.*;
35-
import java.util.concurrent.ExecutorService;
36-
import java.util.concurrent.SynchronousQueue;
37-
import java.util.concurrent.ThreadPoolExecutor;
38-
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.*;
3936
import java.util.function.Function;
4037

4138
import static com.rabbitmq.perf.OptionsUtils.forEach;
@@ -318,13 +315,16 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
318315
p.setBodyFieldCount(bodyFieldCount);
319316
p.setBodyCount(bodyCount);
320317

321-
MulticastSet.CompletionHandler completionHandler = getCompletionHandler(p);
318+
ConcurrentMap<String, Integer> completionReasons = new ConcurrentHashMap<>();
319+
320+
MulticastSet.CompletionHandler completionHandler = getCompletionHandler(p, completionReasons);
322321

323322
factory.setExceptionHandler(perfTestOptions.exceptionHandler);
324323

325324
MulticastSet set = new MulticastSet(stats, factory, p, testID, uris, completionHandler, shutdownService);
326325
set.run(true);
327326

327+
System.out.println(stopLine(completionReasons));
328328
stats.printFinal();
329329
} catch (ParseException exp) {
330330
System.err.println("Parsing failed. Reason: " + exp.getMessage());
@@ -340,6 +340,26 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
340340
systemExiter.exit(exitStatus);
341341
}
342342

343+
static String stopLine(Map<String, Integer> reasons) {
344+
StringBuilder stoppedLine = new StringBuilder("test stopped");
345+
if (reasons.size() > 0) {
346+
stoppedLine.append(" (");
347+
int count = 1;
348+
for (Map.Entry<String, Integer> reasonToCount : reasons.entrySet()) {
349+
stoppedLine.append(reasonToCount.getKey());
350+
if (reasonToCount.getValue() > 1) {
351+
stoppedLine.append(" [").append(reasonToCount.getValue()).append("]");
352+
}
353+
if (count < reasons.size()) {
354+
stoppedLine.append(", ");
355+
}
356+
count++;
357+
}
358+
stoppedLine.append(")");
359+
}
360+
return stoppedLine.toString();
361+
}
362+
343363
private static PrintWriter openCsvFileForWriting(String outputFile, ShutdownService shutdownService) throws IOException {
344364
PrintWriter output;
345365
File file = new File(outputFile);
@@ -396,7 +416,7 @@ private static ConnectionFactory configureNioIfRequested(CommandLineProxy cmd, C
396416
};
397417
}
398418

399-
static MulticastSet.CompletionHandler getCompletionHandler(MulticastParams p) {
419+
static MulticastSet.CompletionHandler getCompletionHandler(MulticastParams p, ConcurrentMap<String, Integer> reasons) {
400420
MulticastSet.CompletionHandler completionHandler;
401421
if (p.hasLimit()) {
402422
int countLimit = 0;
@@ -406,14 +426,14 @@ static MulticastSet.CompletionHandler getCompletionHandler(MulticastParams p) {
406426
countLimit += p.getProducerThreadCount();
407427
}
408428
if (p.getConsumerMsgCount() > 0) {
409-
countLimit += p.getProducerThreadCount();
429+
countLimit += p.getConsumerThreadCount();
410430
}
411431
completionHandler = new MulticastSet.DefaultCompletionHandler(
412432
p.getTimeLimit(),
413-
countLimit
414-
);
433+
countLimit,
434+
reasons);
415435
} else {
416-
completionHandler = new MulticastSet.NoLimitCompletionHandler();
436+
completionHandler = new MulticastSet.NoLimitCompletionHandler(reasons);
417437
}
418438
return completionHandler;
419439
}

src/main/java/com/rabbitmq/perf/Producer.java

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ public class Producer extends AgentBase implements Runnable, ReturnListener,
6363
public static final String APP_ID_PROPERTY = "appId";
6464
public static final String CLUSTER_ID_PROPERTY = "clusterId";
6565
public static final String TIMESTAMP_HEADER = TIMESTAMP_PROPERTY;
66+
static final String STOP_REASON_PRODUCER_MESSAGE_LIMIT = "Producer reached message limit";
67+
static final String STOP_REASON_PRODUCER_THREAD_INTERRUPTED = "Producer thread interrupted";
68+
static final String STOP_REASON_ERROR_IN_PRODUCER = "Error in producer";
6669
private final Channel channel;
6770
private final String exchangeName;
6871
private final String id;
@@ -356,12 +359,25 @@ public void run() {
356359
}
357360
} catch (RuntimeException e) {
358361
LOGGER.debug("Error in publisher", e);
362+
String reason;
363+
if (e.getCause() instanceof InterruptedException && this.rateIndicator.getValue() != 0.0f) {
364+
// likely to have been interrupted while sleeping to honor rate limit
365+
reason = STOP_REASON_PRODUCER_THREAD_INTERRUPTED;
366+
} else {
367+
reason = STOP_REASON_ERROR_IN_PRODUCER;
368+
}
359369
// failing, we don't want to block the whole process, so counting down
360-
countDown();
370+
countDown(reason);
361371
throw e;
362372
}
363373
if (state.getMsgCount() >= msgLimit) {
364-
countDown();
374+
String reason;
375+
if (msgLimit == 0) {
376+
reason = STOP_REASON_PRODUCER_THREAD_INTERRUPTED;
377+
} else {
378+
reason = STOP_REASON_PRODUCER_MESSAGE_LIMIT;
379+
}
380+
countDown(reason);
365381
}
366382
}
367383

@@ -406,7 +422,7 @@ public int incrementMessageCount() {
406422
maybeHandlePublish(state);
407423
} catch (RuntimeException e) {
408424
// failing, we don't want to block the whole process, so counting down
409-
countDown();
425+
countDown("Error in scheduled producer");
410426
throw e;
411427
}
412428
};
@@ -416,7 +432,21 @@ public void maybeHandlePublish(AgentState state) {
416432
if (keepGoing(state)) {
417433
handlePublish(state);
418434
} else {
419-
countDown();
435+
String reason;
436+
if (messageLimitReached(state)) {
437+
reason = STOP_REASON_PRODUCER_MESSAGE_LIMIT;
438+
} else {
439+
reason = STOP_REASON_PRODUCER_THREAD_INTERRUPTED;
440+
}
441+
countDown(reason);
442+
}
443+
}
444+
445+
private boolean messageLimitReached(AgentState state) {
446+
if (msgLimit == 0) {
447+
return false;
448+
} else {
449+
return state.getMsgCount() >= msgLimit;
420450
}
421451
}
422452

@@ -500,9 +530,9 @@ private void publish(MessageBodySource.MessageEnvelope messageEnvelope)
500530
messageEnvelope.getBody());
501531
}
502532

503-
private void countDown() {
533+
private void countDown(String reason) {
504534
if (completed.compareAndSet(false, true)) {
505-
completionHandler.countDown();
535+
completionHandler.countDown(reason);
506536
}
507537
}
508538

src/main/java/com/rabbitmq/perf/SimpleScenario.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.net.URISyntaxException;
2222
import java.security.KeyManagementException;
2323
import java.security.NoSuchAlgorithmException;
24+
import java.util.concurrent.ConcurrentHashMap;
2425
import java.util.concurrent.ExecutionException;
2526
import java.util.concurrent.TimeoutException;
2627

@@ -46,7 +47,7 @@ public void run()
4647
throws IOException, InterruptedException, TimeoutException, NoSuchAlgorithmException, KeyManagementException, URISyntaxException, ExecutionException {
4748
this.stats = new SimpleScenarioStats(interval);
4849
for (MulticastParams p : params) {
49-
MulticastSet set = new MulticastSet(stats, factory, p, null, PerfTest.getCompletionHandler(p));
50+
MulticastSet set = new MulticastSet(stats, factory, p, null, PerfTest.getCompletionHandler(p, new ConcurrentHashMap<>()));
5051
stats.setup(p);
5152
set.run();
5253
}

src/main/java/com/rabbitmq/perf/VaryingScenario.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.util.ArrayList;
2121
import java.util.List;
22+
import java.util.concurrent.ConcurrentHashMap;
2223

2324
public class VaryingScenario implements Scenario {
2425
private final String name;
@@ -60,7 +61,7 @@ private void run(Variable[] variables, List<VariableValue> values) throws Except
6061
for (VariableValue value : values) {
6162
value.setup(p);
6263
}
63-
MulticastSet set = new MulticastSet(stats0, factory, p, null, PerfTest.getCompletionHandler(p));
64+
MulticastSet set = new MulticastSet(stats0, factory, p, null, PerfTest.getCompletionHandler(p, new ConcurrentHashMap<>()));
6465
stats0.setup(p);
6566
set.run();
6667
for (VariableValue value : values) {

src/test/java/com/rabbitmq/perf/ConsumerSharingThreadsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public void waitForCompletion() {
117117
}
118118

119119
@Override
120-
public void countDown() {
120+
public void countDown(String reason) {
121121
}
122122
}
123123
);

0 commit comments

Comments
 (0)