Skip to content

Commit 21f41a2

Browse files
Merge pull request #74 from rabbitmq/rabbitmq-perf-test-72
Enforce producer/consumer message and time limit
2 parents c5463e2 + 7d549f7 commit 21f41a2

File tree

10 files changed

+538
-35
lines changed

10 files changed

+538
-35
lines changed

pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
<junit.platform.version>1.0.2</junit.platform.version>
6464
<mockito.version>2.13.0</mockito.version>
6565
<hamcrest.version>2.0.0.0</hamcrest.version>
66+
<awaitility.version>3.0.0</awaitility.version>
6667

6768
<!-- to sign artifacts when releasing -->
6869
<gpg.keyname>6026DFCA</gpg.keyname>
@@ -159,6 +160,14 @@
159160
<scope>test</scope>
160161
</dependency>
161162

163+
<dependency>
164+
<groupId>org.awaitility</groupId>
165+
<artifactId>awaitility</artifactId>
166+
<version>${awaitility.version}</version>
167+
<scope>test</scope>
168+
</dependency>
169+
170+
162171
<dependency>
163172
<groupId>ch.qos.logback</groupId>
164173
<artifactId>logback-classic</artifactId>

src/main/java/com/rabbitmq/perf/Consumer.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import java.util.HashMap;
2929
import java.util.List;
3030
import java.util.Map;
31-
import java.util.concurrent.CountDownLatch;
31+
import java.util.concurrent.atomic.AtomicBoolean;
3232
import java.util.function.BiFunction;
3333

3434
public class Consumer extends ProducerConsumerBase implements Runnable {
@@ -42,17 +42,19 @@ public class Consumer extends ProducerConsumerBase implements Runnable {
4242
private final int multiAckEvery;
4343
private final Stats stats;
4444
private final int msgLimit;
45-
private final CountDownLatch latch = new CountDownLatch(1);
4645
private final Map<String, String> consumerTagBranchMap = Collections.synchronizedMap(new HashMap<String, String>());
4746
private final ConsumerLatency consumerLatency;
4847
private final BiFunction<BasicProperties, byte[], Long> timestampExtractor;
4948
private final TimestampProvider timestampProvider;
49+
private final MulticastSet.CompletionHandler completionHandler;
50+
private final AtomicBoolean completed = new AtomicBoolean(false);
5051

5152
public Consumer(Channel channel, String id,
5253
List<String> queueNames, int txSize, boolean autoAck,
5354
int multiAckEvery, Stats stats, float rateLimit, int msgLimit,
5455
int consumerLatencyInMicroSeconds,
55-
TimestampProvider timestampProvider) {
56+
TimestampProvider timestampProvider,
57+
MulticastSet.CompletionHandler completionHandler) {
5658

5759
this.channel = channel;
5860
this.id = id;
@@ -64,6 +66,7 @@ public Consumer(Channel channel, String id,
6466
this.stats = stats;
6567
this.msgLimit = msgLimit;
6668
this.timestampProvider = timestampProvider;
69+
this.completionHandler = completionHandler;
6770

6871
if (consumerLatencyInMicroSeconds <= 0) {
6972
this.consumerLatency = new NoWaitConsumerLatency();
@@ -147,13 +150,13 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
147150
consumerLatency.simulateLatency();
148151
}
149152
if (msgLimit != 0 && msgCount >= msgLimit) { // NB: not quite the inverse of above
150-
latch.countDown();
153+
countDown();
151154
}
152155
}
153156

154157
@Override
155158
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
156-
latch.countDown();
159+
countDown();
157160
}
158161

159162
@Override
@@ -169,6 +172,12 @@ public void handleCancel(String consumerTag) throws IOException {
169172
}
170173
}
171174

175+
private void countDown() {
176+
if (completed.compareAndSet(false, true)) {
177+
completionHandler.countDown();
178+
}
179+
}
180+
172181
private interface ConsumerLatency {
173182

174183
void simulateLatency();

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,19 @@ public int getHeartbeatSenderThreads() {
299299
return heartbeatSenderThreads <= 0 ? producerCount + consumerCount : this.heartbeatSenderThreads;
300300
}
301301

302-
public Producer createProducer(Connection connection, Stats stats) throws IOException {
302+
public int getTimeLimit() {
303+
return timeLimit;
304+
}
305+
306+
public int getProducerMsgCount() {
307+
return producerMsgCount;
308+
}
309+
310+
public int getConsumerMsgCount() {
311+
return consumerMsgCount;
312+
}
313+
314+
public Producer createProducer(Connection connection, Stats stats, MulticastSet.CompletionHandler completionHandler) throws IOException {
303315
Channel channel = connection.createChannel();
304316
if (producerTxSize > 0) channel.txSelect();
305317
if (confirm >= 0) channel.confirmSelect();
@@ -318,16 +330,15 @@ public Producer createProducer(Connection connection, Stats stats) throws IOExce
318330
final Producer producer = new Producer(channel, exchangeName, this.topologyHandler.getRoutingKey(),
319331
randomRoutingKey, flags, producerTxSize,
320332
producerRateLimit, producerMsgCount,
321-
timeLimit,
322333
confirm, confirmTimeout, messageBodySource,
323-
tsp, stats);
334+
tsp, stats, completionHandler);
324335
channel.addReturnListener(producer);
325336
channel.addConfirmListener(producer);
326337
this.topologyHandler.next();
327338
return producer;
328339
}
329340

330-
public Consumer createConsumer(Connection connection, Stats stats) throws IOException {
341+
public Consumer createConsumer(Connection connection, Stats stats, MulticastSet.CompletionHandler completionHandler) throws IOException {
331342
Channel channel = connection.createChannel();
332343
if (consumerTxSize > 0) channel.txSelect();
333344
List<String> generatedQueueNames = this.topologyHandler.configureQueuesForClient(connection);
@@ -344,7 +355,7 @@ public Consumer createConsumer(Connection connection, Stats stats) throws IOExce
344355
Consumer consumer = new Consumer(channel, this.topologyHandler.getRoutingKey(), generatedQueueNames,
345356
consumerTxSize, autoAck, multiAckEvery,
346357
stats, consumerRateLimit, consumerMsgCount,
347-
consumerLatencyInMicroseconds, tsp);
358+
consumerLatencyInMicroseconds, tsp, completionHandler);
348359
this.topologyHandler.next();
349360
return consumer;
350361
}
@@ -379,6 +390,10 @@ private static boolean queueExists(Connection connection, final String queueName
379390
return queueName != null && exists(connection, ch -> ch.queueDeclarePassive(queueName));
380391
}
381392

393+
public boolean hasLimit() {
394+
return this.timeLimit > 0 || this.consumerMsgCount > 0 || this.producerCount > 0;
395+
}
396+
382397
private interface Checker {
383398
void check(Channel ch) throws IOException;
384399
}

src/main/java/com/rabbitmq/perf/MulticastSet.java

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,12 @@
2626
import java.util.Collection;
2727
import java.util.List;
2828
import java.util.Random;
29+
import java.util.concurrent.CountDownLatch;
2930
import java.util.concurrent.ExecutionException;
3031
import java.util.concurrent.ExecutorService;
3132
import java.util.concurrent.Executors;
3233
import java.util.concurrent.Future;
3334
import java.util.concurrent.ScheduledExecutorService;
34-
import java.util.concurrent.SynchronousQueue;
35-
import java.util.concurrent.ThreadPoolExecutor;
3635
import java.util.concurrent.TimeUnit;
3736
import java.util.concurrent.TimeoutException;
3837
import java.util.function.Supplier;
@@ -52,19 +51,21 @@ public class MulticastSet {
5251

5352
private ThreadingHandler threadingHandler = new DefaultThreadingHandler();
5453

54+
private final CompletionHandler completionHandler;
55+
5556
public MulticastSet(Stats stats, ConnectionFactory factory,
56-
MulticastParams params, List<String> uris) {
57-
this(stats, factory, params, "perftest", uris);
57+
MulticastParams params, List<String> uris, CompletionHandler completionHandler) {
58+
this(stats, factory, params, "perftest", uris, completionHandler);
5859
}
5960

6061
public MulticastSet(Stats stats, ConnectionFactory factory,
61-
MulticastParams params, String testID, List<String> uris) {
62+
MulticastParams params, String testID, List<String> uris, CompletionHandler completionHandler) {
6263
this.stats = stats;
6364
this.factory = factory;
6465
this.params = params;
6566
this.testID = testID;
6667
this.uris = uris;
67-
68+
this.completionHandler = completionHandler;
6869
this.params.init();
6970
}
7071

@@ -108,7 +109,7 @@ public void run(boolean announceStartup)
108109
if (announceStartup) {
109110
System.out.println("id: " + testID + ", starting consumer #" + i + ", channel #" + j);
110111
}
111-
consumerRunnables[(i * params.getConsumerChannelCount()) + j] = params.createConsumer(conn, stats);
112+
consumerRunnables[(i * params.getConsumerChannelCount()) + j] = params.createConsumer(conn, stats, this.completionHandler);
112113
}
113114
}
114115

@@ -134,7 +135,7 @@ public void run(boolean announceStartup)
134135
System.out.println("id: " + testID + ", starting producer #" + i + ", channel #" + j);
135136
}
136137
AgentState agentState = new AgentState();
137-
agentState.runnable = params.createProducer(conn, stats);
138+
agentState.runnable = params.createProducer(conn, stats, this.completionHandler);
138139
producerStates[(i * params.getProducerChannelCount()) + j] = agentState;
139140
}
140141
}
@@ -154,9 +155,11 @@ public void run(boolean announceStartup)
154155
producerState.task = producersExecutorService.submit(producerState.runnable);
155156
}
156157

158+
this.completionHandler.waitForCompletion();
159+
157160
int count = 1; // counting the threads
158161
for (int i = 0; i < producerStates.length; i++) {
159-
producerStates[i].task.get();
162+
producerStates[i].task.cancel(true);
160163
if(count % params.getProducerChannelCount() == 0) {
161164
// this is the end of a group of threads on the same connection,
162165
// closing the connection
@@ -251,4 +254,51 @@ private static class AgentState {
251254

252255
}
253256

257+
interface CompletionHandler {
258+
259+
void waitForCompletion() throws InterruptedException;
260+
261+
void countDown();
262+
263+
}
264+
265+
static class DefaultCompletionHandler implements CompletionHandler {
266+
267+
private final int timeLimit;
268+
private final CountDownLatch latch;
269+
270+
DefaultCompletionHandler(int timeLimit, int countLimit) {
271+
this.timeLimit = timeLimit;
272+
this.latch = new CountDownLatch(countLimit <= 0 ? 1 : countLimit);
273+
}
274+
275+
@Override
276+
public void waitForCompletion() throws InterruptedException {
277+
if (timeLimit <= 0) {
278+
this.latch.await();
279+
} else {
280+
this.latch.await(timeLimit, TimeUnit.SECONDS);
281+
}
282+
}
283+
284+
@Override
285+
public void countDown() {
286+
latch.countDown();
287+
}
288+
}
289+
290+
static class NoLimitCompletionHandler implements CompletionHandler {
291+
292+
private final CountDownLatch latch = new CountDownLatch(1);
293+
294+
@Override
295+
public void waitForCompletion() throws InterruptedException {
296+
latch.await();
297+
}
298+
299+
@Override
300+
public void countDown() { }
301+
}
302+
303+
254304
}

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,9 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
209209
p.setQueueSequenceTo(to);
210210
p.setHeartbeatSenderThreads(heartbeatSenderThreads);
211211

212-
MulticastSet set = new MulticastSet(stats, factory, p, testID, uris);
212+
MulticastSet.CompletionHandler completionHandler = getCompletionHandler(p);
213+
214+
MulticastSet set = new MulticastSet(stats, factory, p, testID, uris, completionHandler);
213215
set.run(true);
214216

215217
stats.printFinal();
@@ -224,6 +226,26 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
224226
}
225227
}
226228

229+
static MulticastSet.CompletionHandler getCompletionHandler(MulticastParams p) {
230+
MulticastSet.CompletionHandler completionHandler;
231+
if (p.hasLimit()) {
232+
int countLimit = 0;
233+
if (p.getProducerMsgCount() > 0) {
234+
countLimit += p.getProducerThreadCount();
235+
}
236+
if (p.getConsumerMsgCount() > 0) {
237+
countLimit += p.getProducerThreadCount();
238+
}
239+
completionHandler = new MulticastSet.DefaultCompletionHandler(
240+
p.getTimeLimit(),
241+
countLimit
242+
);
243+
} else {
244+
completionHandler = new MulticastSet.NoLimitCompletionHandler();
245+
}
246+
return completionHandler;
247+
}
248+
227249
public static void main(String[] args) {
228250
main(args, new PerfTestOptions().setSystemExiter(new JvmSystemExiter()).setSkipSslContextConfiguration(false));
229251
}

0 commit comments

Comments
 (0)