@@ -62,6 +62,7 @@ public class MulticastSet {
6262 * There's a command line argument to override this anyway.
6363 */
6464 private static final int PUBLISHING_INTERVAL_NB_PRODUCERS_PER_THREAD = 50 ;
65+ private static final String PRODUCER_THREAD_PREFIX = "perf-test-producer-" ;
6566 private final Stats stats ;
6667 private final ConnectionFactory factory ;
6768 private final MulticastParams params ;
@@ -124,6 +125,32 @@ public void run(boolean announceStartup)
124125
125126 Runnable [] consumerRunnables = new Runnable [params .getConsumerThreadCount ()];
126127 Connection [] consumerConnections = new Connection [params .getConsumerCount ()];
128+ Function <Integer , ExecutorService > consumersExecutorsFactory ;
129+ consumersExecutorsFactory = createConsumersExecutorsFactory ();
130+
131+ createConsumers (announceStartup , consumerRunnables , consumerConnections , consumersExecutorsFactory );
132+
133+ this .params .resetTopologyHandler ();
134+
135+ AgentState [] producerStates = new AgentState [params .getProducerThreadCount ()];
136+ Connection [] producerConnections = new Connection [params .getProducerCount ()];
137+ // producers don't need an executor service, as they don't have any consumers
138+ // this consumer should never be asked to create any threads
139+ ExecutorService executorServiceForProducersConsumers = this .threadingHandler .executorService (
140+ "perf-test-producers-worker-" , 0
141+ );
142+ factory .setSharedExecutor (executorServiceForProducersConsumers );
143+ createProducers (announceStartup , producerStates , producerConnections );
144+
145+ startConsumers (consumerRunnables );
146+ startProducers (producerStates );
147+
148+ this .completionHandler .waitForCompletion ();
149+
150+ shutdown (configurationConnection , consumerConnections , producerStates , producerConnections );
151+ }
152+
153+ private Function <Integer , ExecutorService > createConsumersExecutorsFactory () {
127154 Function <Integer , ExecutorService > consumersExecutorsFactory ;
128155 if (params .getConsumersThreadPools () > 0 ) {
129156 consumersExecutorsFactory = new CacheConsumersExecutorsFactory (
@@ -134,6 +161,10 @@ public void run(boolean announceStartup)
134161 this .threadingHandler , this .params
135162 );
136163 }
164+ return consumersExecutorsFactory ;
165+ }
166+
167+ private void createConsumers (boolean announceStartup , Runnable [] consumerRunnables , Connection [] consumerConnections , Function <Integer , ExecutorService > consumersExecutorsFactory ) throws URISyntaxException , NoSuchAlgorithmException , KeyManagementException , IOException , TimeoutException {
137168 for (int i = 0 ; i < consumerConnections .length ; i ++) {
138169 if (announceStartup ) {
139170 System .out .println ("id: " + testID + ", starting consumer #" + i );
@@ -151,23 +182,15 @@ public void run(boolean announceStartup)
151182 consumerRunnables [(i * params .getConsumerChannelCount ()) + j ] = params .createConsumer (consumerConnection , stats , this .completionHandler );
152183 }
153184 }
185+ }
154186
155- this .params .resetTopologyHandler ();
156-
157- AgentState [] producerStates = new AgentState [params .getProducerThreadCount ()];
158- Connection [] producerConnections = new Connection [params .getProducerCount ()];
159- // producers don't need an executor service, as they don't have any consumers
160- // this consumer should never be asked to create any threads
161- ExecutorService executorServiceForProducersConsumers = this .threadingHandler .executorService (
162- "perf-test-producers-worker-" , 0
163- );
164- factory .setSharedExecutor (executorServiceForProducersConsumers );
187+ private void createProducers (boolean announceStartup , AgentState [] producerStates , Connection [] producerConnections ) throws URISyntaxException , NoSuchAlgorithmException , KeyManagementException , IOException , TimeoutException {
165188 for (int i = 0 ; i < producerConnections .length ; i ++) {
166189 if (announceStartup ) {
167190 System .out .println ("id: " + testID + ", starting producer #" + i );
168191 }
169192 setUri ();
170- Connection producerConnection = factory .newConnection ("perf-test-producer-" + i );
193+ Connection producerConnection = factory .newConnection (PRODUCER_THREAD_PREFIX + i );
171194 producerConnections [i ] = producerConnection ;
172195 for (int j = 0 ; j < params .getProducerChannelCount (); j ++) {
173196 if (announceStartup ) {
@@ -178,18 +201,22 @@ public void run(boolean announceStartup)
178201 producerStates [(i * params .getProducerChannelCount ()) + j ] = agentState ;
179202 }
180203 }
204+ }
181205
206+ private void startConsumers (Runnable [] consumerRunnables ) throws InterruptedException {
182207 for (Runnable runnable : consumerRunnables ) {
183208 runnable .run ();
184209 if (params .getConsumerSlowStart ()) {
185210 System .out .println ("Delaying start by 1 second because -S/--slow-start was requested" );
186211 Thread .sleep (1000 );
187212 }
188213 }
214+ }
189215
216+ private void startProducers (AgentState [] producerStates ) {
190217 if (params .getPublishingInterval () > 0 ) {
191218 ScheduledExecutorService producersExecutorService = this .threadingHandler .scheduledExecutorService (
192- "perf-test-producer-" , nbThreadsForConsumer (params )
219+ PRODUCER_THREAD_PREFIX , nbThreadsForConsumer (params )
193220 );
194221 Supplier <Integer > startDelaySupplier ;
195222 if (params .getProducerRandomStartDelayInSeconds () > 0 ) {
@@ -203,21 +230,21 @@ public void run(boolean announceStartup)
203230 AgentState producerState = producerStates [i ];
204231 int delay = startDelaySupplier .get ();
205232 producerState .task = producersExecutorService .scheduleAtFixedRate (
206- producerState .runnable .createRunnableForScheduling (),
207- delay , publishingInterval , TimeUnit .SECONDS
233+ producerState .runnable .createRunnableForScheduling (),
234+ delay , publishingInterval , TimeUnit .SECONDS
208235 );
209236 }
210237 } else {
211238 ExecutorService producersExecutorService = this .threadingHandler .executorService (
212- "perf-test-producer-" , producerStates .length
239+ PRODUCER_THREAD_PREFIX , producerStates .length
213240 );
214241 for (AgentState producerState : producerStates ) {
215242 producerState .task = producersExecutorService .submit (producerState .runnable );
216243 }
217244 }
245+ }
218246
219- this .completionHandler .waitForCompletion ();
220-
247+ private void shutdown (Connection configurationConnection , Connection [] consumerConnections , AgentState [] producerStates , Connection [] producerConnections ) {
221248 try {
222249 LOGGER .debug ("Starting test shutdown" );
223250 for (AgentState producerState : producerStates ) {
0 commit comments