Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/main/java/redis/clients/jedis/MultiNodePipelineBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -38,8 +39,6 @@ public abstract class MultiNodePipelineBase extends PipelineBase
private final Map<HostAndPort, Connection> connections;
private volatile boolean syncing = false;

private final ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);

public MultiNodePipelineBase(CommandObjects commandObjects) {
super(commandObjects);
pipelinedResponses = new LinkedHashMap<>();
Expand Down Expand Up @@ -104,6 +103,8 @@ public final void sync() {
}
syncing = true;

ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);

CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size());
Iterator<Map.Entry<HostAndPort, Queue<Response<?>>>> pipelinedResponsesIterator
= pipelinedResponses.entrySet().iterator();
Expand Down Expand Up @@ -136,6 +137,8 @@ public final void sync() {
log.error("Thread is interrupted during sync.", e);
}

executorService.shutdownNow();

syncing = false;
}

Expand Down
36 changes: 36 additions & 0 deletions src/test/java/redis/clients/jedis/ClusterPipeliningTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1061,4 +1061,40 @@ public void transaction() {
assertThrows(UnsupportedOperationException.class, () -> cluster.multi());
}
}

@Test(timeout = 10_000L)
public void multiple() {
final int maxTotal = 100;
ConnectionPoolConfig poolConfig = new ConnectionPoolConfig();
poolConfig.setMaxTotal(maxTotal);
try (JedisCluster cluster = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG, 5, poolConfig)) {
for (int i = 0; i < maxTotal; i++) {
assertThreadsCount();
String s = Integer.toString(i);
try (ClusterPipeline pipeline = cluster.pipelined()) {
pipeline.set(s, s);
pipeline.sync();
}
assertThreadsCount();
}
}
}

private static void assertThreadsCount() {
// Get the root thread group
final ThreadGroup rootGroup = Thread.currentThread().getThreadGroup().getParent();

// Create a buffer to store the thread information
final Thread[] threads = new Thread[rootGroup.activeCount()];

// Enumerate all threads into the buffer
rootGroup.enumerate(threads);

// Assert information about threads
final int count = (int) Arrays.stream(threads)
.filter(thread -> thread != null && thread.getName() != null
&& thread.getName().startsWith("pool-"))
.count();
assertTrue(count < 9);
}
}