1818import static com .rabbitmq .perf .Utils .isRecoverable ;
1919import static java .lang .Math .min ;
2020import static java .lang .String .format ;
21+ import static java .util .Collections .singletonList ;
2122
22- import com .rabbitmq .client .AMQP ;
23- import com .rabbitmq .client .Address ;
24- import com .rabbitmq .client .AlreadyClosedException ;
25- import com .rabbitmq .client .Connection ;
26- import com .rabbitmq .client .ConnectionFactory ;
27- import com .rabbitmq .client .Recoverable ;
28- import com .rabbitmq .client .RecoveryListener ;
23+ import com .rabbitmq .client .*;
2924import com .rabbitmq .client .impl .recovery .AutorecoveringConnection ;
3025import com .rabbitmq .perf .PerfTest .EXIT_WHEN ;
3126import com .rabbitmq .perf .metrics .PerformanceMetrics ;
@@ -839,7 +834,9 @@ static class DefaultCompletionHandler implements CompletionHandler {
839834 DefaultCompletionHandler (
840835 int timeLimit , int countLimit , ConcurrentMap <String , Integer > reasons ) {
841836 this .timeLimit = timeLimit ;
842- this .latch = new CountDownLatch (countLimit <= 0 ? 1 : countLimit );
837+ int count = countLimit <= 0 ? 1 : countLimit ;
838+ LOGGER .debug ("Count completion limit is {}" , count );
839+ this .latch = new CountDownLatch (count );
843840 this .reasons = reasons ;
844841 }
845842
@@ -983,15 +980,18 @@ private ConnectionCreator(ConnectionFactory cf, List<String> uris) {
983980 * @throws TimeoutException
984981 */
985982 Connection createConnection (String name ) throws IOException , TimeoutException {
983+ Connection connection ;
986984 if (this .addresses .isEmpty ()) {
987- return this .cf .newConnection (name );
985+ connection = this .cf .newConnection (name );
988986 } else {
989987 List <Address > addrs = new ArrayList <>(addresses );
990988 if (addresses .size () > 1 ) {
991989 Collections .shuffle (addrs );
992990 }
993- return this .cf .newConnection (addrs , name );
991+ connection = this .cf .newConnection (addrs , name );
994992 }
993+ addBlockedListener (connection );
994+ return connection ;
995995 }
996996
997997 /**
@@ -1003,16 +1003,28 @@ Connection createConnection(String name) throws IOException, TimeoutException {
10031003 */
10041004 List <Connection > createConfigurationConnections () throws IOException , TimeoutException {
10051005 if (this .addresses .isEmpty ()) {
1006- return Collections . singletonList (createConnection ("perf-test-configuration-0" ));
1006+ return singletonList (createConnection ("perf-test-configuration-0" ));
10071007 } else {
10081008 List <Connection > connections = new ArrayList <>(this .addresses .size ());
10091009 for (int i = 0 ; i < addresses .size (); i ++) {
1010- connections .add (
1011- this .cf .newConnection (
1012- Collections .singletonList (addresses .get (i )), "perf-test-configuration-" + i ));
1010+ String name = "perf-test-configuration-" + i ;
1011+ Connection c = this .cf .newConnection (singletonList (addresses .get (i )), name );
1012+ addBlockedListener (c );
1013+ connections .add (c );
10131014 }
10141015 return Collections .unmodifiableList (connections );
10151016 }
10161017 }
1018+
1019+ private static void addBlockedListener (Connection connection ) {
1020+ String name = connection .getClientProvidedName ();
1021+ connection .addBlockedListener (
1022+ reason -> logger ().debug ("Connection '{}' blocked: {}." , name , reason ),
1023+ () -> logger ().debug ("Connection '{}' unblocked." , name ));
1024+ }
1025+
1026+ private static Logger logger () {
1027+ return LoggerFactory .getLogger ("com.rabbitmq.perf.ConnectionCreator" );
1028+ }
10171029 }
10181030}
0 commit comments