Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
8a9f876
- weighted cluster seleciton
atakavci Jun 27, 2025
d514ecf
- add builder for ClusterConfig
atakavci Jul 9, 2025
df66b1e
- fix naming
atakavci Jul 9, 2025
13757f5
clean up and mark override methods
atakavci Jul 10, 2025
ef5d83a
fix link in javadoc
atakavci Jul 10, 2025
a15fc64
fix formatting
atakavci Jul 10, 2025
cf38240
- fix double registered listeners in healtstatusmgr
atakavci Jul 14, 2025
c2fb34c
Update src/main/java/redis/clients/jedis/mcf/EchoStrategy.java
atakavci Jul 16, 2025
ade866d
- add remove endpoints
atakavci Jul 11, 2025
ca3378d
- replace cluster disabled with failbackCandidate
atakavci Jul 15, 2025
ddcec73
- remove failback candidate
atakavci Jul 16, 2025
c1b6d5f
- fix remove logic
atakavci Jul 16, 2025
ff16330
- periodic failback checks
atakavci Jul 17, 2025
c39fda1
- introduce StatusTracker with purpose of waiting initial healthcheck…
atakavci Jul 19, 2025
975ab78
- introduce forceActiveCluster by duration
atakavci Jul 19, 2025
405101e
- fix failing tests by waiting on clusters to get healthy
atakavci Jul 23, 2025
607c66d
- fix failing scenario test
atakavci Jul 23, 2025
aaac8f7
- adressing reviews and feedback
atakavci Jul 23, 2025
2ffffef
- fix formatting
atakavci Jul 23, 2025
e6e1121
- fix formatting
atakavci Jul 23, 2025
b8d4e87
- get rid of the queue and event ordering for healthstatus change in …
atakavci Jul 24, 2025
1ae7219
- replace use of reflection with helper methods
atakavci Jul 24, 2025
a751a84
Merge branch 'feature/automatic-failover' into ali/aa-failover-health…
atakavci Aug 11, 2025
004a022
- replace 'sleep' with 'await', feedback from @a-TODO-rov
atakavci Aug 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public final CommandObject<String> ping() {
return PING_COMMAND_OBJECT;
}

public final CommandObject<String> echo(String msg) {
return new CommandObject<>(commandArguments(ECHO).add(msg), BuilderFactory.STRING);
}

private final CommandObject<String> FLUSHALL_COMMAND_OBJECT = new CommandObject<>(commandArguments(FLUSHALL), BuilderFactory.STRING);

public final CommandObject<String> flushAll() {
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/redis/clients/jedis/HostAndPort.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import java.io.Serializable;

public class HostAndPort implements Serializable {
import redis.clients.jedis.mcf.Endpoint;

public class HostAndPort implements Serializable, Endpoint {

private static final long serialVersionUID = -519876229978427751L;

Expand All @@ -14,10 +16,12 @@ public HostAndPort(String host, int port) {
this.port = port;
}

@Override
public String getHost() {
return host;
}

@Override
public int getPort() {
return port;
}
Expand Down
173 changes: 163 additions & 10 deletions src/main/java/redis/clients/jedis/MultiClusterClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisValidationException;
import redis.clients.jedis.mcf.ConnectionFailoverException;
import redis.clients.jedis.mcf.EchoStrategy;
import redis.clients.jedis.mcf.HealthCheckStrategy;

/**
* @author Allen Terleto (aterleto)
Expand All @@ -31,6 +33,19 @@
@Experimental
public final class MultiClusterClientConfig {

/**
* Interface for creating HealthCheckStrategy instances for specific endpoints
*/
public static interface StrategySupplier {
/**
* Creates a HealthCheckStrategy for the given endpoint.
* @param hostAndPort the endpoint to create a strategy for
* @param jedisClientConfig the client configuration, may be null for implementations that don't need it
* @return a HealthCheckStrategy instance
*/
HealthCheckStrategy get(HostAndPort hostAndPort, JedisClientConfig jedisClientConfig);
}

private static final int RETRY_MAX_ATTEMPTS_DEFAULT = 3;
private static final int RETRY_WAIT_DURATION_DEFAULT = 500; // measured in milliseconds
private static final int RETRY_WAIT_DURATION_EXPONENTIAL_BACKOFF_MULTIPLIER_DEFAULT = 2;
Expand All @@ -48,6 +63,9 @@ public final class MultiClusterClientConfig {
private static final List<Class<? extends Throwable>> FALLBACK_EXCEPTIONS_DEFAULT = Arrays
.asList(CallNotPermittedException.class, ConnectionFailoverException.class);

private static final long FAILBACK_CHECK_INTERVAL_DEFAULT = 5000; // 5 seconds
private static final long GRACE_PERIOD_DEFAULT = 10000; // 10 seconds

private final ClusterConfig[] clusterConfigs;

//////////// Retry Config - https://resilience4j.readme.io/docs/retry ////////////
Expand Down Expand Up @@ -129,7 +147,27 @@ public final class MultiClusterClientConfig {

private List<Class<? extends Throwable>> fallbackExceptionList;

//////////// Failover Config ////////////

/** Whether to retry failed commands during failover */
private boolean retryOnFailover;

/** Whether failback is supported by client */
private boolean isFailbackSupported;

/** Interval in milliseconds to wait before attempting failback to a recovered cluster */
private long failbackCheckInterval;

/** Grace period in milliseconds to keep clusters disabled after they become unhealthy */
private long gracePeriod;

public MultiClusterClientConfig(ClusterConfig[] clusterConfigs) {
if (clusterConfigs == null || clusterConfigs.length < 1) throw new JedisValidationException(
"ClusterClientConfigs are required for MultiClusterPooledConnectionProvider");
for (ClusterConfig clusterConfig : clusterConfigs) {
if (clusterConfig == null)
throw new IllegalArgumentException("ClusterClientConfigs must not contain null elements");
}
this.clusterConfigs = clusterConfigs;
}

Expand Down Expand Up @@ -193,13 +231,36 @@ public List<Class<? extends Throwable>> getFallbackExceptionList() {
return fallbackExceptionList;
}

public boolean isRetryOnFailover() {
return retryOnFailover;
}

/** Whether failback is supported by client */
public boolean isFailbackSupported() {
return isFailbackSupported;
}

public long getFailbackCheckInterval() {
return failbackCheckInterval;
}

public long getGracePeriod() {
return gracePeriod;
}

public static Builder builder(ClusterConfig[] clusterConfigs) {
return new Builder(clusterConfigs);
}

public static class ClusterConfig {

private int priority;
private HostAndPort hostAndPort;
private JedisClientConfig clientConfig;
private GenericObjectPoolConfig<Connection> connectionPoolConfig;

private float weight = 1.0f;
private StrategySupplier healthCheckStrategySupplier;

public ClusterConfig(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
this.hostAndPort = hostAndPort;
this.clientConfig = clientConfig;
Expand All @@ -212,25 +273,90 @@ public ClusterConfig(HostAndPort hostAndPort, JedisClientConfig clientConfig,
this.connectionPoolConfig = connectionPoolConfig;
}

public int getPriority() {
return priority;
}

private void setPriority(int priority) {
this.priority = priority;
private ClusterConfig(Builder builder) {
this.hostAndPort = builder.hostAndPort;
this.clientConfig = builder.clientConfig;
this.connectionPoolConfig = builder.connectionPoolConfig;
this.weight = builder.weight;
this.healthCheckStrategySupplier = builder.healthCheckStrategySupplier;
}

public HostAndPort getHostAndPort() {
return hostAndPort;
}

public static Builder builder(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
return new Builder(hostAndPort, clientConfig);
}

public JedisClientConfig getJedisClientConfig() {
return clientConfig;
}

public GenericObjectPoolConfig<Connection> getConnectionPoolConfig() {
return connectionPoolConfig;
}

public float getWeight() {
return weight;
}

public StrategySupplier getHealthCheckStrategySupplier() {
return healthCheckStrategySupplier;
}

public static class Builder {
private HostAndPort hostAndPort;
private JedisClientConfig clientConfig;
private GenericObjectPoolConfig<Connection> connectionPoolConfig;

private float weight = 1.0f;
private StrategySupplier healthCheckStrategySupplier = EchoStrategy.DEFAULT;

public Builder(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
this.hostAndPort = hostAndPort;
this.clientConfig = clientConfig;
}

public Builder connectionPoolConfig(GenericObjectPoolConfig<Connection> connectionPoolConfig) {
this.connectionPoolConfig = connectionPoolConfig;
return this;
}

public Builder weight(float weight) {
this.weight = weight;
return this;
}

public Builder healthCheckStrategySupplier(StrategySupplier healthCheckStrategySupplier) {
if (healthCheckStrategySupplier == null) {
throw new IllegalArgumentException("healthCheckStrategySupplier must not be null");
}
this.healthCheckStrategySupplier = healthCheckStrategySupplier;
return this;
}

public Builder healthCheckStrategy(HealthCheckStrategy healthCheckStrategy) {
if (healthCheckStrategy == null) {
throw new IllegalArgumentException("healthCheckStrategy must not be null");
}
this.healthCheckStrategySupplier = (hostAndPort, jedisClientConfig) -> healthCheckStrategy;
return this;
}

public Builder healthCheckEnabled(boolean healthCheckEnabled) {
if (!healthCheckEnabled) {
this.healthCheckStrategySupplier = null;
} else if (healthCheckStrategySupplier == null) {
this.healthCheckStrategySupplier = EchoStrategy.DEFAULT;
}
return this;
}

public ClusterConfig build() {
return new ClusterConfig(this);
}
}
}

public static class Builder {
Expand All @@ -253,14 +379,16 @@ public static class Builder {
private List<Class> circuitBreakerIgnoreExceptionList = null;
private List<Class<? extends Throwable>> fallbackExceptionList = FALLBACK_EXCEPTIONS_DEFAULT;

private boolean retryOnFailover = false;
private boolean isFailbackSupported = true;
private long failbackCheckInterval = FAILBACK_CHECK_INTERVAL_DEFAULT;
private long gracePeriod = GRACE_PERIOD_DEFAULT;

public Builder(ClusterConfig[] clusterConfigs) {

if (clusterConfigs == null || clusterConfigs.length < 1) throw new JedisValidationException(
"ClusterClientConfigs are required for MultiClusterPooledConnectionProvider");

for (int i = 0; i < clusterConfigs.length; i++)
clusterConfigs[i].setPriority(i + 1);

this.clusterConfigs = clusterConfigs;
}

Expand Down Expand Up @@ -348,6 +476,26 @@ public Builder fallbackExceptionList(List<Class<? extends Throwable>> fallbackEx
return this;
}

public Builder retryOnFailover(boolean retryOnFailover) {
this.retryOnFailover = retryOnFailover;
return this;
}

public Builder failbackSupported(boolean supported) {
this.isFailbackSupported = supported;
return this;
}

public Builder failbackCheckInterval(long failbackCheckInterval) {
this.failbackCheckInterval = failbackCheckInterval;
return this;
}

public Builder gracePeriod(long gracePeriod) {
this.gracePeriod = gracePeriod;
return this;
}

public MultiClusterClientConfig build() {
MultiClusterClientConfig config = new MultiClusterClientConfig(this.clusterConfigs);

Expand All @@ -373,6 +521,11 @@ public MultiClusterClientConfig build() {

config.fallbackExceptionList = this.fallbackExceptionList;

config.retryOnFailover = this.retryOnFailover;
config.isFailbackSupported = this.isFailbackSupported;
config.failbackCheckInterval = this.failbackCheckInterval;
config.gracePeriod = this.gracePeriod;

return config;
}
}
Expand Down
19 changes: 5 additions & 14 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import redis.clients.jedis.json.Path2;
import redis.clients.jedis.json.JsonObjectMapper;
import redis.clients.jedis.mcf.CircuitBreakerCommandExecutor;
import redis.clients.jedis.mcf.FailoverOptions;
import redis.clients.jedis.mcf.MultiClusterPipeline;
import redis.clients.jedis.mcf.MultiClusterTransaction;
import redis.clients.jedis.params.*;
Expand Down Expand Up @@ -238,19 +237,7 @@ public UnifiedJedis(ConnectionProvider provider, int maxAttempts, Duration maxTo
*/
@Experimental
public UnifiedJedis(MultiClusterPooledConnectionProvider provider) {
this(new CircuitBreakerCommandExecutor(provider, FailoverOptions.builder().build()), provider);
}

/**
* Constructor which supports multiple cluster/database endpoints each with their own isolated connection pool.
* <p>
* With this Constructor users can seamlessly failover to Disaster Recovery (DR), Backup, and Active-Active cluster(s)
* by using simple configuration which is passed through from Resilience4j - https://resilience4j.readme.io/docs
* <p>
*/
@Experimental
public UnifiedJedis(MultiClusterPooledConnectionProvider provider, FailoverOptions failoverOptions) {
this(new CircuitBreakerCommandExecutor(provider, failoverOptions), provider);
this(new CircuitBreakerCommandExecutor(provider), provider);
}

/**
Expand Down Expand Up @@ -354,6 +341,10 @@ public String ping() {
return checkAndBroadcastCommand(commandObjects.ping());
}

public String echo(String string) {
return executeCommand(commandObjects.echo(string));
}

public String flushDB() {
return checkAndBroadcastCommand(commandObjects.flushDB());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@
@Experimental
public class CircuitBreakerCommandExecutor extends CircuitBreakerFailoverBase implements CommandExecutor {

private final FailoverOptions options;

public CircuitBreakerCommandExecutor(MultiClusterPooledConnectionProvider provider, FailoverOptions options) {
public CircuitBreakerCommandExecutor(MultiClusterPooledConnectionProvider provider) {
super(provider);
this.options = options != null ? options : FailoverOptions.builder().build();
}

@Override
Expand All @@ -50,8 +47,7 @@ private <T> T handleExecuteCommand(CommandObject<T> commandObject, Cluster clust
try (Connection connection = cluster.getConnection()) {
return connection.executeCommand(commandObject);
} catch (Exception e) {

if (retryOnFailover() && !isActiveCluster(cluster)
if (cluster.retryOnFailover() && !isActiveCluster(cluster)
&& isCircuitBreakerTrackedException(e, cluster.getCircuitBreaker())) {
throw new ConnectionFailoverException(
"Command failed during failover: " + cluster.getCircuitBreaker().getName(), e);
Expand All @@ -65,10 +61,6 @@ private boolean isCircuitBreakerTrackedException(Exception e, CircuitBreaker cb)
return cb.getCircuitBreakerConfig().getRecordExceptionPredicate().test(e);
}

private boolean retryOnFailover() {
return options.isRetryOnFailover();
}

private boolean isActiveCluster(Cluster cluster) {
Cluster activeCluster = provider.getCluster();
return activeCluster != null && activeCluster.equals(cluster);
Expand Down
Loading
Loading