Skip to content

Commit caec3f7

Browse files
committed
add RecoveryDelayHandler
1 parent d291d3f commit caec3f7

File tree

4 files changed

+73
-7
lines changed

4 files changed

+73
-7
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ public class ConnectionFactory implements Cloneable {
112112
// and longs safely. It is unlikely that anybody'd need
113113
// to use recovery intervals > Integer.MAX_VALUE in practice.
114114
private long networkRecoveryInterval = 5000;
115+
private RecoveryDelayHandler recoveryDelayHandler;
115116

116117
private MetricsCollector metricsCollector;
117118

@@ -960,6 +961,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
960961
result.setShutdownTimeout(shutdownTimeout);
961962
result.setSaslConfig(saslConfig);
962963
result.setNetworkRecoveryInterval(networkRecoveryInterval);
964+
result.setRecoveryDelayHandler(recoveryDelayHandler);
963965
result.setTopologyRecovery(topologyRecovery);
964966
result.setExceptionHandler(exceptionHandler);
965967
result.setThreadFactory(threadFactory);
@@ -1077,6 +1079,14 @@ public void setNetworkRecoveryInterval(int networkRecoveryInterval) {
10771079
public void setNetworkRecoveryInterval(long networkRecoveryInterval) {
10781080
this.networkRecoveryInterval = networkRecoveryInterval;
10791081
}
1082+
1083+
public RecoveryDelayHandler getRecoveryDelayHandler() {
1084+
return recoveryDelayHandler;
1085+
}
1086+
1087+
public void setRecoveryDelayHandler(final RecoveryDelayHandler recoveryDelayHandler) {
1088+
this.recoveryDelayHandler = recoveryDelayHandler;
1089+
}
10801090

10811091
/**
10821092
* Sets the parameters when using NIO.
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package com.rabbitmq.client;
2+
3+
import java.util.Arrays;
4+
import java.util.Collections;
5+
import java.util.List;
6+
7+
public interface RecoveryDelayHandler {
8+
9+
long getDelay(final int recoveryAttempts);
10+
11+
public static class DefaultRecoveryDelayHandler implements RecoveryDelayHandler {
12+
13+
private final long networkRecoveryInterval;
14+
15+
public DefaultRecoveryDelayHandler(final long networkRecoveryInterval) {
16+
this.networkRecoveryInterval = networkRecoveryInterval;
17+
}
18+
19+
@Override
20+
public long getDelay(int recoveryAttempts) {
21+
return networkRecoveryInterval;
22+
}
23+
24+
}
25+
26+
public static class ExponentialBackoffDelayHandler implements RecoveryDelayHandler {
27+
28+
private final List<Long> sequence;
29+
30+
public ExponentialBackoffDelayHandler() {
31+
sequence = Arrays.asList(0L, 1000L, 1000L, 2000L, 3000L, 5000L, 8000L, 13000L, 21000L);
32+
}
33+
34+
public ExponentialBackoffDelayHandler(final List<Long> sequence) {
35+
if (sequence.isEmpty())
36+
throw new IllegalArgumentException();
37+
this.sequence = Collections.unmodifiableList(sequence);
38+
39+
}
40+
41+
@Override
42+
public long getDelay(int recoveryAttempts) {
43+
return sequence.get(recoveryAttempts >= sequence.size() ? sequence.size() - 1 : recoveryAttempts);
44+
}
45+
}
46+
}

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.rabbitmq.client.impl;
1717

1818
import com.rabbitmq.client.ExceptionHandler;
19+
import com.rabbitmq.client.RecoveryDelayHandler;
1920
import com.rabbitmq.client.SaslConfig;
2021

2122
import java.util.Map;
@@ -38,6 +39,7 @@ public class ConnectionParams {
3839
private int shutdownTimeout;
3940
private SaslConfig saslConfig;
4041
private long networkRecoveryInterval;
42+
private RecoveryDelayHandler recoveryDelayHandler;
4143
private boolean topologyRecovery;
4244
private int channelRpcTimeout;
4345
private boolean channelShouldCheckRpcResponseType;
@@ -102,6 +104,10 @@ public ExceptionHandler getExceptionHandler() {
102104
public long getNetworkRecoveryInterval() {
103105
return networkRecoveryInterval;
104106
}
107+
108+
public RecoveryDelayHandler getRecoveryDelayHandler() {
109+
return recoveryDelayHandler == null ? new RecoveryDelayHandler.DefaultRecoveryDelayHandler(networkRecoveryInterval) : recoveryDelayHandler;
110+
}
105111

106112
public boolean isTopologyRecoveryEnabled() {
107113
return topologyRecovery;
@@ -162,6 +168,10 @@ public void setSaslConfig(SaslConfig saslConfig) {
162168
public void setNetworkRecoveryInterval(long networkRecoveryInterval) {
163169
this.networkRecoveryInterval = networkRecoveryInterval;
164170
}
171+
172+
public void setRecoveryDelayHandler(final RecoveryDelayHandler recoveryDelayHandler) {
173+
this.recoveryDelayHandler = recoveryDelayHandler;
174+
}
165175

166176
public void setTopologyRecovery(boolean topologyRecovery) {
167177
this.topologyRecovery = topologyRecovery;

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) {
487487
}
488488

489489
synchronized private void beginAutomaticRecovery() throws InterruptedException {
490-
Thread.sleep(this.params.getNetworkRecoveryInterval());
490+
Thread.sleep(this.params.getRecoveryDelayHandler().getDelay(0));
491491

492492
this.notifyRecoveryListenersStarted();
493493

@@ -525,9 +525,10 @@ private void recoverBlockedListeners(final RecoveryAwareAMQConnection newConn) {
525525
// Returns new connection if the connection was recovered,
526526
// null if application initiated shutdown while attempting recovery.
527527
private RecoveryAwareAMQConnection recoverConnection() throws InterruptedException {
528-
while (!manuallyClosed)
529-
{
528+
int attempts = 0;
529+
while (!manuallyClosed) {
530530
try {
531+
attempts++;
531532
RecoveryAwareAMQConnection newConn = this.cf.newConnection();
532533
synchronized(recoveryLock) {
533534
if (!manuallyClosed) {
@@ -541,8 +542,7 @@ private RecoveryAwareAMQConnection recoverConnection() throws InterruptedExcepti
541542
newConn.abort();
542543
return null;
543544
} catch (Exception e) {
544-
// TODO: exponential back-off
545-
Thread.sleep(this.params.getNetworkRecoveryInterval());
545+
Thread.sleep(this.params.getRecoveryDelayHandler().getDelay(attempts));
546546
this.getExceptionHandler().handleConnectionRecoveryException(this, e);
547547
}
548548
}
@@ -561,13 +561,13 @@ private void recoverChannels(final RecoveryAwareAMQConnection newConn) {
561561
}
562562

563563
private void notifyRecoveryListenersComplete() {
564-
for (RecoveryListener f : this.recoveryListeners) {
564+
for (RecoveryListener f : Utility.copy(this.recoveryListeners)) {
565565
f.handleRecovery(this);
566566
}
567567
}
568568

569569
private void notifyRecoveryListenersStarted() {
570-
for (RecoveryListener f : this.recoveryListeners) {
570+
for (RecoveryListener f : Utility.copy(this.recoveryListeners)) {
571571
f.handleRecoveryStarted(this);
572572
}
573573
}

0 commit comments

Comments
 (0)