Skip to content

Commit 8a44d44

Browse files
ggivoCopilot
andauthored
[automatic-failover] Implement failover retry for in-flight commands (#4175)
* Implement failover retry for in-flight commands This change adds support for retrying in-flight commands when a Redis connection fails and the client automatically fails over to another cluster. The feature is configurable through the FailoverOptions builder. Key changes: - Added retryFailedInflightCommands option to FailoverOptions - Implemented retry logic in CircuitBreakerCommandExecutor - Added integration tests to verify both retry and no-retry behavior - Created utility methods for test setup and configuration This enhancement improves resilience for long-running commands like blocking operations, allowing them to transparently continue on the failover cluster without client-side errors. * Update src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java Co-authored-by: Copilot <[email protected]> # Conflicts: # src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java * format format & clean-up * fix FailoverIntegrationTest.testInflightCommandsAreNotRetriedAfterFailover blpop timeout should be less than async command timeout to prevent completing with java.util.concurrent.TimeoutException instead of actuall command failure * Address comments from review - rename retryFailedInflightCommands->retryOnFailover - remove check for CB in OPEN state * remove unused method --------- Co-authored-by: Copilot <[email protected]>
1 parent bce08f7 commit 8a44d44

File tree

6 files changed

+285
-74
lines changed

6 files changed

+285
-74
lines changed

src/main/java/redis/clients/jedis/MultiClusterClientConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import redis.clients.jedis.annots.Experimental;
1212
import redis.clients.jedis.exceptions.JedisConnectionException;
1313
import redis.clients.jedis.exceptions.JedisValidationException;
14-
14+
import redis.clients.jedis.mcf.ConnectionFailoverException;
1515

1616
/**
1717
* @author Allen Terleto (aterleto)
@@ -43,7 +43,7 @@ public final class MultiClusterClientConfig {
4343
private static final float CIRCUIT_BREAKER_SLOW_CALL_RATE_THRESHOLD_DEFAULT = 100.0f; // measured as percentage
4444
private static final List<Class> CIRCUIT_BREAKER_INCLUDED_EXCEPTIONS_DEFAULT = Arrays.asList(JedisConnectionException.class);
4545

46-
private static final List<Class<? extends Throwable>> FALLBACK_EXCEPTIONS_DEFAULT = Arrays.asList(CallNotPermittedException.class);
46+
private static final List<Class<? extends Throwable>> FALLBACK_EXCEPTIONS_DEFAULT = Arrays.asList(CallNotPermittedException.class, ConnectionFailoverException.class);
4747

4848
private final ClusterConfig[] clusterConfigs;
4949

src/main/java/redis/clients/jedis/UnifiedJedis.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import redis.clients.jedis.json.Path2;
3131
import redis.clients.jedis.json.JsonObjectMapper;
3232
import redis.clients.jedis.mcf.CircuitBreakerCommandExecutor;
33+
import redis.clients.jedis.mcf.FailoverOptions;
3334
import redis.clients.jedis.mcf.MultiClusterPipeline;
3435
import redis.clients.jedis.mcf.MultiClusterTransaction;
3536
import redis.clients.jedis.params.*;
@@ -237,7 +238,19 @@ public UnifiedJedis(ConnectionProvider provider, int maxAttempts, Duration maxTo
237238
*/
238239
@Experimental
239240
public UnifiedJedis(MultiClusterPooledConnectionProvider provider) {
240-
this(new CircuitBreakerCommandExecutor(provider), provider);
241+
this(new CircuitBreakerCommandExecutor(provider, FailoverOptions.builder().build()), provider);
242+
}
243+
244+
/**
245+
* Constructor which supports multiple cluster/database endpoints each with their own isolated connection pool.
246+
* <p>
247+
* With this Constructor users can seamlessly failover to Disaster Recovery (DR), Backup, and Active-Active cluster(s)
248+
* by using simple configuration which is passed through from Resilience4j - https://resilience4j.readme.io/docs
249+
* <p>
250+
*/
251+
@Experimental
252+
public UnifiedJedis(MultiClusterPooledConnectionProvider provider, FailoverOptions failoverOptions) {
253+
this(new CircuitBreakerCommandExecutor(provider, failoverOptions), provider);
241254
}
242255

243256
/**

src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,12 @@
2424
public class CircuitBreakerCommandExecutor extends CircuitBreakerFailoverBase
2525
implements CommandExecutor {
2626

27-
public CircuitBreakerCommandExecutor(MultiClusterPooledConnectionProvider provider) {
27+
private final FailoverOptions options;
28+
29+
public CircuitBreakerCommandExecutor(MultiClusterPooledConnectionProvider provider,
30+
FailoverOptions options) {
2831
super(provider);
32+
this.options = options != null ? options : FailoverOptions.builder().build();
2933
}
3034

3135
@Override
@@ -49,9 +53,31 @@ public <T> T executeCommand(CommandObject<T> commandObject) {
4953
private <T> T handleExecuteCommand(CommandObject<T> commandObject, Cluster cluster) {
5054
try (Connection connection = cluster.getConnection()) {
5155
return connection.executeCommand(commandObject);
56+
} catch (Exception e) {
57+
58+
if (retryOnFailover() && !isActiveCluster(cluster)
59+
&& isCircuitBreakerTrackedException(e, cluster.getCircuitBreaker())) {
60+
throw new ConnectionFailoverException(
61+
"Command failed during failover: " + cluster.getCircuitBreaker().getName(), e);
62+
}
63+
64+
throw e;
5265
}
5366
}
5467

68+
private boolean isCircuitBreakerTrackedException(Exception e, CircuitBreaker cb) {
69+
return cb.getCircuitBreakerConfig().getRecordExceptionPredicate().test(e);
70+
}
71+
72+
private boolean retryOnFailover() {
73+
return options.isRetryOnFailover();
74+
}
75+
76+
private boolean isActiveCluster(Cluster cluster) {
77+
Cluster activeCluster = provider.getCluster();
78+
return activeCluster != null && activeCluster.equals(cluster);
79+
}
80+
5581
/**
5682
* Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker
5783
* failure scenarios
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package redis.clients.jedis.mcf;
2+
3+
import redis.clients.jedis.exceptions.JedisException;
4+
5+
public class ConnectionFailoverException extends JedisException {
6+
public ConnectionFailoverException(String s, Exception e) {
7+
super(s, e);
8+
}
9+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package redis.clients.jedis.mcf;
2+
3+
import redis.clients.jedis.annots.Experimental;
4+
5+
/**
6+
* Configuration options for CircuitBreakerCommandExecutor
7+
*/
8+
@Experimental
9+
public class FailoverOptions {
10+
private final boolean retryOnFailover;
11+
12+
private FailoverOptions(Builder builder) {
13+
this.retryOnFailover = builder.retryOnFailover;
14+
}
15+
16+
/**
17+
* Gets whether to retry failed commands during failover
18+
* @return true if retry is enabled, false otherwise
19+
*/
20+
public boolean isRetryOnFailover() {
21+
return retryOnFailover;
22+
}
23+
24+
/**
25+
* Creates a new builder with default options
26+
* @return a new builder
27+
*/
28+
public static Builder builder() {
29+
return new Builder();
30+
}
31+
32+
/**
33+
* Builder for FailoverOptions
34+
*/
35+
public static class Builder {
36+
private boolean retryOnFailover = false;
37+
38+
private Builder() {
39+
}
40+
41+
/**
42+
* Sets whether to retry failed commands during failover
43+
* @param retry true to retry, false otherwise
44+
* @return this builder for method chaining
45+
*/
46+
public Builder retryOnFailover(boolean retry) {
47+
this.retryOnFailover = retry;
48+
return this;
49+
}
50+
51+
/**
52+
* Builds a new FailoverOptions instance with the configured options
53+
* @return a new FailoverOptions instance
54+
*/
55+
public FailoverOptions build() {
56+
return new FailoverOptions(this);
57+
}
58+
}
59+
}

0 commit comments

Comments
 (0)