Skip to content

Commit b29ebe6

Browse files
committed
code review comments
1 parent f55ad22 commit b29ebe6

File tree

5 files changed

+54
-37
lines changed

5 files changed

+54
-37
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public class ConnectionFactory implements Cloneable {
127127

128128
private boolean automaticRecovery = true;
129129
private boolean topologyRecovery = true;
130-
private int topologyRecoveryThreads = 1;
130+
private ExecutorService topologyRecoveryExecutor;
131131

132132
// long is used to make sure the users can use both ints
133133
// and longs safely. It is unlikely that anybody'd need
@@ -714,13 +714,22 @@ public void setTopologyRecoveryEnabled(boolean topologyRecovery) {
714714
this.topologyRecovery = topologyRecovery;
715715
}
716716

717-
public int getTopologyRecoveryThreadCount() {
718-
return topologyRecoveryThreads;
717+
/**
718+
* Get the executor to use for parallel topology recovery. If null (the default), recovery is done single threaded on the main connection thread.
719+
* @return thread pool executor
720+
*/
721+
public ExecutorService getTopologyRecoveryExecutor() {
722+
return topologyRecoveryExecutor;
719723
}
720724

721-
// TODO Document that your exception handler method should be thread safe
722-
public void setTopologyRecoveryThreadCount(final int topologyRecoveryThreads) {
723-
this.topologyRecoveryThreads = topologyRecoveryThreads;
725+
/**
726+
* Set the executor to use for parallel topology recovery. If null (the default), recovery is done single threaded on the main connection thread.
727+
* It is recommended to pass a ThreadPoolExecutor that will allow its core threads to timeout so these threads can die when recovery is complete.
728+
* Note: your {@link ExceptionHandler#handleTopologyRecoveryException(Connection, Channel, TopologyRecoveryException)} method should be thread-safe.
729+
* @param topologyRecoveryExecutor thread pool executor
730+
*/
731+
public void setTopologyRecoveryExecutor(final ExecutorService topologyRecoveryExecutor) {
732+
this.topologyRecoveryExecutor = topologyRecoveryExecutor;
724733
}
725734

726735
public void setMetricsCollector(MetricsCollector metricsCollector) {
@@ -1021,7 +1030,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
10211030
result.setNetworkRecoveryInterval(networkRecoveryInterval);
10221031
result.setRecoveryDelayHandler(recoveryDelayHandler);
10231032
result.setTopologyRecovery(topologyRecovery);
1024-
result.setTopologyRecoveryThreadCount(topologyRecoveryThreads);
1033+
result.setTopologyRecoveryExecutor(topologyRecoveryExecutor);
10251034
result.setExceptionHandler(exceptionHandler);
10261035
result.setThreadFactory(threadFactory);
10271036
result.setHandshakeTimeout(handshakeTimeout);

src/main/java/com/rabbitmq/client/ExceptionHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ void handleConsumerException(Channel channel,
108108
* during topology (exchanges, queues, bindings, consumers) recovery
109109
* that it can't otherwise deal with.
110110
* @param conn the Connection that caught the exception
111-
* @param ch the Channel that caught the exception
111+
* @param ch the Channel that caught the exception. May be null.
112112
* @param exception the exception caught in the driver thread
113113
*/
114114

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class ConnectionParams {
4141
private long networkRecoveryInterval;
4242
private RecoveryDelayHandler recoveryDelayHandler;
4343
private boolean topologyRecovery;
44-
private int topologyRecoveryThreads = 1;
44+
private ExecutorService topologyRecoveryExecutor;
4545
private int channelRpcTimeout;
4646
private boolean channelShouldCheckRpcResponseType;
4747
private ErrorOnWriteListener errorOnWriteListener;
@@ -116,8 +116,12 @@ public boolean isTopologyRecoveryEnabled() {
116116
return topologyRecovery;
117117
}
118118

119-
public int getTopologyRecoveryThreadCount() {
120-
return topologyRecoveryThreads;
119+
/**
120+
* Get the topology recovery executor. If null, the main connection thread should be used.
121+
* @return executor. May be null.
122+
*/
123+
public ExecutorService getTopologyRecoveryExecutor() {
124+
return topologyRecoveryExecutor;
121125
}
122126

123127
public ThreadFactory getThreadFactory() {
@@ -180,8 +184,8 @@ public void setTopologyRecovery(boolean topologyRecovery) {
180184
this.topologyRecovery = topologyRecovery;
181185
}
182186

183-
public void setTopologyRecoveryThreadCount(final int topologyRecoveryThreads) {
184-
this.topologyRecoveryThreads = topologyRecoveryThreads;
187+
public void setTopologyRecoveryExecutor(final ExecutorService topologyRecoveryExecutor) {
188+
this.topologyRecoveryExecutor = topologyRecoveryExecutor;
185189
}
186190

187191
public void setExceptionHandler(ExceptionHandler exceptionHandler) {

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,7 @@ private synchronized void beginAutomaticRecovery() throws InterruptedException {
544544
// don't assign new delegate connection until channel recovery is complete
545545
this.delegate = newConn;
546546
if (this.params.isTopologyRecoveryEnabled()) {
547-
recoverTopology(params.getTopologyRecoveryThreadCount());
547+
recoverTopology(params.getTopologyRecoveryExecutor());
548548
}
549549
this.notifyRecoveryListenersComplete();
550550
}
@@ -612,27 +612,13 @@ private void notifyRecoveryListenersStarted() {
612612
}
613613
}
614614

615-
private void recoverTopology(final int recoveryThreads) throws InterruptedException {
615+
private void recoverTopology(final ExecutorService executor) {
616616
// The recovery sequence is the following:
617617
// 1. Recover exchanges
618618
// 2. Recover queues
619619
// 3. Recover bindings
620620
// 4. Recover consumers
621-
if (recoveryThreads > 1) {
622-
// Support recovering entities in parallel for connections that have a lot of queues, bindings, & consumers
623-
// A channel is single threaded, so group things by channel and recover 1 entity at a time per channel
624-
// We still need to recover 1 type of entity at a time in case channel1 has a binding to a queue that is currently owned and being recovered by channel2 for example
625-
final ExecutorService executor = Executors.newFixedThreadPool(recoveryThreads, delegate.getThreadFactory());
626-
try {
627-
// invokeAll will block until all callables are completed
628-
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedExchanges).values()));
629-
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedQueues).values()));
630-
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedBindings)));
631-
executor.invokeAll(groupEntitiesByChannel(Utility.copy(consumers).values()));
632-
} finally {
633-
executor.shutdownNow();
634-
}
635-
} else {
621+
if (executor == null) {
636622
// recover entities in serial on the main connection thread
637623
for (final RecordedExchange exchange : Utility.copy(recordedExchanges).values()) {
638624
recoverExchange(exchange);
@@ -646,6 +632,21 @@ private void recoverTopology(final int recoveryThreads) throws InterruptedExcept
646632
for (final Map.Entry<String, RecordedConsumer> entry : Utility.copy(consumers).entrySet()) {
647633
recoverConsumer(entry.getKey(), entry.getValue());
648634
}
635+
} else {
636+
// Support recovering entities in parallel for connections that have a lot of queues, bindings, & consumers
637+
// A channel is single threaded, so group things by channel and recover 1 entity at a time per channel
638+
// We also need to recover 1 type of entity at a time in case channel1 has a binding to a queue that is currently owned and being recovered by channel2 for example
639+
// Note: invokeAll will block until all callables are completed and all returned futures will be complete
640+
try {
641+
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedExchanges).values()));
642+
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedQueues).values()));
643+
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedBindings)));
644+
executor.invokeAll(groupEntitiesByChannel(Utility.copy(consumers).values()));
645+
} catch (final Exception cause) {
646+
final String message = "Caught an exception while recovering toplogy: " + cause.getMessage();
647+
final TopologyRecoveryException e = new TopologyRecoveryException(message, cause);
648+
getExceptionHandler().handleTopologyRecoveryException(delegate, null, e);
649+
}
649650
}
650651
}
651652

src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232

3333
import java.util.concurrent.CopyOnWriteArrayList;
3434
import java.util.concurrent.CountDownLatch;
35+
import java.util.concurrent.ExecutorService;
36+
import java.util.concurrent.LinkedBlockingQueue;
37+
import java.util.concurrent.ThreadPoolExecutor;
3538
import java.util.concurrent.TimeUnit;
3639
import java.util.concurrent.TimeoutException;
3740
import java.util.concurrent.atomic.AtomicInteger;
@@ -809,8 +812,12 @@ public void handleDelivery(String consumerTag,
809812

810813
@Test public void recoveryWithMultipleThreads() throws Exception {
811814
// test with 8 recovery threads
812-
ConnectionFactory connectionFactory = buildConnectionFactoryWithRecoveryEnabled(false, 8);
813-
assertEquals(8, connectionFactory.getTopologyRecoveryThreadCount());
815+
final ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 8, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
816+
executor.allowCoreThreadTimeOut(true);
817+
ConnectionFactory connectionFactory = buildConnectionFactoryWithRecoveryEnabled(false);
818+
assertNull(connectionFactory.getTopologyRecoveryExecutor());
819+
connectionFactory.setTopologyRecoveryExecutor(executor);
820+
assertEquals(executor, connectionFactory.getTopologyRecoveryExecutor());
814821
RecoverableConnection testConnection = (RecoverableConnection) connectionFactory.newConnection();
815822
try {
816823
final List<Channel> channels = new ArrayList<Channel>();
@@ -822,6 +829,7 @@ public void handleDelivery(String consumerTag,
822829
final CountDownLatch latch = new CountDownLatch(channelCount * queuesPerChannel);
823830
for (int i=0; i < channelCount; i++) {
824831
final Channel testChannel = testConnection.createChannel();
832+
channels.add(testChannel);
825833
String x = "tmp-x-topic-" + i;
826834
exchanges.add(x);
827835
testChannel.exchangeDeclare(x, "topic");
@@ -1019,17 +1027,12 @@ private static RecoverableConnection newRecoveringConnection(String connectionNa
10191027
}
10201028

10211029
private static ConnectionFactory buildConnectionFactoryWithRecoveryEnabled(boolean disableTopologyRecovery) {
1022-
return buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery, 1);
1023-
}
1024-
1025-
private static ConnectionFactory buildConnectionFactoryWithRecoveryEnabled(boolean disableTopologyRecovery, final int recoveryThreads) {
10261030
ConnectionFactory cf = TestUtils.connectionFactory();
10271031
cf.setNetworkRecoveryInterval(RECOVERY_INTERVAL);
10281032
cf.setAutomaticRecoveryEnabled(true);
10291033
if (disableTopologyRecovery) {
10301034
cf.setTopologyRecoveryEnabled(false);
10311035
}
1032-
cf.setTopologyRecoveryThreadCount(recoveryThreads);
10331036
return cf;
10341037
}
10351038

0 commit comments

Comments
 (0)