Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@
<version>${resilience4j.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>2.0.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public final class DefaultJedisClientConfig implements JedisClientConfig {

private final AuthXManager authXManager;

private final boolean fallbackToMaster;

private DefaultJedisClientConfig(DefaultJedisClientConfig.Builder builder) {
this.redisProtocol = builder.redisProtocol;
this.connectionTimeoutMillis = builder.connectionTimeoutMillis;
Expand All @@ -50,6 +52,7 @@ private DefaultJedisClientConfig(DefaultJedisClientConfig.Builder builder) {
this.clientSetInfoConfig = builder.clientSetInfoConfig;
this.readOnlyForRedisClusterReplicas = builder.readOnlyForRedisClusterReplicas;
this.authXManager = builder.authXManager;
this.fallbackToMaster = builder.fallbackToMaster;
}

@Override
Expand Down Expand Up @@ -143,6 +146,11 @@ public boolean isReadOnlyForRedisClusterReplicas() {
return readOnlyForRedisClusterReplicas;
}

@Override
public boolean isFallbackToMaster() {
return fallbackToMaster;
}

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -175,6 +183,8 @@ public static class Builder {

private AuthXManager authXManager = null;

private boolean fallbackToMaster = true;

private Builder() {
}

Expand Down Expand Up @@ -297,6 +307,11 @@ public Builder authXManager(AuthXManager authXManager) {
return this;
}

public Builder fallbackToMaster(boolean fallbackToMaster) {
this.fallbackToMaster = fallbackToMaster;
return this;
}

public Builder from(JedisClientConfig instance) {
this.redisProtocol = instance.getRedisProtocol();
this.connectionTimeoutMillis = instance.getConnectionTimeoutMillis();
Expand All @@ -314,6 +329,7 @@ public Builder from(JedisClientConfig instance) {
this.clientSetInfoConfig = instance.getClientSetInfoConfig();
this.readOnlyForRedisClusterReplicas = instance.isReadOnlyForRedisClusterReplicas();
this.authXManager = instance.getAuthXManager();
this.fallbackToMaster = instance.isFallbackToMaster();
return this;
}
}
Expand Down Expand Up @@ -376,6 +392,8 @@ public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) {

builder.authXManager(copy.getAuthXManager());

builder.fallbackToMaster(copy.isFallbackToMaster());

return builder.build();
}
}
18 changes: 18 additions & 0 deletions src/main/java/redis/clients/jedis/JedisClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import javax.net.ssl.SSLSocketFactory;

import redis.clients.jedis.authentication.AuthXManager;
import redis.clients.jedis.util.Commands;

public interface JedisClientConfig {

Expand Down Expand Up @@ -115,4 +116,21 @@ default boolean isReadOnlyForRedisClusterReplicas() {
default ClientSetInfoConfig getClientSetInfoConfig() {
return ClientSetInfoConfig.DEFAULT;
}

/**
* fallback when no replicas are healthy, default to master
* @return {@code true} - to execute command by master. {@code false} - throw exception.
*/
default boolean isFallbackToMaster() {
return true;
}

/**
* check a Command is READONLY
* @param args
* @return
*/
default boolean isReadCommand(CommandArguments args) {
return Commands.ReadOnlyCommands.contains(args.getCommand());
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are introducing support for READ commands only for the JedisSentineled client.
Because of this, the read-from settings should be configured specifically in JedisSentineled (or its SentineledConnectionProvider) rather than being placed in the generic client configuration.

In addition, I think it is better to determine whether a command is a READ command by exposing a configurable Predicate. This gives users flexibility and avoids the need to override methods. I like how this is achieved in Lettuce:
• Predicate interface: ReadOnlyPredicate
• Example implementation: ReadOnlyCommands
• Configuration: ClientOptions

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
Expand All @@ -26,6 +27,15 @@
import redis.clients.jedis.util.IOUtils;

public class SentineledConnectionProvider implements ConnectionProvider {
class PoolInfo {
public String host;
public ConnectionPool pool;

public PoolInfo(String host, ConnectionPool pool) {
this.host = host;
this.pool = pool;
}
}

private static final Logger LOG = LoggerFactory.getLogger(SentineledConnectionProvider.class);

Expand All @@ -51,6 +61,10 @@ public class SentineledConnectionProvider implements ConnectionProvider {

private final Lock initPoolLock = new ReentrantLock(true);

private final List<PoolInfo> slavePools = new ArrayList<>();

private int poolIndex;

public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
this(masterName, masterClientConfig, null, null, sentinels, sentinelClientConfig);
Expand Down Expand Up @@ -102,13 +116,52 @@ public SentineledConnectionProvider(String masterName, final JedisClientConfig m
initMaster(master);
}

private Connection getSlaveResource() {
int startIdx;
synchronized (slavePools) {
poolIndex++;
if (poolIndex >= slavePools.size()) {
poolIndex = 0;
}
startIdx = poolIndex;
}
return _getSlaveResource(startIdx, 0);
}

private Connection _getSlaveResource(int idx, int cnt) {
PoolInfo poolInfo;
synchronized (slavePools) {
if (cnt >= slavePools.size()) {
return null;
}
poolInfo = slavePools.get(idx % slavePools.size());
}
try {
Connection jedis = poolInfo.pool.getResource();
return jedis;
} catch (Exception e) {
LOG.error("get connection fail:", e);
return _getSlaveResource(idx + 1, cnt + 1);
}
}

@Override
public Connection getConnection() {
return pool.getResource();
}

@Override
public Connection getConnection(CommandArguments args) {
boolean readCommand = masterClientConfig.isReadCommand(args);
if (readCommand) {
Connection slaveConn = getSlaveResource();
if (slaveConn != null) {
return slaveConn;
}
if (!masterClientConfig.isFallbackToMaster()) {
throw new JedisException("can not get Connection, all slave is invalid");
}
}
return pool.getResource();
}
Comment on lines 186 to 224
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment about preserving backword compatibility


Expand All @@ -117,6 +170,10 @@ public void close() {
sentinelListeners.forEach(SentinelListener::shutdown);

pool.close();

for (PoolInfo slavePool : slavePools) {
slavePool.pool.close();
}
}

public HostAndPort getCurrentMaster() {
Expand Down Expand Up @@ -167,6 +224,79 @@ private ConnectionPool createNodePool(HostAndPort master) {
}
}

private void initSlaves(List<HostAndPort> slaves) {
List<PoolInfo> removedSlavePools = new ArrayList<>();
try {
synchronized (slavePools) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synchronized blocks cause carrier thread pinning, which undermines Loom’s scalability model. To avoid this, we replaced synchronized blocks with ReentrantLock in other parts of the driver. (#3480)

We should avoid introducing new synchronized sections here, and instead rely on other type syncronisation.

Loop:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The nested loop with continue Loop reduces readability .

for (int i = slavePools.size()-1; i >= 0; i--) {
PoolInfo poolInfo = slavePools.get(i);
for (HostAndPort slave : slaves) {
String host = slave.toString();
if (poolInfo.host.equals(host)) {
continue Loop;
}
}
removedSlavePools.add(slavePools.remove(i));
}

for (HostAndPort slave : slaves) {
addSlave(slave);
}
}
} finally {
if (!removedSlavePools.isEmpty() && clientSideCache != null) {
clientSideCache.flush();
}

for (PoolInfo removedSlavePool : removedSlavePools) {
removedSlavePool.pool.destroy();
}
}
}

private static boolean isHealthy(String flags) {
for (String flag : flags.split(",")) {
switch (flag.trim()) {
case "s_down":
case "o_down":
case "disconnected":
return false;
}
}
return true;
}

private void addSlave(HostAndPort slave) {
String newSlaveHost = slave.toString();
synchronized (this.slavePools) {
for (int i = 0; i < this.slavePools.size(); i++) {
PoolInfo poolInfo = this.slavePools.get(i);
if (poolInfo.host.equals(newSlaveHost)) {
return;
}
}
slavePools.add(new PoolInfo(newSlaveHost, createNodePool(slave)));
}
}

private void removeSlave(HostAndPort slave) {
String newSlaveHost = slave.toString();
PoolInfo removed = null;
synchronized (this.slavePools) {
for (int i = 0; i < this.slavePools.size(); i++) {
PoolInfo poolInfo = this.slavePools.get(i);
if (poolInfo.host.equals(newSlaveHost)) {
removed = slavePools.remove(i);
break;
}
}
}
if (removed != null) {
removed.pool.destroy();
}
}

private HostAndPort initSentinels(Set<HostAndPort> sentinels) {

HostAndPort master = null;
Expand Down Expand Up @@ -262,6 +392,24 @@ public void run() {

sentinelJedis = new Jedis(node, sentinelClientConfig);

List<Map<String, String>> slaveInfos = sentinelJedis.sentinelSlaves(masterName);

List<HostAndPort> slaves = new ArrayList<>();

for (int i = 0; i < slaveInfos.size(); i++) {
Map<String, String> slaveInfo = slaveInfos.get(i);
String flags = slaveInfo.get("flags");
if (flags == null || !isHealthy(flags)) {
continue;
}
String ip = slaveInfo.get("ip");
int port = Integer.parseInt(slaveInfo.get("port"));
HostAndPort slave = new HostAndPort(ip, port);
slaves.add(slave);
}

initSlaves(slaves);

// code for active refresh
List<String> masterAddr = sentinelJedis.sentinelGetMasterAddrByName(masterName);
if (masterAddr == null || masterAddr.size() != 2) {
Expand All @@ -275,24 +423,58 @@ public void run() {
public void onMessage(String channel, String message) {
LOG.debug("Sentinel {} published: {}.", node, message);

String[] switchMasterMsg = message.split(" ");

if (switchMasterMsg.length > 3) {

if (masterName.equals(switchMasterMsg[0])) {
initMaster(toHostAndPort(switchMasterMsg[3], switchMasterMsg[4]));
} else {
LOG.debug(
"Ignoring message on +switch-master for master {}. Our master is {}.",
switchMasterMsg[0], masterName);
}

} else {
LOG.error("Invalid message received on sentinel {} on channel +switch-master: {}.",
node, message);
String[] switchMsg = message.split(" ");
String slaveIp;
int slavePort;
switch (channel) {
case "+switch-master":
if (switchMsg.length > 3) {
if (masterName.equals(switchMsg[0])) {
initMaster(toHostAndPort(switchMsg[3], switchMsg[4]));
} else {
LOG.debug(
"Ignoring message on +switch-master for master {}. Our master is {}.",
switchMsg[0], masterName);
}
} else {
LOG.error("Invalid message received on sentinel {} on channel +switch-master: {}.",
node, message);
}
break;
case "+sdown":
if (switchMsg[0].equals("master")) {
return;
}
if (!masterName.equals(switchMsg[5])) {
return;
}
slaveIp = switchMsg[2];
slavePort = Integer.parseInt(switchMsg[3]);
removeSlave(new HostAndPort(slaveIp, slavePort));
break;
case "-sdown":
if (!masterName.equals(switchMsg[5])) {
return;
}
slaveIp = switchMsg[2];
slavePort = Integer.parseInt(switchMsg[3]);
addSlave(new HostAndPort(slaveIp, slavePort));
break;
case "+slave":
if (!masterName.equals(switchMsg[5])) {
return;
}
slaveIp = switchMsg[2];
slavePort = Integer.parseInt(switchMsg[3]);
addSlave(new HostAndPort(slaveIp, slavePort));

String masterIp = switchMsg[6];
int masterPort = Integer.parseInt(switchMsg[7]);
removeSlave(new HostAndPort(masterIp, masterPort));
break;
}
}
}, "+switch-master");
}, "+switch-master", "+sdown", "-sdown", "+slave");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on High availability with Redis Sentinel

+sentinel <instance details> -- A new sentinel for this master was detected and attached.
+sdown <instance details> -- The specified instance is now in Subjectively Down state.
-sdown <instance details> -- The specified instance is no longer in Subjectively Down state.
+odown <instance details> -- The specified instance is now in Objectively Down state.
-odown <instance details> -- The specified instance is no longer in Objectively Down state.

Not clear the difference between sdown/odown events. If I am reading it right odown events are send when majority of sentinels consider the node down, and sdown is send when one sentinel considers the node down.

Probably we should react on odown events.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a problem here. When I kill a redis node, +odown is triggered, but when the node comes back online, there is no -odown event.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And +odown will only be triggered when the master goes down, my redis version is 6.2.6


} catch (JedisException e) {

Expand Down
Loading
Loading