Skip to content

Commit c5463e2

Browse files
Merge pull request #71 from rabbitmq/rabbitmq-perf-test-70
Rationalize threads usage
2 parents 26dc94e + ee984b2 commit c5463e2

File tree

9 files changed

+330
-97
lines changed

9 files changed

+330
-97
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,12 @@
128128
<groupId>org.junit.jupiter</groupId>
129129
<artifactId>junit-jupiter-migrationsupport</artifactId>
130130
<version>${junit.jupiter.version}</version>
131+
<exclusions>
132+
<exclusion>
133+
<groupId>org.hamcrest</groupId>
134+
<artifactId>hamcrest-core</artifactId>
135+
</exclusion>
136+
</exclusions>
131137
<scope>test</scope>
132138
</dependency>
133139

src/main/java/com/rabbitmq/perf/Consumer.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.rabbitmq.client.DefaultConsumer;
2121
import com.rabbitmq.client.Envelope;
2222
import com.rabbitmq.client.ShutdownSignalException;
23-
import org.slf4j.LoggerFactory;
2423

2524
import java.io.ByteArrayInputStream;
2625
import java.io.DataInputStream;
@@ -30,7 +29,6 @@
3029
import java.util.List;
3130
import java.util.Map;
3231
import java.util.concurrent.CountDownLatch;
33-
import java.util.concurrent.TimeUnit;
3432
import java.util.function.BiFunction;
3533

3634
public class Consumer extends ProducerConsumerBase implements Runnable {
@@ -44,7 +42,6 @@ public class Consumer extends ProducerConsumerBase implements Runnable {
4442
private final int multiAckEvery;
4543
private final Stats stats;
4644
private final int msgLimit;
47-
private final long timeLimit;
4845
private final CountDownLatch latch = new CountDownLatch(1);
4946
private final Map<String, String> consumerTagBranchMap = Collections.synchronizedMap(new HashMap<String, String>());
5047
private final ConsumerLatency consumerLatency;
@@ -53,7 +50,7 @@ public class Consumer extends ProducerConsumerBase implements Runnable {
5350

5451
public Consumer(Channel channel, String id,
5552
List<String> queueNames, int txSize, boolean autoAck,
56-
int multiAckEvery, Stats stats, float rateLimit, int msgLimit, final int timeLimit,
53+
int multiAckEvery, Stats stats, float rateLimit, int msgLimit,
5754
int consumerLatencyInMicroSeconds,
5855
TimestampProvider timestampProvider) {
5956

@@ -66,7 +63,6 @@ public Consumer(Channel channel, String id,
6663
this.multiAckEvery = multiAckEvery;
6764
this.stats = stats;
6865
this.msgLimit = msgLimit;
69-
this.timeLimit = 1000L * timeLimit;
7066
this.timestampProvider = timestampProvider;
7167

7268
if (consumerLatencyInMicroSeconds <= 0) {
@@ -103,17 +99,8 @@ public void run() {
10399
String tag = channel.basicConsume(qName, autoAck, q);
104100
consumerTagBranchMap.put(tag, qName);
105101
}
106-
if (timeLimit == 0) {
107-
latch.await();
108-
}
109-
else {
110-
latch.await(timeLimit, TimeUnit.MILLISECONDS);
111-
}
112-
113102
} catch (IOException e) {
114103
throw new RuntimeException(e);
115-
} catch (InterruptedException e) {
116-
LoggerFactory.getLogger(getClass()).warn("Consumer thread has been interrupted");
117104
} catch (ShutdownSignalException e) {
118105
throw new RuntimeException(e);
119106
}

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ public class MulticastParams {
7676

7777
private TopologyHandler topologyHandler;
7878

79+
private int heartbeatSenderThreads = -1;
80+
7981
public void setExchangeType(String exchangeType) {
8082
this.exchangeType = exchangeType;
8183
}
@@ -289,6 +291,14 @@ public int getQueueSequenceTo() {
289291
return queueSequenceTo;
290292
}
291293

294+
public void setHeartbeatSenderThreads(int heartbeatSenderThreads) {
295+
this.heartbeatSenderThreads = heartbeatSenderThreads;
296+
}
297+
298+
public int getHeartbeatSenderThreads() {
299+
return heartbeatSenderThreads <= 0 ? producerCount + consumerCount : this.heartbeatSenderThreads;
300+
}
301+
292302
public Producer createProducer(Connection connection, Stats stats) throws IOException {
293303
Channel channel = connection.createChannel();
294304
if (producerTxSize > 0) channel.txSelect();
@@ -333,7 +343,7 @@ public Consumer createConsumer(Connection connection, Stats stats) throws IOExce
333343
TimestampProvider tsp = new TimestampProvider(useMillis, timestampInHeader);
334344
Consumer consumer = new Consumer(channel, this.topologyHandler.getRoutingKey(), generatedQueueNames,
335345
consumerTxSize, autoAck, multiAckEvery,
336-
stats, consumerRateLimit, consumerMsgCount, timeLimit,
346+
stats, consumerRateLimit, consumerMsgCount,
337347
consumerLatencyInMicroseconds, tsp);
338348
this.topologyHandler.next();
339349
return consumer;

src/main/java/com/rabbitmq/perf/MulticastSet.java

Lines changed: 108 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,23 @@
2222
import java.net.URISyntaxException;
2323
import java.security.KeyManagementException;
2424
import java.security.NoSuchAlgorithmException;
25+
import java.util.ArrayList;
26+
import java.util.Collection;
2527
import java.util.List;
2628
import java.util.Random;
27-
import java.util.UUID;
29+
import java.util.concurrent.ExecutionException;
30+
import java.util.concurrent.ExecutorService;
31+
import java.util.concurrent.Executors;
32+
import java.util.concurrent.Future;
33+
import java.util.concurrent.ScheduledExecutorService;
34+
import java.util.concurrent.SynchronousQueue;
35+
import java.util.concurrent.ThreadPoolExecutor;
36+
import java.util.concurrent.TimeUnit;
2837
import java.util.concurrent.TimeoutException;
38+
import java.util.function.Supplier;
39+
40+
import static java.lang.Math.min;
41+
import static java.lang.String.format;
2942

3043
public class MulticastSet {
3144

@@ -37,7 +50,7 @@ public class MulticastSet {
3750

3851
private final Random random = new Random();
3952

40-
private ThreadHandler threadHandler = new DefaultThreadHandler();
53+
private ThreadingHandler threadingHandler = new DefaultThreadingHandler();
4154

4255
public MulticastSet(Stats stats, ConnectionFactory factory,
4356
MulticastParams params, List<String> uris) {
@@ -55,73 +68,95 @@ public MulticastSet(Stats stats, ConnectionFactory factory,
5568
this.params.init();
5669
}
5770

58-
public void run() throws IOException, InterruptedException, TimeoutException, NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
71+
public void run()
72+
throws IOException, InterruptedException, TimeoutException, NoSuchAlgorithmException, KeyManagementException, URISyntaxException, ExecutionException {
5973
run(false);
6074
}
6175

6276
public void run(boolean announceStartup)
63-
throws IOException, InterruptedException, TimeoutException, NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
77+
throws IOException, InterruptedException, TimeoutException, NoSuchAlgorithmException, KeyManagementException, URISyntaxException, ExecutionException {
78+
79+
ScheduledExecutorService heartbeatSenderExecutorService = this.threadingHandler.scheduledExecutorService(
80+
"perf-test-heartbeat-sender-",
81+
this.params.getHeartbeatSenderThreads()
82+
);
83+
factory.setHeartbeatExecutor(heartbeatSenderExecutorService);
6484

6585
setUri();
66-
Connection conn = factory.newConnection();
86+
Connection conn = factory.newConnection("perf-test-configuration");
6787
params.configureAllQueues(conn);
6888
conn.close();
6989

7090
this.params.resetTopologyHandler();
7191

72-
Thread[] consumerThreads = new Thread[params.getConsumerThreadCount()];
92+
Runnable[] consumerRunnables = new Runnable[params.getConsumerThreadCount()];
7393
Connection[] consumerConnections = new Connection[params.getConsumerCount()];
7494
for (int i = 0; i < consumerConnections.length; i++) {
7595
if (announceStartup) {
7696
System.out.println("id: " + testID + ", starting consumer #" + i);
7797
}
7898
setUri();
79-
conn = factory.newConnection();
99+
ExecutorService executorService = this.threadingHandler.executorService(
100+
format("perf-test-consumer-%s-worker-", i),
101+
nbThreadsForConsumer(this.params)
102+
);
103+
factory.setSharedExecutor(executorService);
104+
105+
conn = factory.newConnection("perf-test-consumer-" + i);
80106
consumerConnections[i] = conn;
81107
for (int j = 0; j < params.getConsumerChannelCount(); j++) {
82108
if (announceStartup) {
83109
System.out.println("id: " + testID + ", starting consumer #" + i + ", channel #" + j);
84110
}
85-
Thread t = new Thread(params.createConsumer(conn, stats));
86-
consumerThreads[(i * params.getConsumerChannelCount()) + j] = t;
111+
consumerRunnables[(i * params.getConsumerChannelCount()) + j] = params.createConsumer(conn, stats);
87112
}
88113
}
89114

90115
this.params.resetTopologyHandler();
91116

92-
Thread[] producerThreads = new Thread[params.getProducerThreadCount()];
117+
AgentState[] producerStates = new AgentState[params.getProducerThreadCount()];
93118
Connection[] producerConnections = new Connection[params.getProducerCount()];
119+
// producers don't need an executor service, as they don't have any consumers
120+
// this consumer should never be asked to create any threads
121+
ExecutorService executorServiceForProducersConsumers = this.threadingHandler.executorService(
122+
"perf-test-producers-worker-", 0
123+
);
124+
factory.setSharedExecutor(executorServiceForProducersConsumers);
94125
for (int i = 0; i < producerConnections.length; i++) {
95126
if (announceStartup) {
96127
System.out.println("id: " + testID + ", starting producer #" + i);
97128
}
98129
setUri();
99-
conn = factory.newConnection();
130+
conn = factory.newConnection("perf-test-producer-i");
100131
producerConnections[i] = conn;
101132
for (int j = 0; j < params.getProducerChannelCount(); j++) {
102133
if (announceStartup) {
103134
System.out.println("id: " + testID + ", starting producer #" + i + ", channel #" + j);
104135
}
105-
Thread t = new Thread(params.createProducer(conn, stats));
106-
producerThreads[(i * params.getProducerChannelCount()) + j] = t;
136+
AgentState agentState = new AgentState();
137+
agentState.runnable = params.createProducer(conn, stats);
138+
producerStates[(i * params.getProducerChannelCount()) + j] = agentState;
107139
}
108140
}
109141

110-
for (Thread consumerThread : consumerThreads) {
111-
this.threadHandler.start(consumerThread);
142+
for (Runnable runnable : consumerRunnables) {
143+
runnable.run();
112144
if(params.getConsumerSlowStart()) {
113-
System.out.println("Delaying start by 1 second because -S/--slow-start was requested");
114-
Thread.sleep(1000);
145+
System.out.println("Delaying start by 1 second because -S/--slow-start was requested");
146+
Thread.sleep(1000);
115147
}
116148
}
117149

118-
for (Thread producerThread : producerThreads) {
119-
this.threadHandler.start(producerThread);
150+
ExecutorService producersExecutorService = this.threadingHandler.executorService(
151+
"perf-test-producer-", producerStates.length
152+
);
153+
for (AgentState producerState : producerStates) {
154+
producerState.task = producersExecutorService.submit(producerState.runnable);
120155
}
121156

122157
int count = 1; // counting the threads
123-
for (int i = 0; i < producerThreads.length; i++) {
124-
this.threadHandler.waitForCompletion(producerThreads[i]);
158+
for (int i = 0; i < producerStates.length; i++) {
159+
producerStates[i].task.get();
125160
if(count % params.getProducerChannelCount() == 0) {
126161
// this is the end of a group of threads on the same connection,
127162
// closing the connection
@@ -130,20 +165,21 @@ public void run(boolean announceStartup)
130165
count++;
131166
}
132167

133-
count = 1; // counting the threads
134-
for (int i = 0; i < consumerThreads.length; i++) {
135-
this.threadHandler.waitForCompletion(consumerThreads[i]);
136-
if(count % params.getConsumerChannelCount() == 0) {
137-
// this is the end of a group of threads on the same connection,
138-
// closing the connection
139-
consumerConnections[count / params.getConsumerChannelCount() - 1].close();
140-
}
141-
count++;
168+
for (Connection consumerConnection : consumerConnections) {
169+
consumerConnection.close();
142170
}
171+
172+
this.threadingHandler.shutdown();
143173
}
144174

145-
public void setThreadHandler(ThreadHandler threadHandler) {
146-
this.threadHandler = threadHandler;
175+
// from Java Client ConsumerWorkService
176+
public final static int DEFAULT_CONSUMER_WORK_SERVICE_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
177+
178+
protected static int nbThreadsForConsumer(MulticastParams params) {
179+
// for backward compatibility, the thread pool should large enough to dedicate
180+
// one thread for each channel when channel number is <= DEFAULT_CONSUMER_WORK_SERVICE_THREAD_POOL_SIZE
181+
// Above this number, we stick to DEFAULT_CONSUMER_WORK_SERVICE_THREAD_POOL_SIZE
182+
return min(params.getConsumerChannelCount(), DEFAULT_CONSUMER_WORK_SERVICE_THREAD_POOL_SIZE);
147183
}
148184

149185
private void setUri() throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
@@ -157,29 +193,61 @@ private String uri() {
157193
return uri;
158194
}
159195

196+
public void setThreadingHandler(ThreadingHandler threadingHandler) {
197+
this.threadingHandler = threadingHandler;
198+
}
199+
160200
/**
161201
* Abstraction for thread management.
162202
* Exists to ease testing.
163203
*/
164-
interface ThreadHandler {
204+
interface ThreadingHandler {
205+
206+
ExecutorService executorService(String name, int nbThreads);
165207

166-
void start(Thread thread);
208+
ScheduledExecutorService scheduledExecutorService(String name, int nbThreads);
167209

168-
void waitForCompletion(Thread thread) throws InterruptedException;
210+
void shutdown();
169211

170212
}
171213

172-
static class DefaultThreadHandler implements ThreadHandler {
214+
static class DefaultThreadingHandler implements ThreadingHandler {
215+
216+
private final Collection<ExecutorService> executorServices = new ArrayList<>();
217+
218+
@Override
219+
public ExecutorService executorService(String name, int nbThreads) {
220+
if (nbThreads <= 0) {
221+
return create(() -> Executors.newSingleThreadExecutor(new NamedThreadFactory(name)));
222+
} else {
223+
return create(() -> Executors.newFixedThreadPool(nbThreads, new NamedThreadFactory(name)));
224+
}
225+
}
173226

174227
@Override
175-
public void start(Thread thread) {
176-
thread.start();
228+
public ScheduledExecutorService scheduledExecutorService(String name, int nbThreads) {
229+
return (ScheduledExecutorService) create(() -> Executors.newScheduledThreadPool(nbThreads, new NamedThreadFactory(name)));
230+
}
231+
232+
private ExecutorService create(Supplier<ExecutorService> s) {
233+
ExecutorService executorService = s.get();
234+
this.executorServices.add(executorService);
235+
return executorService;
177236
}
178237

179238
@Override
180-
public void waitForCompletion(Thread thread) throws InterruptedException {
181-
thread.join();
239+
public void shutdown() {
240+
for (ExecutorService executorService : executorServices) {
241+
executorService.shutdown();
242+
}
182243
}
244+
}
245+
246+
private static class AgentState {
247+
248+
private Runnable runnable;
249+
250+
private Future<?> task;
183251

184252
}
185253

0 commit comments

Comments
 (0)