diff --git a/pom.xml b/pom.xml
index fd9b692e71..3b91194d41 100644
--- a/pom.xml
+++ b/pom.xml
@@ -214,6 +214,12 @@
${resilience4j.version}
true
+
+ org.powermock
+ powermock-module-junit4
+ 2.0.2
+ test
+
io.github.resilience4j
resilience4j-circuitbreaker
@@ -391,7 +397,7 @@
@{failsafeSuffixArgLine} ${JVM_OPTS}
-
+
**/*IntegrationTest.java
**/*IntegrationTests.java
@@ -494,6 +500,7 @@
**/Health*.java
src/main/java/redis/clients/jedis/MultiClusterClientConfig.java
src/main/java/redis/clients/jedis/HostAndPort.java
+ src/main/java/redis/clients/jedis/util/ReadOnlyCommands.java
diff --git a/src/main/java/redis/clients/jedis/JedisClientConfig.java b/src/main/java/redis/clients/jedis/JedisClientConfig.java
index ce7fd82de4..5548b36b64 100644
--- a/src/main/java/redis/clients/jedis/JedisClientConfig.java
+++ b/src/main/java/redis/clients/jedis/JedisClientConfig.java
@@ -6,9 +6,9 @@
import javax.net.ssl.SSLSocketFactory;
import redis.clients.jedis.authentication.AuthXManager;
+import redis.clients.jedis.util.ReadOnlyCommands;
public interface JedisClientConfig {
-
default RedisProtocol getRedisProtocol() {
return null;
}
diff --git a/src/main/java/redis/clients/jedis/JedisSentineled.java b/src/main/java/redis/clients/jedis/JedisSentineled.java
index 26f208a03b..147ac351b5 100644
--- a/src/main/java/redis/clients/jedis/JedisSentineled.java
+++ b/src/main/java/redis/clients/jedis/JedisSentineled.java
@@ -7,6 +7,7 @@
import redis.clients.jedis.csc.CacheConfig;
import redis.clients.jedis.csc.CacheFactory;
import redis.clients.jedis.providers.SentineledConnectionProvider;
+import redis.clients.jedis.util.ReadOnlyCommands;
public class JedisSentineled extends UnifiedJedis {
@@ -37,6 +38,21 @@ public JedisSentineled(String masterName, final JedisClientConfig masterClientCo
masterClientConfig.getRedisProtocol());
}
+ public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig,
+ final GenericObjectPoolConfig poolConfig,
+ Set sentinels, final JedisClientConfig sentinelClientConfig, ReadFrom readFrom) {
+ super(new SentineledConnectionProvider(masterName, masterClientConfig, poolConfig, sentinels, sentinelClientConfig, readFrom),
+ masterClientConfig.getRedisProtocol());
+ }
+
+ public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig,
+ final GenericObjectPoolConfig poolConfig,
+ Set sentinels, final JedisClientConfig sentinelClientConfig, ReadFrom readFrom,
+ ReadOnlyCommands.ReadOnlyPredicate readOnlyPredicate) {
+ super(new SentineledConnectionProvider(masterName, masterClientConfig, poolConfig, sentinels, sentinelClientConfig, readFrom, readOnlyPredicate),
+ masterClientConfig.getRedisProtocol());
+ }
+
@Experimental
public JedisSentineled(String masterName, final JedisClientConfig masterClientConfig, Cache clientSideCache,
final GenericObjectPoolConfig poolConfig,
diff --git a/src/main/java/redis/clients/jedis/ReadFrom.java b/src/main/java/redis/clients/jedis/ReadFrom.java
new file mode 100644
index 0000000000..5ac339009b
--- /dev/null
+++ b/src/main/java/redis/clients/jedis/ReadFrom.java
@@ -0,0 +1,12 @@
+package redis.clients.jedis;
+
+public enum ReadFrom {
+ // read from the upstream only.
+ UPSTREAM,
+ // read from the replica only.
+ REPLICA,
+ // read preferred from the upstream and fall back to a replica if the upstream is not available.
+ UPSTREAM_PREFERRED,
+ // read preferred from replica and fall back to upstream if no replica is not available.
+ REPLICA_PREFERRED
+}
diff --git a/src/main/java/redis/clients/jedis/providers/SentineledConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/SentineledConnectionProvider.java
index dedf34fb69..e04a23b050 100644
--- a/src/main/java/redis/clients/jedis/providers/SentineledConnectionProvider.java
+++ b/src/main/java/redis/clients/jedis/providers/SentineledConnectionProvider.java
@@ -3,6 +3,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
@@ -19,13 +20,24 @@
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisClientConfig;
import redis.clients.jedis.JedisPubSub;
+import redis.clients.jedis.ReadFrom;
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.csc.Cache;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.IOUtils;
+import redis.clients.jedis.util.ReadOnlyCommands;
public class SentineledConnectionProvider implements ConnectionProvider {
+ class PoolInfo {
+ public String host;
+ public ConnectionPool pool;
+
+ public PoolInfo(String host, ConnectionPool pool) {
+ this.host = host;
+ this.pool = pool;
+ }
+ }
private static final Logger LOG = LoggerFactory.getLogger(SentineledConnectionProvider.class);
@@ -49,8 +61,18 @@ public class SentineledConnectionProvider implements ConnectionProvider {
private final long subscribeRetryWaitTimeMillis;
+ private final ReadFrom readFrom;
+
+ private ReadOnlyCommands.ReadOnlyPredicate READ_ONLY_COMMANDS;
+
private final Lock initPoolLock = new ReentrantLock(true);
+ private final List slavePools = new ArrayList<>();
+
+ private final Lock slavePoolsLock = new ReentrantLock(true);
+
+ private int poolIndex;
+
public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
Set sentinels, final JedisClientConfig sentinelClientConfig) {
this(masterName, masterClientConfig, null, null, sentinels, sentinelClientConfig);
@@ -69,26 +91,41 @@ public SentineledConnectionProvider(String masterName, final JedisClientConfig m
DEFAULT_SUBSCRIBE_RETRY_WAIT_TIME_MILLIS);
}
+ public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
+ final GenericObjectPoolConfig poolConfig,
+ Set sentinels, final JedisClientConfig sentinelClientConfig, ReadFrom readFrom) {
+ this(masterName, masterClientConfig, null, poolConfig, sentinels, sentinelClientConfig,
+ DEFAULT_SUBSCRIBE_RETRY_WAIT_TIME_MILLIS, readFrom, ReadOnlyCommands.asPredicate());
+ }
+
+ public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
+ final GenericObjectPoolConfig poolConfig,
+ Set sentinels, final JedisClientConfig sentinelClientConfig, ReadFrom readFrom,
+ ReadOnlyCommands.ReadOnlyPredicate readOnlyPredicate) {
+ this(masterName, masterClientConfig, null, poolConfig, sentinels, sentinelClientConfig,
+ DEFAULT_SUBSCRIBE_RETRY_WAIT_TIME_MILLIS, readFrom, readOnlyPredicate);
+ }
+
@Experimental
public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
Cache clientSideCache, final GenericObjectPoolConfig poolConfig,
Set sentinels, final JedisClientConfig sentinelClientConfig) {
this(masterName, masterClientConfig, clientSideCache, poolConfig, sentinels, sentinelClientConfig,
- DEFAULT_SUBSCRIBE_RETRY_WAIT_TIME_MILLIS);
+ DEFAULT_SUBSCRIBE_RETRY_WAIT_TIME_MILLIS, ReadFrom.UPSTREAM, ReadOnlyCommands.asPredicate());
}
public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
final GenericObjectPoolConfig poolConfig,
Set sentinels, final JedisClientConfig sentinelClientConfig,
final long subscribeRetryWaitTimeMillis) {
- this(masterName, masterClientConfig, null, poolConfig, sentinels, sentinelClientConfig, subscribeRetryWaitTimeMillis);
+ this(masterName, masterClientConfig, null, poolConfig, sentinels, sentinelClientConfig, subscribeRetryWaitTimeMillis, ReadFrom.UPSTREAM, ReadOnlyCommands.asPredicate());
}
@Experimental
public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
Cache clientSideCache, final GenericObjectPoolConfig poolConfig,
Set sentinels, final JedisClientConfig sentinelClientConfig,
- final long subscribeRetryWaitTimeMillis) {
+ final long subscribeRetryWaitTimeMillis, ReadFrom readFrom, ReadOnlyCommands.ReadOnlyPredicate readOnlyPredicate) {
this.masterName = masterName;
this.masterClientConfig = masterClientConfig;
@@ -97,11 +134,49 @@ public SentineledConnectionProvider(String masterName, final JedisClientConfig m
this.sentinelClientConfig = sentinelClientConfig;
this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
+ this.readFrom = readFrom;
+ this.READ_ONLY_COMMANDS = readOnlyPredicate;
HostAndPort master = initSentinels(sentinels);
initMaster(master);
}
+ private Connection getSlaveResource() {
+ int startIdx;
+ slavePoolsLock.lock();
+ try {
+ poolIndex++;
+ if (poolIndex >= slavePools.size()) {
+ poolIndex = 0;
+ }
+ startIdx = poolIndex;
+ } finally {
+ slavePoolsLock.unlock();
+ }
+ return _getSlaveResource(startIdx, 0);
+ }
+
+ private Connection _getSlaveResource(int idx, int cnt) {
+ PoolInfo poolInfo;
+ slavePoolsLock.lock();
+ try {
+ if (cnt >= slavePools.size()) {
+ return null;
+ }
+ poolInfo = slavePools.get(idx % slavePools.size());
+ } finally {
+ slavePoolsLock.unlock();
+ }
+
+ try {
+ Connection jedis = poolInfo.pool.getResource();
+ return jedis;
+ } catch (Exception e) {
+ LOG.error("get connection fail:", e);
+ return _getSlaveResource(idx + 1, cnt + 1);
+ }
+ }
+
@Override
public Connection getConnection() {
return pool.getResource();
@@ -109,7 +184,43 @@ public Connection getConnection() {
@Override
public Connection getConnection(CommandArguments args) {
- return pool.getResource();
+ boolean isReadCommand = READ_ONLY_COMMANDS.isReadOnly(args);
+ if (!isReadCommand) {
+ return pool.getResource();
+ }
+
+ Connection conn;
+ switch (readFrom) {
+ case REPLICA:
+ conn = getSlaveResource();
+ if (conn == null) {
+ throw new JedisException("all replica is invalid");
+ }
+ return conn;
+ case UPSTREAM_PREFERRED:
+ try {
+ conn = pool.getResource();
+ if (conn != null) {
+ return conn;
+ }
+ } catch (Exception e) {
+ LOG.error("get master connection error", e);
+ }
+
+ conn = getSlaveResource();
+ if (conn == null) {
+ throw new JedisException("all redis instance is invalid");
+ }
+ return conn;
+ case REPLICA_PREFERRED:
+ conn = getSlaveResource();
+ if (conn != null) {
+ return conn;
+ }
+ return pool.getResource();
+ default:
+ return pool.getResource();
+ }
}
@Override
@@ -117,6 +228,10 @@ public void close() {
sentinelListeners.forEach(SentinelListener::shutdown);
pool.close();
+
+ for (PoolInfo slavePool : slavePools) {
+ slavePool.pool.close();
+ }
}
public HostAndPort getCurrentMaster() {
@@ -167,6 +282,88 @@ private ConnectionPool createNodePool(HostAndPort master) {
}
}
+ private void initSlaves(List slaves) {
+ List removedSlavePools = new ArrayList<>();
+ slavePoolsLock.lock();
+ try {
+ for (int i = slavePools.size()-1; i >= 0; i--) {
+ PoolInfo poolInfo = slavePools.get(i);
+ boolean found = false;
+ for (HostAndPort slave : slaves) {
+ String host = slave.toString();
+ if (poolInfo.host.equals(host)) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ removedSlavePools.add(slavePools.remove(i));
+ }
+ }
+
+ for (HostAndPort slave : slaves) {
+ addSlave(slave);
+ }
+ } finally {
+ slavePoolsLock.unlock();
+ if (!removedSlavePools.isEmpty() && clientSideCache != null) {
+ clientSideCache.flush();
+ }
+
+ for (PoolInfo removedSlavePool : removedSlavePools) {
+ removedSlavePool.pool.destroy();
+ }
+ }
+ }
+
+ private static boolean isHealthy(String flags) {
+ for (String flag : flags.split(",")) {
+ switch (flag.trim()) {
+ case "s_down":
+ case "o_down":
+ case "disconnected":
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void addSlave(HostAndPort slave) {
+ String newSlaveHost = slave.toString();
+ slavePoolsLock.lock();
+ try {
+ for (int i = 0; i < this.slavePools.size(); i++) {
+ PoolInfo poolInfo = this.slavePools.get(i);
+ if (poolInfo.host.equals(newSlaveHost)) {
+ return;
+ }
+ }
+ slavePools.add(new PoolInfo(newSlaveHost, createNodePool(slave)));
+ } finally {
+ slavePoolsLock.unlock();
+ }
+ }
+
+ private void removeSlave(HostAndPort slave) {
+ String newSlaveHost = slave.toString();
+ PoolInfo removed = null;
+ slavePoolsLock.lock();
+ try {
+ for (int i = 0; i < this.slavePools.size(); i++) {
+ PoolInfo poolInfo = this.slavePools.get(i);
+ if (poolInfo.host.equals(newSlaveHost)) {
+ removed = slavePools.remove(i);
+ break;
+ }
+ }
+ } finally {
+ slavePoolsLock.unlock();
+ }
+ if (removed != null) {
+ removed.pool.destroy();
+ }
+ }
+
private HostAndPort initSentinels(Set sentinels) {
HostAndPort master = null;
@@ -262,6 +459,24 @@ public void run() {
sentinelJedis = new Jedis(node, sentinelClientConfig);
+ List