Skip to content

Commit 5a324c1

Browse files
author
李金松
committed
feat(connection): 增加读写分离功能
- 在 Protocol.Command 中添加 isWriteCommand 方法,标记命令是否为写操作 - 新增 SlavePoolInfo 类,用于管理从节点连接池信息 - 修改 getConnection 方法,实现读操作时从从节点池获取连接 - 更新测试用例,以适应新的读写分离逻辑
1 parent edeb00f commit 5a324c1

File tree

5 files changed

+79
-39
lines changed

5 files changed

+79
-39
lines changed

src/main/java/redis/clients/jedis/Protocol.java

Lines changed: 49 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -283,46 +283,65 @@ public static final byte[] toByteArray(final double value) {
283283

284284
public static enum Command implements ProtocolCommand {
285285

286-
PING, AUTH, HELLO, SET, GET, GETDEL, GETEX, EXISTS, DEL, UNLINK, TYPE, FLUSHDB, FLUSHALL, MOVE,
287-
KEYS, RANDOMKEY, RENAME, RENAMENX, DUMP, RESTORE, DBSIZE, SELECT, SWAPDB, MIGRATE, ECHO, //
288-
EXPIRE, EXPIREAT, EXPIRETIME, PEXPIRE, PEXPIREAT, PEXPIRETIME, TTL, PTTL, // <-- key expiration
289-
MULTI, DISCARD, EXEC, WATCH, UNWATCH, SORT, SORT_RO, INFO, SHUTDOWN, MONITOR, CONFIG, LCS, //
290-
GETSET, MGET, SETNX, SETEX, PSETEX, MSET, MSETNX, DECR, DECRBY, INCR, INCRBY, INCRBYFLOAT,
291-
STRLEN, APPEND, SUBSTR, // <-- string
292-
SETBIT, GETBIT, BITPOS, SETRANGE, GETRANGE, BITCOUNT, BITOP, BITFIELD, BITFIELD_RO, // <-- bit (string)
293-
HSET, HGET, HSETNX, HMSET, HMGET, HINCRBY, HEXISTS, HDEL, HLEN, HKEYS, HVALS, HGETALL, HSTRLEN,
294-
HEXPIRE, HPEXPIRE, HEXPIREAT, HPEXPIREAT, HTTL, HPTTL, HEXPIRETIME, HPEXPIRETIME, HPERSIST,
295-
HRANDFIELD, HINCRBYFLOAT, HSETEX, HGETEX, HGETDEL, // <-- hash
296-
RPUSH, LPUSH, LLEN, LRANGE, LTRIM, LINDEX, LSET, LREM, LPOP, RPOP, BLPOP, BRPOP, LINSERT, LPOS,
297-
RPOPLPUSH, BRPOPLPUSH, BLMOVE, LMOVE, LMPOP, BLMPOP, LPUSHX, RPUSHX, // <-- list
298-
SADD, SMEMBERS, SREM, SPOP, SMOVE, SCARD, SRANDMEMBER, SINTER, SINTERSTORE, SUNION, SUNIONSTORE,
299-
SDIFF, SDIFFSTORE, SISMEMBER, SMISMEMBER, SINTERCARD, // <-- set
300-
ZADD, ZDIFF, ZDIFFSTORE, ZRANGE, ZREM, ZINCRBY, ZRANK, ZREVRANK, ZREVRANGE, ZRANDMEMBER, ZCARD,
301-
ZSCORE, ZPOPMAX, ZPOPMIN, ZCOUNT, ZUNION, ZUNIONSTORE, ZINTER, ZINTERSTORE, ZRANGEBYSCORE,
302-
ZREVRANGEBYSCORE, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZLEXCOUNT, ZRANGEBYLEX, ZREVRANGEBYLEX,
303-
ZREMRANGEBYLEX, ZMSCORE, ZRANGESTORE, ZINTERCARD, ZMPOP, BZMPOP, BZPOPMIN, BZPOPMAX, // <-- zset
304-
GEOADD, GEODIST, GEOHASH, GEOPOS, GEORADIUS, GEORADIUS_RO, GEOSEARCH, GEOSEARCHSTORE,
305-
GEORADIUSBYMEMBER, GEORADIUSBYMEMBER_RO, // <-- geo
306-
PFADD, PFCOUNT, PFMERGE, // <-- hyper log log
307-
XADD, XLEN, XDEL, XTRIM, XRANGE, XREVRANGE, XREAD, XACK, XGROUP, XREADGROUP, XPENDING, XCLAIM,
308-
XAUTOCLAIM, XINFO, // <-- stream
309-
EVAL, EVALSHA, SCRIPT, EVAL_RO, EVALSHA_RO, FUNCTION, FCALL, FCALL_RO, // <-- program
310-
SUBSCRIBE, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, PUBLISH, PUBSUB,
311-
SSUBSCRIBE, SUNSUBSCRIBE, SPUBLISH, // <-- pub sub
312-
SAVE, BGSAVE, BGREWRITEAOF, LASTSAVE, PERSIST, ROLE, FAILOVER, SLOWLOG, OBJECT, CLIENT, TIME,
313-
SCAN, HSCAN, SSCAN, ZSCAN, WAIT, CLUSTER, ASKING, READONLY, READWRITE, SLAVEOF, REPLICAOF, COPY,
314-
SENTINEL, MODULE, ACL, TOUCH, MEMORY, LOLWUT, COMMAND, RESET, LATENCY, WAITAOF;
286+
PING(false), AUTH(false), HELLO(false), SET(true), GET(false), GETDEL(true),
287+
GETEX(true), EXISTS(false), DEL(true), UNLINK(true), TYPE(false), FLUSHDB(true),
288+
FLUSHALL(true), MOVE(true), KEYS(false), RANDOMKEY(false), RENAME(true), RENAMENX(true),
289+
DUMP(false), RESTORE(true), DBSIZE(false), SELECT(false), SWAPDB(true), MIGRATE(true),
290+
ECHO(false), //
291+
292+
EXPIRE(true), EXPIREAT(true), EXPIRETIME(false), PEXPIRE(true), PEXPIREAT(true), PEXPIRETIME(false),
293+
TTL(false), PTTL(false), // <-- key expiration
294+
MULTI(true), DISCARD(true), EXEC(true), WATCH(true), UNWATCH(true), SORT(true), SORT_RO(false),
295+
INFO(false), SHUTDOWN(true), MONITOR(false), CONFIG(true), LCS(false), //
296+
GETSET(true), MGET(false), SETNX(true), SETEX(true), PSETEX(true), MSET(true), MSETNX(true),
297+
DECR(true), DECRBY(true), INCR(true), INCRBY(true), INCRBYFLOAT(true),
298+
STRLEN(false), APPEND(true), SUBSTR(false), // <-- string
299+
SETBIT(true), GETBIT(false), BITPOS(false), SETRANGE(true), GETRANGE(false), BITCOUNT(false), BITOP(true),
300+
BITFIELD(true), BITFIELD_RO(false), // <-- bit (string)
301+
HSET(true), HGET(false), HSETNX(true), HMSET(true), HMGET(false), HINCRBY(true), HEXISTS(false),
302+
HDEL(true), HLEN(false), HKEYS(false), HVALS(false), HGETALL(false), HSTRLEN(false),
303+
HEXPIRE(true), HPEXPIRE(true), HEXPIREAT(true), HPEXPIREAT(true), HTTL(false), HPTTL(false),
304+
HEXPIRETIME(false), HPEXPIRETIME(false), HPERSIST(true),
305+
HRANDFIELD(false), HINCRBYFLOAT(true), HSETEX(true), HGETEX(true), HGETDEL(true), // <-- hash
306+
RPUSH(true), LPUSH(true), LLEN(false), LRANGE(false), LTRIM(true), LINDEX(false), LSET(true),
307+
LREM(true), LPOP(true), RPOP(true), BLPOP(true), BRPOP(true), LINSERT(true), LPOS(false),
308+
RPOPLPUSH(true), BRPOPLPUSH(true), BLMOVE(true), LMOVE(true), LMPOP(true), BLMPOP(true), LPUSHX(true), RPUSHX(true), // <-- list
309+
SADD(true), SMEMBERS(false), SREM(true), SPOP(true), SMOVE(true), SCARD(false), SRANDMEMBER(false), SINTER(false), SINTERSTORE(true), SUNION(false), SUNIONSTORE(true),
310+
SDIFF(false), SDIFFSTORE(true), SISMEMBER(false), SMISMEMBER(false), SINTERCARD(false), // <-- set
311+
ZADD(true), ZDIFF(false), ZDIFFSTORE(true), ZRANGE(false), ZREM(true), ZINCRBY(true), ZRANK(false), ZREVRANK(false), ZREVRANGE(false), ZRANDMEMBER(false), ZCARD(false),
312+
ZSCORE(false), ZPOPMAX(true), ZPOPMIN(true), ZCOUNT(false), ZUNION(false), ZUNIONSTORE(true), ZINTER(false), ZINTERSTORE(true), ZRANGEBYSCORE(false),
313+
ZREVRANGEBYSCORE(false), ZREMRANGEBYRANK(true), ZREMRANGEBYSCORE(true), ZLEXCOUNT(false), ZRANGEBYLEX(false), ZREVRANGEBYLEX(false),
314+
ZREMRANGEBYLEX(true), ZMSCORE(false), ZRANGESTORE(true), ZINTERCARD(false), ZMPOP(true), BZMPOP(true), BZPOPMIN(true), BZPOPMAX(true), // <-- zset
315+
GEOADD(true), GEODIST(false), GEOHASH(false), GEOPOS(false), GEORADIUS(true), GEORADIUS_RO(false), GEOSEARCH(true), GEOSEARCHSTORE(true),
316+
GEORADIUSBYMEMBER(true), GEORADIUSBYMEMBER_RO(false), // <-- geo
317+
PFADD(true), PFCOUNT(false), PFMERGE(true), // <-- hyper log log
318+
XADD(true), XLEN(false), XDEL(true), XTRIM(true), XRANGE(false), XREVRANGE(false), XREAD(false), XACK(true), XGROUP(true), XREADGROUP(false), XPENDING(false), XCLAIM(true),
319+
XAUTOCLAIM(true), XINFO(false), // <-- stream
320+
EVAL(true), EVALSHA(true), SCRIPT(true), EVAL_RO(true), EVALSHA_RO(true), FUNCTION(true), FCALL(true), FCALL_RO(false), // <-- program
321+
SUBSCRIBE(false), UNSUBSCRIBE(false), PSUBSCRIBE(false), PUNSUBSCRIBE(false), PUBLISH(true), PUBSUB(false),
322+
SSUBSCRIBE(false), SUNSUBSCRIBE(false), SPUBLISH(true), // <-- pub sub
323+
SAVE(true), BGSAVE(true), BGREWRITEAOF(true), LASTSAVE(false), PERSIST(true), ROLE(false), FAILOVER(true), SLOWLOG(true), OBJECT(false), CLIENT(true), TIME(false),
324+
SCAN(false), HSCAN(false), SSCAN(false), ZSCAN(false), WAIT(true), CLUSTER(true), ASKING(true), READONLY(true), READWRITE(true), SLAVEOF(true), REPLICAOF(true), COPY(true),
325+
SENTINEL(true), MODULE(true), ACL(true), TOUCH(true), MEMORY(true), LOLWUT(false), COMMAND(false), RESET(true), LATENCY(true), WAITAOF(true);
315326

316327
private final byte[] raw;
317328

318-
private Command() {
329+
private final boolean isWriteCommand;
330+
331+
private Command(boolean isWriteCommand) {
319332
raw = SafeEncoder.encode(name());
333+
this.isWriteCommand = isWriteCommand;
320334
}
321335

322336
@Override
323337
public byte[] getRaw() {
324338
return raw;
325339
}
340+
341+
@Override
342+
public boolean isWriteCommand() {
343+
return isWriteCommand;
344+
}
326345
}
327346

328347
public static enum Keyword implements Rawable {

src/main/java/redis/clients/jedis/commands/ProtocolCommand.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@
33
import redis.clients.jedis.args.Rawable;
44

55
public interface ProtocolCommand extends Rawable {
6+
boolean isWriteCommand();
67
}

src/main/java/redis/clients/jedis/providers/SentineledConnectionProvider.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,24 @@
1818
import redis.clients.jedis.HostAndPort;
1919
import redis.clients.jedis.Jedis;
2020
import redis.clients.jedis.JedisClientConfig;
21+
import redis.clients.jedis.JedisPool;
2122
import redis.clients.jedis.JedisPubSub;
2223
import redis.clients.jedis.annots.Experimental;
2324
import redis.clients.jedis.csc.Cache;
2425
import redis.clients.jedis.exceptions.JedisConnectionException;
2526
import redis.clients.jedis.exceptions.JedisException;
2627
import redis.clients.jedis.util.IOUtils;
2728

29+
class PoolInfo {
30+
public String host;
31+
public JedisPool pool;
32+
33+
public PoolInfo(String host, JedisPool pool) {
34+
this.host = host;
35+
this.pool = pool;
36+
}
37+
}
38+
2839
public class SentineledConnectionProvider implements ConnectionProvider {
2940

3041
private static final Logger LOG = LoggerFactory.getLogger(SentineledConnectionProvider.class);
@@ -51,6 +62,10 @@ public class SentineledConnectionProvider implements ConnectionProvider {
5162

5263
private final Lock initPoolLock = new ReentrantLock(true);
5364

65+
private final List<PoolInfo> slavePools = new ArrayList<>();
66+
67+
private int poolIndex;
68+
5469
public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
5570
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
5671
this(masterName, masterClientConfig, null, null, sentinels, sentinelClientConfig);
@@ -109,6 +124,10 @@ public Connection getConnection() {
109124

110125
@Override
111126
public Connection getConnection(CommandArguments args) {
127+
boolean writeCommand = args.getCommand().isWriteCommand();
128+
if (!writeCommand) {
129+
130+
}
112131
return pool.getResource();
113132
}
114133

@@ -275,6 +294,7 @@ public void run() {
275294
public void onMessage(String channel, String message) {
276295
LOG.debug("Sentinel {} published: {}.", node, message);
277296

297+
278298
String[] switchMasterMsg = message.split(" ");
279299

280300
if (switchMasterMsg.length > 3) {

src/test/java/redis/clients/jedis/HostAndPorts.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ public final class HostAndPorts {
2121
}
2222

2323
sentinelHostAndPortList.add(new HostAndPort("localhost", Protocol.DEFAULT_SENTINEL_PORT));
24-
sentinelHostAndPortList.add(new HostAndPort("localhost", Protocol.DEFAULT_SENTINEL_PORT + 1));
25-
sentinelHostAndPortList.add(new HostAndPort("localhost", Protocol.DEFAULT_SENTINEL_PORT + 2));
24+
sentinelHostAndPortList.add(new HostAndPort("10.148.17.43", Protocol.DEFAULT_SENTINEL_PORT + 1));
25+
sentinelHostAndPortList.add(new HostAndPort("10.148.17.43", Protocol.DEFAULT_SENTINEL_PORT + 2));
2626
sentinelHostAndPortList.add(new HostAndPort("localhost", Protocol.DEFAULT_SENTINEL_PORT + 3));
2727
sentinelHostAndPortList.add(new HostAndPort("localhost", Protocol.DEFAULT_SENTINEL_PORT + 4));
2828

src/test/java/redis/clients/jedis/SentineledConnectionProviderTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ public class SentineledConnectionProviderTest {
2424
private static final String MASTER_NAME = "mymaster";
2525

2626
protected static final HostAndPort sentinel1 = HostAndPorts.getSentinelServers().get(1);
27-
protected static final HostAndPort sentinel2 = HostAndPorts.getSentinelServers().get(3);
27+
protected static final HostAndPort sentinel2 = HostAndPorts.getSentinelServers().get(2);
2828

2929
protected Set<HostAndPort> sentinels = new HashSet<>();
30+
protected String password = "foobared";
3031

3132
@BeforeEach
3233
public void setUp() throws Exception {
@@ -39,9 +40,8 @@ public void setUp() throws Exception {
3940
@Test
4041
public void repeatedSentinelPoolInitialization() {
4142
for (int i = 0; i < 20; ++i) {
42-
4343
try (SentineledConnectionProvider provider = new SentineledConnectionProvider(MASTER_NAME,
44-
DefaultJedisClientConfig.builder().timeoutMillis(1000).password("foobared").database(2).build(),
44+
DefaultJedisClientConfig.builder().timeoutMillis(1000).password(password).database(2).build(),
4545
sentinels, DefaultJedisClientConfig.builder().build())) {
4646

4747
provider.getConnection().close();
@@ -76,7 +76,7 @@ public void checkCloseableConnections() throws Exception {
7676
GenericObjectPoolConfig<Connection> config = new GenericObjectPoolConfig<>();
7777

7878
try (JedisSentineled jedis = new JedisSentineled(MASTER_NAME,
79-
DefaultJedisClientConfig.builder().timeoutMillis(1000).password("foobared").database(2).build(),
79+
DefaultJedisClientConfig.builder().timeoutMillis(1000).password(password).database(2).build(),
8080
config, sentinels, DefaultJedisClientConfig.builder().build())) {
8181
assertSame(SentineledConnectionProvider.class, jedis.provider.getClass());
8282
jedis.set("foo", "bar");
@@ -91,7 +91,7 @@ public void checkResourceIsCloseable() {
9191
config.setBlockWhenExhausted(false);
9292

9393
try (JedisSentineled jedis = new JedisSentineled(MASTER_NAME,
94-
DefaultJedisClientConfig.builder().timeoutMillis(1000).password("foobared").database(2).build(),
94+
DefaultJedisClientConfig.builder().timeoutMillis(1000).password(password).database(2).build(),
9595
config, sentinels, DefaultJedisClientConfig.builder().build())) {
9696

9797
Connection conn = jedis.provider.getConnection();
@@ -113,7 +113,7 @@ public void checkResourceIsCloseable() {
113113
@Test
114114
public void testResetInvalidPassword() {
115115
DefaultRedisCredentialsProvider credentialsProvider
116-
= new DefaultRedisCredentialsProvider(new DefaultRedisCredentials(null, "foobared"));
116+
= new DefaultRedisCredentialsProvider(new DefaultRedisCredentials(null, password));
117117

118118
try (JedisSentineled jedis = new JedisSentineled(MASTER_NAME, DefaultJedisClientConfig.builder()
119119
.timeoutMillis(2000).credentialsProvider(credentialsProvider).database(2)
@@ -154,7 +154,7 @@ public void testResetValidPassword() {
154154
fail("Should not get resource from pool");
155155
} catch (JedisException e) { }
156156

157-
credentialsProvider.setCredentials(new DefaultRedisCredentials(null, "foobared"));
157+
credentialsProvider.setCredentials(new DefaultRedisCredentials(null, password));
158158

159159
try (Connection conn2 = jedis.provider.getConnection()) {
160160
new Jedis(conn2).set("foo", "bar");

0 commit comments

Comments
 (0)