Skip to content

Commit b899f3f

Browse files
atakavciggivo
andauthored
[automatic failover] Automatic failover client improvements (part 2) (#4297)
* [automatic failover] Remove the check for 'GenericObjectPool.getNumWaiters()' in 'TrackingConnectionPool' (#4270) - remove the check for number of waitiers in TrackingConnectionPool * [automatic failover] Configure max total connections for EchoStrategy (#4268) - set maxtotal connections for echoStrategy * [automatic failover] Replace 'CircuitBreaker' with 'Cluster' for 'CircuitBreakerFailoverBase.clusterFailover' (#4275) * - replace CircuitBreaker with Cluster for CircuitBreakerFailoverBase.clusterFailover - improve thread safety with provider initialization * - formatting * [automatic failover] Minor optimizations on fast failover (#4277) * - minor optimizations on fail fast * - volatile failfast * [automatic failover] Implement health check retries (#4273) * - replace minConsecutiveSuccessCount with numberOfRetries - add retries into healtCheckImpl - apply changes to strategy implementations config classes - fix unit tests * - fix typo * - fix failing tests * - add tests for retry logic * - formatting * - format * - revisit numRetries for healthCheck ,replace with numProbes and implement built in policies - new types probecontext, ProbePolicy, HealthProbeContext - add delayer executor pool to healthcheckımpl - adjustments on worker pool of healthCheckImpl for shared use of workers * - format * - expand comment with example case * - drop pooled executor for delays * - polish * - fix tests * - formatting * - checking failing tests * - fix test * - fix flaky tests * - fix flaky test * - add tests for builtin probing policies * - fix flaky test * [automatic failover] Move failover provider to mcf (#4294) * - move failover provider to mcf * - make iterateActiveCluster package private * [automatic failover] Add SSL configuration support to LagAwareStrategy (#4291) * User-provided ssl config for lag-aware health check * ssl scenario test for lag-aware healthcheck * format * format * address review comments - use getters instead of fields * [automatic failover] Implement max number of failover attempts (#4293) * - implement max failover attempt - add tests * - fix user receive the intended exception * -clean+format * - java doc for exceptions * format * - more tests on excaption types in max failover attempts mechanism * format * fix failing timing in test * disable health checks * rename to switchToHealthyCluster * format --------- Co-authored-by: Ivo Gaydazhiev <[email protected]>
1 parent e838e48 commit b899f3f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1935
-444
lines changed

pom.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,8 @@
492492
<include>**/mcf/MultiCluster*.java</include>
493493
<include>**/mcf/StatusTracker*.java</include>
494494
<include>**/Health*.java</include>
495+
<include>**/*IT.java</include>
496+
<include>**/scenario/RestEndpointUtil.java</include>
495497
<include>src/main/java/redis/clients/jedis/MultiClusterClientConfig.java</include>
496498
<include>src/main/java/redis/clients/jedis/HostAndPort.java</include>
497499
</includes>

src/main/java/redis/clients/jedis/Connection.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ public void connect() throws JedisConnectionException {
267267
if (!isConnected()) {
268268
try {
269269
socket = socketFactory.createSocket();
270-
soTimeout = socket.getSoTimeout(); //?
270+
soTimeout = socket.getSoTimeout(); // ?
271271

272272
outputStream = new RedisOutputStream(socket.getOutputStream());
273273
inputStream = new RedisInputStream(socket.getInputStream());
@@ -326,6 +326,10 @@ public void disconnect() {
326326
}
327327

328328
public void forceDisconnect() throws IOException {
329+
// setBroken() must be called first here,
330+
// otherwise a concurrent close attempt would call 'returnResource' (instead of
331+
// 'returnBrokenResource'),
332+
// assuming it's an open/healthy connection whereas this individual socket is already closed.
329333
setBroken();
330334
IOUtils.closeQuietly(socket);
331335
}
@@ -476,7 +480,6 @@ public List<Object> getMany(final int count) {
476480

477481
/**
478482
* Check if the client name libname, libver, characters are legal
479-
*
480483
* @param info the name
481484
* @return Returns true if legal, false throws exception
482485
* @throws JedisException if characters illegal

src/main/java/redis/clients/jedis/MultiClusterClientConfig.java

Lines changed: 101 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
* This configuration enables seamless failover between multiple Redis clusters, databases, or
2323
* endpoints by providing comprehensive settings for retry logic, circuit breaker behavior, health
2424
* checks, and failback mechanisms. It is designed to work with
25-
* {@link redis.clients.jedis.providers.MultiClusterPooledConnectionProvider} to provide high
26-
* availability and disaster recovery capabilities.
25+
* {@link redis.clients.jedis.mcf.MultiClusterPooledConnectionProvider} to provide high availability
26+
* and disaster recovery capabilities.
2727
* </p>
2828
* <p>
2929
* <strong>Key Features:</strong>
@@ -70,7 +70,7 @@
7070
* The configuration leverages <a href="https://resilience4j.readme.io/docs">Resilience4j</a> for
7171
* circuit breaker and retry implementations, providing battle-tested fault tolerance patterns.
7272
* </p>
73-
* @see redis.clients.jedis.providers.MultiClusterPooledConnectionProvider
73+
* @see redis.clients.jedis.mcf.MultiClusterPooledConnectionProvider
7474
* @see redis.clients.jedis.mcf.HealthCheckStrategy
7575
* @see redis.clients.jedis.mcf.EchoStrategy
7676
* @see redis.clients.jedis.mcf.LagAwareStrategy
@@ -161,6 +161,12 @@ public static interface StrategySupplier {
161161
/** Default grace period in milliseconds to keep clusters disabled after they become unhealthy. */
162162
private static final long GRACE_PERIOD_DEFAULT = 10000;
163163

164+
/** Default maximum number of failover attempts. */
165+
private static final int MAX_NUM_FAILOVER_ATTEMPTS_DEFAULT = 10;
166+
167+
/** Default delay in milliseconds between failover attempts. */
168+
private static final int DELAY_IN_BETWEEN_FAILOVER_ATTEMPTS_DEFAULT = 12000;
169+
164170
/** Array of cluster configurations defining the available Redis endpoints and their settings. */
165171
private final ClusterConfig[] clusterConfigs;
166172

@@ -485,6 +491,34 @@ public static interface StrategySupplier {
485491
*/
486492
private boolean fastFailover;
487493

494+
/**
495+
* Maximum number of failover attempts.
496+
* <p>
497+
* This setting controls how many times the system will attempt to failover to a different cluster
498+
* before giving up. For example, if set to 3, the system will make 1 initial attempt plus 2
499+
* failover attempts for a total of 3 attempts.
500+
* </p>
501+
* <p>
502+
* <strong>Default:</strong> {@value #MAX_NUM_FAILOVER_ATTEMPTS_DEFAULT}
503+
* </p>
504+
* @see #getMaxNumFailoverAttempts()
505+
*/
506+
private int maxNumFailoverAttempts;
507+
508+
/**
509+
* Delay in milliseconds between failover attempts.
510+
* <p>
511+
* This setting controls how long the system will wait before attempting to failover to a
512+
* different cluster. For example, if set to 1000, the system will wait 1 second before attempting
513+
* to failover to a different cluster.
514+
* </p>
515+
* <p>
516+
* <strong>Default:</strong> {@value #DELAY_IN_BETWEEN_FAILOVER_ATTEMPTS_DEFAULT} milliseconds
517+
* </p>
518+
* @see #getDelayInBetweenFailoverAttempts()
519+
*/
520+
private int delayInBetweenFailoverAttempts;
521+
488522
/**
489523
* Constructs a new MultiClusterClientConfig with the specified cluster configurations.
490524
* <p>
@@ -679,6 +713,25 @@ public long getGracePeriod() {
679713
return gracePeriod;
680714
}
681715

716+
/**
717+
* Returns the maximum number of failover attempts.
718+
* @return maximum number of failover attempts
719+
* @see #maxNumFailoverAttempts
720+
*/
721+
public int getMaxNumFailoverAttempts() {
722+
return maxNumFailoverAttempts;
723+
724+
}
725+
726+
/**
727+
* Returns the delay in milliseconds between failover attempts.
728+
* @return delay in milliseconds between failover attempts
729+
* @see #delayInBetweenFailoverAttempts
730+
*/
731+
public int getDelayInBetweenFailoverAttempts() {
732+
return delayInBetweenFailoverAttempts;
733+
}
734+
682735
/**
683736
* Returns whether connections are forcefully terminated during failover.
684737
* @return true if fast failover is enabled, false for graceful failover
@@ -1090,6 +1143,12 @@ public static class Builder {
10901143
/** Whether to forcefully terminate connections during failover. */
10911144
private boolean fastFailover = false;
10921145

1146+
/** Maximum number of failover attempts. */
1147+
private int maxNumFailoverAttempts = MAX_NUM_FAILOVER_ATTEMPTS_DEFAULT;
1148+
1149+
/** Delay in milliseconds between failover attempts. */
1150+
private int delayInBetweenFailoverAttempts = DELAY_IN_BETWEEN_FAILOVER_ATTEMPTS_DEFAULT;
1151+
10931152
/**
10941153
* Constructs a new Builder with the specified cluster configurations.
10951154
* @param clusterConfigs array of cluster configurations defining available Redis endpoints
@@ -1460,7 +1519,7 @@ public Builder retryOnFailover(boolean retryOnFailover) {
14601519
* <ul>
14611520
* <li>Health checks must be enabled on cluster configurations</li>
14621521
* <li>Grace period must elapse after cluster becomes unhealthy</li>
1463-
* <li>Higher-priority cluster must pass consecutive health checks</li>
1522+
* <li>Higher-priority cluster must pass health checks</li>
14641523
* </ul>
14651524
* @param supported true to enable automatic failback, false for manual failback only
14661525
* @return this builder instance for method chaining
@@ -1539,6 +1598,42 @@ public Builder fastFailover(boolean fastFailover) {
15391598
return this;
15401599
}
15411600

1601+
/**
1602+
* Sets the maximum number of failover attempts.
1603+
* <p>
1604+
* This setting controls how many times the system will attempt to failover to a different
1605+
* cluster before giving up. For example, if set to 3, the system will make 1 initial attempt
1606+
* plus 2 failover attempts for a total of 3 attempts.
1607+
* </p>
1608+
* <p>
1609+
* <strong>Default:</strong> {@value #MAX_NUM_FAILOVER_ATTEMPTS_DEFAULT}
1610+
* </p>
1611+
* @param maxNumFailoverAttempts maximum number of failover attempts
1612+
* @return this builder instance for method chaining
1613+
*/
1614+
public Builder maxNumFailoverAttempts(int maxNumFailoverAttempts) {
1615+
this.maxNumFailoverAttempts = maxNumFailoverAttempts;
1616+
return this;
1617+
}
1618+
1619+
/**
1620+
* Sets the delay in milliseconds between failover attempts.
1621+
* <p>
1622+
* This setting controls how long the system will wait before attempting to failover to a
1623+
* different cluster. For example, if set to 1000, the system will wait 1 second before
1624+
* attempting to failover to a different cluster.
1625+
* </p>
1626+
* <p>
1627+
* <strong>Default:</strong> {@value #DELAY_IN_BETWEEN_FAILOVER_ATTEMPTS_DEFAULT} milliseconds
1628+
* </p>
1629+
* @param delayInBetweenFailoverAttempts delay in milliseconds between failover attempts
1630+
* @return this builder instance for method chaining
1631+
*/
1632+
public Builder delayInBetweenFailoverAttempts(int delayInBetweenFailoverAttempts) {
1633+
this.delayInBetweenFailoverAttempts = delayInBetweenFailoverAttempts;
1634+
return this;
1635+
}
1636+
15421637
/**
15431638
* Builds and returns a new MultiClusterClientConfig instance with all configured settings.
15441639
* <p>
@@ -1576,6 +1671,8 @@ public MultiClusterClientConfig build() {
15761671
config.failbackCheckInterval = this.failbackCheckInterval;
15771672
config.gracePeriod = this.gracePeriod;
15781673
config.fastFailover = this.fastFailover;
1674+
config.maxNumFailoverAttempts = this.maxNumFailoverAttempts;
1675+
config.delayInBetweenFailoverAttempts = this.delayInBetweenFailoverAttempts;
15791676

15801677
return config;
15811678
}

src/main/java/redis/clients/jedis/SslOptions.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ public SSLContext createSslContext() throws IOException, GeneralSecurityExceptio
343343

344344
if (keystoreResource != null) {
345345

346-
KeyStore keyStore = KeyStore.getInstance(keyStoreType);
346+
KeyStore keyStore = KeyStore.getInstance(keyStoreType==null ? KeyStore.getDefaultType() : keyStoreType);
347347
try (InputStream keystoreStream = keystoreResource.get()) {
348348
keyStore.load(keystoreStream, keystorePassword);
349349
}
@@ -355,7 +355,8 @@ public SSLContext createSslContext() throws IOException, GeneralSecurityExceptio
355355

356356
if (trustManagers == null && truststoreResource != null) {
357357

358-
KeyStore trustStore = KeyStore.getInstance(trustStoreType);
358+
359+
KeyStore trustStore = KeyStore.getInstance(trustStoreType == null ? KeyStore.getDefaultType() : trustStoreType);
359360
try (InputStream truststoreStream = truststoreResource.get()) {
360361
trustStore.load(truststoreStream, truststorePassword);
361362
}
@@ -379,6 +380,15 @@ public SSLParameters getSslParameters() {
379380
return sslParameters;
380381
}
381382

383+
384+
/**
385+
* Configured ssl verify mode.
386+
* @return {@link SslVerifyMode}
387+
*/
388+
public SslVerifyMode getSslVerifyMode() {
389+
return sslVerifyMode;
390+
}
391+
382392
private static char[] getPassword(char[] chars) {
383393
return chars != null ? Arrays.copyOf(chars, chars.length) : null;
384394
}

src/main/java/redis/clients/jedis/UnifiedJedis.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import redis.clients.jedis.json.JsonObjectMapper;
3535
import redis.clients.jedis.mcf.CircuitBreakerCommandExecutor;
3636
import redis.clients.jedis.mcf.MultiClusterPipeline;
37+
import redis.clients.jedis.mcf.MultiClusterPooledConnectionProvider;
3738
import redis.clients.jedis.mcf.MultiClusterTransaction;
3839
import redis.clients.jedis.params.*;
3940
import redis.clients.jedis.providers.*;

src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
import redis.clients.jedis.CommandObject;
88
import redis.clients.jedis.Connection;
99
import redis.clients.jedis.annots.Experimental;
10+
import redis.clients.jedis.exceptions.JedisConnectionException;
1011
import redis.clients.jedis.executors.CommandExecutor;
11-
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider;
12-
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider.Cluster;
12+
import redis.clients.jedis.mcf.MultiClusterPooledConnectionProvider.Cluster;
1313

1414
/**
1515
* @author Allen Terleto (aterleto)
@@ -38,7 +38,7 @@ public <T> T executeCommand(CommandObject<T> commandObject) {
3838
supplier.withCircuitBreaker(cluster.getCircuitBreaker());
3939
supplier.withRetry(cluster.getRetry());
4040
supplier.withFallback(provider.getFallbackExceptionList(),
41-
e -> this.handleClusterFailover(commandObject, cluster.getCircuitBreaker()));
41+
e -> this.handleClusterFailover(commandObject, cluster));
4242

4343
return supplier.decorate().get();
4444
}
@@ -47,7 +47,14 @@ public <T> T executeCommand(CommandObject<T> commandObject) {
4747
* Functional interface wrapped in retry and circuit breaker logic to handle happy path scenarios
4848
*/
4949
private <T> T handleExecuteCommand(CommandObject<T> commandObject, Cluster cluster) {
50-
try (Connection connection = cluster.getConnection()) {
50+
Connection connection;
51+
try {
52+
connection = cluster.getConnection();
53+
} catch (JedisConnectionException e) {
54+
provider.assertOperability();
55+
throw e;
56+
}
57+
try {
5158
return connection.executeCommand(commandObject);
5259
} catch (Exception e) {
5360
if (cluster.retryOnFailover() && !isActiveCluster(cluster)
@@ -57,6 +64,8 @@ && isCircuitBreakerTrackedException(e, cluster.getCircuitBreaker())) {
5764
}
5865

5966
throw e;
67+
} finally {
68+
connection.close();
6069
}
6170
}
6271

@@ -73,10 +82,9 @@ private boolean isActiveCluster(Cluster cluster) {
7382
* Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker
7483
* failure scenarios
7584
*/
76-
private <T> T handleClusterFailover(CommandObject<T> commandObject,
77-
CircuitBreaker circuitBreaker) {
85+
private <T> T handleClusterFailover(CommandObject<T> commandObject, Cluster cluster) {
7886

79-
clusterFailover(circuitBreaker);
87+
clusterFailover(cluster);
8088

8189
// Recursive call to the initiating method so the operation can be retried on the next cluster
8290
// connection

src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55
import java.util.concurrent.locks.ReentrantLock;
66
import redis.clients.jedis.annots.Experimental;
77
import redis.clients.jedis.exceptions.JedisConnectionException;
8-
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider;
9-
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider.Cluster;
8+
import redis.clients.jedis.mcf.MultiClusterPooledConnectionProvider.Cluster;
109
import redis.clients.jedis.util.IOUtils;
1110

1211
/**
@@ -38,9 +37,10 @@ public void close() {
3837
* Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker
3938
* failure scenarios
4039
*/
41-
protected void clusterFailover(CircuitBreaker circuitBreaker) {
40+
protected void clusterFailover(Cluster cluster) {
4241
lock.lock();
4342

43+
CircuitBreaker circuitBreaker = cluster.getCircuitBreaker();
4444
try {
4545
// Check state to handle race conditions since iterateActiveCluster() is
4646
// non-idempotent
@@ -52,34 +52,28 @@ protected void clusterFailover(CircuitBreaker circuitBreaker) {
5252

5353
Cluster activeCluster = provider.getCluster();
5454
// This should be possible only if active cluster is switched from by other reasons than
55-
// circuit
56-
// breaker, just before circuit breaker triggers
57-
if (activeCluster.getCircuitBreaker() != circuitBreaker) {
55+
// circuit breaker, just before circuit breaker triggers
56+
if (activeCluster != cluster) {
5857
return;
5958
}
6059

61-
activeCluster.setGracePeriod();
60+
cluster.setGracePeriod();
6261
circuitBreaker.transitionToForcedOpenState();
6362

6463
// Iterating the active cluster will allow subsequent calls to the executeCommand() to use
6564
// the next
6665
// cluster's connection pool - according to the configuration's prioritization/order/weight
67-
// int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex1();
68-
provider.iterateActiveCluster(SwitchReason.CIRCUIT_BREAKER);
66+
provider.switchToHealthyCluster(SwitchReason.CIRCUIT_BREAKER, cluster);
6967
}
7068
// this check relies on the fact that many failover attempts can hit with the same CB,
7169
// only the first one will trigger a failover, and make the CB FORCED_OPEN.
7270
// when the rest reaches here, the active cluster is already the next one, and should be
7371
// different than
7472
// active CB. If its the same one and there are no more clusters to failover to, then throw an
7573
// exception
76-
else if (circuitBreaker == provider.getCluster().getCircuitBreaker()
77-
&& !provider.canIterateOnceMore()) {
78-
throw new JedisConnectionException(
79-
"Cluster/database endpoint could not failover since the MultiClusterClientConfig was not "
80-
+ "provided with an additional cluster/database endpoint according to its prioritized sequence. "
81-
+ "If applicable, consider failing back OR restarting with an available cluster/database endpoint");
82-
}
74+
else if (cluster == provider.getCluster()) {
75+
provider.switchToHealthyCluster(SwitchReason.CIRCUIT_BREAKER, cluster);
76+
}
8377
// Ignore exceptions since we are already in a failure state
8478
} finally {
8579
lock.unlock();

0 commit comments

Comments
 (0)