Skip to content
Closed
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
8a9f876
- weighted cluster seleciton
atakavci Jun 27, 2025
d514ecf
- add builder for ClusterConfig
atakavci Jul 9, 2025
df66b1e
- fix naming
atakavci Jul 9, 2025
13757f5
clean up and mark override methods
atakavci Jul 10, 2025
ef5d83a
fix link in javadoc
atakavci Jul 10, 2025
a15fc64
fix formatting
atakavci Jul 10, 2025
cf38240
- fix double registered listeners in healtstatusmgr
atakavci Jul 14, 2025
c2fb34c
Update src/main/java/redis/clients/jedis/mcf/EchoStrategy.java
atakavci Jul 16, 2025
ade866d
- add remove endpoints
atakavci Jul 11, 2025
ca3378d
- replace cluster disabled with failbackCandidate
atakavci Jul 15, 2025
ddcec73
- remove failback candidate
atakavci Jul 16, 2025
c1b6d5f
- fix remove logic
atakavci Jul 16, 2025
ff16330
- periodic failback checks
atakavci Jul 17, 2025
c39fda1
- introduce StatusTracker with purpose of waiting initial healthcheck…
atakavci Jul 19, 2025
975ab78
- introduce forceActiveCluster by duration
atakavci Jul 19, 2025
405101e
- fix failing tests by waiting on clusters to get healthy
atakavci Jul 23, 2025
607c66d
- fix failing scenario test
atakavci Jul 23, 2025
aaac8f7
- adressing reviews and feedback
atakavci Jul 23, 2025
2ffffef
- fix formatting
atakavci Jul 23, 2025
e6e1121
- fix formatting
atakavci Jul 23, 2025
b8d4e87
- get rid of the queue and event ordering for healthstatus change in …
atakavci Jul 24, 2025
1ae7219
- replace use of reflection with helper methods
atakavci Jul 24, 2025
397f437
- introduce clusterSwitchEvent and drop clusterFailover post processor
atakavci Jul 31, 2025
ab05e6c
- introduce fastfailover using objectMaker injection into connection…
atakavci Jul 31, 2025
de034f4
- polish
atakavci Jul 31, 2025
df3d555
- cleanup
atakavci Jul 31, 2025
3352260
- improve healtcheck thread visibility
atakavci Jul 31, 2025
74f024f
- fix threads waiting on ConnectionPool resources to return
atakavci Aug 4, 2025
db23079
- formatting
atakavci Aug 4, 2025
b2ebe2d
- fix failing tests due to mocked ctor for trackingConnectionPool
atakavci Aug 6, 2025
11c4d2b
- fix test , replace mock ctors for TrackingConnectionPool
atakavci Aug 7, 2025
f4eae58
- make Tracking pool wait for ongoing inits in forceDisconnect
atakavci Aug 7, 2025
4c86919
- fix failover test by checking time and endpoint
atakavci Aug 7, 2025
9a1da64
- manage the case with failovers from multiple reasons.
atakavci Aug 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ tags
redis-git
appendonlydir/
.DS_Store
.vscode/settings.json
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.3.15</version>
<version>1.2.12</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public final CommandObject<String> ping() {
return PING_COMMAND_OBJECT;
}

public final CommandObject<String> echo(String msg) {
return new CommandObject<>(commandArguments(ECHO).add(msg), BuilderFactory.STRING);
}

private final CommandObject<String> FLUSHALL_COMMAND_OBJECT = new CommandObject<>(commandArguments(FLUSHALL), BuilderFactory.STRING);

public final CommandObject<String> flushAll() {
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ public void disconnect() {
}
}

public void forceDisconnect() throws IOException {
socket.close();
}

public boolean isConnected() {
return socket != null && socket.isBound() && !socket.isClosed() && socket.isConnected()
&& !socket.isInputShutdown() && !socket.isOutputShutdown();
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/redis/clients/jedis/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.slf4j.LoggerFactory;

import java.util.function.Supplier;
import java.util.function.UnaryOperator;

import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.authentication.AuthXManager;
Expand All @@ -21,12 +22,15 @@
*/
public class ConnectionFactory implements PooledObjectFactory<Connection> {

public interface MakerInjector extends UnaryOperator<Supplier<Connection>> {
};

private static final Logger logger = LoggerFactory.getLogger(ConnectionFactory.class);

private final JedisSocketFactory jedisSocketFactory;
private final JedisClientConfig clientConfig;
private final Cache clientSideCache;
private final Supplier<Connection> objectMaker;
private Supplier<Connection> objectMaker;

private final AuthXEventListener authXEventListener;

Expand Down Expand Up @@ -73,6 +77,10 @@ private Supplier<Connection> connectionSupplier() {
: () -> new CacheConnection(jedisSocketFactory, clientConfig, clientSideCache);
}

public void injectMaker(MakerInjector injector) {
this.objectMaker = injector.apply(objectMaker);
}

@Override
public void activateObject(PooledObject<Connection> pooledConnection) throws Exception {
// what to do ??
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/redis/clients/jedis/HostAndPort.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import java.io.Serializable;

public class HostAndPort implements Serializable {
import redis.clients.jedis.mcf.Endpoint;

public class HostAndPort implements Serializable, Endpoint {

private static final long serialVersionUID = -519876229978427751L;

Expand All @@ -14,10 +16,12 @@ public HostAndPort(String host, int port) {
this.port = port;
}

@Override
public String getHost() {
return host;
}

@Override
public int getPort() {
return port;
}
Expand Down
188 changes: 178 additions & 10 deletions src/main/java/redis/clients/jedis/MultiClusterClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisValidationException;
import redis.clients.jedis.mcf.ConnectionFailoverException;
import redis.clients.jedis.mcf.EchoStrategy;
import redis.clients.jedis.mcf.HealthCheckStrategy;

/**
* @author Allen Terleto (aterleto)
Expand All @@ -31,6 +33,19 @@
@Experimental
public final class MultiClusterClientConfig {

/**
* Interface for creating HealthCheckStrategy instances for specific endpoints
*/
public static interface StrategySupplier {
/**
* Creates a HealthCheckStrategy for the given endpoint.
* @param hostAndPort the endpoint to create a strategy for
* @param jedisClientConfig the client configuration, may be null for implementations that don't need it
* @return a HealthCheckStrategy instance
*/
HealthCheckStrategy get(HostAndPort hostAndPort, JedisClientConfig jedisClientConfig);
}

private static final int RETRY_MAX_ATTEMPTS_DEFAULT = 3;
private static final int RETRY_WAIT_DURATION_DEFAULT = 500; // measured in milliseconds
private static final int RETRY_WAIT_DURATION_EXPONENTIAL_BACKOFF_MULTIPLIER_DEFAULT = 2;
Expand All @@ -48,6 +63,9 @@ public final class MultiClusterClientConfig {
private static final List<Class<? extends Throwable>> FALLBACK_EXCEPTIONS_DEFAULT = Arrays
.asList(CallNotPermittedException.class, ConnectionFailoverException.class);

private static final long FAILBACK_CHECK_INTERVAL_DEFAULT = 5000; // 5 seconds
private static final long GRACE_PERIOD_DEFAULT = 10000; // 10 seconds

private final ClusterConfig[] clusterConfigs;

//////////// Retry Config - https://resilience4j.readme.io/docs/retry ////////////
Expand Down Expand Up @@ -129,7 +147,30 @@ public final class MultiClusterClientConfig {

private List<Class<? extends Throwable>> fallbackExceptionList;

//////////// Failover Config ////////////

/** Whether to retry failed commands during failover */
private boolean retryOnFailover;

/** Whether failback is supported by client */
private boolean isFailbackSupported;

/** Interval in milliseconds to wait before attempting failback to a recovered cluster */
private long failbackCheckInterval;

/** Grace period in milliseconds to keep clusters disabled after they become unhealthy */
private long gracePeriod;

/** Whether to force terminate connections forcefully on failover */
private boolean fastFailover;

public MultiClusterClientConfig(ClusterConfig[] clusterConfigs) {
if (clusterConfigs == null || clusterConfigs.length < 1) throw new JedisValidationException(
"ClusterClientConfigs are required for MultiClusterPooledConnectionProvider");
for (ClusterConfig clusterConfig : clusterConfigs) {
if (clusterConfig == null)
throw new IllegalArgumentException("ClusterClientConfigs must not contain null elements");
}
this.clusterConfigs = clusterConfigs;
}

Expand Down Expand Up @@ -193,13 +234,40 @@ public List<Class<? extends Throwable>> getFallbackExceptionList() {
return fallbackExceptionList;
}

public boolean isRetryOnFailover() {
return retryOnFailover;
}

/** Whether failback is supported by client */
public boolean isFailbackSupported() {
return isFailbackSupported;
}

public long getFailbackCheckInterval() {
return failbackCheckInterval;
}

public long getGracePeriod() {
return gracePeriod;
}

public boolean isFastFailover() {
return fastFailover;
}

public static Builder builder(ClusterConfig[] clusterConfigs) {
return new Builder(clusterConfigs);
}

public static class ClusterConfig {

private int priority;
private HostAndPort hostAndPort;
private JedisClientConfig clientConfig;
private GenericObjectPoolConfig<Connection> connectionPoolConfig;

private float weight = 1.0f;
private StrategySupplier healthCheckStrategySupplier;

public ClusterConfig(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
this.hostAndPort = hostAndPort;
this.clientConfig = clientConfig;
Expand All @@ -212,25 +280,90 @@ public ClusterConfig(HostAndPort hostAndPort, JedisClientConfig clientConfig,
this.connectionPoolConfig = connectionPoolConfig;
}

public int getPriority() {
return priority;
}

private void setPriority(int priority) {
this.priority = priority;
private ClusterConfig(Builder builder) {
this.hostAndPort = builder.hostAndPort;
this.clientConfig = builder.clientConfig;
this.connectionPoolConfig = builder.connectionPoolConfig;
this.weight = builder.weight;
this.healthCheckStrategySupplier = builder.healthCheckStrategySupplier;
}

public HostAndPort getHostAndPort() {
return hostAndPort;
}

public static Builder builder(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
return new Builder(hostAndPort, clientConfig);
}

public JedisClientConfig getJedisClientConfig() {
return clientConfig;
}

public GenericObjectPoolConfig<Connection> getConnectionPoolConfig() {
return connectionPoolConfig;
}

public float getWeight() {
return weight;
}

public StrategySupplier getHealthCheckStrategySupplier() {
return healthCheckStrategySupplier;
}

public static class Builder {
private HostAndPort hostAndPort;
private JedisClientConfig clientConfig;
private GenericObjectPoolConfig<Connection> connectionPoolConfig;

private float weight = 1.0f;
private StrategySupplier healthCheckStrategySupplier = EchoStrategy.DEFAULT;

public Builder(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
this.hostAndPort = hostAndPort;
this.clientConfig = clientConfig;
}

public Builder connectionPoolConfig(GenericObjectPoolConfig<Connection> connectionPoolConfig) {
this.connectionPoolConfig = connectionPoolConfig;
return this;
}

public Builder weight(float weight) {
this.weight = weight;
return this;
}

public Builder healthCheckStrategySupplier(StrategySupplier healthCheckStrategySupplier) {
if (healthCheckStrategySupplier == null) {
throw new IllegalArgumentException("healthCheckStrategySupplier must not be null");
}
this.healthCheckStrategySupplier = healthCheckStrategySupplier;
return this;
}

public Builder healthCheckStrategy(HealthCheckStrategy healthCheckStrategy) {
if (healthCheckStrategy == null) {
throw new IllegalArgumentException("healthCheckStrategy must not be null");
}
this.healthCheckStrategySupplier = (hostAndPort, jedisClientConfig) -> healthCheckStrategy;
return this;
}

public Builder healthCheckEnabled(boolean healthCheckEnabled) {
if (!healthCheckEnabled) {
this.healthCheckStrategySupplier = null;
} else if (healthCheckStrategySupplier == null) {
this.healthCheckStrategySupplier = EchoStrategy.DEFAULT;
}
return this;
}

public ClusterConfig build() {
return new ClusterConfig(this);
}
}
}

public static class Builder {
Expand All @@ -253,14 +386,18 @@ public static class Builder {
private List<Class> circuitBreakerIgnoreExceptionList = null;
private List<Class<? extends Throwable>> fallbackExceptionList = FALLBACK_EXCEPTIONS_DEFAULT;

private boolean retryOnFailover = false;
private boolean isFailbackSupported = true;
private long failbackCheckInterval = FAILBACK_CHECK_INTERVAL_DEFAULT;
private long gracePeriod = GRACE_PERIOD_DEFAULT;

private boolean fastFailover = false;

public Builder(ClusterConfig[] clusterConfigs) {

if (clusterConfigs == null || clusterConfigs.length < 1) throw new JedisValidationException(
"ClusterClientConfigs are required for MultiClusterPooledConnectionProvider");

for (int i = 0; i < clusterConfigs.length; i++)
clusterConfigs[i].setPriority(i + 1);

this.clusterConfigs = clusterConfigs;
}

Expand Down Expand Up @@ -348,6 +485,31 @@ public Builder fallbackExceptionList(List<Class<? extends Throwable>> fallbackEx
return this;
}

public Builder retryOnFailover(boolean retryOnFailover) {
this.retryOnFailover = retryOnFailover;
return this;
}

public Builder failbackSupported(boolean supported) {
this.isFailbackSupported = supported;
return this;
}

public Builder failbackCheckInterval(long failbackCheckInterval) {
this.failbackCheckInterval = failbackCheckInterval;
return this;
}

public Builder gracePeriod(long gracePeriod) {
this.gracePeriod = gracePeriod;
return this;
}

public Builder fastFailover(boolean fastFailover) {
this.fastFailover = fastFailover;
return this;
}

public MultiClusterClientConfig build() {
MultiClusterClientConfig config = new MultiClusterClientConfig(this.clusterConfigs);

Expand All @@ -373,6 +535,12 @@ public MultiClusterClientConfig build() {

config.fallbackExceptionList = this.fallbackExceptionList;

config.retryOnFailover = this.retryOnFailover;
config.isFailbackSupported = this.isFailbackSupported;
config.failbackCheckInterval = this.failbackCheckInterval;
config.gracePeriod = this.gracePeriod;
config.fastFailover = this.fastFailover;

return config;
}
}
Expand Down
Loading
Loading