Skip to content

Commit b79a9f3

Browse files
atakavciCopilot
andauthored
[automatic failover] Implement HealtStatusManager + weighted endpoints (#4189)
* - weighted cluster seleciton - Healtstatus manager with initial listener and registration logic - pluggable health checker strategy introduced, these are draft NoOpStrategy, EchoStrategy, LagAwareStrategy, - fix failing tests impacted from weighted clusters * - add builder for ClusterConfig - add echo ot CommandObjects and UnifiedJEdis - improve StrategySupplier by accepting jedisclientconfig - adapt EchoStrategy to StrategySupplier. Now it handles the creation of connection by accepting endpoint and JedisClientConfig - make healthchecks disabled by default - drop noOpStrategy - add unit&integration tests for health check * - fix naming * clean up and mark override methods * fix link in javadoc * fix formatting * - fix double registered listeners in healtstatusmgr - clear redundant catch - replace failover options and drop failoveroptions class - remove forced_unhealthy from healthstatus - fix failback check - add disabled flag to cluster - update/fix related tests * Update src/main/java/redis/clients/jedis/mcf/EchoStrategy.java Co-authored-by: Copilot <[email protected]> --------- Co-authored-by: Copilot <[email protected]>
1 parent b2c2267 commit b79a9f3

25 files changed

+1490
-265
lines changed

src/main/java/redis/clients/jedis/CommandObjects.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ public final CommandObject<String> ping() {
7474
return PING_COMMAND_OBJECT;
7575
}
7676

77+
public final CommandObject<String> echo(String msg) {
78+
return new CommandObject<>(commandArguments(ECHO).add(msg), BuilderFactory.STRING);
79+
}
80+
7781
private final CommandObject<String> FLUSHALL_COMMAND_OBJECT = new CommandObject<>(commandArguments(FLUSHALL), BuilderFactory.STRING);
7882

7983
public final CommandObject<String> flushAll() {

src/main/java/redis/clients/jedis/HostAndPort.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
import java.io.Serializable;
44

5-
public class HostAndPort implements Serializable {
5+
import redis.clients.jedis.mcf.Endpoint;
6+
7+
public class HostAndPort implements Serializable, Endpoint {
68

79
private static final long serialVersionUID = -519876229978427751L;
810

@@ -14,10 +16,12 @@ public HostAndPort(String host, int port) {
1416
this.port = port;
1517
}
1618

19+
@Override
1720
public String getHost() {
1821
return host;
1922
}
2023

24+
@Override
2125
public int getPort() {
2226
return port;
2327
}

src/main/java/redis/clients/jedis/MultiClusterClientConfig.java

Lines changed: 123 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import redis.clients.jedis.exceptions.JedisConnectionException;
1313
import redis.clients.jedis.exceptions.JedisValidationException;
1414
import redis.clients.jedis.mcf.ConnectionFailoverException;
15+
import redis.clients.jedis.mcf.EchoStrategy;
16+
import redis.clients.jedis.mcf.HealthCheckStrategy;
1517

1618
/**
1719
* @author Allen Terleto (aterleto)
@@ -31,6 +33,19 @@
3133
@Experimental
3234
public final class MultiClusterClientConfig {
3335

36+
/**
37+
* Interface for creating HealthCheckStrategy instances for specific endpoints
38+
*/
39+
public static interface StrategySupplier {
40+
/**
41+
* Creates a HealthCheckStrategy for the given endpoint.
42+
* @param hostAndPort the endpoint to create a strategy for
43+
* @param jedisClientConfig the client configuration, may be null for implementations that don't need it
44+
* @return a HealthCheckStrategy instance
45+
*/
46+
HealthCheckStrategy get(HostAndPort hostAndPort, JedisClientConfig jedisClientConfig);
47+
}
48+
3449
private static final int RETRY_MAX_ATTEMPTS_DEFAULT = 3;
3550
private static final int RETRY_WAIT_DURATION_DEFAULT = 500; // measured in milliseconds
3651
private static final int RETRY_WAIT_DURATION_EXPONENTIAL_BACKOFF_MULTIPLIER_DEFAULT = 2;
@@ -129,6 +144,14 @@ public final class MultiClusterClientConfig {
129144

130145
private List<Class<? extends Throwable>> fallbackExceptionList;
131146

147+
//////////// Failover Config ////////////
148+
149+
/** Whether to retry failed commands during failover */
150+
private boolean retryOnFailover = false;
151+
152+
/** Whether failback is enabled */
153+
private boolean failback = false;
154+
132155
public MultiClusterClientConfig(ClusterConfig[] clusterConfigs) {
133156
this.clusterConfigs = clusterConfigs;
134157
}
@@ -193,13 +216,23 @@ public List<Class<? extends Throwable>> getFallbackExceptionList() {
193216
return fallbackExceptionList;
194217
}
195218

219+
public boolean isRetryOnFailover() {
220+
return retryOnFailover;
221+
}
222+
223+
public boolean isFailback() {
224+
return failback;
225+
}
226+
196227
public static class ClusterConfig {
197228

198-
private int priority;
199229
private HostAndPort hostAndPort;
200230
private JedisClientConfig clientConfig;
201231
private GenericObjectPoolConfig<Connection> connectionPoolConfig;
202232

233+
private float weight = 1.0f;
234+
private StrategySupplier healthCheckStrategySupplier;
235+
203236
public ClusterConfig(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
204237
this.hostAndPort = hostAndPort;
205238
this.clientConfig = clientConfig;
@@ -212,25 +245,92 @@ public ClusterConfig(HostAndPort hostAndPort, JedisClientConfig clientConfig,
212245
this.connectionPoolConfig = connectionPoolConfig;
213246
}
214247

215-
public int getPriority() {
216-
return priority;
217-
}
218-
219-
private void setPriority(int priority) {
220-
this.priority = priority;
248+
private ClusterConfig(Builder builder) {
249+
this.hostAndPort = builder.hostAndPort;
250+
this.clientConfig = builder.clientConfig;
251+
this.connectionPoolConfig = builder.connectionPoolConfig;
252+
this.weight = builder.weight;
253+
this.healthCheckStrategySupplier = builder.healthCheckStrategySupplier;
221254
}
222255

223256
public HostAndPort getHostAndPort() {
224257
return hostAndPort;
225258
}
226259

260+
public static Builder builder(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
261+
return new Builder(hostAndPort, clientConfig);
262+
}
263+
227264
public JedisClientConfig getJedisClientConfig() {
228265
return clientConfig;
229266
}
230267

231268
public GenericObjectPoolConfig<Connection> getConnectionPoolConfig() {
232269
return connectionPoolConfig;
233270
}
271+
272+
public float getWeight() {
273+
return weight;
274+
}
275+
276+
public StrategySupplier getHealthCheckStrategySupplier() {
277+
return healthCheckStrategySupplier;
278+
}
279+
280+
public static class Builder {
281+
private HostAndPort hostAndPort;
282+
private JedisClientConfig clientConfig;
283+
private GenericObjectPoolConfig<Connection> connectionPoolConfig;
284+
285+
private float weight = 1.0f;
286+
private StrategySupplier healthCheckStrategySupplier = EchoStrategy.DEFAULT;
287+
private boolean healthCheckEnabled = true;
288+
289+
public Builder(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
290+
this.hostAndPort = hostAndPort;
291+
this.clientConfig = clientConfig;
292+
}
293+
294+
public Builder connectionPoolConfig(GenericObjectPoolConfig<Connection> connectionPoolConfig) {
295+
this.connectionPoolConfig = connectionPoolConfig;
296+
return this;
297+
}
298+
299+
public Builder weight(float weight) {
300+
this.weight = weight;
301+
return this;
302+
}
303+
304+
public Builder healthCheckStrategySupplier(StrategySupplier healthCheckStrategySupplier) {
305+
if (healthCheckStrategySupplier == null) {
306+
throw new IllegalArgumentException("healthCheckStrategySupplier must not be null");
307+
}
308+
this.healthCheckStrategySupplier = healthCheckStrategySupplier;
309+
return this;
310+
}
311+
312+
public Builder healthCheckStrategy(HealthCheckStrategy healthCheckStrategy) {
313+
if (healthCheckStrategy == null) {
314+
throw new IllegalArgumentException("healthCheckStrategy must not be null");
315+
}
316+
this.healthCheckStrategySupplier = (hostAndPort, jedisClientConfig) -> healthCheckStrategy;
317+
return this;
318+
}
319+
320+
public Builder healthCheckEnabled(boolean healthCheckEnabled) {
321+
this.healthCheckEnabled = healthCheckEnabled;
322+
if (!healthCheckEnabled) {
323+
this.healthCheckStrategySupplier = null;
324+
} else if (healthCheckStrategySupplier == null) {
325+
this.healthCheckStrategySupplier = EchoStrategy.DEFAULT;
326+
}
327+
return this;
328+
}
329+
330+
public ClusterConfig build() {
331+
return new ClusterConfig(this);
332+
}
333+
}
234334
}
235335

236336
public static class Builder {
@@ -253,14 +353,14 @@ public static class Builder {
253353
private List<Class> circuitBreakerIgnoreExceptionList = null;
254354
private List<Class<? extends Throwable>> fallbackExceptionList = FALLBACK_EXCEPTIONS_DEFAULT;
255355

356+
private boolean retryOnFailover = false;
357+
private boolean failback = false;
358+
256359
public Builder(ClusterConfig[] clusterConfigs) {
257360

258361
if (clusterConfigs == null || clusterConfigs.length < 1) throw new JedisValidationException(
259362
"ClusterClientConfigs are required for MultiClusterPooledConnectionProvider");
260363

261-
for (int i = 0; i < clusterConfigs.length; i++)
262-
clusterConfigs[i].setPriority(i + 1);
263-
264364
this.clusterConfigs = clusterConfigs;
265365
}
266366

@@ -348,6 +448,16 @@ public Builder fallbackExceptionList(List<Class<? extends Throwable>> fallbackEx
348448
return this;
349449
}
350450

451+
public Builder retryOnFailover(boolean retryOnFailover) {
452+
this.retryOnFailover = retryOnFailover;
453+
return this;
454+
}
455+
456+
public Builder failback(boolean failback) {
457+
this.failback = failback;
458+
return this;
459+
}
460+
351461
public MultiClusterClientConfig build() {
352462
MultiClusterClientConfig config = new MultiClusterClientConfig(this.clusterConfigs);
353463

@@ -373,6 +483,9 @@ public MultiClusterClientConfig build() {
373483

374484
config.fallbackExceptionList = this.fallbackExceptionList;
375485

486+
config.retryOnFailover = this.retryOnFailover;
487+
config.failback = this.failback;
488+
376489
return config;
377490
}
378491
}

src/main/java/redis/clients/jedis/UnifiedJedis.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import redis.clients.jedis.json.Path2;
3131
import redis.clients.jedis.json.JsonObjectMapper;
3232
import redis.clients.jedis.mcf.CircuitBreakerCommandExecutor;
33-
import redis.clients.jedis.mcf.FailoverOptions;
3433
import redis.clients.jedis.mcf.MultiClusterPipeline;
3534
import redis.clients.jedis.mcf.MultiClusterTransaction;
3635
import redis.clients.jedis.params.*;
@@ -238,19 +237,7 @@ public UnifiedJedis(ConnectionProvider provider, int maxAttempts, Duration maxTo
238237
*/
239238
@Experimental
240239
public UnifiedJedis(MultiClusterPooledConnectionProvider provider) {
241-
this(new CircuitBreakerCommandExecutor(provider, FailoverOptions.builder().build()), provider);
242-
}
243-
244-
/**
245-
* Constructor which supports multiple cluster/database endpoints each with their own isolated connection pool.
246-
* <p>
247-
* With this Constructor users can seamlessly failover to Disaster Recovery (DR), Backup, and Active-Active cluster(s)
248-
* by using simple configuration which is passed through from Resilience4j - https://resilience4j.readme.io/docs
249-
* <p>
250-
*/
251-
@Experimental
252-
public UnifiedJedis(MultiClusterPooledConnectionProvider provider, FailoverOptions failoverOptions) {
253-
this(new CircuitBreakerCommandExecutor(provider, failoverOptions), provider);
240+
this(new CircuitBreakerCommandExecutor(provider), provider);
254241
}
255242

256243
/**
@@ -354,6 +341,10 @@ public String ping() {
354341
return checkAndBroadcastCommand(commandObjects.ping());
355342
}
356343

344+
public String echo(String string) {
345+
return executeCommand(commandObjects.echo(string));
346+
}
347+
357348
public String flushDB() {
358349
return checkAndBroadcastCommand(commandObjects.flushDB());
359350
}

src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,8 @@
2222
@Experimental
2323
public class CircuitBreakerCommandExecutor extends CircuitBreakerFailoverBase implements CommandExecutor {
2424

25-
private final FailoverOptions options;
26-
27-
public CircuitBreakerCommandExecutor(MultiClusterPooledConnectionProvider provider, FailoverOptions options) {
25+
public CircuitBreakerCommandExecutor(MultiClusterPooledConnectionProvider provider) {
2826
super(provider);
29-
this.options = options != null ? options : FailoverOptions.builder().build();
3027
}
3128

3229
@Override
@@ -50,8 +47,7 @@ private <T> T handleExecuteCommand(CommandObject<T> commandObject, Cluster clust
5047
try (Connection connection = cluster.getConnection()) {
5148
return connection.executeCommand(commandObject);
5249
} catch (Exception e) {
53-
54-
if (retryOnFailover() && !isActiveCluster(cluster)
50+
if (cluster.retryOnFailover() && !isActiveCluster(cluster)
5551
&& isCircuitBreakerTrackedException(e, cluster.getCircuitBreaker())) {
5652
throw new ConnectionFailoverException(
5753
"Command failed during failover: " + cluster.getCircuitBreaker().getName(), e);
@@ -65,10 +61,6 @@ private boolean isCircuitBreakerTrackedException(Exception e, CircuitBreaker cb)
6561
return cb.getCircuitBreakerConfig().getRecordExceptionPredicate().test(e);
6662
}
6763

68-
private boolean retryOnFailover() {
69-
return options.isRetryOnFailover();
70-
}
71-
7264
private boolean isActiveCluster(Cluster cluster) {
7365
Cluster activeCluster = provider.getCluster();
7466
return activeCluster != null && activeCluster.equals(cluster);

src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ protected void clusterFailover(CircuitBreaker circuitBreaker) {
3939
lock.lock();
4040

4141
try {
42-
// Check state to handle race conditions since incrementActiveMultiClusterIndex() is
42+
// Check state to handle race conditions since iterateActiveCluster() is
4343
// non-idempotent
4444
if (!CircuitBreaker.State.FORCED_OPEN.equals(circuitBreaker.getState())) {
4545

@@ -48,25 +48,26 @@ protected void clusterFailover(CircuitBreaker circuitBreaker) {
4848
// To recover/transition from this forced state the user will need to manually failback
4949
circuitBreaker.transitionToForcedOpenState();
5050

51-
// Incrementing the activeMultiClusterIndex will allow subsequent calls to the
52-
// executeCommand()
53-
// to use the next cluster's connection pool - according to the configuration's
54-
// prioritization/order
55-
int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex();
51+
// Iterating the active cluster will allow subsequent calls to the executeCommand() to use the next
52+
// cluster's connection pool - according to the configuration's prioritization/order/weight
53+
// int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex1();
54+
provider.iterateActiveCluster();
5655

5756
// Implementation is optionally provided during configuration. Typically, used for
5857
// activeMultiClusterIndex persistence or custom logging
59-
provider.runClusterFailoverPostProcessor(activeMultiClusterIndex);
58+
provider.runClusterFailoverPostProcessor(provider.getCluster());
6059
}
61-
62-
// Once the priority list is exhausted only a manual failback can open the circuit breaker so
63-
// all subsequent operations will fail
64-
else if (provider.isLastClusterCircuitBreakerForcedOpen()) {
60+
// this check relies on the fact that many failover attempts can hit with the same CB,
61+
// only the first one will trigger a failover, and make the CB FORCED_OPEN.
62+
// when the rest reaches here, the active cluster is already the next one, and should be different than
63+
// active CB. If its the same one and there are no more clusters to failover to, then throw an exception
64+
else if (circuitBreaker == provider.getCluster().getCircuitBreaker() && !provider.canIterateOnceMore()) {
6565
throw new JedisConnectionException(
6666
"Cluster/database endpoint could not failover since the MultiClusterClientConfig was not "
6767
+ "provided with an additional cluster/database endpoint according to its prioritized sequence. "
6868
+ "If applicable, consider failing back OR restarting with an available cluster/database endpoint");
6969
}
70+
// Ignore exceptions since we are already in a failure state
7071
} finally {
7172
lock.unlock();
7273
}

0 commit comments

Comments
 (0)