Skip to content

Commit adb4187

Browse files
committed
Revert "recover channels in parallel too"
This reverts commit 774590a.
1 parent 774590a commit adb4187

File tree

4 files changed

+64
-167
lines changed

4 files changed

+64
-167
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public class ConnectionFactory implements Cloneable {
127127

128128
private boolean automaticRecovery = true;
129129
private boolean topologyRecovery = true;
130-
private int recoveryThreads = 1;
130+
private int topologyRecoveryThreads = 1;
131131

132132
// long is used to make sure the users can use both ints
133133
// and longs safely. It is unlikely that anybody'd need
@@ -714,13 +714,13 @@ public void setTopologyRecoveryEnabled(boolean topologyRecovery) {
714714
this.topologyRecovery = topologyRecovery;
715715
}
716716

717-
public int getRecoveryThreadCount() {
718-
return recoveryThreads;
717+
public int getTopologyRecoveryThreadCount() {
718+
return topologyRecoveryThreads;
719719
}
720720

721721
// TODO Document that your exception handler method should be thread safe
722-
public void setRecoveryThreadCount(final int recoveryThreads) {
723-
this.recoveryThreads = recoveryThreads;
722+
public void setTopologyRecoveryThreadCount(final int topologyRecoveryThreads) {
723+
this.topologyRecoveryThreads = topologyRecoveryThreads;
724724
}
725725

726726
public void setMetricsCollector(MetricsCollector metricsCollector) {
@@ -1021,7 +1021,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
10211021
result.setNetworkRecoveryInterval(networkRecoveryInterval);
10221022
result.setRecoveryDelayHandler(recoveryDelayHandler);
10231023
result.setTopologyRecovery(topologyRecovery);
1024-
result.setRecoveryThreadCount(recoveryThreads);
1024+
result.setTopologyRecoveryThreadCount(topologyRecoveryThreads);
10251025
result.setExceptionHandler(exceptionHandler);
10261026
result.setThreadFactory(threadFactory);
10271027
result.setHandshakeTimeout(handshakeTimeout);

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class ConnectionParams {
4141
private long networkRecoveryInterval;
4242
private RecoveryDelayHandler recoveryDelayHandler;
4343
private boolean topologyRecovery;
44-
private int recoveryThreads = 1;
44+
private int topologyRecoveryThreads = 1;
4545
private int channelRpcTimeout;
4646
private boolean channelShouldCheckRpcResponseType;
4747
private ErrorOnWriteListener errorOnWriteListener;
@@ -116,8 +116,8 @@ public boolean isTopologyRecoveryEnabled() {
116116
return topologyRecovery;
117117
}
118118

119-
public int getRecoveryThreadCount() {
120-
return recoveryThreads;
119+
public int getTopologyRecoveryThreadCount() {
120+
return topologyRecoveryThreads;
121121
}
122122

123123
public ThreadFactory getThreadFactory() {
@@ -180,8 +180,8 @@ public void setTopologyRecovery(boolean topologyRecovery) {
180180
this.topologyRecovery = topologyRecovery;
181181
}
182182

183-
public void setRecoveryThreadCount(final int recoveryThreads) {
184-
this.recoveryThreads = recoveryThreads;
183+
public void setTopologyRecoveryThreadCount(final int topologyRecoveryThreads) {
184+
this.topologyRecoveryThreads = topologyRecoveryThreads;
185185
}
186186

187187
public void setExceptionHandler(ExceptionHandler exceptionHandler) {

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 29 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -540,25 +540,12 @@ private synchronized void beginAutomaticRecovery() throws InterruptedException {
540540
this.addAutomaticRecoveryListener(newConn);
541541
this.recoverShutdownListeners(newConn);
542542
this.recoverBlockedListeners(newConn);
543-
544-
// Optionally support recovering channels & entities in parallel for connections that have a lot of channels, queues, bindings, etc.
545-
ExecutorService executor = null;
546-
if (params.getRecoveryThreadCount() > 1) {
547-
executor = Executors.newFixedThreadPool(params.getRecoveryThreadCount(), delegate.getThreadFactory());
548-
}
549-
try {
550-
this.recoverChannels(newConn, executor);
551-
// don't assign new delegate connection until channel recovery is complete
552-
this.delegate = newConn;
553-
// recover topology
554-
if (this.params.isTopologyRecoveryEnabled()) {
555-
recoverTopology(executor);
556-
}
557-
} finally {
558-
if (executor != null) {
559-
executor.shutdownNow();
560-
}
561-
}
543+
this.recoverChannels(newConn);
544+
// don't assign new delegate connection until channel recovery is complete
545+
this.delegate = newConn;
546+
if (this.params.isTopologyRecoveryEnabled()) {
547+
recoverTopology(params.getTopologyRecoveryThreadCount());
548+
}
562549
this.notifyRecoveryListenersComplete();
563550
}
564551

@@ -602,34 +589,16 @@ private RecoveryAwareAMQConnection recoverConnection() throws InterruptedExcepti
602589
return null;
603590
}
604591

605-
private void recoverChannels(final RecoveryAwareAMQConnection newConn, final ExecutorService executor) throws InterruptedException {
606-
if (executor != null) {
607-
final List<Callable<Object>> tasks = new ArrayList<Callable<Object>>();
608-
for (final AutorecoveringChannel ch : this.channels.values()) {
609-
tasks.add(Executors.callable(new Runnable() {
610-
@Override
611-
public void run() {
612-
recoverChannel(newConn, ch);
613-
}
614-
}));
615-
}
616-
// invokeAll will block until all callables are completed
617-
executor.invokeAll(tasks);
618-
} else {
619-
for (final AutorecoveringChannel ch : this.channels.values()) {
620-
recoverChannel(newConn, ch);
592+
private void recoverChannels(final RecoveryAwareAMQConnection newConn) {
593+
for (AutorecoveringChannel ch : this.channels.values()) {
594+
try {
595+
ch.automaticallyRecover(this, newConn);
596+
LOGGER.debug("Channel {} has recovered", ch);
597+
} catch (Throwable t) {
598+
newConn.getExceptionHandler().handleChannelRecoveryException(ch, t);
621599
}
622600
}
623601
}
624-
625-
private void recoverChannel(final RecoveryAwareAMQConnection newConn, final AutorecoveringChannel ch) {
626-
try {
627-
ch.automaticallyRecover(this, newConn);
628-
LOGGER.debug("Channel {} has recovered", ch);
629-
} catch (Throwable t) {
630-
newConn.getExceptionHandler().handleChannelRecoveryException(ch, t);
631-
}
632-
}
633602

634603
private void notifyRecoveryListenersComplete() {
635604
for (RecoveryListener f : Utility.copy(this.recoveryListeners)) {
@@ -643,20 +612,26 @@ private void notifyRecoveryListenersStarted() {
643612
}
644613
}
645614

646-
private void recoverTopology(final ExecutorService executor) throws InterruptedException {
615+
private void recoverTopology(final int recoveryThreads) throws InterruptedException {
647616
// The recovery sequence is the following:
648617
// 1. Recover exchanges
649618
// 2. Recover queues
650619
// 3. Recover bindings
651620
// 4. Recover consumers
652-
if (executor != null) {
621+
if (recoveryThreads > 1) {
622+
// Support recovering entities in parallel for connections that have a lot of queues, bindings, & consumers
653623
// A channel is single threaded, so group things by channel and recover 1 entity at a time per channel
654624
// We still need to recover 1 type of entity at a time in case channel1 has a binding to a queue that is currently owned and being recovered by channel2 for example
655-
// invokeAll will block until all callables are completed
656-
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedExchanges).values()));
657-
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedQueues).values()));
658-
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedBindings)));
659-
executor.invokeAll(groupEntitiesByChannel(Utility.copy(consumers).values()));
625+
final ExecutorService executor = Executors.newFixedThreadPool(recoveryThreads, delegate.getThreadFactory());
626+
try {
627+
// invokeAll will block until all callables are completed
628+
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedExchanges).values()));
629+
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedQueues).values()));
630+
executor.invokeAll(groupEntitiesByChannel(Utility.copy(recordedBindings)));
631+
executor.invokeAll(groupEntitiesByChannel(Utility.copy(consumers).values()));
632+
} finally {
633+
executor.shutdownNow();
634+
}
660635
} else {
661636
// recover entities in serial on the main connection thread
662637
for (final RecordedExchange exchange : Utility.copy(recordedExchanges).values()) {
@@ -785,9 +760,9 @@ private <E extends RecordedEntity> List<Callable<Object>> groupEntitiesByChannel
785760
list.add(entity);
786761
}
787762
// now create a runnable per channel
788-
final List<Callable<Object>> tasks = new ArrayList<Callable<Object>>();
763+
final List<Callable<Object>> callables = new ArrayList<Callable<Object>>();
789764
for (final List<E> entityList : map.values()) {
790-
tasks.add(Executors.callable(new Runnable() {
765+
callables.add(Executors.callable(new Runnable() {
791766
@Override
792767
public void run() {
793768
for (final E entity : entityList) {
@@ -806,7 +781,7 @@ public void run() {
806781
}
807782
}));
808783
}
809-
return tasks;
784+
return callables;
810785
}
811786

812787
void recordQueueBinding(AutorecoveringChannel ch,

0 commit comments

Comments
 (0)