From ef775fb3053659c38de35bfad5721eacb3979801 Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Tue, 13 Jun 2023 20:59:11 +0600 Subject: [PATCH 1/3] Shutdown ExecutorService(s) in multi node pipelines --- .../clients/jedis/MultiNodePipelineBase.java | 14 ++++++-- .../clients/jedis/ClusterPipeliningTest.java | 36 +++++++++++++++++++ 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index fae16a8910..670fb0f66a 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -10,6 +10,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,8 +39,6 @@ public abstract class MultiNodePipelineBase extends PipelineBase private final Map connections; private volatile boolean syncing = false; - private final ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS); - public MultiNodePipelineBase(CommandObjects commandObjects) { super(commandObjects); pipelinedResponses = new LinkedHashMap<>(); @@ -104,6 +103,8 @@ public final void sync() { } syncing = true; + ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS); + CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size()); Iterator>>> pipelinedResponsesIterator = pipelinedResponses.entrySet().iterator(); @@ -136,6 +137,15 @@ public final void sync() { log.error("Thread is interrupted during sync.", e); } + executorService.shutdown(); + try { + if (!executorService.awaitTermination(5, TimeUnit.MINUTES)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + } + syncing = false; } diff --git a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java index 05cd56c2a8..541393cfed 100644 --- a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java +++ b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java @@ -1061,4 +1061,40 @@ public void transaction() { assertThrows(UnsupportedOperationException.class, () -> cluster.multi()); } } + + @Test(timeout = 10 * 1000L) + public void multiple() { + final int maxTotal = 100; + ConnectionPoolConfig poolConfig = new ConnectionPoolConfig(); + poolConfig.setMaxTotal(maxTotal); + try (JedisCluster cluster = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG, 5, poolConfig)) { + for (int i = 0; i < maxTotal; i++) { + assertThreadsCount(); + String s = Integer.toString(i); + try (ClusterPipeline pipeline = cluster.pipelined()) { + pipeline.set(s, s); + pipeline.sync(); + } + assertThreadsCount(); + } + } + } + + private static void assertThreadsCount() { + // Get the root thread group + final ThreadGroup rootGroup = Thread.currentThread().getThreadGroup().getParent(); + + // Create a buffer to store the thread information + final Thread[] threads = new Thread[rootGroup.activeCount()]; + + // Enumerate all threads into the buffer + rootGroup.enumerate(threads); + + // Assert information about threads + final int count = (int) Arrays.stream(threads) + .filter(thread -> thread.getName() + .startsWith("pool-")) + .count(); + assertTrue(count < 9); + } } From 2a1873a914a0d50d1c1b560fa010c5cef110f3fc Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Wed, 14 Jun 2023 13:14:30 +0600 Subject: [PATCH 2/3] Use only shutdownNow() --- .../java/redis/clients/jedis/MultiNodePipelineBase.java | 9 +-------- .../java/redis/clients/jedis/ClusterPipeliningTest.java | 6 +++--- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index 670fb0f66a..10d4874071 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -137,14 +137,7 @@ public final void sync() { log.error("Thread is interrupted during sync.", e); } - executorService.shutdown(); - try { - if (!executorService.awaitTermination(5, TimeUnit.MINUTES)) { - executorService.shutdownNow(); - } - } catch (InterruptedException e) { - executorService.shutdownNow(); - } + executorService.shutdownNow(); syncing = false; } diff --git a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java index 541393cfed..268728b8aa 100644 --- a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java +++ b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java @@ -1062,7 +1062,7 @@ public void transaction() { } } - @Test(timeout = 10 * 1000L) + @Test(timeout = 10_000L) public void multiple() { final int maxTotal = 100; ConnectionPoolConfig poolConfig = new ConnectionPoolConfig(); @@ -1092,8 +1092,8 @@ private static void assertThreadsCount() { // Assert information about threads final int count = (int) Arrays.stream(threads) - .filter(thread -> thread.getName() - .startsWith("pool-")) + .filter(thread -> thread != null && thread.getName() != null + && thread.getName().startsWith("pool-")) .count(); assertTrue(count < 9); } From 7ee6fb922a7865d38d0a50953884af0b99290c4a Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Wed, 14 Jun 2023 13:20:46 +0600 Subject: [PATCH 3/3] format import --- src/main/java/redis/clients/jedis/MultiNodePipelineBase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index 10d4874071..eef6b2a810 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -10,7 +10,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory;