diff --git a/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java b/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java new file mode 100644 index 0000000000..fb7fea63b4 --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisSentinelSlavePool.java @@ -0,0 +1,457 @@ +package redis.clients.jedis; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.exceptions.JedisException; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + + +class PoolInfo { + public String host; + public JedisPool pool; + + public PoolInfo(String host, JedisPool pool) { + this.host = host; + this.pool = pool; + } +} + +public class JedisSentinelSlavePool implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(JedisSentinelSlavePool.class); + + private final JedisClientConfig sentinelClientConfig; + + private final JedisClientConfig clientConfig; + + private final GenericObjectPoolConfig poolConfig; + + protected final Collection slaveListeners = new ArrayList<>(); + + private final List slavePools = new ArrayList<>(); + + private int poolIndex; + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig) { + this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, null, + Protocol.DEFAULT_DATABASE); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels) { + this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, null, + Protocol.DEFAULT_DATABASE); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, String password) { + this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, password); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, String password, String sentinelPassword) { + this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, + password, Protocol.DEFAULT_DATABASE, null, Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, sentinelPassword, null); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, int timeout, final String password) { + this(masterName, sentinels, poolConfig, timeout, password, Protocol.DEFAULT_DATABASE); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int timeout) { + this(masterName, sentinels, poolConfig, timeout, null, Protocol.DEFAULT_DATABASE); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final String password) { + this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, password); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, int timeout, final String password, + final int database) { + this(masterName, sentinels, poolConfig, timeout, timeout, null, password, database); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, int timeout, final String user, + final String password, final int database) { + this(masterName, sentinels, poolConfig, timeout, timeout, user, password, database); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, int timeout, final String password, + final int database, final String clientName) { + this(masterName, sentinels, poolConfig, timeout, timeout, password, database, clientName); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, int timeout, final String user, + final String password, final int database, final String clientName) { + this(masterName, sentinels, poolConfig, timeout, timeout, user, password, database, clientName); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, + final String password, final int database) { + this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, null, password, database, null); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, + final String user, final String password, final int database) { + this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, user, password, database, null); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, + final String password, final int database, final String clientName) { + this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, null, password, database, clientName); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, + final String user, final String password, final int database, final String clientName) { + this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, user, password, database, clientName, + Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, null, null, null); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, final int infiniteSoTimeout, + final String user, final String password, final int database, final String clientName) { + this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, infiniteSoTimeout, user, password, database, clientName, + Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, null, null, null); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, + final String password, final int database, final String clientName, + final int sentinelConnectionTimeout, final int sentinelSoTimeout, final String sentinelPassword, + final String sentinelClientName) { + this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, null, password, database, clientName, + sentinelConnectionTimeout, sentinelSoTimeout, null, sentinelPassword, sentinelClientName); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout, + final String user, final String password, final int database, final String clientName, + final int sentinelConnectionTimeout, final int sentinelSoTimeout, final String sentinelUser, + final String sentinelPassword, final String sentinelClientName) { + this(masterName, sentinels, poolConfig, connectionTimeout, soTimeout, 0, user, password, database, clientName, + sentinelConnectionTimeout, sentinelSoTimeout, sentinelUser, sentinelPassword, sentinelClientName); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, + final int connectionTimeout, final int soTimeout, final int infiniteSoTimeout, + final String user, final String password, final int database, final String clientName, + final int sentinelConnectionTimeout, final int sentinelSoTimeout, final String sentinelUser, + final String sentinelPassword, final String sentinelClientName) { + this(masterName, parseHostAndPorts(sentinels), poolConfig, + DefaultJedisClientConfig.builder().connectionTimeoutMillis(connectionTimeout) + .socketTimeoutMillis(soTimeout).blockingSocketTimeoutMillis(infiniteSoTimeout) + .user(user).password(password).database(database).clientName(clientName).build(), + DefaultJedisClientConfig.builder().connectionTimeoutMillis(sentinelConnectionTimeout) + .socketTimeoutMillis(sentinelSoTimeout).user(sentinelUser).password(sentinelPassword) + .clientName(sentinelClientName).build() + ); + } + + public JedisSentinelSlavePool(String masterName, Set sentinels, + final GenericObjectPoolConfig poolConfig, final JedisClientConfig clientConfig, + final JedisClientConfig sentinelClientConfig) { + this.poolConfig = poolConfig; + this.clientConfig = clientConfig; + this.sentinelClientConfig = sentinelClientConfig; + + initSentinels(sentinels, masterName); + } + + private static Set parseHostAndPorts(Set strings) { + return strings.stream().map(HostAndPort::from).collect(Collectors.toSet()); + } + + public void destroy() { + for (SlaveListener m : slaveListeners) { + m.shutdown(); + } + + for (PoolInfo poolInfo : slavePools) { + poolInfo.pool.destroy(); + } + } + + private static boolean isHealthy(String flags) { + for (String flag : flags.split(",")) { + switch (flag.trim()) { + case "s_down": + case "o_down": + case "disconnected": + return false; + } + } + return true; + } + + private List initSentinels(Set sentinels, final String masterName) { + + boolean sentinelAvailable = false; + + LOG.info("Trying to find slave from available Sentinels..."); + + List slaves = new ArrayList<>(); + + for (HostAndPort sentinel : sentinels) { + + LOG.debug("Connecting to Sentinel {}", sentinel); + + try (Jedis jedis = new Jedis(sentinel, sentinelClientConfig)) { + List> slaveInfos = jedis.sentinelSlaves(masterName); + + // connected to sentinel... + sentinelAvailable = true; + + if (slaveInfos == null || slaveInfos.isEmpty()) { + LOG.warn("Can not get slave addr, master name: {}. Sentinel: {}", masterName, sentinel); + continue; + } + + for (int i = 0; i < slaveInfos.size(); i++) { + Map slaveInfo = slaveInfos.get(i); + String flags = slaveInfo.get("flags"); + if (flags == null || !isHealthy(flags)) { + continue; + } + String ip = slaveInfo.get("ip"); + int port = Integer.parseInt(slaveInfo.get("port")); + HostAndPort slave = new HostAndPort(ip, port); + addSlave(slave); + slaves.add(slave); + } + + LOG.debug("Found Redis slaves at {}", slaveInfos); + break; + } catch (JedisException e) { + // resolves #1036, it should handle JedisException there's another chance + // of raising JedisDataException + LOG.warn( + "Cannot get master address from sentinel running @ {}. Reason: {}. Trying next one.", sentinel, e); + } + } + + if (slaves.isEmpty()) { + if (sentinelAvailable) { + // can connect to sentinel, but master name seems to not monitored + throw new JedisException("Can connect to sentinel, but " + masterName + + " seems to be not monitored..."); + } else { + throw new JedisConnectionException("All sentinels down, cannot determine where is " + + masterName + " master is running..."); + } + } + + LOG.info("Redis slaves running at {}, starting Sentinel listeners...", slaves); + + for (HostAndPort sentinel : sentinels) { + SlaveListener slaveListener = new SlaveListener(masterName, sentinel.getHost(), sentinel.getPort()); + // whether SlaveListener threads are alive or not, process can be stopped + slaveListener.setDaemon(true); + slaveListeners.add(slaveListener); + slaveListener.start(); + } + + return slaves; + } + + private void addSlave(HostAndPort slave) { + String newSlaveHost = slave.toString(); + synchronized (this.slavePools) { + for (int i = 0; i < this.slavePools.size(); i++) { + PoolInfo poolInfo = this.slavePools.get(i); + if (poolInfo.host.equals(newSlaveHost)) { + return; + } + } + slavePools.add(new PoolInfo(newSlaveHost, new JedisPool(this.poolConfig, slave, this.clientConfig))); + } + } + + private void removeSlave(HostAndPort slave) { + String newSlaveHost = slave.toString(); + PoolInfo removed = null; + synchronized (this.slavePools) { + for (int i = 0; i < this.slavePools.size(); i++) { + PoolInfo poolInfo = this.slavePools.get(i); + if (poolInfo.host.equals(newSlaveHost)) { + removed = slavePools.remove(i); + break; + } + } + } + if (removed != null) { + removed.pool.destroy(); + } + } + + public Jedis getResource() { + int startIdx; + synchronized (slavePools) { + poolIndex++; + if (poolIndex >= slavePools.size()) { + poolIndex = 0; + } + startIdx = poolIndex; + } + return _getResource(startIdx, 0); + } + + public Jedis _getResource(int idx, int cnt) { + PoolInfo poolInfo; + synchronized (slavePools) { + if (cnt >= slavePools.size()) { + throw new JedisException("can not get Jedis Object, all slave is invalid"); + } + poolInfo = slavePools.get(idx % slavePools.size()); + } + try { + Jedis jedis = poolInfo.pool.getResource(); + return jedis; + } catch (Exception e) { + LOG.error("get connection fail:", e); + return _getResource(idx + 1, cnt + 1); + } + } + + @Override + public void close() { + destroy(); + } + + protected class SlaveListener extends Thread { + + protected String masterName; + protected String host; + protected int port; + protected long subscribeRetryWaitTimeMillis = 5000; + protected volatile Jedis j; + protected AtomicBoolean running = new AtomicBoolean(false); + + protected SlaveListener() { + } + + public SlaveListener(String masterName, String host, int port) { + super(String.format("MasterListener-%s-[%s:%d]", masterName, host, port)); + this.masterName = masterName; + this.host = host; + this.port = port; + } + + public SlaveListener(String masterName, String host, int port, + long subscribeRetryWaitTimeMillis) { + this(masterName, host, port); + this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis; + } + + @Override + public void run() { + + running.set(true); + + while (running.get()) { + + try { + // double check that it is not being shutdown + if (!running.get()) { + break; + } + + final HostAndPort hostPort = new HostAndPort(host, port); + j = new Jedis(hostPort, sentinelClientConfig); + + j.subscribe(new JedisPubSub() { + @Override + public void onMessage(String channel, String message) { + LOG.info("Sentinel: {}, channel: {}, published: {}.", hostPort, channel, message); + String[] switchMasterMsg = message.split(" "); + String slaveIp; + int slavePort; + switch (channel) { + case "+sdown": + if (switchMasterMsg[0].equals("master")) { + return; + } + if (!masterName.equals(switchMasterMsg[5])) { + return; + } + slaveIp = switchMasterMsg[2]; + slavePort = Integer.parseInt(switchMasterMsg[3]); + removeSlave(new HostAndPort(slaveIp, slavePort)); + break; + case "-sdown": + if (!masterName.equals(switchMasterMsg[5])) { + return; + } + slaveIp = switchMasterMsg[2]; + slavePort = Integer.parseInt(switchMasterMsg[3]); + addSlave(new HostAndPort(slaveIp, slavePort)); + break; + case "+slave": + if (!masterName.equals(switchMasterMsg[5])) { + return; + } + slaveIp = switchMasterMsg[2]; + slavePort = Integer.parseInt(switchMasterMsg[3]); + addSlave(new HostAndPort(slaveIp, slavePort)); + + String masterIp = switchMasterMsg[6]; + int masterPort = Integer.parseInt(switchMasterMsg[7]); + removeSlave(new HostAndPort(masterIp, masterPort)); + break; + } + } + }, "+sdown", "-sdown", "+slave", "-slave"); + + } catch (JedisException e) { + + if (running.get()) { + LOG.error("Lost connection to Sentinel at {}:{}. Sleeping 5000ms and retrying.", host, + port, e); + try { + Thread.sleep(subscribeRetryWaitTimeMillis); + } catch (InterruptedException e1) { + LOG.error("Sleep interrupted: ", e1); + } + } else { + LOG.debug("Unsubscribing from Sentinel at {}:{}", host, port); + } + } finally { + if (j != null) { + j.close(); + } + } + } + } + + public void shutdown() { + try { + LOG.debug("Shutting down listener on {}:{}", host, port); + running.set(false); + // This isn't good, the Jedis object is not thread safe + if (j != null) { + j.close(); + } + } catch (RuntimeException e) { + LOG.error("Caught exception while shutting down: ", e); + } + } + } +} diff --git a/src/test/java/redis/clients/jedis/JedisSentinelSlavePoolTest.java b/src/test/java/redis/clients/jedis/JedisSentinelSlavePoolTest.java new file mode 100644 index 0000000000..e50bbd36f3 --- /dev/null +++ b/src/test/java/redis/clients/jedis/JedisSentinelSlavePoolTest.java @@ -0,0 +1,97 @@ +package redis.clients.jedis; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.exceptions.JedisDataException; +import redis.clients.jedis.exceptions.JedisException; + +import java.util.HashSet; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class JedisSentinelSlavePoolTest { + + private static final String MASTER_NAME = "mymaster"; + + protected static final HostAndPort sentinel1 = HostAndPorts.getSentinelServers().get(1); + protected static final HostAndPort sentinel2 = HostAndPorts.getSentinelServers().get(2); + + protected final Set sentinels = new HashSet<>(); + + private final String password = "foobared"; + + @BeforeEach + public void setUp() throws Exception { + sentinels.clear(); + + sentinels.add(sentinel1.toString()); + sentinels.add(sentinel2.toString()); + } + + @Test + public void repeatedSentinelPoolInitialization() { + + for (int i = 0; i < 20; ++i) { + GenericObjectPoolConfig config = new GenericObjectPoolConfig<>(); + + JedisSentinelSlavePool pool = new JedisSentinelSlavePool(MASTER_NAME, sentinels, config, 1000, + password, 2); + pool.getResource().close(); + pool.destroy(); + } + } + + @Test + public void initializeWithNotAvailableSentinelsShouldThrowException() { + Set wrongSentinels = new HashSet(); + wrongSentinels.add(new HostAndPort("localhost", 65432).toString()); + wrongSentinels.add(new HostAndPort("localhost", 65431).toString()); + + assertThrows(JedisConnectionException.class, + () -> new JedisSentinelSlavePool(MASTER_NAME, wrongSentinels).close()); + } + + @Test + public void initializeWithNotMonitoredMasterNameShouldThrowException() { + final String wrongMasterName = "wrongMasterName"; + assertThrows(JedisException.class, () -> new JedisSentinelSlavePool(wrongMasterName, sentinels).close()); + } + + @Test + public void checkJedisIsSlave() { + GenericObjectPoolConfig config = new GenericObjectPoolConfig<>(); + config.setMaxTotal(1); + config.setBlockWhenExhausted(false); + JedisSentinelSlavePool pool = new JedisSentinelSlavePool(MASTER_NAME, sentinels, config, 1000, + password, 2); + + Jedis jedis = pool.getResource(); + assertThrows(JedisDataException.class, () -> jedis.set("hello", "jedis")); + } + + @Test + public void customClientName() { + GenericObjectPoolConfig config = new GenericObjectPoolConfig<>(); + config.setMaxTotal(1); + config.setBlockWhenExhausted(false); + JedisSentinelSlavePool pool = new JedisSentinelSlavePool(MASTER_NAME, sentinels, config, 1000, + password, 0, "my_shiny_client_name"); + + Jedis jedis = pool.getResource(); + + try { + assertEquals("my_shiny_client_name", jedis.clientGetname()); + } finally { + jedis.close(); + pool.destroy(); + } + + } +}