From 6f73b5dca8330182b1be6bf4e366e6491e8dcac5 Mon Sep 17 00:00:00 2001 From: Now Date: Thu, 31 Jul 2025 02:00:36 +0900 Subject: [PATCH 1/5] Fix JedisBroadcastException in functionLoadReplace --- .../clients/jedis/JedisClusterInfoCache.java | 21 +++++++++++++++ .../executors/ClusterCommandExecutor.java | 26 ++++++++++++++++++- .../providers/ClusterConnectionProvider.java | 8 ++++++ 3 files changed, 54 insertions(+), 1 deletion(-) diff --git a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java index 42574bdbba..ad7d01a361 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java +++ b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java @@ -425,6 +425,27 @@ public Map getNodes() { } } + public Map getPrimaryNodes() { + r.lock(); + try { + Map primaryNodes = new HashMap<>(); + Set addedPools = new HashSet<>(); + + for (int slot = 0; slot < slots.length; slot++) { + ConnectionPool pool = slots[slot]; + if (pool != null && addedPools.add(pool)) { + HostAndPort hostAndPort = slotNodes[slot]; + if (hostAndPort != null) { + primaryNodes.put(getNodeKey(hostAndPort), pool); + } + } + } + return primaryNodes; + } finally { + r.unlock(); + } + } + public List getShuffledNodesPool() { r.lock(); try { diff --git a/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java b/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java index 766e2660c9..42ee1d0b42 100644 --- a/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java +++ b/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java @@ -2,7 +2,10 @@ import java.time.Duration; import java.time.Instant; +import java.util.Collections; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -21,6 +24,16 @@ public class ClusterCommandExecutor implements CommandExecutor { + private static final Set PRIMARY_ONLY_COMMANDS; + static { + Set commands = new HashSet<>(); + commands.add("FUNCTION_DELETE"); + commands.add("FUNCTION_FLUSH"); + commands.add("FUNCTION_LOAD"); + commands.add("FUNCTION_RESTORE"); + PRIMARY_ONLY_COMMANDS = Collections.unmodifiableSet(commands); + } + private final Logger log = LoggerFactory.getLogger(getClass()); public final ClusterConnectionProvider provider; @@ -45,7 +58,9 @@ public void close() { @Override public final T broadcastCommand(CommandObject commandObject) { - Map connectionMap = provider.getConnectionMap(); + Map connectionMap = requiresPrimaryOnly(commandObject) + ? provider.getPrimaryConnectionMap() + : provider.getConnectionMap(); boolean isErrored = false; T reply = null; @@ -76,6 +91,15 @@ public final T broadcastCommand(CommandObject commandObject) { return reply; } + private boolean requiresPrimaryOnly(CommandObject commandObject) { + try { + String commandName = new String(commandObject.getArguments().getCommand().getRaw()); + return PRIMARY_ONLY_COMMANDS.contains(commandName); + } catch (Exception e) { + return false; + } + } + @Override public final T executeCommand(CommandObject commandObject) { return doExecuteCommand(commandObject, false); diff --git a/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java index d7e2f9fee7..1650b76dfe 100644 --- a/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java @@ -112,6 +112,10 @@ public Map getNodes() { return cache.getNodes(); } + public Map getPrimaryNodes() { + return cache.getPrimaryNodes(); + } + public HostAndPort getNode(int slot) { return slot >= 0 ? cache.getSlotNode(slot) : null; } @@ -209,4 +213,8 @@ public Connection getReplicaConnectionFromSlot(int slot) { public Map getConnectionMap() { return Collections.unmodifiableMap(getNodes()); } + + public Map getPrimaryConnectionMap() { + return Collections.unmodifiableMap(getPrimaryNodes()); + } } From afde0becb5604f6bcb219756f9fe4f8af0ec4839 Mon Sep 17 00:00:00 2001 From: Now Date: Thu, 31 Jul 2025 02:06:51 +0900 Subject: [PATCH 2/5] Clean up PRIMARY_ONLY_COMMANDS initialization --- .../jedis/executors/ClusterCommandExecutor.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java b/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java index 42ee1d0b42..3d6406b217 100644 --- a/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java +++ b/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java @@ -2,6 +2,7 @@ import java.time.Duration; import java.time.Instant; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Map; @@ -26,12 +27,9 @@ public class ClusterCommandExecutor implements CommandExecutor { private static final Set PRIMARY_ONLY_COMMANDS; static { - Set commands = new HashSet<>(); - commands.add("FUNCTION_DELETE"); - commands.add("FUNCTION_FLUSH"); - commands.add("FUNCTION_LOAD"); - commands.add("FUNCTION_RESTORE"); - PRIMARY_ONLY_COMMANDS = Collections.unmodifiableSet(commands); + PRIMARY_ONLY_COMMANDS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + "FUNCTION_DELETE", "FUNCTION_FLUSH", "FUNCTION_LOAD", "FUNCTION_RESTORE" + ))); } private final Logger log = LoggerFactory.getLogger(getClass()); From c41fb1e7822a1ca070033ac8b9495b46c40a8fb7 Mon Sep 17 00:00:00 2001 From: Now Date: Thu, 31 Jul 2025 14:08:35 +0900 Subject: [PATCH 3/5] Fix JedisBroadcastException in FUNCTION commands for cluster --- .../executors/ClusterCommandExecutor.java | 48 +++++++++++++++---- .../jedis/ClusterCommandExecutorTest.java | 42 ++++++++++++++++ 2 files changed, 82 insertions(+), 8 deletions(-) diff --git a/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java b/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java index 3d6406b217..3bdb7f2d6b 100644 --- a/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java +++ b/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java @@ -5,6 +5,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; @@ -12,6 +13,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import redis.clients.jedis.CommandArguments; import redis.clients.jedis.CommandObject; import redis.clients.jedis.Connection; import redis.clients.jedis.ConnectionPool; @@ -28,7 +30,7 @@ public class ClusterCommandExecutor implements CommandExecutor { private static final Set PRIMARY_ONLY_COMMANDS; static { PRIMARY_ONLY_COMMANDS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( - "FUNCTION_DELETE", "FUNCTION_FLUSH", "FUNCTION_LOAD", "FUNCTION_RESTORE" + "FUNCTION_DELETE", "FUNCTION_FLUSH", "FUNCTION_LOAD", "FUNCTION_RESTORE", "FUNCTION_KILL" ))); } @@ -57,8 +59,8 @@ public void close() { @Override public final T broadcastCommand(CommandObject commandObject) { Map connectionMap = requiresPrimaryOnly(commandObject) - ? provider.getPrimaryConnectionMap() - : provider.getConnectionMap(); + ? provider.getPrimaryConnectionMap() + : provider.getConnectionMap(); boolean isErrored = false; T reply = null; @@ -90,12 +92,42 @@ public final T broadcastCommand(CommandObject commandObject) { } private boolean requiresPrimaryOnly(CommandObject commandObject) { - try { - String commandName = new String(commandObject.getArguments().getCommand().getRaw()); - return PRIMARY_ONLY_COMMANDS.contains(commandName); - } catch (Exception e) { + try { + String commandName = new String(commandObject.getArguments().getCommand().getRaw()); + + if ("FUNCTION".equals(commandName)) { + CommandArguments args = commandObject.getArguments(); + Iterator iterator = args.iterator(); + + if (iterator.hasNext()) { + iterator.next(); + + if (iterator.hasNext()) { + Object subCommandObj = iterator.next(); + + if (subCommandObj != null) { + try { + java.lang.reflect.Method getRawMethod = subCommandObj.getClass().getMethod("getRaw"); + Object rawValue = getRawMethod.invoke(subCommandObj); + + if (rawValue instanceof byte[]) { + String subCommand = new String((byte[]) rawValue); + String fullCommand = "FUNCTION_" + subCommand.toUpperCase(); + return PRIMARY_ONLY_COMMANDS.contains(fullCommand); + } + } catch (Exception e) { + return false; + } + } + } + } return false; - } + } + + return PRIMARY_ONLY_COMMANDS.contains(commandName); + } catch (Exception e) { + return false; + } } @Override diff --git a/src/test/java/redis/clients/jedis/ClusterCommandExecutorTest.java b/src/test/java/redis/clients/jedis/ClusterCommandExecutorTest.java index d6437e84ea..ad73465266 100644 --- a/src/test/java/redis/clients/jedis/ClusterCommandExecutorTest.java +++ b/src/test/java/redis/clients/jedis/ClusterCommandExecutorTest.java @@ -8,8 +8,13 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.never; + import java.time.Duration; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongConsumer; import org.hamcrest.MatcherAssert; @@ -357,4 +362,41 @@ protected void sleep(long sleepMillis) { inOrder.verifyNoMoreInteractions(); assertEquals(0L, totalSleepMs.get()); } + + @Test + public void runFunctionCommandUsesPrimaryOnly() { + ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class); + + Map primaryNodes = new HashMap<>(); + primaryNodes.put("127.0.0.1:6379", mock(ConnectionPool.class)); + + Map allNodes = new HashMap<>(); + allNodes.put("127.0.0.1:6379", mock(ConnectionPool.class)); + allNodes.put("127.0.0.1:6380", mock(ConnectionPool.class)); + + when(connectionHandler.getPrimaryConnectionMap()).thenReturn(primaryNodes); + when(connectionHandler.getConnectionMap()).thenReturn(allNodes); + + Connection conn = mock(Connection.class); + for (ConnectionPool pool : primaryNodes.values()) { + when(pool.getResource()).thenReturn(conn); + } + + ClusterCommandExecutor executor = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO) { + @Override + public T execute(Connection connection, CommandObject commandObject) { + return (T) "mylib"; + } + }; + + CommandObjects commandObjects = new CommandObjects(); + CommandObject functionLoadReplaceCommand = commandObjects.functionLoadReplace("script"); + + String result = executor.broadcastCommand(functionLoadReplaceCommand); + + assertEquals("mylib", result); + + verify(connectionHandler).getPrimaryConnectionMap(); + verify(connectionHandler, never()).getConnectionMap(); + } } From 2b2617eaffb40fc572b51ab55de250991cf306b4 Mon Sep 17 00:00:00 2001 From: ggivo Date: Mon, 11 Aug 2025 09:06:49 +0300 Subject: [PATCH 4/5] Broadcast to primary nodes only PR #3306 introduces broadcasting of commands like FUNCTION DELETE, FUNCTION FLUSH, FUNCTION KILL, FUNCTION RESTORE ... to all nodes of the cluster. This leads to error when command is executed on non-writable (replica) node. This commit introduces a fix to broadcast the commands only to primary nodes from the cluster. --- pom.xml | 1 + .../clients/jedis/JedisClusterInfoCache.java | 28 +- .../executors/ClusterCommandExecutor.java | 56 +--- .../providers/ClusterConnectionProvider.java | 7 +- .../jedis/providers/ConnectionProvider.java | 5 + .../jedis/ClusterCommandExecutorTest.java | 43 --- .../jedis/JedisClusterInfoCacheTest.java | 72 ++++- .../unified/FunctionCommandsTestBase.java | 271 ++++++++++++++++++ .../cluster/ClusterFunctionsCommandsTest.java | 74 +++++ src/test/resources/env/docker-compose.yml | 8 +- 10 files changed, 443 insertions(+), 122 deletions(-) create mode 100644 src/test/java/redis/clients/jedis/commands/unified/FunctionCommandsTestBase.java create mode 100644 src/test/java/redis/clients/jedis/commands/unified/cluster/ClusterFunctionsCommandsTest.java diff --git a/pom.xml b/pom.xml index d6b5bc6d6e..f4f3d56c03 100644 --- a/pom.xml +++ b/pom.xml @@ -348,6 +348,7 @@ src/test/java/redis/clients/jedis/commands/jedis/ClusterStreamsCommandsTest.java src/test/java/redis/clients/jedis/commands/jedis/PooledStreamsCommandsTest.java src/test/java/redis/clients/jedis/resps/StreamEntryDeletionResultTest.java + **/FunctionCommandsTest* diff --git a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java index ad7d01a361..ab8b6622c3 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java +++ b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java @@ -39,6 +39,7 @@ public class JedisClusterInfoCache { private static final Logger logger = LoggerFactory.getLogger(JedisClusterInfoCache.class); private final Map nodes = new HashMap<>(); + private final Map primaryNodesCache = new HashMap<>(); private final ConnectionPool[] slots = new ConnectionPool[Protocol.CLUSTER_HASHSLOTS]; private final HostAndPort[] slotNodes = new HostAndPort[Protocol.CLUSTER_HASHSLOTS]; private final List[] replicaSlots; @@ -176,6 +177,7 @@ public void discoverClusterNodesAndSlots(Connection jedis) { HostAndPort targetNode = generateHostAndPort(hostInfos); setupNodeIfNotExist(targetNode); if (i == MASTER_NODE_INDEX) { + primaryNodesCache.put(getNodeKey(targetNode), getNode(targetNode)); assignSlotsToNode(slotNums, targetNode); } else if (clientConfig.isReadOnlyForRedisClusterReplicas()) { assignSlotsToReplicaNode(slotNums, targetNode); @@ -428,19 +430,18 @@ public Map getNodes() { public Map getPrimaryNodes() { r.lock(); try { - Map primaryNodes = new HashMap<>(); - Set addedPools = new HashSet<>(); - - for (int slot = 0; slot < slots.length; slot++) { - ConnectionPool pool = slots[slot]; - if (pool != null && addedPools.add(pool)) { - HostAndPort hostAndPort = slotNodes[slot]; - if (hostAndPort != null) { - primaryNodes.put(getNodeKey(hostAndPort), pool); - } - } - } - return primaryNodes; + return new HashMap<>(primaryNodesCache); + } finally { + r.unlock(); + } + } + + public List getShuffledPrimaryNodesPool() { + r.lock(); + try { + List pools = new ArrayList<>(primaryNodesCache.values()); + Collections.shuffle(pools); + return pools; } finally { r.unlock(); } @@ -496,6 +497,7 @@ private void resetNodes() { } } nodes.clear(); + primaryNodesCache.clear(); } public void close() { diff --git a/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java b/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java index 3bdb7f2d6b..e475a40948 100644 --- a/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java +++ b/src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java @@ -2,18 +2,12 @@ import java.time.Duration; import java.time.Instant; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; import java.util.Map; -import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import redis.clients.jedis.CommandArguments; import redis.clients.jedis.CommandObject; import redis.clients.jedis.Connection; import redis.clients.jedis.ConnectionPool; @@ -27,13 +21,6 @@ public class ClusterCommandExecutor implements CommandExecutor { - private static final Set PRIMARY_ONLY_COMMANDS; - static { - PRIMARY_ONLY_COMMANDS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( - "FUNCTION_DELETE", "FUNCTION_FLUSH", "FUNCTION_LOAD", "FUNCTION_RESTORE", "FUNCTION_KILL" - ))); - } - private final Logger log = LoggerFactory.getLogger(getClass()); public final ClusterConnectionProvider provider; @@ -58,9 +45,7 @@ public void close() { @Override public final T broadcastCommand(CommandObject commandObject) { - Map connectionMap = requiresPrimaryOnly(commandObject) - ? provider.getPrimaryConnectionMap() - : provider.getConnectionMap(); + Map connectionMap = provider.getPrimaryNodesConnectionMap(); boolean isErrored = false; T reply = null; @@ -91,45 +76,6 @@ public final T broadcastCommand(CommandObject commandObject) { return reply; } - private boolean requiresPrimaryOnly(CommandObject commandObject) { - try { - String commandName = new String(commandObject.getArguments().getCommand().getRaw()); - - if ("FUNCTION".equals(commandName)) { - CommandArguments args = commandObject.getArguments(); - Iterator iterator = args.iterator(); - - if (iterator.hasNext()) { - iterator.next(); - - if (iterator.hasNext()) { - Object subCommandObj = iterator.next(); - - if (subCommandObj != null) { - try { - java.lang.reflect.Method getRawMethod = subCommandObj.getClass().getMethod("getRaw"); - Object rawValue = getRawMethod.invoke(subCommandObj); - - if (rawValue instanceof byte[]) { - String subCommand = new String((byte[]) rawValue); - String fullCommand = "FUNCTION_" + subCommand.toUpperCase(); - return PRIMARY_ONLY_COMMANDS.contains(fullCommand); - } - } catch (Exception e) { - return false; - } - } - } - } - return false; - } - - return PRIMARY_ONLY_COMMANDS.contains(commandName); - } catch (Exception e) { - return false; - } - } - @Override public final T executeCommand(CommandObject commandObject) { return doExecuteCommand(commandObject, false); diff --git a/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java index 1650b76dfe..2f2e62dac8 100644 --- a/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java @@ -140,7 +140,7 @@ public Connection getConnection() { // In antirez's redis-rb-cluster implementation, getRandomConnection always return // valid connection (able to ping-pong) or exception if all connections are invalid - List pools = cache.getShuffledNodesPool(); + List pools = cache.getShuffledPrimaryNodesPool(); JedisException suppressed = null; for (ConnectionPool pool : pools) { @@ -209,12 +209,15 @@ public Connection getReplicaConnectionFromSlot(int slot) { return getConnectionFromSlot(slot); } + @Override public Map getConnectionMap() { return Collections.unmodifiableMap(getNodes()); } - public Map getPrimaryConnectionMap() { + @Override + public Map getPrimaryNodesConnectionMap() { return Collections.unmodifiableMap(getPrimaryNodes()); } + } diff --git a/src/main/java/redis/clients/jedis/providers/ConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/ConnectionProvider.java index 48543dd5cb..37c6c9c685 100644 --- a/src/main/java/redis/clients/jedis/providers/ConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/ConnectionProvider.java @@ -15,4 +15,9 @@ public interface ConnectionProvider extends AutoCloseable { final Connection c = getConnection(); return Collections.singletonMap(c.toString(), c); } + + default Map getPrimaryNodesConnectionMap() { + final Connection c = getConnection(); + return Collections.singletonMap(c.toString(), c); + } } diff --git a/src/test/java/redis/clients/jedis/ClusterCommandExecutorTest.java b/src/test/java/redis/clients/jedis/ClusterCommandExecutorTest.java index ad73465266..0ff04a832a 100644 --- a/src/test/java/redis/clients/jedis/ClusterCommandExecutorTest.java +++ b/src/test/java/redis/clients/jedis/ClusterCommandExecutorTest.java @@ -8,13 +8,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.never; - import java.time.Duration; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongConsumer; import org.hamcrest.MatcherAssert; @@ -24,7 +19,6 @@ import org.mockito.InOrder; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import redis.clients.jedis.exceptions.JedisAskDataException; import redis.clients.jedis.exceptions.JedisClusterOperationException; @@ -362,41 +356,4 @@ protected void sleep(long sleepMillis) { inOrder.verifyNoMoreInteractions(); assertEquals(0L, totalSleepMs.get()); } - - @Test - public void runFunctionCommandUsesPrimaryOnly() { - ClusterConnectionProvider connectionHandler = mock(ClusterConnectionProvider.class); - - Map primaryNodes = new HashMap<>(); - primaryNodes.put("127.0.0.1:6379", mock(ConnectionPool.class)); - - Map allNodes = new HashMap<>(); - allNodes.put("127.0.0.1:6379", mock(ConnectionPool.class)); - allNodes.put("127.0.0.1:6380", mock(ConnectionPool.class)); - - when(connectionHandler.getPrimaryConnectionMap()).thenReturn(primaryNodes); - when(connectionHandler.getConnectionMap()).thenReturn(allNodes); - - Connection conn = mock(Connection.class); - for (ConnectionPool pool : primaryNodes.values()) { - when(pool.getResource()).thenReturn(conn); - } - - ClusterCommandExecutor executor = new ClusterCommandExecutor(connectionHandler, 10, Duration.ZERO) { - @Override - public T execute(Connection connection, CommandObject commandObject) { - return (T) "mylib"; - } - }; - - CommandObjects commandObjects = new CommandObjects(); - CommandObject functionLoadReplaceCommand = commandObjects.functionLoadReplace("script"); - - String result = executor.broadcastCommand(functionLoadReplaceCommand); - - assertEquals("mylib", result); - - verify(connectionHandler).getPrimaryConnectionMap(); - verify(connectionHandler, never()).getConnectionMap(); - } } diff --git a/src/test/java/redis/clients/jedis/JedisClusterInfoCacheTest.java b/src/test/java/redis/clients/jedis/JedisClusterInfoCacheTest.java index fa7b288360..691d8ed01c 100644 --- a/src/test/java/redis/clients/jedis/JedisClusterInfoCacheTest.java +++ b/src/test/java/redis/clients/jedis/JedisClusterInfoCacheTest.java @@ -15,12 +15,16 @@ import java.util.stream.Collectors; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.aMapWithSize; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasItem; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.when; +import static redis.clients.jedis.JedisClusterInfoCache.getNodeKey; import static redis.clients.jedis.Protocol.Command.CLUSTER; import static redis.clients.jedis.util.CommandArgumentMatchers.commandWithArgs; @@ -49,7 +53,7 @@ public void testReplicaNodeRemovalAndRediscovery() { // Mock the cluster slots responses when(mockConnection.executeCommand(argThat(commandWithArgs(CLUSTER, "SLOTS")))).thenReturn( - masterReplicaSlotsResponse()).thenReturn(masterOnlySlotsResponse()) + masterReplicaSlotsResponse(MASTER_HOST, REPLICA_1_HOST)).thenReturn(masterOnlySlotsResponse()) .thenReturn(masterReplica2SlotsResponse()); // Initial discovery with one master and one replica (replica-1) @@ -78,7 +82,7 @@ public void testResetWithReplicaSlots() { // Mock the cluster slots responses when(mockConnection.executeCommand(argThat(commandWithArgs(CLUSTER, "SLOTS")))).thenReturn( - masterReplicaSlotsResponse()); + masterReplicaSlotsResponse(MASTER_HOST, REPLICA_1_HOST)); // Initial discovery cache.discoverClusterNodesAndSlots(mockConnection); @@ -94,10 +98,68 @@ public void testResetWithReplicaSlots() { assertReplicasAvailable(cache, REPLICA_1_HOST); } - private List masterReplicaSlotsResponse() { + @Test + public void getPrimaryNodesAfterReplicaNodeRemovalAndRediscovery() { + // Create client config with read-only replicas enabled + JedisClientConfig clientConfig = DefaultJedisClientConfig.builder() + .readOnlyForRedisClusterReplicas().build(); + + Set startNodes = new HashSet<>(); + startNodes.add(MASTER_HOST); + + JedisClusterInfoCache cache = new JedisClusterInfoCache(clientConfig, startNodes); + + // Mock the cluster slots responses + when(mockConnection.executeCommand(argThat(commandWithArgs(CLUSTER, "SLOTS")))).thenReturn( + masterReplicaSlotsResponse(MASTER_HOST, REPLICA_1_HOST)).thenReturn(masterOnlySlotsResponse()) + .thenReturn(masterReplica2SlotsResponse()); + + // Initial discovery with one master and one replica (replica-1) + cache.discoverClusterNodesAndSlots(mockConnection); + assertThat(cache.getPrimaryNodes(),aMapWithSize(1)); + assertThat(cache.getPrimaryNodes(), + hasEntry(equalTo(getNodeKey(MASTER_HOST)), equalTo(cache.getNode(MASTER_HOST)))); + + // Simulate rediscovery - master only + cache.discoverClusterNodesAndSlots(mockConnection); + assertThat( cache.getPrimaryNodes(),aMapWithSize(1)); + assertThat(cache.getPrimaryNodes(), + hasEntry(equalTo(getNodeKey(MASTER_HOST)), equalTo(cache.getNode(MASTER_HOST)))); + } + + @Test + public void getPrimaryNodesAfterMasterReplicaFailover() { + // Create client config with read-only replicas enabled + JedisClientConfig clientConfig = DefaultJedisClientConfig.builder() + .readOnlyForRedisClusterReplicas().build(); + + Set startNodes = new HashSet<>(); + startNodes.add(MASTER_HOST); + + JedisClusterInfoCache cache = new JedisClusterInfoCache(clientConfig, startNodes); + + // Mock the cluster slots responses + when(mockConnection.executeCommand(argThat(commandWithArgs(CLUSTER, "SLOTS")))) + .thenReturn(masterReplicaSlotsResponse(MASTER_HOST, REPLICA_1_HOST)) + .thenReturn(masterReplicaSlotsResponse(REPLICA_1_HOST, MASTER_HOST)); + + // Initial discovery with one master and one replica (replica-1) + cache.discoverClusterNodesAndSlots(mockConnection); + assertThat(cache.getPrimaryNodes(),aMapWithSize(1)); + assertThat(cache.getPrimaryNodes(), + hasEntry(equalTo(getNodeKey(MASTER_HOST)), equalTo(cache.getNode(MASTER_HOST)))); + + // Simulate rediscovery - master only + cache.discoverClusterNodesAndSlots(mockConnection); + assertThat( cache.getPrimaryNodes(),aMapWithSize(1)); + assertThat(cache.getPrimaryNodes(), + hasEntry(equalTo(getNodeKey(REPLICA_1_HOST)), equalTo(cache.getNode(REPLICA_1_HOST)))); + } + + private List masterReplicaSlotsResponse(HostAndPort masterHost, HostAndPort replicaHost) { return createClusterSlotsResponse( - new SlotRange.Builder(0, 16383).master(MASTER_HOST, "master-id-1") - .replica(REPLICA_1_HOST, "replica-id-1").build()); + new SlotRange.Builder(0, 16383).master(masterHost, masterHost.toString() + "-id") + .replica(replicaHost, replicaHost.toString() + "-id").build()); } private List masterOnlySlotsResponse() { diff --git a/src/test/java/redis/clients/jedis/commands/unified/FunctionCommandsTestBase.java b/src/test/java/redis/clients/jedis/commands/unified/FunctionCommandsTestBase.java new file mode 100644 index 0000000000..9b293ac93b --- /dev/null +++ b/src/test/java/redis/clients/jedis/commands/unified/FunctionCommandsTestBase.java @@ -0,0 +1,271 @@ +package redis.clients.jedis.commands.unified; + +import io.redis.test.annotations.SinceRedisVersion; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import redis.clients.jedis.RedisProtocol; +import redis.clients.jedis.args.FlushMode; +import redis.clients.jedis.args.FunctionRestorePolicy; +import redis.clients.jedis.exceptions.JedisException; +import redis.clients.jedis.resps.FunctionStats; +import redis.clients.jedis.resps.LibraryInfo; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public abstract class FunctionCommandsTestBase extends UnifiedJedisCommandsTestBase { + final String libraryName = "mylib"; + final String TEST_LUA_SCRIPT_TMPL = "#!lua name=%s\n" + + "redis.register_function('%s', function(keys, args) return %s end)"; + + private String functionName; + + public FunctionCommandsTestBase(RedisProtocol protocol) { + super(protocol); + } + + protected void setUpFunctions(TestInfo info) { + functionName = info.getDisplayName().replaceAll("[^a-zA-Z0-9]", "_"); + jedis.functionLoad(String.format(TEST_LUA_SCRIPT_TMPL, libraryName, functionName, "42")); + } + + protected void cleanUpFunctions() { + try { + jedis.functionDelete(libraryName); + } catch (JedisException e) { + // ignore + } + } + + @Test + @SinceRedisVersion(value = "7.0.0") + public void testFunctionDeletion() { + List listResponse = jedis.functionList(); + + assertThat(listResponse, hasSize(1)); + assertThat(listResponse.get(0).getLibraryName(), equalTo(libraryName)); + assertThat(listResponse.get(0).getFunctions(), hasSize(1)); + assertThat(listResponse.get(0).getFunctions().get(0), hasEntry("name", functionName)); + + String delete = jedis.functionDelete(libraryName); + assertThat(delete, equalTo("OK")); + + listResponse = jedis.functionList(); + assertThat(listResponse, empty()); + } + + @Test + @SinceRedisVersion(value = "7.0.0") + public void testFunctionDeletionBinary() { + List listResponse = jedis.functionList(); + + assertThat(listResponse, hasSize(1)); + assertThat(listResponse.get(0).getLibraryName(), equalTo(libraryName)); + assertThat(listResponse.get(0).getFunctions(), hasSize(1)); + assertThat(listResponse.get(0).getFunctions().get(0), hasEntry("name", functionName)); + + String deleteBinary = jedis.functionDelete(libraryName.getBytes()); + assertThat(deleteBinary, equalTo("OK")); + + listResponse = jedis.functionList(); + assertThat(listResponse, empty()); + } + + @Test + @SinceRedisVersion(value = "7.0.0") + public void testFunctionListing() { + + List list = jedis.functionList(); + + assertThat(list, hasSize(1)); + assertThat(list.get(0).getLibraryName(), equalTo(libraryName)); + assertThat(list.get(0).getFunctions(), hasSize(1)); + assertThat(list.get(0).getFunctions().get(0), hasEntry("name", functionName)); + assertThat(list.get(0).getLibraryCode(), nullValue()); + + List listBinary = jedis.functionListBinary(); + + assertThat(listBinary, hasSize(1)); + + List listLibrary = jedis.functionList(libraryName); + + assertThat(listLibrary, hasSize(1)); + assertThat(listLibrary.get(0).getLibraryName(), equalTo(libraryName)); + assertThat(listLibrary.get(0).getFunctions(), hasSize(1)); + assertThat(listLibrary.get(0).getFunctions().get(0), hasEntry("name", functionName)); + assertThat(listLibrary.get(0).getLibraryCode(), nullValue()); + + List listLibraryBinary = jedis.functionList(libraryName.getBytes()); + + assertThat(listLibraryBinary, hasSize(1)); + + List listWithCode = jedis.functionListWithCode(); + + assertThat(listWithCode, hasSize(1)); + assertThat(listWithCode.get(0).getLibraryName(), equalTo(libraryName)); + assertThat(listWithCode.get(0).getFunctions(), hasSize(1)); + assertThat(listWithCode.get(0).getFunctions().get(0), hasEntry("name", functionName)); + assertThat(listWithCode.get(0).getLibraryCode(), notNullValue()); + + List listWithCodeBinary = jedis.functionListWithCodeBinary(); + + assertThat(listWithCodeBinary, hasSize(1)); + + List listWithCodeLibrary = jedis.functionListWithCode(libraryName); + + assertThat(listWithCodeLibrary, hasSize(1)); + assertThat(listWithCodeLibrary.get(0).getLibraryName(), equalTo(libraryName)); + assertThat(listWithCodeLibrary.get(0).getFunctions(), hasSize(1)); + assertThat(listWithCodeLibrary.get(0).getFunctions().get(0), hasEntry("name", functionName)); + assertThat(listWithCodeLibrary.get(0).getLibraryCode(), notNullValue()); + + List listWithCodeLibraryBinary = jedis.functionListWithCode(libraryName.getBytes()); + + assertThat(listWithCodeLibraryBinary, hasSize(1)); + } + + @Test + @SinceRedisVersion(value = "7.0.0") + public void testFunctionReload() { + Object result = jedis.fcall(functionName.getBytes(), new ArrayList<>(), new ArrayList<>()); + assertThat(result, equalTo(42L)); + + String luaScriptChanged = String.format(TEST_LUA_SCRIPT_TMPL, libraryName, functionName, "52"); + String replaceResult = jedis.functionLoadReplace(luaScriptChanged); + assertThat(replaceResult, equalTo("mylib")); + + Object resultAfter = jedis.fcall(functionName.getBytes(), new ArrayList<>(), new ArrayList<>()); + assertThat(resultAfter, equalTo(52L)); + } + + @Test + @SinceRedisVersion(value = "7.0.0") + public void testFunctionReloadBinary() { + Object result = jedis.fcall(functionName.getBytes(), new ArrayList<>(), new ArrayList<>()); + assertThat(result, equalTo(42L)); + + String luaScriptChanged = String.format(TEST_LUA_SCRIPT_TMPL, libraryName, functionName, "52"); + String replaceResult = jedis.functionLoadReplace(luaScriptChanged.getBytes()); + assertThat(replaceResult, equalTo("mylib")); + + Object resultAfter = jedis.fcall(functionName.getBytes(), new ArrayList<>(), new ArrayList<>()); + assertThat(resultAfter, equalTo(52L)); + } + + @Test + @SinceRedisVersion(value = "7.0.0") + public void testFunctionStats() { + + for (int i = 0; i < 5; i++) { + Object result = jedis.fcall(functionName.getBytes(), new ArrayList<>(), new ArrayList<>()); + assertThat(result, equalTo(42L)); + } + + FunctionStats stats = jedis.functionStats(); + + assertThat(stats, notNullValue()); + assertThat(stats.getEngines(), hasKey("LUA")); + Map luaStats = stats.getEngines().get("LUA"); + assertThat(luaStats, hasEntry("libraries_count", 1L)); + assertThat(luaStats, hasEntry("functions_count", 1L)); + + Object statsBinary = jedis.functionStatsBinary(); + + assertThat(statsBinary, notNullValue()); + } + + @Test + @SinceRedisVersion(value = "7.0.0") + public void testFunctionDumpFlushRestore() { + + List list = jedis.functionList(); + assertThat(list, hasSize(1)); + assertThat(list.get(0).getLibraryName(), equalTo(libraryName)); + assertThat(list.get(0).getFunctions(), hasSize(1)); + assertThat(list.get(0).getFunctions().get(0), hasEntry("name", functionName)); + + byte[] dump = jedis.functionDump(); + assertThat(dump, notNullValue()); + + String flush = jedis.functionFlush(); + assertThat(flush, equalTo("OK")); + + list = jedis.functionList(); + assertThat(list, empty()); + + String restore = jedis.functionRestore(dump); + assertThat(restore, equalTo("OK")); + + list = jedis.functionList(); + assertThat(list, hasSize(1)); + assertThat(list.get(0).getLibraryName(), equalTo(libraryName)); + assertThat(list.get(0).getFunctions(), hasSize(1)); + assertThat(list.get(0).getFunctions().get(0), hasEntry("name", functionName)); + } + + @Test + @SinceRedisVersion(value = "7.0.0") + public void testFunctionDumpFlushRestoreWithPolicy() { + + List list = jedis.functionList(); + assertThat(list, hasSize(1)); + assertThat(list.get(0).getLibraryName(), equalTo(libraryName)); + assertThat(list.get(0).getFunctions(), hasSize(1)); + assertThat(list.get(0).getFunctions().get(0), hasEntry("name", functionName)); + + byte[] dump = jedis.functionDump(); + assertThat(dump, notNullValue()); + + String flush = jedis.functionFlush(); + assertThat(flush, equalTo("OK")); + + list = jedis.functionList(); + assertThat(list, empty()); + + String restore = jedis.functionRestore(dump, FunctionRestorePolicy.REPLACE); + assertThat(restore, equalTo("OK")); + + list = jedis.functionList(); + assertThat(list, hasSize(1)); + assertThat(list.get(0).getLibraryName(), equalTo(libraryName)); + assertThat(list.get(0).getFunctions(), hasSize(1)); + assertThat(list.get(0).getFunctions().get(0), hasEntry("name", functionName)); + } + + @Test + @SinceRedisVersion(value = "7.0.0") + public void testFunctionFlushWithMode() { + + List list = jedis.functionList(); + + assertThat(list, hasSize(1)); + assertThat(list.get(0).getLibraryName(), equalTo(libraryName)); + assertThat(list.get(0).getFunctions(), hasSize(1)); + assertThat(list.get(0).getFunctions().get(0), hasEntry("name", functionName)); + + String flush = jedis.functionFlush(FlushMode.SYNC); + assertThat(flush, equalTo("OK")); + + list = jedis.functionList(); + assertThat(list, empty()); + } + + @Test + @SinceRedisVersion(value = "7.0.0") + public void testFunctionKill() { + JedisException e = assertThrows(JedisException.class, () -> jedis.functionKill()); + assertThat(e.getMessage(), containsString("No scripts in execution right now")); + } +} diff --git a/src/test/java/redis/clients/jedis/commands/unified/cluster/ClusterFunctionsCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/cluster/ClusterFunctionsCommandsTest.java new file mode 100644 index 0000000000..5586e1e4d1 --- /dev/null +++ b/src/test/java/redis/clients/jedis/commands/unified/cluster/ClusterFunctionsCommandsTest.java @@ -0,0 +1,74 @@ +package redis.clients.jedis.commands.unified.cluster; + +import io.redis.test.annotations.SinceRedisVersion; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedClass; +import org.junit.jupiter.params.provider.MethodSource; +import redis.clients.jedis.DefaultJedisClientConfig; +import redis.clients.jedis.HostAndPorts; +import redis.clients.jedis.RedisProtocol; +import redis.clients.jedis.commands.unified.FunctionCommandsTestBase; +import redis.clients.jedis.exceptions.JedisBroadcastException; +import redis.clients.jedis.exceptions.JedisException; +import redis.clients.jedis.util.EnabledOnCommandCondition; +import redis.clients.jedis.util.RedisVersionCondition; + +import java.util.List; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@ParameterizedClass +@MethodSource("redis.clients.jedis.commands.CommandsTestsParameters#respVersions") +public class ClusterFunctionsCommandsTest extends FunctionCommandsTestBase { + + public ClusterFunctionsCommandsTest(RedisProtocol protocol) { + super(protocol); + } + + @RegisterExtension + public RedisVersionCondition versionCondition = new RedisVersionCondition( + HostAndPorts.getStableClusterServers().get(0), + DefaultJedisClientConfig.builder().password("cluster").build()); + @RegisterExtension + public EnabledOnCommandCondition enabledOnCommandCondition = new EnabledOnCommandCondition( + HostAndPorts.getStableClusterServers().get(0), + DefaultJedisClientConfig.builder().password("cluster").build()); + + @BeforeEach + public void setUp(TestInfo testInfo) { + jedis = ClusterCommandsTestHelper.getCleanCluster(protocol); + super.setUpFunctions(testInfo); + } + + @AfterEach + public void tearDown() { + super.cleanUpFunctions(); + jedis.close(); + ClusterCommandsTestHelper.clearClusterData(); + } + + @Test + @SinceRedisVersion(value = "7.0.0") + @Override + public void testFunctionKill() { + JedisException e = assertThrows(JedisException.class, + () -> jedis.functionKill()); + assertThat(e, instanceOf(JedisBroadcastException.class)); + JedisBroadcastException jbe = (JedisBroadcastException) e; + List replies = jbe.getReplies().values().stream().map(e1 -> (Exception) e1).map(Exception::getMessage) + .collect(Collectors.toList()); + assertThat(replies.size(), equalTo(3)); + assertThat(replies, everyItem(containsString("No scripts in execution right now"))); + } + +} diff --git a/src/test/resources/env/docker-compose.yml b/src/test/resources/env/docker-compose.yml index bcdeef5835..86239cf9c6 100644 --- a/src/test/resources/env/docker-compose.yml +++ b/src/test/resources/env/docker-compose.yml @@ -131,15 +131,15 @@ services: image: "${CLIENT_LIBS_TEST_IMAGE}:${REDIS_VERSION}" container_name: cluster-stable-1 #network_mode: host - command: --cluster-announce-ip 127.0.0.1 + command: --cluster-announce-ip 127.0.0.1 --cluster-node-timeout 150 environment: - REDIS_CLUSTER=yes - REDIS_PASSWORD=cluster - PORT=7479 - - NODES=3 - - REPLICAS=0 + - NODES=6 + - REPLICAS=1 ports: - - "7479-7481:7479-7481" + - "7479-7484:7479-7484" volumes: - ${REDIS_ENV_CONF_DIR}/cluster-stable/config:/redis/config:r - ${REDIS_ENV_WORK_DIR}/cluster-stable/work:/redis/work:rw From 1bce2048c8c467b6233d26827a204e94c8067c34 Mon Sep 17 00:00:00 2001 From: ggivo Date: Mon, 11 Aug 2025 11:10:26 +0300 Subject: [PATCH 5/5] format --- pom.xml | 2 +- ....java => ClusterFunctionCommandsTest.java} | 19 +++++++++---------- 2 files changed, 10 insertions(+), 11 deletions(-) rename src/test/java/redis/clients/jedis/commands/unified/cluster/{ClusterFunctionsCommandsTest.java => ClusterFunctionCommandsTest.java} (79%) diff --git a/pom.xml b/pom.xml index f4f3d56c03..142aca1bd3 100644 --- a/pom.xml +++ b/pom.xml @@ -348,7 +348,7 @@ src/test/java/redis/clients/jedis/commands/jedis/ClusterStreamsCommandsTest.java src/test/java/redis/clients/jedis/commands/jedis/PooledStreamsCommandsTest.java src/test/java/redis/clients/jedis/resps/StreamEntryDeletionResultTest.java - **/FunctionCommandsTest* + **/*FunctionCommandsTest* diff --git a/src/test/java/redis/clients/jedis/commands/unified/cluster/ClusterFunctionsCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/cluster/ClusterFunctionCommandsTest.java similarity index 79% rename from src/test/java/redis/clients/jedis/commands/unified/cluster/ClusterFunctionsCommandsTest.java rename to src/test/java/redis/clients/jedis/commands/unified/cluster/ClusterFunctionCommandsTest.java index 5586e1e4d1..c7848076fd 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/cluster/ClusterFunctionsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/unified/cluster/ClusterFunctionCommandsTest.java @@ -29,20 +29,20 @@ @ParameterizedClass @MethodSource("redis.clients.jedis.commands.CommandsTestsParameters#respVersions") -public class ClusterFunctionsCommandsTest extends FunctionCommandsTestBase { +public class ClusterFunctionCommandsTest extends FunctionCommandsTestBase { - public ClusterFunctionsCommandsTest(RedisProtocol protocol) { + public ClusterFunctionCommandsTest(RedisProtocol protocol) { super(protocol); } @RegisterExtension public RedisVersionCondition versionCondition = new RedisVersionCondition( - HostAndPorts.getStableClusterServers().get(0), - DefaultJedisClientConfig.builder().password("cluster").build()); + HostAndPorts.getStableClusterServers().get(0), + DefaultJedisClientConfig.builder().password("cluster").build()); @RegisterExtension public EnabledOnCommandCondition enabledOnCommandCondition = new EnabledOnCommandCondition( - HostAndPorts.getStableClusterServers().get(0), - DefaultJedisClientConfig.builder().password("cluster").build()); + HostAndPorts.getStableClusterServers().get(0), + DefaultJedisClientConfig.builder().password("cluster").build()); @BeforeEach public void setUp(TestInfo testInfo) { @@ -61,12 +61,11 @@ public void tearDown() { @SinceRedisVersion(value = "7.0.0") @Override public void testFunctionKill() { - JedisException e = assertThrows(JedisException.class, - () -> jedis.functionKill()); + JedisException e = assertThrows(JedisException.class, () -> jedis.functionKill()); assertThat(e, instanceOf(JedisBroadcastException.class)); JedisBroadcastException jbe = (JedisBroadcastException) e; - List replies = jbe.getReplies().values().stream().map(e1 -> (Exception) e1).map(Exception::getMessage) - .collect(Collectors.toList()); + List replies = jbe.getReplies().values().stream().map(e1 -> (Exception) e1) + .map(Exception::getMessage).collect(Collectors.toList()); assertThat(replies.size(), equalTo(3)); assertThat(replies, everyItem(containsString("No scripts in execution right now"))); }