diff --git a/pom.xml b/pom.xml
index 0d8ecda453..9f9176d895 100644
--- a/pom.xml
+++ b/pom.xml
@@ -348,6 +348,7 @@
src/test/java/redis/clients/jedis/commands/jedis/ClusterStreamsCommandsTest.java
src/test/java/redis/clients/jedis/commands/jedis/PooledStreamsCommandsTest.java
src/test/java/redis/clients/jedis/resps/StreamEntryDeletionResultTest.java
+ **/*FunctionCommandsTest*
diff --git a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
index 42574bdbba..ab8b6622c3 100644
--- a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
+++ b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
@@ -39,6 +39,7 @@ public class JedisClusterInfoCache {
private static final Logger logger = LoggerFactory.getLogger(JedisClusterInfoCache.class);
private final Map nodes = new HashMap<>();
+ private final Map primaryNodesCache = new HashMap<>();
private final ConnectionPool[] slots = new ConnectionPool[Protocol.CLUSTER_HASHSLOTS];
private final HostAndPort[] slotNodes = new HostAndPort[Protocol.CLUSTER_HASHSLOTS];
private final List[] replicaSlots;
@@ -176,6 +177,7 @@ public void discoverClusterNodesAndSlots(Connection jedis) {
HostAndPort targetNode = generateHostAndPort(hostInfos);
setupNodeIfNotExist(targetNode);
if (i == MASTER_NODE_INDEX) {
+ primaryNodesCache.put(getNodeKey(targetNode), getNode(targetNode));
assignSlotsToNode(slotNums, targetNode);
} else if (clientConfig.isReadOnlyForRedisClusterReplicas()) {
assignSlotsToReplicaNode(slotNums, targetNode);
@@ -425,6 +427,26 @@ public Map getNodes() {
}
}
+ public Map getPrimaryNodes() {
+ r.lock();
+ try {
+ return new HashMap<>(primaryNodesCache);
+ } finally {
+ r.unlock();
+ }
+ }
+
+ public List getShuffledPrimaryNodesPool() {
+ r.lock();
+ try {
+ List pools = new ArrayList<>(primaryNodesCache.values());
+ Collections.shuffle(pools);
+ return pools;
+ } finally {
+ r.unlock();
+ }
+ }
+
public List getShuffledNodesPool() {
r.lock();
try {
@@ -475,6 +497,7 @@ private void resetNodes() {
}
}
nodes.clear();
+ primaryNodesCache.clear();
}
public void close() {
diff --git a/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java b/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java
index 766e2660c9..e475a40948 100644
--- a/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java
+++ b/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java
@@ -45,7 +45,7 @@ public void close() {
@Override
public final T broadcastCommand(CommandObject commandObject) {
- Map connectionMap = provider.getConnectionMap();
+ Map connectionMap = provider.getPrimaryNodesConnectionMap();
boolean isErrored = false;
T reply = null;
diff --git a/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java
index d7e2f9fee7..2f2e62dac8 100644
--- a/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java
+++ b/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java
@@ -112,6 +112,10 @@ public Map getNodes() {
return cache.getNodes();
}
+ public Map getPrimaryNodes() {
+ return cache.getPrimaryNodes();
+ }
+
public HostAndPort getNode(int slot) {
return slot >= 0 ? cache.getSlotNode(slot) : null;
}
@@ -136,7 +140,7 @@ public Connection getConnection() {
// In antirez's redis-rb-cluster implementation, getRandomConnection always return
// valid connection (able to ping-pong) or exception if all connections are invalid
- List pools = cache.getShuffledNodesPool();
+ List pools = cache.getShuffledPrimaryNodesPool();
JedisException suppressed = null;
for (ConnectionPool pool : pools) {
@@ -205,8 +209,15 @@ public Connection getReplicaConnectionFromSlot(int slot) {
return getConnectionFromSlot(slot);
}
+
@Override
public Map getConnectionMap() {
return Collections.unmodifiableMap(getNodes());
}
+
+ @Override
+ public Map getPrimaryNodesConnectionMap() {
+ return Collections.unmodifiableMap(getPrimaryNodes());
+ }
+
}
diff --git a/src/main/java/redis/clients/jedis/providers/ConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/ConnectionProvider.java
index 48543dd5cb..37c6c9c685 100644
--- a/src/main/java/redis/clients/jedis/providers/ConnectionProvider.java
+++ b/src/main/java/redis/clients/jedis/providers/ConnectionProvider.java
@@ -15,4 +15,9 @@ public interface ConnectionProvider extends AutoCloseable {
final Connection c = getConnection();
return Collections.singletonMap(c.toString(), c);
}
+
+ default Map, ?> getPrimaryNodesConnectionMap() {
+ final Connection c = getConnection();
+ return Collections.singletonMap(c.toString(), c);
+ }
}
diff --git a/src/test/java/redis/clients/jedis/ClusterCommandExecutorTest.java b/src/test/java/redis/clients/jedis/ClusterCommandExecutorTest.java
index d6437e84ea..0ff04a832a 100644
--- a/src/test/java/redis/clients/jedis/ClusterCommandExecutorTest.java
+++ b/src/test/java/redis/clients/jedis/ClusterCommandExecutorTest.java
@@ -19,7 +19,6 @@
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import redis.clients.jedis.exceptions.JedisAskDataException;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
diff --git a/src/test/java/redis/clients/jedis/JedisClusterInfoCacheTest.java b/src/test/java/redis/clients/jedis/JedisClusterInfoCacheTest.java
index fa7b288360..691d8ed01c 100644
--- a/src/test/java/redis/clients/jedis/JedisClusterInfoCacheTest.java
+++ b/src/test/java/redis/clients/jedis/JedisClusterInfoCacheTest.java
@@ -15,12 +15,16 @@
import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.aMapWithSize;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.when;
+import static redis.clients.jedis.JedisClusterInfoCache.getNodeKey;
import static redis.clients.jedis.Protocol.Command.CLUSTER;
import static redis.clients.jedis.util.CommandArgumentMatchers.commandWithArgs;
@@ -49,7 +53,7 @@ public void testReplicaNodeRemovalAndRediscovery() {
// Mock the cluster slots responses
when(mockConnection.executeCommand(argThat(commandWithArgs(CLUSTER, "SLOTS")))).thenReturn(
- masterReplicaSlotsResponse()).thenReturn(masterOnlySlotsResponse())
+ masterReplicaSlotsResponse(MASTER_HOST, REPLICA_1_HOST)).thenReturn(masterOnlySlotsResponse())
.thenReturn(masterReplica2SlotsResponse());
// Initial discovery with one master and one replica (replica-1)
@@ -78,7 +82,7 @@ public void testResetWithReplicaSlots() {
// Mock the cluster slots responses
when(mockConnection.executeCommand(argThat(commandWithArgs(CLUSTER, "SLOTS")))).thenReturn(
- masterReplicaSlotsResponse());
+ masterReplicaSlotsResponse(MASTER_HOST, REPLICA_1_HOST));
// Initial discovery
cache.discoverClusterNodesAndSlots(mockConnection);
@@ -94,10 +98,68 @@ public void testResetWithReplicaSlots() {
assertReplicasAvailable(cache, REPLICA_1_HOST);
}
- private List