From f549d53fd92668d1c3ef6e2f4a24c9826ca06bb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=87=91=E6=9D=BE?= Date: Thu, 24 Jul 2025 11:21:00 +0800 Subject: [PATCH 1/9] Implemented read-write separation based on sentinel --- .../clients/jedis/JedisSentinelSlavePool.java | 475 ++++++++++++++++++ 1 file changed, 475 insertions(+) create mode 100644 src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java diff --git a/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java b/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java new file mode 100644 index 0000000000..043233e38f --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java @@ -0,0 +1,475 @@ +package redis.clients.jedis; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.exceptions.JedisException; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + + +class PoolInfo { + public String host; + public JedisPool pool; + + public PoolInfo(String host, JedisPool pool) { + this.host = host; + this.pool = pool; + } +} + +public class JedisSentinelSlavePool implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(JedisSentinelSlavePool.class); + + private final JedisClientConfig sentinelClientConfig; + + private final JedisClientConfig clientConfig; + + private final GenericObjectPoolConfig poolConfig; + + protected final Collection slaveListeners = new ArrayList<>(); + + private final List slavePools = new ArrayList<>(); + + private int poolIndex; + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig) { + this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, null, + Protocol.DEFAULT_DATABASE); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels) { + this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, null, + Protocol.DEFAULT_DATABASE); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, String password) { + this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, password); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, String password, String sentinelPassword) { + this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, + password, Protocol.DEFAULT_DATABASE, null, Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, sentinelPassword, null); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, int timeout, final String password) { + this(masterName, sentinels, poolConfig, timeout, password, Protocol.DEFAULT_DATABASE); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int timeout) { + this(masterName, sentinels, poolConfig, timeout, null, Protocol.DEFAULT_DATABASE); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final String password) { + this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, password); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, int timeout, final String password, + final int database) { + this(masterName, sentinels, poolConfig, timeout, timeout, null, password, database); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, int timeout, final String user, + final String password, final int database) { + this(masterName, sentinels, poolConfig, timeout, timeout, user, password, database); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, int timeout, final String password, + final int database, final String clientName) { + this(masterName, sentinels, poolConfig, timeout, timeout, password, database, clientName); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, int timeout, final String user, + final String password, final int database, final String clientName) { + this(masterName, sentinels, poolConfig, timeout, timeout, user, password, database, clientName); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, + final String password, final int database) { + this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, null, password, database, null); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, + final String user, final String password, final int database) { + this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, user, password, database, null); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, + final String password, final int database, final String clientName) { + this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, null, password, database, clientName); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, + final String user, final String password, final int database, final String clientName) { + this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, user, password, database, clientName, + Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, null, null, null); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, final int infiniteSoTimeout, + final String user, final String password, final int database, final String clientName) { + this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, infiniteSoTimeout, user, password, database, clientName, + Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, null, null, null); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, + final String password, final int database, final String clientName, + final int sentinelConnectionTimeout, final int sentinelSoTimeout, final String sentinelPassword, + final String sentinelClientName) { + this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, null, password, database, clientName, + sentinelConnectionTimeout, sentinelSoTimeout, null, sentinelPassword, sentinelClientName); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, + final String user, final String password, final int database, final String clientName, + final int sentinelConnectionTimeout, final int sentinelSoTimeout, final String sentinelUser, + final String sentinelPassword, final String sentinelClientName) { + this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, 0, user, password, database, clientName, + sentinelConnectionTimeout, sentinelSoTimeout, sentinelUser, sentinelPassword, sentinelClientName); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, + final int connectionTimeout, final int soTimeout, final int infiniteSoTimeout, + final String user, final String password, final int database, final String clientName, + final int sentinelConnectionTimeout, final int sentinelSoTimeout, final String sentinelUser, + final String sentinelPassword, final String sentinelClientName) { + this(masterName, parseHostAndPorts(sentinels), poolConfig, + DefaultJedisClientConfig.builder().connectionTimeoutMillis(connectionTimeout) + .socketTimeoutMillis(soTimeout).blockingSocketTimeoutMillis(infiniteSoTimeout) + .user(user).password(password).database(database).clientName(clientName).build(), + DefaultJedisClientConfig.builder().connectionTimeoutMillis(sentinelConnectionTimeout) + .socketTimeoutMillis(sentinelSoTimeout).user(sentinelUser).password(sentinelPassword) + .clientName(sentinelClientName).build() + ); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final JedisClientConfig clientConfig, + final JedisClientConfig sentinelClientConfig) { + this.poolConfig = poolConfig; + this.clientConfig = clientConfig; + this.sentinelClientConfig = sentinelClientConfig; + + initSentinels(sentinels, masterName); + } + + private static Set parseHostAndPorts(Set strings) { + return strings.stream().map(HostAndPort::from).collect(Collectors.toSet()); + } + + public void destroy() { + for (SlaveListener m : slaveListeners) { + m.shutdown(); + } + + for (PoolInfo poolInfo : slavePools) { + poolInfo.pool.destroy(); + } + } + + private List initSentinels(Set sentinels, final String masterName) { + + boolean sentinelAvailable = false; + + LOG.info("Trying to find master from available Sentinels..."); + + List slaves = new ArrayList<>(); + + for (HostAndPort sentinel : sentinels) { + + LOG.debug("Connecting to Sentinel {}", sentinel); + + try (Jedis jedis = new Jedis(sentinel, sentinelClientConfig)) { + List> slaveInfos = jedis.sentinelSlaves(masterName); + + // connected to sentinel... + sentinelAvailable = true; + + if (slaveInfos == null || slaveInfos.isEmpty()) { + LOG.warn("Can not get slave addr, master name: {}. Sentinel: {}", masterName, sentinel); + continue; + } + + slaveInfos.forEach(slaveInfo -> { + String ip = slaveInfo.get("ip"); + int port = Integer.parseInt(slaveInfo.get("port")); + HostAndPort slave = new HostAndPort(ip, port); + addSlave(slave); + slaves.add(slave); + }); + + LOG.debug("Found Redis slaves at {}", slaveInfos); + break; + } catch (JedisException e) { + // resolves #1036, it should handle JedisException there's another chance + // of raising JedisDataException + LOG.warn( + "Cannot get master address from sentinel running @ {}. Reason: {}. Trying next one.", sentinel, e); + } + } + + if (slaves.isEmpty()) { + if (sentinelAvailable) { + // can connect to sentinel, but master name seems to not monitored + throw new JedisException("Can connect to sentinel, but " + masterName + + " seems to be not monitored..."); + } else { + throw new JedisConnectionException("All sentinels down, cannot determine where is " + + masterName + " master is running..."); + } + } + + LOG.info("Redis slaves running at {}, starting Sentinel listeners...", slaves); + + for (HostAndPort sentinel : sentinels) { + SlaveListener slaveListener = new SlaveListener(masterName, sentinel.getHost(), sentinel.getPort()); + // whether MasterListener threads are alive or not, process can be stopped + slaveListener.setDaemon(true); + slaveListeners.add(slaveListener); + slaveListener.start(); + } + + return slaves; + } + + private void addSlave(HostAndPort slave) { + String newSlaveHost = slave.toString(); + synchronized (this.slavePools) { + for (int i = 0; i < this.slavePools.size(); i++) { + PoolInfo poolInfo = this.slavePools.get(i); + if (poolInfo.host.equals(newSlaveHost)) { + return; + } + } + slavePools.add(new PoolInfo(newSlaveHost, new JedisPool(this.poolConfig, slave, this.clientConfig))); + } + } + + private void removeSlave(HostAndPort slave) { + String newSlaveHost = slave.toString(); + synchronized (this.slavePools) { + for (int i = 0; i < this.slavePools.size(); i++) { + PoolInfo poolInfo = this.slavePools.get(i); + if (poolInfo.host.equals(newSlaveHost)) { + PoolInfo removed = slavePools.remove(i); + removed.pool.destroy(); + return; + } + } + } + } + + public Jedis getResource() { + int startIdx; + synchronized (slavePools) { + poolIndex++; + if (poolIndex >= slavePools.size()) { + poolIndex = 0; + } + startIdx = poolIndex; + } + return _getResource(startIdx, 0); + } + + public Jedis _getResource(int idx, int cnt) { + PoolInfo poolInfo; + synchronized (slavePools) { + if (cnt >= slavePools.size()) { + throw new RuntimeException("can not get Jedis Object, all slave is invalid"); + } + poolInfo = slavePools.get(idx % slavePools.size()); + } + try { + Jedis jedis = poolInfo.pool.getResource(); + return jedis; + } catch (Exception e) { + LOG.error("get connection fail:", e); + return _getResource(idx+1, cnt + 1); + } + } + + @Override + public void close() { + destroy(); + } + + protected class SlaveListener extends Thread { + + protected String masterName; + protected String host; + protected int port; + protected long subscribeRetryWaitTimeMillis = 5000; + protected volatile Jedis j; + protected AtomicBoolean running = new AtomicBoolean(false); + + protected SlaveListener() { + } + + public SlaveListener(String masterName, String host, int port) { + super(String.format("MasterListener-%s-[%s:%d]", masterName, host, port)); + this.masterName = masterName; + this.host = host; + this.port = port; + } + + public SlaveListener(String masterName, String host, int port, + long subscribeRetryWaitTimeMillis) { + this(masterName, host, port); + this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis; + } + + @Override + public void run() { + + running.set(true); + + while (running.get()) { + + try { + // double check that it is not being shutdown + if (!running.get()) { + break; + } + + final HostAndPort hostPort = new HostAndPort(host, port); + j = new Jedis(hostPort, sentinelClientConfig); + + j.subscribe(new JedisPubSub() { + @Override + public void onMessage(String channel, String message) { +// System.out.println("Sentinel " + hostPort + " channel: " + channel + " published: " + message); + LOG.info("Sentinel: {}, channel: {}, published: {}.", hostPort, channel, message); + /* +// 直接kill进行主观下线: 10.148.17.43:6379 +Sentinel 10.148.17.43:26381 channel: +sdown published: slave 10.148.17.43:6379 10.148.17.43 6379 @ mymaster 10.148.17.43 6380 +Sentinel 10.148.17.43:26379 channel: +sdown published: slave 10.148.17.43:6379 10.148.17.43 6379 @ mymaster 10.148.17.43 6380 +Sentinel 10.148.17.43:26380 channel: +sdown published: slave 10.148.17.43:6379 10.148.17.43 6379 @ mymaster 10.148.17.43 6380 + + +// 重启启动被kill的节点: 10.148.17.43:6379 +Sentinel 10.148.17.43:26381 channel: -sdown published: slave 10.148.17.43:6379 10.148.17.43 6379 @ mymaster 10.148.17.43 6380 +Sentinel 10.148.17.43:26379 channel: -sdown published: slave 10.148.17.43:6379 10.148.17.43 6379 @ mymaster 10.148.17.43 6380 +Sentinel 10.148.17.43:26380 channel: -sdown published: slave 10.148.17.43:6379 10.148.17.43 6379 @ mymaster 10.148.17.43 6380 + + +// 使用SENTINEL failover mymaster命令手动进行切主,切主前master为10.148.17.43:6380, 切主后为10.148.17.43:6381 +Sentinel 10.148.17.43:26381 channel: +slave published: slave 10.148.17.43:6379 10.148.17.43 6379 @ mymaster 10.148.17.43 6381 +Sentinel 10.148.17.43:26379 channel: +slave published: slave 10.148.17.43:6379 10.148.17.43 6379 @ mymaster 10.148.17.43 6381 +Sentinel 10.148.17.43:26379 channel: +slave published: slave 10.148.17.43:6380 10.148.17.43 6380 @ mymaster 10.148.17.43 6381 +Sentinel 10.148.17.43:26381 channel: +slave published: slave 10.148.17.43:6380 10.148.17.43 6380 @ mymaster 10.148.17.43 6381 +Sentinel 10.148.17.43:26380 channel: +sdown published: master mymaster 10.148.17.43 6380 +Sentinel 10.148.17.43:26380 channel: +slave published: slave 10.148.17.43:6379 10.148.17.43 6379 @ mymaster 10.148.17.43 6381 +Sentinel 10.148.17.43:26380 channel: +slave published: slave 10.148.17.43:6380 10.148.17.43 6380 @ mymaster 10.148.17.43 6381 +*/ + String[] switchMasterMsg = message.split(" "); + String slaveIp; + int slavePort; + switch (channel) { + case "+sdown": + if (switchMasterMsg[0].equals("master")) { + // 应该是手动切主的消息,忽略即可 + return; + } + if (!masterName.equals(switchMasterMsg[5])) { + // 不是我们监控的集群,也直接返回 + return; + } + slaveIp = switchMasterMsg[2]; + slavePort = Integer.parseInt(switchMasterMsg[3]); + removeSlave(new HostAndPort(slaveIp, slavePort)); + break; + case "-sdown": + if (!masterName.equals(switchMasterMsg[5])) { + // 不是我们监控的集群,直接返回 + return; + } + slaveIp = switchMasterMsg[2]; + slavePort = Integer.parseInt(switchMasterMsg[3]); + addSlave(new HostAndPort(slaveIp, slavePort)); + break; + case "+slave": + if (!masterName.equals(switchMasterMsg[5])) { + // 不是我们监控的集群,直接返回 + return; + } + // 先尝试添加slave + slaveIp = switchMasterMsg[2]; + slavePort = Integer.parseInt(switchMasterMsg[3]); + addSlave(new HostAndPort(slaveIp, slavePort)); + // 将新主节点移除 + String masterIp = switchMasterMsg[6]; + int masterPort = Integer.parseInt(switchMasterMsg[7]); + removeSlave(new HostAndPort(masterIp, masterPort)); + break; + } + } + + /* + +sdown:节点被判定为 subjective down(主观下线) + + -sdown:节点恢复 + + +slave:某个 slave 添加或上线 + + -slave: 某个 slave 移除 这个通常只在 config 变更时才发,先进行监听,目前应该用不到这个channel + */ + }, "+sdown", "-sdown", "+slave", "-slave"); + + } catch (JedisException e) { + + if (running.get()) { + LOG.error("Lost connection to Sentinel at {}:{}. Sleeping 5000ms and retrying.", host, + port, e); + try { + Thread.sleep(subscribeRetryWaitTimeMillis); + } catch (InterruptedException e1) { + LOG.error("Sleep interrupted: ", e1); + } + } else { + LOG.debug("Unsubscribing from Sentinel at {}:{}", host, port); + } + } finally { + if (j != null) { + j.close(); + } + } + } + } + + public void shutdown() { + try { + LOG.debug("Shutting down listener on {}:{}", host, port); + running.set(false); + // This isn't good, the Jedis object is not thread safe + if (j != null) { + j.close(); + } + } catch (RuntimeException e) { + LOG.error("Caught exception while shutting down: ", e); + } + } + } +} From 340acd74f06ae9c3dfe75c26af6ae2acc97f3892 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=87=91=E6=9D=BE?= Date: Thu, 24 Jul 2025 11:23:45 +0800 Subject: [PATCH 2/9] Implemented read-write separation based on sentinel --- .../clients/jedis/JedisSentinelSlavePool.java | 32 ++----------------- 1 file changed, 2 insertions(+), 30 deletions(-) diff --git a/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java b/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java index 043233e38f..cd5766184a 100644 --- a/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java +++ b/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java @@ -246,7 +246,7 @@ private List initSentinels(Set sentinels, final String for (HostAndPort sentinel : sentinels) { SlaveListener slaveListener = new SlaveListener(masterName, sentinel.getHost(), sentinel.getPort()); - // whether MasterListener threads are alive or not, process can be stopped + // whether SlaveListener threads are alive or not, process can be stopped slaveListener.setDaemon(true); slaveListeners.add(slaveListener); slaveListener.start(); @@ -360,41 +360,16 @@ public void run() { j.subscribe(new JedisPubSub() { @Override public void onMessage(String channel, String message) { -// System.out.println("Sentinel " + hostPort + " channel: " + channel + " published: " + message); LOG.info("Sentinel: {}, channel: {}, published: {}.", hostPort, channel, message); - /* -// 直接kill进行主观下线: 10.148.17.43:6379 -Sentinel 10.148.17.43:26381 channel: +sdown published: slave 10.148.17.43:6379 10.148.17.43 6379 @ mymaster 10.148.17.43 6380 -Sentinel 10.148.17.43:26379 channel: +sdown published: slave 10.148.17.43:6379 10.148.17.43 6379 @ mymaster 10.148.17.43 6380 -Sentinel 10.148.17.43:26380 channel: +sdown published: slave 10.148.17.43:6379 10.148.17.43 6379 @ mymaster 10.148.17.43 6380 - - -// 重启启动被kill的节点: 10.148.17.43:6379 -Sentinel 10.148.17.43:26381 channel: -sdown published: slave 10.148.17.43:6379 10.148.17.43 6379 @ mymaster 10.148.17.43 6380 -Sentinel 10.148.17.43:26379 channel: -sdown published: slave 10.148.17.43:6379 10.148.17.43 6379 @ mymaster 10.148.17.43 6380 -Sentinel 10.148.17.43:26380 channel: -sdown published: slave 10.148.17.43:6379 10.148.17.43 6379 @ mymaster 10.148.17.43 6380 - - -// 使用SENTINEL failover mymaster命令手动进行切主,切主前master为10.148.17.43:6380, 切主后为10.148.17.43:6381 -Sentinel 10.148.17.43:26381 channel: +slave published: slave 10.148.17.43:6379 10.148.17.43 6379 @ mymaster 10.148.17.43 6381 -Sentinel 10.148.17.43:26379 channel: +slave published: slave 10.148.17.43:6379 10.148.17.43 6379 @ mymaster 10.148.17.43 6381 -Sentinel 10.148.17.43:26379 channel: +slave published: slave 10.148.17.43:6380 10.148.17.43 6380 @ mymaster 10.148.17.43 6381 -Sentinel 10.148.17.43:26381 channel: +slave published: slave 10.148.17.43:6380 10.148.17.43 6380 @ mymaster 10.148.17.43 6381 -Sentinel 10.148.17.43:26380 channel: +sdown published: master mymaster 10.148.17.43 6380 -Sentinel 10.148.17.43:26380 channel: +slave published: slave 10.148.17.43:6379 10.148.17.43 6379 @ mymaster 10.148.17.43 6381 -Sentinel 10.148.17.43:26380 channel: +slave published: slave 10.148.17.43:6380 10.148.17.43 6380 @ mymaster 10.148.17.43 6381 -*/ String[] switchMasterMsg = message.split(" "); String slaveIp; int slavePort; switch (channel) { case "+sdown": if (switchMasterMsg[0].equals("master")) { - // 应该是手动切主的消息,忽略即可 return; } if (!masterName.equals(switchMasterMsg[5])) { - // 不是我们监控的集群,也直接返回 return; } slaveIp = switchMasterMsg[2]; @@ -403,7 +378,6 @@ public void onMessage(String channel, String message) { break; case "-sdown": if (!masterName.equals(switchMasterMsg[5])) { - // 不是我们监控的集群,直接返回 return; } slaveIp = switchMasterMsg[2]; @@ -412,14 +386,12 @@ public void onMessage(String channel, String message) { break; case "+slave": if (!masterName.equals(switchMasterMsg[5])) { - // 不是我们监控的集群,直接返回 return; } - // 先尝试添加slave slaveIp = switchMasterMsg[2]; slavePort = Integer.parseInt(switchMasterMsg[3]); addSlave(new HostAndPort(slaveIp, slavePort)); - // 将新主节点移除 + String masterIp = switchMasterMsg[6]; int masterPort = Integer.parseInt(switchMasterMsg[7]); removeSlave(new HostAndPort(masterIp, masterPort)); From 2a0c747eef6142e2fe6b66fd8454459ba0752473 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=87=91=E6=9D=BE?= Date: Thu, 24 Jul 2025 11:25:13 +0800 Subject: [PATCH 3/9] Implemented read-write separation based on sentinel --- .../redis/clients/jedis/JedisSentinelSlavePool.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java b/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java index cd5766184a..a63cad324a 100644 --- a/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java +++ b/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java @@ -398,16 +398,6 @@ public void onMessage(String channel, String message) { break; } } - - /* - +sdown:节点被判定为 subjective down(主观下线) - - -sdown:节点恢复 - - +slave:某个 slave 添加或上线 - - -slave: 某个 slave 移除 这个通常只在 config 变更时才发,先进行监听,目前应该用不到这个channel - */ }, "+sdown", "-sdown", "+slave", "-slave"); } catch (JedisException e) { From 5c19da4cb7f80a8fc5fd520c38226af6bb4eb0be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=87=91=E6=9D=BE?= Date: Thu, 24 Jul 2025 11:43:30 +0800 Subject: [PATCH 4/9] Implemented read-write separation based on sentinel --- src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java b/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java index a63cad324a..2b63b5820c 100644 --- a/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java +++ b/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java @@ -194,7 +194,7 @@ private List initSentinels(Set sentinels, final String boolean sentinelAvailable = false; - LOG.info("Trying to find master from available Sentinels..."); + LOG.info("Trying to find slave from available Sentinels..."); List slaves = new ArrayList<>(); From 827ea527cb6e2e119fe7d8150cfb6133eb65496f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=87=91=E6=9D=BE?= Date: Thu, 24 Jul 2025 11:49:15 +0800 Subject: [PATCH 5/9] Implemented read-write separation based on sentinel --- src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java b/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java index 2b63b5820c..b7938ea12f 100644 --- a/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java +++ b/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java @@ -298,7 +298,7 @@ public Jedis _getResource(int idx, int cnt) { PoolInfo poolInfo; synchronized (slavePools) { if (cnt >= slavePools.size()) { - throw new RuntimeException("can not get Jedis Object, all slave is invalid"); + throw new JedisException("can not get Jedis Object, all slave is invalid"); } poolInfo = slavePools.get(idx % slavePools.size()); } From 9f5132992216bad039efb1ec4361ab3ff9d966ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=87=91=E6=9D=BE?= Date: Fri, 25 Jul 2025 17:09:15 +0800 Subject: [PATCH 6/9] Implemented read-write separation based on sentinel --- .../clients/jedis/JedisSentinelSlavePool.java | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java b/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java index b7938ea12f..0a533226dc 100644 --- a/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java +++ b/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java @@ -190,6 +190,18 @@ public void destroy() { } } + private static boolean isHealthy(String flags) { + for (String flag : flags.split(",")) { + switch (flag.trim()) { + case "s_down": + case "o_down": + case "disconnected": + return false; + } + } + return true; + } + private List initSentinels(Set sentinels, final String masterName) { boolean sentinelAvailable = false; @@ -213,13 +225,18 @@ private List initSentinels(Set sentinels, final String continue; } - slaveInfos.forEach(slaveInfo -> { + for (int i = 0; i < slaveInfos.size(); i++) { + Map slaveInfo = slaveInfos.get(i); + String flags = slaveInfo.get("flags"); + if (flags == null || !isHealthy(flags)) { + continue; + } String ip = slaveInfo.get("ip"); int port = Integer.parseInt(slaveInfo.get("port")); HostAndPort slave = new HostAndPort(ip, port); addSlave(slave); slaves.add(slave); - }); + } LOG.debug("Found Redis slaves at {}", slaveInfos); break; From 28c2670a95da63925adf6739a428bbe1d6922fa8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=87=91=E6=9D=BE?= Date: Thu, 31 Jul 2025 10:17:16 +0800 Subject: [PATCH 7/9] Optimize locking logic --- .../java/redis/clients/jedis/JedisSentinelSlavePool.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java b/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java index 0a533226dc..41f175db17 100644 --- a/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java +++ b/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java @@ -287,16 +287,19 @@ private void addSlave(HostAndPort slave) { private void removeSlave(HostAndPort slave) { String newSlaveHost = slave.toString(); + PoolInfo removed = null; synchronized (this.slavePools) { for (int i = 0; i < this.slavePools.size(); i++) { PoolInfo poolInfo = this.slavePools.get(i); if (poolInfo.host.equals(newSlaveHost)) { - PoolInfo removed = slavePools.remove(i); - removed.pool.destroy(); - return; + removed = slavePools.remove(i); + break; } } } + if (removed != null) { + removed.pool.destroy(); + } } public Jedis getResource() { From 5f1cc31f41b83eea286ac8c9c528d5c0b7b27d24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=87=91=E6=9D=BE?= Date: Mon, 11 Aug 2025 14:21:40 +0800 Subject: [PATCH 8/9] add tests --- .../clients/jedis/JedisSentinelSlavePool.java | 726 +++++++++--------- .../jedis/JedisSentinelSlavePoolTest.java | 98 +++ 2 files changed, 461 insertions(+), 363 deletions(-) create mode 100644 src/test/java/redis/clients/jedis/JedisSentinelSlavePoolTest.java diff --git a/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java b/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java index 41f175db17..fb7fea63b4 100644 --- a/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java +++ b/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java @@ -16,291 +16,291 @@ class PoolInfo { - public String host; - public JedisPool pool; + public String host; + public JedisPool pool; - public PoolInfo(String host, JedisPool pool) { - this.host = host; - this.pool = pool; - } + public PoolInfo(String host, JedisPool pool) { + this.host = host; + this.pool = pool; + } } public class JedisSentinelSlavePool implements AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(JedisSentinelSlavePool.class); - - private final JedisClientConfig sentinelClientConfig; - - private final JedisClientConfig clientConfig; - - private final GenericObjectPoolConfig poolConfig; - - protected final Collection slaveListeners = new ArrayList<>(); - - private final List slavePools = new ArrayList<>(); - - private int poolIndex; - - public JedisSentinelSlavePool(String masterName, Set sentinels, - final GenericObjectPoolConfig poolConfig) { - this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, null, - Protocol.DEFAULT_DATABASE); - } - - public JedisSentinelSlavePool(String masterName, Set sentinels) { - this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, null, - Protocol.DEFAULT_DATABASE); - } - - public JedisSentinelSlavePool(String masterName, Set sentinels, String password) { - this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, password); - } - - public JedisSentinelSlavePool(String masterName, Set sentinels, String password, String sentinelPassword) { - this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, - password, Protocol.DEFAULT_DATABASE, null, Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, sentinelPassword, null); - } - - public JedisSentinelSlavePool(String masterName, Set sentinels, - final GenericObjectPoolConfig poolConfig, int timeout, final String password) { - this(masterName, sentinels, poolConfig, timeout, password, Protocol.DEFAULT_DATABASE); - } - - public JedisSentinelSlavePool(String masterName, Set sentinels, - final GenericObjectPoolConfig poolConfig, final int timeout) { - this(masterName, sentinels, poolConfig, timeout, null, Protocol.DEFAULT_DATABASE); - } - - public JedisSentinelSlavePool(String masterName, Set sentinels, - final GenericObjectPoolConfig poolConfig, final String password) { - this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, password); - } - - public JedisSentinelSlavePool(String masterName, Set sentinels, - final GenericObjectPoolConfig poolConfig, int timeout, final String password, - final int database) { - this(masterName, sentinels, poolConfig, timeout, timeout, null, password, database); - } - - public JedisSentinelSlavePool(String masterName, Set sentinels, - final GenericObjectPoolConfig poolConfig, int timeout, final String user, - final String password, final int database) { - this(masterName, sentinels, poolConfig, timeout, timeout, user, password, database); - } - - public JedisSentinelSlavePool(String masterName, Set sentinels, - final GenericObjectPoolConfig poolConfig, int timeout, final String password, - final int database, final String clientName) { - this(masterName, sentinels, poolConfig, timeout, timeout, password, database, clientName); - } - - public JedisSentinelSlavePool(String masterName, Set sentinels, - final GenericObjectPoolConfig poolConfig, int timeout, final String user, - final String password, final int database, final String clientName) { - this(masterName, sentinels, poolConfig, timeout, timeout, user, password, database, clientName); - } - - public JedisSentinelSlavePool(String masterName, Set sentinels, - final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, - final String password, final int database) { - this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, null, password, database, null); - } - - public JedisSentinelSlavePool(String masterName, Set sentinels, - final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, - final String user, final String password, final int database) { - this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, user, password, database, null); - } - - public JedisSentinelSlavePool(String masterName, Set sentinels, - final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, - final String password, final int database, final String clientName) { - this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, null, password, database, clientName); - } - - public JedisSentinelSlavePool(String masterName, Set sentinels, - final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, - final String user, final String password, final int database, final String clientName) { - this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, user, password, database, clientName, - Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, null, null, null); - } - - public JedisSentinelSlavePool(String masterName, Set sentinels, - final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, final int infiniteSoTimeout, - final String user, final String password, final int database, final String clientName) { - this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, infiniteSoTimeout, user, password, database, clientName, - Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, null, null, null); - } - - public JedisSentinelSlavePool(String masterName, Set sentinels, - final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, - final String password, final int database, final String clientName, - final int sentinelConnectionTimeout, final int sentinelSoTimeout, final String sentinelPassword, - final String sentinelClientName) { - this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, null, password, database, clientName, - sentinelConnectionTimeout, sentinelSoTimeout, null, sentinelPassword, sentinelClientName); - } - - public JedisSentinelSlavePool(String masterName, Set sentinels, - final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, - final String user, final String password, final int database, final String clientName, - final int sentinelConnectionTimeout, final int sentinelSoTimeout, final String sentinelUser, - final String sentinelPassword, final String sentinelClientName) { - this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, 0, user, password, database, clientName, - sentinelConnectionTimeout, sentinelSoTimeout, sentinelUser, sentinelPassword, sentinelClientName); - } - - public JedisSentinelSlavePool(String masterName, Set sentinels, - final GenericObjectPoolConfig poolConfig, - final int connectionTimeout, final int soTimeout, final int infiniteSoTimeout, - final String user, final String password, final int database, final String clientName, - final int sentinelConnectionTimeout, final int sentinelSoTimeout, final String sentinelUser, - final String sentinelPassword, final String sentinelClientName) { - this(masterName, parseHostAndPorts(sentinels), poolConfig, - DefaultJedisClientConfig.builder().connectionTimeoutMillis(connectionTimeout) - .socketTimeoutMillis(soTimeout).blockingSocketTimeoutMillis(infiniteSoTimeout) - .user(user).password(password).database(database).clientName(clientName).build(), - DefaultJedisClientConfig.builder().connectionTimeoutMillis(sentinelConnectionTimeout) - .socketTimeoutMillis(sentinelSoTimeout).user(sentinelUser).password(sentinelPassword) - .clientName(sentinelClientName).build() - ); - } - - public JedisSentinelSlavePool(String masterName, Set sentinels, - final GenericObjectPoolConfig poolConfig, final JedisClientConfig clientConfig, - final JedisClientConfig sentinelClientConfig) { - this.poolConfig = poolConfig; - this.clientConfig = clientConfig; - this.sentinelClientConfig = sentinelClientConfig; - - initSentinels(sentinels, masterName); - } - - private static Set parseHostAndPorts(Set strings) { - return strings.stream().map(HostAndPort::from).collect(Collectors.toSet()); - } - - public void destroy() { - for (SlaveListener m : slaveListeners) { - m.shutdown(); + private static final Logger LOG = LoggerFactory.getLogger(JedisSentinelSlavePool.class); + + private final JedisClientConfig sentinelClientConfig; + + private final JedisClientConfig clientConfig; + + private final GenericObjectPoolConfig poolConfig; + + protected final Collection slaveListeners = new ArrayList<>(); + + private final List slavePools = new ArrayList<>(); + + private int poolIndex; + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig) { + this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, null, + Protocol.DEFAULT_DATABASE); } - for (PoolInfo poolInfo : slavePools) { - poolInfo.pool.destroy(); + public JedisSentinelSlavePool(String masterName, Set sentinels) { + this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, null, + Protocol.DEFAULT_DATABASE); } - } - - private static boolean isHealthy(String flags) { - for (String flag : flags.split(",")) { - switch (flag.trim()) { - case "s_down": - case "o_down": - case "disconnected": - return false; - } + + public JedisSentinelSlavePool(String masterName, Set sentinels, String password) { + this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, password); } - return true; - } - private List initSentinels(Set sentinels, final String masterName) { + public JedisSentinelSlavePool(String masterName, Set sentinels, String password, String sentinelPassword) { + this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, + password, Protocol.DEFAULT_DATABASE, null, Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, sentinelPassword, null); + } - boolean sentinelAvailable = false; + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, int timeout, final String password) { + this(masterName, sentinels, poolConfig, timeout, password, Protocol.DEFAULT_DATABASE); + } - LOG.info("Trying to find slave from available Sentinels..."); + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int timeout) { + this(masterName, sentinels, poolConfig, timeout, null, Protocol.DEFAULT_DATABASE); + } - List slaves = new ArrayList<>(); + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final String password) { + this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, password); + } - for (HostAndPort sentinel : sentinels) { + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, int timeout, final String password, + final int database) { + this(masterName, sentinels, poolConfig, timeout, timeout, null, password, database); + } - LOG.debug("Connecting to Sentinel {}", sentinel); + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, int timeout, final String user, + final String password, final int database) { + this(masterName, sentinels, poolConfig, timeout, timeout, user, password, database); + } - try (Jedis jedis = new Jedis(sentinel, sentinelClientConfig)) { - List> slaveInfos = jedis.sentinelSlaves(masterName); + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, int timeout, final String password, + final int database, final String clientName) { + this(masterName, sentinels, poolConfig, timeout, timeout, password, database, clientName); + } - // connected to sentinel... - sentinelAvailable = true; + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, int timeout, final String user, + final String password, final int database, final String clientName) { + this(masterName, sentinels, poolConfig, timeout, timeout, user, password, database, clientName); + } - if (slaveInfos == null || slaveInfos.isEmpty()) { - LOG.warn("Can not get slave addr, master name: {}. Sentinel: {}", masterName, sentinel); - continue; - } + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, + final String password, final int database) { + this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, null, password, database, null); + } - for (int i = 0; i < slaveInfos.size(); i++) { - Map slaveInfo = slaveInfos.get(i); - String flags = slaveInfo.get("flags"); - if (flags == null || !isHealthy(flags)) { - continue; - } - String ip = slaveInfo.get("ip"); - int port = Integer.parseInt(slaveInfo.get("port")); - HostAndPort slave = new HostAndPort(ip, port); - addSlave(slave); - slaves.add(slave); - } + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, + final String user, final String password, final int database) { + this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, user, password, database, null); + } - LOG.debug("Found Redis slaves at {}", slaveInfos); - break; - } catch (JedisException e) { - // resolves #1036, it should handle JedisException there's another chance - // of raising JedisDataException - LOG.warn( - "Cannot get master address from sentinel running @ {}. Reason: {}. Trying next one.", sentinel, e); - } + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, + final String password, final int database, final String clientName) { + this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, null, password, database, clientName); } - if (slaves.isEmpty()) { - if (sentinelAvailable) { - // can connect to sentinel, but master name seems to not monitored - throw new JedisException("Can connect to sentinel, but " + masterName - + " seems to be not monitored..."); - } else { - throw new JedisConnectionException("All sentinels down, cannot determine where is " - + masterName + " master is running..."); - } + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, + final String user, final String password, final int database, final String clientName) { + this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, user, password, database, clientName, + Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, null, null, null); } - LOG.info("Redis slaves running at {}, starting Sentinel listeners...", slaves); + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, final int infiniteSoTimeout, + final String user, final String password, final int database, final String clientName) { + this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, infiniteSoTimeout, user, password, database, clientName, + Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, null, null, null); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, + final String password, final int database, final String clientName, + final int sentinelConnectionTimeout, final int sentinelSoTimeout, final String sentinelPassword, + final String sentinelClientName) { + this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, null, password, database, clientName, + sentinelConnectionTimeout, sentinelSoTimeout, null, sentinelPassword, sentinelClientName); + } - for (HostAndPort sentinel : sentinels) { - SlaveListener slaveListener = new SlaveListener(masterName, sentinel.getHost(), sentinel.getPort()); - // whether SlaveListener threads are alive or not, process can be stopped - slaveListener.setDaemon(true); - slaveListeners.add(slaveListener); - slaveListener.start(); + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, + final String user, final String password, final int database, final String clientName, + final int sentinelConnectionTimeout, final int sentinelSoTimeout, final String sentinelUser, + final String sentinelPassword, final String sentinelClientName) { + this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, 0, user, password, database, clientName, + sentinelConnectionTimeout, sentinelSoTimeout, sentinelUser, sentinelPassword, sentinelClientName); } - return slaves; - } + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, + final int connectionTimeout, final int soTimeout, final int infiniteSoTimeout, + final String user, final String password, final int database, final String clientName, + final int sentinelConnectionTimeout, final int sentinelSoTimeout, final String sentinelUser, + final String sentinelPassword, final String sentinelClientName) { + this(masterName, parseHostAndPorts(sentinels), poolConfig, + DefaultJedisClientConfig.builder().connectionTimeoutMillis(connectionTimeout) + .socketTimeoutMillis(soTimeout).blockingSocketTimeoutMillis(infiniteSoTimeout) + .user(user).password(password).database(database).clientName(clientName).build(), + DefaultJedisClientConfig.builder().connectionTimeoutMillis(sentinelConnectionTimeout) + .socketTimeoutMillis(sentinelSoTimeout).user(sentinelUser).password(sentinelPassword) + .clientName(sentinelClientName).build() + ); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final JedisClientConfig clientConfig, + final JedisClientConfig sentinelClientConfig) { + this.poolConfig = poolConfig; + this.clientConfig = clientConfig; + this.sentinelClientConfig = sentinelClientConfig; + + initSentinels(sentinels, masterName); + } + + private static Set parseHostAndPorts(Set strings) { + return strings.stream().map(HostAndPort::from).collect(Collectors.toSet()); + } + + public void destroy() { + for (SlaveListener m : slaveListeners) { + m.shutdown(); + } + + for (PoolInfo poolInfo : slavePools) { + poolInfo.pool.destroy(); + } + } + + private static boolean isHealthy(String flags) { + for (String flag : flags.split(",")) { + switch (flag.trim()) { + case "s_down": + case "o_down": + case "disconnected": + return false; + } + } + return true; + } + + private List initSentinels(Set sentinels, final String masterName) { + + boolean sentinelAvailable = false; + + LOG.info("Trying to find slave from available Sentinels..."); - private void addSlave(HostAndPort slave) { - String newSlaveHost = slave.toString(); - synchronized (this.slavePools) { - for (int i = 0; i < this.slavePools.size(); i++) { - PoolInfo poolInfo = this.slavePools.get(i); - if (poolInfo.host.equals(newSlaveHost)) { - return; + List slaves = new ArrayList<>(); + + for (HostAndPort sentinel : sentinels) { + + LOG.debug("Connecting to Sentinel {}", sentinel); + + try (Jedis jedis = new Jedis(sentinel, sentinelClientConfig)) { + List> slaveInfos = jedis.sentinelSlaves(masterName); + + // connected to sentinel... + sentinelAvailable = true; + + if (slaveInfos == null || slaveInfos.isEmpty()) { + LOG.warn("Can not get slave addr, master name: {}. Sentinel: {}", masterName, sentinel); + continue; + } + + for (int i = 0; i < slaveInfos.size(); i++) { + Map slaveInfo = slaveInfos.get(i); + String flags = slaveInfo.get("flags"); + if (flags == null || !isHealthy(flags)) { + continue; + } + String ip = slaveInfo.get("ip"); + int port = Integer.parseInt(slaveInfo.get("port")); + HostAndPort slave = new HostAndPort(ip, port); + addSlave(slave); + slaves.add(slave); + } + + LOG.debug("Found Redis slaves at {}", slaveInfos); + break; + } catch (JedisException e) { + // resolves #1036, it should handle JedisException there's another chance + // of raising JedisDataException + LOG.warn( + "Cannot get master address from sentinel running @ {}. Reason: {}. Trying next one.", sentinel, e); + } + } + + if (slaves.isEmpty()) { + if (sentinelAvailable) { + // can connect to sentinel, but master name seems to not monitored + throw new JedisException("Can connect to sentinel, but " + masterName + + " seems to be not monitored..."); + } else { + throw new JedisConnectionException("All sentinels down, cannot determine where is " + + masterName + " master is running..."); + } } - } - slavePools.add(new PoolInfo(newSlaveHost, new JedisPool(this.poolConfig, slave, this.clientConfig))); + + LOG.info("Redis slaves running at {}, starting Sentinel listeners...", slaves); + + for (HostAndPort sentinel : sentinels) { + SlaveListener slaveListener = new SlaveListener(masterName, sentinel.getHost(), sentinel.getPort()); + // whether SlaveListener threads are alive or not, process can be stopped + slaveListener.setDaemon(true); + slaveListeners.add(slaveListener); + slaveListener.start(); + } + + return slaves; } - } - - private void removeSlave(HostAndPort slave) { - String newSlaveHost = slave.toString(); - PoolInfo removed = null; - synchronized (this.slavePools) { - for (int i = 0; i < this.slavePools.size(); i++) { - PoolInfo poolInfo = this.slavePools.get(i); - if (poolInfo.host.equals(newSlaveHost)) { - removed = slavePools.remove(i); - break; + + private void addSlave(HostAndPort slave) { + String newSlaveHost = slave.toString(); + synchronized (this.slavePools) { + for (int i = 0; i < this.slavePools.size(); i++) { + PoolInfo poolInfo = this.slavePools.get(i); + if (poolInfo.host.equals(newSlaveHost)) { + return; + } + } + slavePools.add(new PoolInfo(newSlaveHost, new JedisPool(this.poolConfig, slave, this.clientConfig))); } - } } - if (removed != null) { - removed.pool.destroy(); + + private void removeSlave(HostAndPort slave) { + String newSlaveHost = slave.toString(); + PoolInfo removed = null; + synchronized (this.slavePools) { + for (int i = 0; i < this.slavePools.size(); i++) { + PoolInfo poolInfo = this.slavePools.get(i); + if (poolInfo.host.equals(newSlaveHost)) { + removed = slavePools.remove(i); + break; + } + } + } + if (removed != null) { + removed.pool.destroy(); + } } - } public Jedis getResource() { int startIdx; @@ -327,131 +327,131 @@ public Jedis _getResource(int idx, int cnt) { return jedis; } catch (Exception e) { LOG.error("get connection fail:", e); - return _getResource(idx+1, cnt + 1); + return _getResource(idx + 1, cnt + 1); } } - @Override - public void close() { - destroy(); - } - - protected class SlaveListener extends Thread { - - protected String masterName; - protected String host; - protected int port; - protected long subscribeRetryWaitTimeMillis = 5000; - protected volatile Jedis j; - protected AtomicBoolean running = new AtomicBoolean(false); - - protected SlaveListener() { + @Override + public void close() { + destroy(); } - public SlaveListener(String masterName, String host, int port) { - super(String.format("MasterListener-%s-[%s:%d]", masterName, host, port)); - this.masterName = masterName; - this.host = host; - this.port = port; - } + protected class SlaveListener extends Thread { - public SlaveListener(String masterName, String host, int port, - long subscribeRetryWaitTimeMillis) { - this(masterName, host, port); - this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis; - } + protected String masterName; + protected String host; + protected int port; + protected long subscribeRetryWaitTimeMillis = 5000; + protected volatile Jedis j; + protected AtomicBoolean running = new AtomicBoolean(false); - @Override - public void run() { + protected SlaveListener() { + } - running.set(true); + public SlaveListener(String masterName, String host, int port) { + super(String.format("MasterListener-%s-[%s:%d]", masterName, host, port)); + this.masterName = masterName; + this.host = host; + this.port = port; + } - while (running.get()) { + public SlaveListener(String masterName, String host, int port, + long subscribeRetryWaitTimeMillis) { + this(masterName, host, port); + this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis; + } - try { - // double check that it is not being shutdown - if (!running.get()) { - break; - } - - final HostAndPort hostPort = new HostAndPort(host, port); - j = new Jedis(hostPort, sentinelClientConfig); - - j.subscribe(new JedisPubSub() { - @Override - public void onMessage(String channel, String message) { - LOG.info("Sentinel: {}, channel: {}, published: {}.", hostPort, channel, message); - String[] switchMasterMsg = message.split(" "); - String slaveIp; - int slavePort; - switch (channel) { - case "+sdown": - if (switchMasterMsg[0].equals("master")) { - return; - } - if (!masterName.equals(switchMasterMsg[5])) { - return; - } - slaveIp = switchMasterMsg[2]; - slavePort = Integer.parseInt(switchMasterMsg[3]); - removeSlave(new HostAndPort(slaveIp, slavePort)); - break; - case "-sdown": - if (!masterName.equals(switchMasterMsg[5])) { - return; - } - slaveIp = switchMasterMsg[2]; - slavePort = Integer.parseInt(switchMasterMsg[3]); - addSlave(new HostAndPort(slaveIp, slavePort)); - break; - case "+slave": - if (!masterName.equals(switchMasterMsg[5])) { - return; - } - slaveIp = switchMasterMsg[2]; - slavePort = Integer.parseInt(switchMasterMsg[3]); - addSlave(new HostAndPort(slaveIp, slavePort)); - - String masterIp = switchMasterMsg[6]; - int masterPort = Integer.parseInt(switchMasterMsg[7]); - removeSlave(new HostAndPort(masterIp, masterPort)); - break; - } + @Override + public void run() { + + running.set(true); + + while (running.get()) { + + try { + // double check that it is not being shutdown + if (!running.get()) { + break; + } + + final HostAndPort hostPort = new HostAndPort(host, port); + j = new Jedis(hostPort, sentinelClientConfig); + + j.subscribe(new JedisPubSub() { + @Override + public void onMessage(String channel, String message) { + LOG.info("Sentinel: {}, channel: {}, published: {}.", hostPort, channel, message); + String[] switchMasterMsg = message.split(" "); + String slaveIp; + int slavePort; + switch (channel) { + case "+sdown": + if (switchMasterMsg[0].equals("master")) { + return; + } + if (!masterName.equals(switchMasterMsg[5])) { + return; + } + slaveIp = switchMasterMsg[2]; + slavePort = Integer.parseInt(switchMasterMsg[3]); + removeSlave(new HostAndPort(slaveIp, slavePort)); + break; + case "-sdown": + if (!masterName.equals(switchMasterMsg[5])) { + return; + } + slaveIp = switchMasterMsg[2]; + slavePort = Integer.parseInt(switchMasterMsg[3]); + addSlave(new HostAndPort(slaveIp, slavePort)); + break; + case "+slave": + if (!masterName.equals(switchMasterMsg[5])) { + return; + } + slaveIp = switchMasterMsg[2]; + slavePort = Integer.parseInt(switchMasterMsg[3]); + addSlave(new HostAndPort(slaveIp, slavePort)); + + String masterIp = switchMasterMsg[6]; + int masterPort = Integer.parseInt(switchMasterMsg[7]); + removeSlave(new HostAndPort(masterIp, masterPort)); + break; + } + } + }, "+sdown", "-sdown", "+slave", "-slave"); + + } catch (JedisException e) { + + if (running.get()) { + LOG.error("Lost connection to Sentinel at {}:{}. Sleeping 5000ms and retrying.", host, + port, e); + try { + Thread.sleep(subscribeRetryWaitTimeMillis); + } catch (InterruptedException e1) { + LOG.error("Sleep interrupted: ", e1); + } + } else { + LOG.debug("Unsubscribing from Sentinel at {}:{}", host, port); + } + } finally { + if (j != null) { + j.close(); + } + } } - }, "+sdown", "-sdown", "+slave", "-slave"); - - } catch (JedisException e) { + } - if (running.get()) { - LOG.error("Lost connection to Sentinel at {}:{}. Sleeping 5000ms and retrying.", host, - port, e); + public void shutdown() { try { - Thread.sleep(subscribeRetryWaitTimeMillis); - } catch (InterruptedException e1) { - LOG.error("Sleep interrupted: ", e1); + LOG.debug("Shutting down listener on {}:{}", host, port); + running.set(false); + // This isn't good, the Jedis object is not thread safe + if (j != null) { + j.close(); + } + } catch (RuntimeException e) { + LOG.error("Caught exception while shutting down: ", e); } - } else { - LOG.debug("Unsubscribing from Sentinel at {}:{}", host, port); - } - } finally { - if (j != null) { - j.close(); - } - } - } - } - - public void shutdown() { - try { - LOG.debug("Shutting down listener on {}:{}", host, port); - running.set(false); - // This isn't good, the Jedis object is not thread safe - if (j != null) { - j.close(); } - } catch (RuntimeException e) { - LOG.error("Caught exception while shutting down: ", e); - } } - } } diff --git a/src/test/java/redis/clients/jedis/JedisSentinelSlavePoolTest.java b/src/test/java/redis/clients/jedis/JedisSentinelSlavePoolTest.java new file mode 100644 index 0000000000..16c1b28e98 --- /dev/null +++ b/src/test/java/redis/clients/jedis/JedisSentinelSlavePoolTest.java @@ -0,0 +1,98 @@ +package redis.clients.jedis; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.exceptions.JedisDataException; +import redis.clients.jedis.exceptions.JedisException; + +import java.util.HashSet; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class JedisSentinelSlavePoolTest { + + private static final String MASTER_NAME = "mymaster"; + + protected static final HostAndPort sentinel1 = HostAndPorts.getSentinelServers().get(1); + protected static final HostAndPort sentinel2 = HostAndPorts.getSentinelServers().get(2); + + protected final Set sentinels = new HashSet<>(); + +// private final String password = "foobared"; + private final String password = "0a2eb141353cf115"; + + @BeforeEach + public void setUp() throws Exception { + sentinels.clear(); + + sentinels.add(sentinel1.toString()); + sentinels.add(sentinel2.toString()); + } + + @Test + public void repeatedSentinelPoolInitialization() { + + for (int i = 0; i < 20; ++i) { + GenericObjectPoolConfig config = new GenericObjectPoolConfig<>(); + + JedisSentinelSlavePool pool = new JedisSentinelSlavePool(MASTER_NAME, sentinels, config, 1000, + password, 2); + pool.getResource().close(); + pool.destroy(); + } + } + + @Test + public void initializeWithNotAvailableSentinelsShouldThrowException() { + Set wrongSentinels = new HashSet(); + wrongSentinels.add(new HostAndPort("localhost", 65432).toString()); + wrongSentinels.add(new HostAndPort("localhost", 65431).toString()); + + assertThrows(JedisConnectionException.class, + () -> new JedisSentinelSlavePool(MASTER_NAME, wrongSentinels).close()); + } + + @Test + public void initializeWithNotMonitoredMasterNameShouldThrowException() { + final String wrongMasterName = "wrongMasterName"; + assertThrows(JedisException.class, () -> new JedisSentinelSlavePool(wrongMasterName, sentinels).close()); + } + + @Test + public void checkJedisIsSlave() { + GenericObjectPoolConfig config = new GenericObjectPoolConfig<>(); + config.setMaxTotal(1); + config.setBlockWhenExhausted(false); + JedisSentinelSlavePool pool = new JedisSentinelSlavePool(MASTER_NAME, sentinels, config, 1000, + password, 2); + + Jedis jedis = pool.getResource(); + assertThrows(JedisDataException.class, () -> jedis.set("hello", "jedis")); + } + + @Test + public void customClientName() { + GenericObjectPoolConfig config = new GenericObjectPoolConfig<>(); + config.setMaxTotal(1); + config.setBlockWhenExhausted(false); + JedisSentinelSlavePool pool = new JedisSentinelSlavePool(MASTER_NAME, sentinels, config, 1000, + password, 0, "my_shiny_client_name"); + + Jedis jedis = pool.getResource(); + + try { + assertEquals("my_shiny_client_name", jedis.clientGetname()); + } finally { + jedis.close(); + pool.destroy(); + } + + } +} From c3251a50a3707b635d095b969e6f612b48af4043 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=87=91=E6=9D=BE?= Date: Mon, 11 Aug 2025 14:22:59 +0800 Subject: [PATCH 9/9] add tests --- .../java/redis/clients/jedis/JedisSentinelSlavePoolTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/test/java/redis/clients/jedis/JedisSentinelSlavePoolTest.java b/src/test/java/redis/clients/jedis/JedisSentinelSlavePoolTest.java index 16c1b28e98..e50bbd36f3 100644 --- a/src/test/java/redis/clients/jedis/JedisSentinelSlavePoolTest.java +++ b/src/test/java/redis/clients/jedis/JedisSentinelSlavePoolTest.java @@ -25,8 +25,7 @@ public class JedisSentinelSlavePoolTest { protected final Set sentinels = new HashSet<>(); -// private final String password = "foobared"; - private final String password = "0a2eb141353cf115"; + private final String password = "foobared"; @BeforeEach public void setUp() throws Exception {