1- // Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
1+ // Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
22//
33// This software, the RabbitMQ Java client library, is triple-licensed under the
44// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
2929import com .rabbitmq .client .impl .recovery .AutorecoveringConnection ;
3030import com .rabbitmq .perf .PerfTest .EXIT_WHEN ;
3131import java .io .IOException ;
32+ import java .math .BigDecimal ;
3233import java .net .URISyntaxException ;
3334import java .security .KeyManagementException ;
3435import java .security .NoSuchAlgorithmException ;
36+ import java .time .Duration ;
3537import java .util .ArrayList ;
3638import java .util .Collection ;
3739import java .util .Collections ;
@@ -61,14 +63,6 @@ public class MulticastSet {
6163 // from Java Client ConsumerWorkService
6264 public final static int DEFAULT_CONSUMER_WORK_SERVICE_THREAD_POOL_SIZE = Runtime .getRuntime ().availableProcessors () * 2 ;
6365 private static final Logger LOGGER = LoggerFactory .getLogger (MulticastSet .class );
64- /**
65- * Why 50? This is arbitrary. The fastest rate is 1 message / second when
66- * using a publishing interval, so 1 thread should be able to keep up easily with
67- * up to 50 messages / seconds (ie. 50 producers). Then, a new thread is used
68- * every 50 producers. This is 20 threads for 1000 producers, which seems reasonable.
69- * There's a command line argument to override this anyway.
70- */
71- private static final int PUBLISHING_INTERVAL_NB_PRODUCERS_PER_THREAD = 50 ;
7266 private static final String PRODUCER_THREAD_PREFIX = "perf-test-producer-" ;
7367 static final String STOP_REASON_REACHED_TIME_LIMIT = "Reached time limit" ;
7468 private final Stats stats ;
@@ -149,7 +143,30 @@ protected static int nbThreadsForConsumer(MulticastParams params) {
149143 protected static int nbThreadsForProducerScheduledExecutorService (MulticastParams params ) {
150144 int producerExecutorServiceNbThreads = params .getProducerSchedulerThreadCount ();
151145 if (producerExecutorServiceNbThreads <= 0 ) {
152- return params .getProducerThreadCount () / PUBLISHING_INTERVAL_NB_PRODUCERS_PER_THREAD + 1 ;
146+ int producerThreadCount = params .getProducerThreadCount ();
147+ Duration publishingInterval = params .getPublishingInterval () == null ? Duration .ofSeconds (1 )
148+ : params .getPublishingInterval ();
149+ long publishingIntervalMs = publishingInterval .toMillis ();
150+
151+ BigDecimal publishingIntervalSeconds = new BigDecimal (publishingIntervalMs )
152+ .divide (new BigDecimal (1000 ));
153+ BigDecimal rate = new BigDecimal (producerThreadCount )
154+ .divide (publishingIntervalSeconds );
155+ /**
156+ * Why 100? This is arbitrary. We assume 1 thread is more than enough to handle
157+ * the publishing of 100 messages in 1 second, the fastest rate
158+ * being 10 messages / second when using --publishing-interval.
159+ * Then, a new thread is used
160+ * every for every 100 messages / second.
161+ * This is 21 threads for 1000 producers publishing 1 message / second,
162+ * which seems reasonable.
163+ * There's a command line argument to override this anyway.
164+ */
165+ int threadCount = rate .intValue () / 100 + 1 ;
166+ LOGGER .debug ("Using {} thread(s) to schedule {} publisher(s) publishing every {} ms" ,
167+ threadCount , producerThreadCount , publishingInterval .toMillis ()
168+ );
169+ return threadCount ;
153170 } else {
154171 return producerExecutorServiceNbThreads ;
155172 }
@@ -406,9 +423,9 @@ private void startConsumers(Runnable[] consumerRunnables) throws InterruptedExce
406423
407424 private void startProducers (AgentState [] producerStates ) {
408425 this .messageSizeIndicator .start ();
409- if (params .getPublishingInterval () > 0 ) {
426+ if (params .getPublishingInterval () != null ) {
410427 ScheduledExecutorService producersExecutorService = this .threadingHandler .scheduledExecutorService (
411- PRODUCER_THREAD_PREFIX , nbThreadsForConsumer (params )
428+ PRODUCER_THREAD_PREFIX , nbThreadsForProducerScheduledExecutorService (params )
412429 );
413430 Supplier <Integer > startDelaySupplier ;
414431 if (params .getProducerRandomStartDelayInSeconds () > 0 ) {
@@ -417,13 +434,13 @@ PRODUCER_THREAD_PREFIX, nbThreadsForConsumer(params)
417434 } else {
418435 startDelaySupplier = () -> 0 ;
419436 }
420- int publishingInterval = params .getPublishingInterval ();
437+ Duration publishingInterval = params .getPublishingInterval ();
421438 for (int i = 0 ; i < producerStates .length ; i ++) {
422439 AgentState producerState = producerStates [i ];
423440 int delay = startDelaySupplier .get ();
424441 producerState .task = producersExecutorService .scheduleAtFixedRate (
425442 producerState .runnable .createRunnableForScheduling (),
426- delay , publishingInterval , TimeUnit .SECONDS
443+ delay , publishingInterval . toMillis () , TimeUnit .MILLISECONDS
427444 );
428445 }
429446 } else {
0 commit comments