Skip to content

Commit 48f88d9

Browse files
authored
Merge pull request #288 from rabbitmq/rabbitmq-perf-test-287-decimal-publishing-interval
Allows decimal values for publishing interval
2 parents a68aa07 + 4327f94 commit 48f88d9

File tree

9 files changed

+139
-38
lines changed

9 files changed

+139
-38
lines changed

src/main/java/com/rabbitmq/perf/MulticastParams.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -22,6 +22,7 @@
2222

2323
import com.rabbitmq.perf.PerfTest.EXIT_WHEN;
2424
import java.io.IOException;
25+
import java.time.Duration;
2526
import java.util.ArrayList;
2627
import java.util.Collections;
2728
import java.util.List;
@@ -92,7 +93,7 @@ public class MulticastParams {
9293

9394
private int routingKeyCacheSize = 0;
9495
private boolean exclusive = false;
95-
private int publishingInterval = -1;
96+
private Duration publishingInterval = null;
9697
private int producerRandomStartDelayInSeconds;
9798
private int producerSchedulerThreadCount = -1;
9899
private int consumersThreadPools = -1;
@@ -625,11 +626,11 @@ public boolean isExclusive() {
625626
return exclusive;
626627
}
627628

628-
public void setPublishingInterval(int publishingIntervalInSeconds) {
629-
this.publishingInterval = publishingIntervalInSeconds;
629+
public void setPublishingInterval(Duration publishingInterval) {
630+
this.publishingInterval = publishingInterval;
630631
}
631632

632-
public int getPublishingInterval() {
633+
public Duration getPublishingInterval() {
633634
return publishingInterval;
634635
}
635636

src/main/java/com/rabbitmq/perf/MulticastSet.java

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -32,6 +32,7 @@
3232
import java.net.URISyntaxException;
3333
import java.security.KeyManagementException;
3434
import java.security.NoSuchAlgorithmException;
35+
import java.time.Duration;
3536
import java.util.ArrayList;
3637
import java.util.Collection;
3738
import java.util.Collections;
@@ -61,14 +62,6 @@ public class MulticastSet {
6162
// from Java Client ConsumerWorkService
6263
public final static int DEFAULT_CONSUMER_WORK_SERVICE_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
6364
private static final Logger LOGGER = LoggerFactory.getLogger(MulticastSet.class);
64-
/**
65-
* Why 50? This is arbitrary. The fastest rate is 1 message / second when
66-
* using a publishing interval, so 1 thread should be able to keep up easily with
67-
* up to 50 messages / seconds (ie. 50 producers). Then, a new thread is used
68-
* every 50 producers. This is 20 threads for 1000 producers, which seems reasonable.
69-
* There's a command line argument to override this anyway.
70-
*/
71-
private static final int PUBLISHING_INTERVAL_NB_PRODUCERS_PER_THREAD = 50;
7265
private static final String PRODUCER_THREAD_PREFIX = "perf-test-producer-";
7366
static final String STOP_REASON_REACHED_TIME_LIMIT = "Reached time limit";
7467
private final Stats stats;
@@ -149,7 +142,28 @@ protected static int nbThreadsForConsumer(MulticastParams params) {
149142
protected static int nbThreadsForProducerScheduledExecutorService(MulticastParams params) {
150143
int producerExecutorServiceNbThreads = params.getProducerSchedulerThreadCount();
151144
if (producerExecutorServiceNbThreads <= 0) {
152-
return params.getProducerThreadCount() / PUBLISHING_INTERVAL_NB_PRODUCERS_PER_THREAD + 1;
145+
int producerThreadCount = params.getProducerThreadCount();
146+
Duration publishingInterval = params.getPublishingInterval() == null ? Duration.ofSeconds(1)
147+
: params.getPublishingInterval();
148+
long publishingIntervalMs = publishingInterval.toMillis();
149+
150+
double publishingIntervalSeconds = (double) publishingIntervalMs / 1000d;
151+
double rate = (double) producerThreadCount / publishingIntervalSeconds;
152+
/**
153+
* Why 100? This is arbitrary. We assume 1 thread is more than enough to handle
154+
* the publishing of 100 messages in 1 second, the fastest rate
155+
* being 10 messages / second when using --publishing-interval.
156+
* Then, a new thread is used
157+
* every for every 100 messages / second.
158+
* This is 21 threads for 1000 producers publishing 1 message / second,
159+
* which seems reasonable.
160+
* There's a command line argument to override this anyway.
161+
*/
162+
int threadCount = (int) (rate / 100d) + 1;
163+
LOGGER.debug("Using {} thread(s) to schedule {} publisher(s) publishing every {} ms",
164+
threadCount, producerThreadCount, publishingInterval.toMillis()
165+
);
166+
return threadCount;
153167
} else {
154168
return producerExecutorServiceNbThreads;
155169
}
@@ -406,9 +420,9 @@ private void startConsumers(Runnable[] consumerRunnables) throws InterruptedExce
406420

407421
private void startProducers(AgentState[] producerStates) {
408422
this.messageSizeIndicator.start();
409-
if (params.getPublishingInterval() > 0) {
423+
if (params.getPublishingInterval() != null) {
410424
ScheduledExecutorService producersExecutorService = this.threadingHandler.scheduledExecutorService(
411-
PRODUCER_THREAD_PREFIX, nbThreadsForConsumer(params)
425+
PRODUCER_THREAD_PREFIX, nbThreadsForProducerScheduledExecutorService(params)
412426
);
413427
Supplier<Integer> startDelaySupplier;
414428
if (params.getProducerRandomStartDelayInSeconds() > 0) {
@@ -417,13 +431,13 @@ PRODUCER_THREAD_PREFIX, nbThreadsForConsumer(params)
417431
} else {
418432
startDelaySupplier = () -> 0;
419433
}
420-
int publishingInterval = params.getPublishingInterval();
434+
Duration publishingInterval = params.getPublishingInterval();
421435
for (int i = 0; i < producerStates.length; i++) {
422436
AgentState producerState = producerStates[i];
423437
int delay = startDelaySupplier.get();
424438
producerState.task = producersExecutorService.scheduleAtFixedRate(
425439
producerState.runnable.createRunnableForScheduling(),
426-
delay, publishingInterval, TimeUnit.SECONDS
440+
delay, publishingInterval.toMillis(), TimeUnit.MILLISECONDS
427441
);
428442
}
429443
} else {

src/main/java/com/rabbitmq/perf/PerfTest.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.rabbitmq.client.impl.DefaultExceptionHandler;
2424
import com.rabbitmq.client.impl.nio.NioParams;
2525
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
26+
import java.math.BigDecimal;
27+
import java.time.Duration;
2628
import java.util.stream.Collectors;
2729
import org.apache.commons.cli.*;
2830
import org.slf4j.Logger;
@@ -130,7 +132,16 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
130132
String messageProperties = strArg(cmd, "mp", null);
131133
int routingKeyCacheSize = intArg(cmd, "rkcs", 0);
132134
boolean exclusive = hasOption(cmd, "E");
133-
int publishingIntervalInSeconds = intArg(cmd, "P", -1);
135+
Duration publishingInterval = null;
136+
String publishingIntervalArg = strArg(cmd, "P", null);
137+
if (publishingIntervalArg != null) {
138+
try {
139+
publishingInterval = parsePublishingInterval(publishingIntervalArg);
140+
} catch (IllegalArgumentException e) {
141+
System.out.println("Invalid value for --publishing-interval: " + e.getMessage());
142+
systemExiter.exit(1);
143+
}
144+
}
134145
int producerRandomStartDelayInSeconds = intArg(cmd, "prsd", -1);
135146
int producerSchedulingThreads = intArg(cmd, "pst", -1);
136147

@@ -374,7 +385,7 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
374385
p.setMessageProperties(convertKeyValuePairs(messageProperties));
375386
p.setRoutingKeyCacheSize(routingKeyCacheSize);
376387
p.setExclusive(exclusive);
377-
p.setPublishingInterval(publishingIntervalInSeconds);
388+
p.setPublishingInterval(publishingInterval);
378389
p.setProducerRandomStartDelayInSeconds(producerRandomStartDelayInSeconds);
379390
p.setProducerSchedulerThreadCount(producerSchedulingThreads);
380391
p.setConsumersThreadPools(consumersThreadPools);
@@ -811,6 +822,23 @@ private static void versionInformation() {
811822
System.out.println("\u001B[0m" + info);
812823
}
813824

825+
static Duration parsePublishingInterval(String input) {
826+
BigDecimal decimalValue;
827+
try {
828+
decimalValue = new BigDecimal(input);
829+
} catch (NumberFormatException e) {
830+
throw new IllegalArgumentException("Must be a number");
831+
}
832+
if (decimalValue.compareTo(BigDecimal.ZERO) <= 0) {
833+
throw new IllegalArgumentException("Must be positive");
834+
}
835+
Duration result = Duration.ofMillis(decimalValue.multiply(BigDecimal.valueOf(1000)).longValue());
836+
if (result.toMillis() < 100) {
837+
throw new IllegalArgumentException("Cannot be less than 0.1");
838+
}
839+
return result;
840+
}
841+
814842
/**
815843
* Abstraction to ease testing or PerfTest usage as a library.
816844
*/

src/test/java/com/rabbitmq/perf/MessageCountTimeLimitAndPublishingIntervalRateTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2018-2O20 Pivotal Software, Inc. All rights reserved.
1+
// Copyright (c) 2018-2O22 Pivotal Software, Inc. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -19,6 +19,7 @@
1919
import com.rabbitmq.client.Consumer;
2020
import com.rabbitmq.client.*;
2121
import com.rabbitmq.perf.PerfTest.EXIT_WHEN;
22+
import java.time.Duration;
2223
import java.util.stream.Collectors;
2324
import org.assertj.core.api.Assertions;
2425
import org.assertj.core.api.Condition;
@@ -574,7 +575,7 @@ public void publishingRateLimit() throws InterruptedException {
574575
@Test
575576
public void publishingInterval() throws InterruptedException {
576577
countsAndTimeLimit(0, 0, 6);
577-
params.setPublishingInterval(1);
578+
params.setPublishingInterval(Duration.ofSeconds(1));
578579
params.setProducerCount(3);
579580

580581
AtomicInteger publishedMessageCount = new AtomicInteger();

src/test/java/com/rabbitmq/perf/MulticastParamsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2018-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2018-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2

src/test/java/com/rabbitmq/perf/MulticastSetTest.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
import com.rabbitmq.client.Address;
1919
import com.rabbitmq.client.Connection;
2020
import com.rabbitmq.client.ConnectionFactory;
21+
import java.time.Duration;
2122
import org.apache.commons.lang3.time.StopWatch;
2223
import org.junit.jupiter.api.AfterEach;
2324
import org.junit.jupiter.api.BeforeEach;
2425
import org.junit.jupiter.api.Test;
26+
import org.junit.jupiter.params.ParameterizedTest;
27+
import org.junit.jupiter.params.provider.CsvSource;
2528
import org.mockito.ArgumentCaptor;
2629
import org.mockito.ArgumentMatchers;
2730
import org.mockito.Mock;
@@ -105,18 +108,36 @@ public void nbThreadsForProducerScheduledExecutorServiceDefaultIsOne() {
105108
}
106109

107110
@Test
108-
public void nbThreadsForProducerScheduledExecutorServiceOneThreadEvery50Producers() {
109-
params.setProducerCount(120);
111+
public void nbThreadsForProducerScheduledExecutorServiceOneThreadEvery100Producers() {
112+
params.setProducerCount(220);
110113
assertThat(nbThreadsForProducerScheduledExecutorService(params)).isEqualTo(3);
111114
}
112115

113116
@Test
114-
public void nbThreadsForProducerScheduledExecutorServiceOneThreadEvery50ProducersIncludeChannels() {
117+
public void nbThreadsForProducerScheduledExecutorServiceOneThreadEvery100ProducersIncludeChannels() {
115118
params.setProducerCount(30);
116-
params.setProducerChannelCount(4);
119+
params.setProducerChannelCount(8);
117120
assertThat(nbThreadsForProducerScheduledExecutorService(params)).isEqualTo(3);
118121
}
119122

123+
@ParameterizedTest
124+
@CsvSource({
125+
"1,1000,1",
126+
"200,1000,3",
127+
"299,1000,3",
128+
"300,1000,4",
129+
"200,100,21",
130+
"2000,1000,21",
131+
"1000,60000,1"
132+
})
133+
void nbThreadsForProducerScheduledExecutorServiceOK(int producerCount, int publishingIntervalMs,
134+
int expectedThreadCount) {
135+
params.setProducerCount(producerCount);
136+
params.setPublishingInterval(Duration.ofMillis(publishingIntervalMs));
137+
assertThat(nbThreadsForProducerScheduledExecutorService(params))
138+
.isEqualTo(expectedThreadCount);
139+
}
140+
120141
@Test
121142
public void nbThreadsForProducerScheduledExecutorServiceUseParameterValueWhenSpecified() {
122143
params.setProducerSchedulerThreadCount(7);

src/test/java/com/rabbitmq/perf/PerfTestTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,14 @@
1919

2020
import java.util.HashMap;
2121
import java.util.Map;
22+
import org.junit.jupiter.params.ParameterizedTest;
23+
import org.junit.jupiter.params.provider.CsvSource;
24+
import org.junit.jupiter.params.provider.ValueSource;
2225

2326
import static com.rabbitmq.perf.PerfTest.convertKeyValuePairs;
27+
import static com.rabbitmq.perf.PerfTest.parsePublishingInterval;
2428
import static org.assertj.core.api.Assertions.assertThat;
29+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2530
import static org.junit.jupiter.api.Assertions.*;
2631

2732
/**
@@ -113,4 +118,28 @@ void convertPostProcessKeyValuePairs() {
113118
.containsEntry("x-dead-letter-exchange", "")
114119
.containsEntry("x-queue-type", "quorum");
115120
}
121+
122+
@ParameterizedTest
123+
@CsvSource({
124+
"1,1000",
125+
"0.1,100",
126+
"0.5,500",
127+
"0.55,550",
128+
"2,2000",
129+
"15,15000",
130+
})
131+
void parsePublishingIntervalOK(String input, long expectedMs) {
132+
assertThat(parsePublishingInterval(input).toMillis()).isEqualTo(expectedMs);
133+
}
134+
135+
@ParameterizedTest
136+
@ValueSource(strings = {
137+
"0.09",
138+
"-1",
139+
"0"
140+
})
141+
void parsePublishingIntervalKO(String input) {
142+
assertThatThrownBy(() -> parsePublishingInterval(input))
143+
.isInstanceOf(IllegalArgumentException.class);
144+
}
116145
}

src/test/java/com/rabbitmq/perf/PublisherOnlyStopsCorrectlyTest.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2018-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2018-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -19,6 +19,7 @@
1919
import com.rabbitmq.client.Connection;
2020
import com.rabbitmq.client.ConnectionFactory;
2121
import com.rabbitmq.client.impl.AMQImpl;
22+
import java.time.Duration;
2223
import org.junit.jupiter.api.AfterEach;
2324
import org.junit.jupiter.api.BeforeEach;
2425
import org.junit.jupiter.api.TestInfo;
@@ -49,12 +50,17 @@ public class PublisherOnlyStopsCorrectlyTest {
4950
ExecutorService executorService;
5051

5152
static Stream<Arguments> publisherOnlyStopsWhenBrokerCrashesArguments() {
52-
return Stream.of(
53-
// number of messages before throwing exception, configurator, assertion message
54-
Arguments.of(10, (Consumer<MulticastParams>) (params) -> {
55-
}, "Sender should have failed and program should stop"),
56-
Arguments.of(2, (Consumer<MulticastParams>) (params) -> params.setPublishingInterval(1), "Sender should have failed and program should stop")
57-
);
53+
return Stream.of(
54+
// number of messages before throwing exception, configurator, assertion message
55+
Arguments.of(
56+
10,
57+
(Consumer<MulticastParams>) (params) -> {},
58+
"Sender should have failed and program should stop"),
59+
Arguments.of(
60+
2,
61+
(Consumer<MulticastParams>)
62+
(params) -> params.setPublishingInterval(Duration.ofSeconds(1)),
63+
"Sender should have failed and program should stop"));
5864
}
5965

6066
@BeforeEach

src/test/java/com/rabbitmq/perf/it/ConnectionRecoveryIT.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2018-2020 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2018-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -24,6 +24,7 @@
2424
import com.rabbitmq.perf.NamedThreadFactory;
2525
import com.rabbitmq.perf.Stats;
2626
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
27+
import java.time.Duration;
2728
import org.junit.jupiter.api.AfterEach;
2829
import org.junit.jupiter.api.BeforeEach;
2930
import org.junit.jupiter.api.Test;
@@ -225,7 +226,7 @@ public void shouldStopWhenConnectionRecoveryIsOffAndConnectionsAreKilledAndUsing
225226
cf.setAutomaticRecoveryEnabled(false);
226227
configurer.accept(params);
227228
cfConfigurer.accept(cf);
228-
params.setPublishingInterval(1);
229+
params.setPublishingInterval(Duration.ofSeconds(1));
229230
int producerConsumerCount = params.getProducerCount();
230231

231232
MulticastSet set = new MulticastSet(stats, cf, params, "", URIS, latchCompletionHandler(1, info));
@@ -255,7 +256,7 @@ public void shouldRecoverWhenConnectionsAreKilled(Consumer<MulticastParams> conf
255256
@MethodSource("configurationArguments")
256257
public void shouldRecoverWhenConnectionsAreKilledAndUsingPublishingInterval(Consumer<MulticastParams> configurer, Consumer<ConnectionFactory> cfConfigurer,
257258
TestInfo info) throws Exception {
258-
params.setPublishingInterval(1);
259+
params.setPublishingInterval(Duration.ofSeconds(1));
259260
configurer.accept(params);
260261
cfConfigurer.accept(cf);
261262
int producerConsumerCount = params.getProducerCount();

0 commit comments

Comments
 (0)