Skip to content

Commit 397f437

Browse files
committed
- introduce clusterSwitchEvent and drop clusterFailover post processor
- fix broken echostrategy due to connection issue - make healtthCheckStrategy closable and close on - adding fastfailover mode to config and provider - add local failover tests for total failover duration
1 parent 1ae7219 commit 397f437

14 files changed

+466
-64
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@ tags
1717
redis-git
1818
appendonlydir/
1919
.DS_Store
20+
.vscode/settings.json

src/main/java/redis/clients/jedis/MultiClusterClientConfig.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ public static interface StrategySupplier {
161161
/** Grace period in milliseconds to keep clusters disabled after they become unhealthy */
162162
private long gracePeriod;
163163

164+
/** Whether to force terminate connections forcefully on failover */
165+
private boolean fastFailover;
166+
164167
public MultiClusterClientConfig(ClusterConfig[] clusterConfigs) {
165168
if (clusterConfigs == null || clusterConfigs.length < 1) throw new JedisValidationException(
166169
"ClusterClientConfigs are required for MultiClusterPooledConnectionProvider");
@@ -248,6 +251,10 @@ public long getGracePeriod() {
248251
return gracePeriod;
249252
}
250253

254+
public boolean isFastFailover() {
255+
return fastFailover;
256+
}
257+
251258
public static Builder builder(ClusterConfig[] clusterConfigs) {
252259
return new Builder(clusterConfigs);
253260
}
@@ -384,6 +391,8 @@ public static class Builder {
384391
private long failbackCheckInterval = FAILBACK_CHECK_INTERVAL_DEFAULT;
385392
private long gracePeriod = GRACE_PERIOD_DEFAULT;
386393

394+
private boolean fastFailover = false;
395+
387396
public Builder(ClusterConfig[] clusterConfigs) {
388397

389398
if (clusterConfigs == null || clusterConfigs.length < 1) throw new JedisValidationException(
@@ -496,6 +505,11 @@ public Builder gracePeriod(long gracePeriod) {
496505
return this;
497506
}
498507

508+
public Builder fastFailover(boolean fastFailover) {
509+
this.fastFailover = fastFailover;
510+
return this;
511+
}
512+
499513
public MultiClusterClientConfig build() {
500514
MultiClusterClientConfig config = new MultiClusterClientConfig(this.clusterConfigs);
501515

@@ -525,6 +539,7 @@ public MultiClusterClientConfig build() {
525539
config.isFailbackSupported = this.isFailbackSupported;
526540
config.failbackCheckInterval = this.failbackCheckInterval;
527541
config.gracePeriod = this.gracePeriod;
542+
config.fastFailover = this.fastFailover;
528543

529544
return config;
530545
}

src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,7 @@ protected void clusterFailover(CircuitBreaker circuitBreaker) {
5959
// Iterating the active cluster will allow subsequent calls to the executeCommand() to use the next
6060
// cluster's connection pool - according to the configuration's prioritization/order/weight
6161
// int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex1();
62-
if (provider.iterateActiveCluster() != null) {
63-
64-
// Implementation is optionally provided during configuration. Typically, used for
65-
// activeMultiClusterIndex persistence or custom logging
66-
provider.runClusterFailoverPostProcessor(provider.getCluster());
67-
}
62+
provider.iterateActiveCluster(SwitchReason.CIRCUIT_BREAKER);
6863
}
6964
// this check relies on the fact that many failover attempts can hit with the same CB,
7065
// only the first one will trigger a failover, and make the CB FORCED_OPEN.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package redis.clients.jedis.mcf;
2+
3+
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider.Cluster;
4+
5+
public class ClusterSwitchEventArgs {
6+
7+
private final SwitchReason reason;
8+
private final String ClusterName;
9+
private final Endpoint Endpoint;
10+
11+
public ClusterSwitchEventArgs(SwitchReason reason, Endpoint endpoint, Cluster cluster) {
12+
this.reason = reason;
13+
this.ClusterName = cluster.getCircuitBreaker().getName();
14+
this.Endpoint = endpoint;
15+
}
16+
17+
public SwitchReason getReason() {
18+
return reason;
19+
}
20+
21+
public String getClusterName() {
22+
return ClusterName;
23+
}
24+
25+
public Endpoint getEndpoint() {
26+
return Endpoint;
27+
}
28+
29+
}

src/main/java/redis/clients/jedis/mcf/EchoStrategy.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package redis.clients.jedis.mcf;
22

3-
import redis.clients.jedis.ConnectionFactory;
43
import redis.clients.jedis.HostAndPort;
54
import redis.clients.jedis.JedisClientConfig;
65
import redis.clients.jedis.UnifiedJedis;
7-
import redis.clients.jedis.exceptions.JedisConnectionException;
86
import redis.clients.jedis.MultiClusterClientConfig.StrategySupplier;
97

108
public class EchoStrategy implements HealthCheckStrategy {
@@ -20,12 +18,7 @@ public EchoStrategy(HostAndPort hostAndPort, JedisClientConfig jedisClientConfig
2018
public EchoStrategy(HostAndPort hostAndPort, JedisClientConfig jedisClientConfig, int interval, int timeout) {
2119
this.interval = interval;
2220
this.timeout = timeout;
23-
ConnectionFactory connFactory = new ConnectionFactory(hostAndPort, jedisClientConfig);
24-
try {
25-
this.jedis = new UnifiedJedis(connFactory.makeObject().getObject());
26-
} catch (Exception e) {
27-
throw new JedisConnectionException("HealthCheck connection Failed!", e);
28-
}
21+
this.jedis = new UnifiedJedis(hostAndPort, jedisClientConfig);
2922
}
3023

3124
@Override
@@ -43,6 +36,11 @@ public HealthStatus doHealthCheck(Endpoint endpoint) {
4336
return "HealthCheck".equals(jedis.echo("HealthCheck")) ? HealthStatus.HEALTHY : HealthStatus.UNHEALTHY;
4437
}
4538

39+
@Override
40+
public void close() {
41+
jedis.close();
42+
}
43+
4644
public static final StrategySupplier DEFAULT = (hostAndPort, jedisClientConfig) -> {
4745
return new EchoStrategy(hostAndPort, jedisClientConfig);
4846
};

src/main/java/redis/clients/jedis/mcf/HealthCheck.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,13 @@
1212
import java.util.concurrent.atomic.AtomicReference;
1313
import java.util.function.Consumer;
1414

15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
1518
public class HealthCheck {
1619

20+
private static final Logger log = LoggerFactory.getLogger(HealthCheck.class);
21+
1722
private Endpoint endpoint;
1823
private HealthCheckStrategy strategy;
1924
private AtomicReference<SimpleEntry<Long, HealthStatus>> statusRef = new AtomicReference<SimpleEntry<Long, HealthStatus>>();
@@ -43,6 +48,7 @@ public void start() {
4348
}
4449

4550
public void stop() {
51+
strategy.close();
4652
this.statusChangeCallback = null;
4753
scheduler.shutdown();
4854
executor.shutdown();
@@ -67,6 +73,7 @@ private void healthCheck() {
6773
Future<?> future = executor.submit(() -> {
6874
HealthStatus newStatus = strategy.doHealthCheck(endpoint);
6975
safeUpdate(me, newStatus);
76+
log.trace("Health check completed for {} with status {}", endpoint, newStatus);
7077
});
7178

7279
try {
@@ -75,11 +82,13 @@ private void healthCheck() {
7582
// Cancel immediately on timeout or exec exception
7683
future.cancel(true);
7784
safeUpdate(me, HealthStatus.UNHEALTHY);
85+
log.warn("Health check timed out or failed for {}", endpoint, e);
7886
} catch (InterruptedException e) {
7987
// Health check thread was interrupted
8088
future.cancel(true);
8189
safeUpdate(me, HealthStatus.UNHEALTHY);
8290
Thread.currentThread().interrupt(); // Restore interrupted status
91+
log.warn("Health check interrupted for {}", endpoint, e);
8392
}
8493
}
8594

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
package redis.clients.jedis.mcf;
22

3-
public interface HealthCheckStrategy {
3+
import java.io.Closeable;
4+
5+
public interface HealthCheckStrategy extends Closeable {
46

57
int getInterval();
68

79
int getTimeout();
810

911
HealthStatus doHealthCheck(Endpoint endpoint);
1012

13+
default void close() {
14+
}
15+
1116
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package redis.clients.jedis.mcf;
2+
3+
public enum SwitchReason {
4+
HEALTH_CHECK, CIRCUIT_BREAKER, FAILBACK, FORCED
5+
}

0 commit comments

Comments
 (0)