|
| 1 | +package redis.clients.jedis; |
| 2 | + |
| 3 | +import java.time.Duration; |
| 4 | +import java.util.Collections; |
| 5 | +import java.util.Map; |
| 6 | +import java.util.Set; |
| 7 | + |
| 8 | +import redis.clients.jedis.builders.ClusterClientBuilder; |
| 9 | +import redis.clients.jedis.executors.ClusterCommandExecutor; |
| 10 | +import redis.clients.jedis.executors.CommandExecutor; |
| 11 | +import redis.clients.jedis.providers.ClusterConnectionProvider; |
| 12 | +import redis.clients.jedis.csc.Cache; |
| 13 | +import redis.clients.jedis.providers.ConnectionProvider; |
| 14 | +import redis.clients.jedis.util.JedisClusterCRC16; |
| 15 | + |
| 16 | +public class RedisClusterClient extends UnifiedJedis { |
| 17 | + |
| 18 | + public static final String INIT_NO_ERROR_PROPERTY = "jedis.cluster.initNoError"; |
| 19 | + |
| 20 | + /** |
| 21 | + * Default timeout in milliseconds. |
| 22 | + */ |
| 23 | + public static final int DEFAULT_TIMEOUT = 2000; |
| 24 | + |
| 25 | + /** |
| 26 | + * Default amount of attempts for executing a command |
| 27 | + */ |
| 28 | + public static final int DEFAULT_MAX_ATTEMPTS = 5; |
| 29 | + |
| 30 | + /** |
| 31 | + * Creates a RedisClusterClient instance. The provided node is used to make the first contact with the cluster. |
| 32 | + * <p> |
| 33 | + * Here, the default timeout of {@value redis.clients.jedis.RedisClusterClient#DEFAULT_TIMEOUT} ms is being used with |
| 34 | + * {@value redis.clients.jedis.RedisClusterClient#DEFAULT_MAX_ATTEMPTS} maximum attempts. |
| 35 | + * @param node Node to first connect to. |
| 36 | + */ |
| 37 | + public RedisClusterClient(HostAndPort node) { |
| 38 | + super(new ClusterConnectionProvider(Collections.singleton(node), DefaultJedisClientConfig.builder().timeoutMillis(DEFAULT_TIMEOUT).build()), |
| 39 | + DEFAULT_MAX_ATTEMPTS, Duration.ofMillis((long) DEFAULT_TIMEOUT * DEFAULT_MAX_ATTEMPTS)); |
| 40 | + } |
| 41 | + |
| 42 | + /** |
| 43 | + * Creates a RedisClusterClient with multiple entry points. |
| 44 | + * <p> |
| 45 | + * Here, the default timeout of {@value redis.clients.jedis.RedisClusterClient#DEFAULT_TIMEOUT} ms is being used with |
| 46 | + * {@value redis.clients.jedis.RedisClusterClient#DEFAULT_MAX_ATTEMPTS} maximum attempts. |
| 47 | + * @param nodes Nodes to connect to. |
| 48 | + */ |
| 49 | + public RedisClusterClient(Set<HostAndPort> nodes) { |
| 50 | + super(new ClusterConnectionProvider(nodes, DefaultJedisClientConfig.builder().timeoutMillis(DEFAULT_TIMEOUT).build()), |
| 51 | + DEFAULT_MAX_ATTEMPTS, Duration.ofMillis((long) DEFAULT_TIMEOUT * DEFAULT_MAX_ATTEMPTS)); |
| 52 | + } |
| 53 | + |
| 54 | + public RedisClusterClient(Set<HostAndPort> nodes, String user, String password) { |
| 55 | + super(new ClusterConnectionProvider(nodes, DefaultJedisClientConfig.builder().user(user).password(password).build()), |
| 56 | + DEFAULT_MAX_ATTEMPTS, Duration.ofMillis((long) Protocol.DEFAULT_TIMEOUT * DEFAULT_MAX_ATTEMPTS)); |
| 57 | + } |
| 58 | + |
| 59 | + private RedisClusterClient(CommandExecutor commandExecutor, ConnectionProvider connectionProvider, CommandObjects commandObjects, RedisProtocol redisProtocol, Cache cache) { |
| 60 | + super(commandExecutor, connectionProvider, commandObjects, redisProtocol, cache); |
| 61 | + } |
| 62 | + |
| 63 | + /** |
| 64 | + * Fluent builder for {@link RedisClusterClient} (Redis Cluster). |
| 65 | + * <p> |
| 66 | + * Obtain an instance via {@link #builder()}. |
| 67 | + * </p> |
| 68 | + */ |
| 69 | + static public class Builder extends ClusterClientBuilder<RedisClusterClient> { |
| 70 | + |
| 71 | + @Override |
| 72 | + protected RedisClusterClient createClient() { |
| 73 | + return new RedisClusterClient(commandExecutor, connectionProvider, commandObjects, clientConfig.getRedisProtocol(), |
| 74 | + cache); |
| 75 | + } |
| 76 | + } |
| 77 | + |
| 78 | + /** |
| 79 | + * Create a new builder for configuring RedisClusterClient instances. |
| 80 | + * @return a new {@link RedisClusterClient.Builder} instance |
| 81 | + */ |
| 82 | + public static Builder builder() { |
| 83 | + return new Builder(); |
| 84 | + } |
| 85 | + |
| 86 | + /** |
| 87 | + * Returns all nodes that were configured to connect to in key-value pairs ({@link Map}).<br> |
| 88 | + * Key is the HOST:PORT and the value is the connection pool. |
| 89 | + * @return the map of all connections. |
| 90 | + */ |
| 91 | + public Map<String, ConnectionPool> getClusterNodes() { |
| 92 | + return ((ClusterConnectionProvider) provider).getNodes(); |
| 93 | + } |
| 94 | + |
| 95 | + /** |
| 96 | + * Returns the connection for one of the 16,384 slots. |
| 97 | + * @param slot the slot to retrieve the connection for. |
| 98 | + * @return connection of the provided slot. {@code close()} of this connection must be called after use. |
| 99 | + */ |
| 100 | + public Connection getConnectionFromSlot(int slot) { |
| 101 | + return ((ClusterConnectionProvider) provider).getConnectionFromSlot(slot); |
| 102 | + } |
| 103 | + |
| 104 | + // commands |
| 105 | + public long spublish(String channel, String message) { |
| 106 | + return executeCommand(commandObjects.spublish(channel, message)); |
| 107 | + } |
| 108 | + |
| 109 | + public long spublish(byte[] channel, byte[] message) { |
| 110 | + return executeCommand(commandObjects.spublish(channel, message)); |
| 111 | + } |
| 112 | + |
| 113 | + public void ssubscribe(final JedisShardedPubSub jedisPubSub, final String... channels) { |
| 114 | + try (Connection connection = getConnectionFromSlot(JedisClusterCRC16.getSlot(channels[0]))) { |
| 115 | + jedisPubSub.proceed(connection, channels); |
| 116 | + } |
| 117 | + } |
| 118 | + |
| 119 | + public void ssubscribe(BinaryJedisShardedPubSub jedisPubSub, final byte[]... channels) { |
| 120 | + try (Connection connection = getConnectionFromSlot(JedisClusterCRC16.getSlot(channels[0]))) { |
| 121 | + jedisPubSub.proceed(connection, channels); |
| 122 | + } |
| 123 | + } |
| 124 | + // commands |
| 125 | + |
| 126 | + @Override |
| 127 | + public ClusterPipeline pipelined() { |
| 128 | + return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects); |
| 129 | + } |
| 130 | + |
| 131 | + /** |
| 132 | + * @param doMulti param |
| 133 | + * @return nothing |
| 134 | + * @throws UnsupportedOperationException |
| 135 | + */ |
| 136 | + @Override |
| 137 | + public AbstractTransaction transaction(boolean doMulti) { |
| 138 | + throw new UnsupportedOperationException(); |
| 139 | + } |
| 140 | + |
| 141 | + public final <T> T executeCommandToReplica(CommandObject<T> commandObject) { |
| 142 | + if (!(executor instanceof ClusterCommandExecutor)) { |
| 143 | + throw new UnsupportedOperationException("Support only execute to replica in ClusterCommandExecutor"); |
| 144 | + } |
| 145 | + return ((ClusterCommandExecutor) executor).executeCommandToReplica(commandObject); |
| 146 | + } |
| 147 | +} |
| 148 | + |
0 commit comments