2828import com .rabbitmq .perf .metrics .PerformanceMetrics ;
2929import io .micrometer .core .instrument .simple .SimpleMeterRegistry ;
3030import java .io .IOException ;
31+ import java .io .PrintStream ;
3132import java .net .URISyntaxException ;
3233import java .security .KeyManagementException ;
3334import java .security .NoSuchAlgorithmException ;
@@ -80,6 +81,7 @@ public class MulticastSet {
8081 private final ConnectionCreator connectionCreator ;
8182 private final ExpectedMetrics expectedMetrics ;
8283 private final InstanceSynchronization instanceSynchronization ;
84+ private final PrintStream out ;
8385
8486 public MulticastSet (
8587 PerformanceMetrics performanceMetrics ,
@@ -167,6 +169,7 @@ public MulticastSet(
167169 this .connectionCreator = new ConnectionCreator (this .factory , this .uris , connectionAllocation );
168170 this .expectedMetrics = expectedMetrics ;
169171 this .instanceSynchronization = instanceSynchronization ;
172+ this .out = params .getOut ();
170173 }
171174
172175 protected static int nbThreadsForConsumer (MulticastParams params ) {
@@ -233,6 +236,7 @@ public void run(boolean announceStartup)
233236 : params .getServersUpLimit (),
234237 uris ,
235238 factory )) {
239+ // TODO do not set a heartbeat executor if Netty is used
236240 ScheduledExecutorService heartbeatSenderExecutorService =
237241 this .threadingHandler .scheduledExecutorService (
238242 "perf-test-heartbeat-sender-" , this .params .getHeartbeatSenderThreads ());
@@ -382,7 +386,7 @@ public void run(boolean announceStartup)
382386
383387 executeShutdownSequence .run ();
384388 } else {
385- System . out .println (
389+ out .println (
386390 "Could not connect to broker(s) in "
387391 + params .getServersStartUpTimeout ()
388392 + " second(s), exiting." );
@@ -475,7 +479,7 @@ private void createConsumers(
475479 int consumerIndex = 0 ;
476480 for (int i = 0 ; i < consumerConnections .length ; i ++) {
477481 if (announceStartup ) {
478- System . out .println ("id: " + testID + ", starting consumer #" + i );
482+ out .println ("id: " + testID + ", starting consumer #" + i );
479483 }
480484 ExecutorService executorService = consumersExecutorsFactory .apply (i );
481485 factory .setSharedExecutor (executorService );
@@ -484,7 +488,7 @@ private void createConsumers(
484488 consumerConnections [i ] = consumerConnection ;
485489 for (int j = 0 ; j < params .getConsumerChannelCount (); j ++) {
486490 if (announceStartup ) {
487- System . out .println ("id: " + testID + ", starting consumer #" + i + ", channel #" + j );
491+ out .println ("id: " + testID + ", starting consumer #" + i + ", channel #" + j );
488492 }
489493 Consumer consumer =
490494 params .createConsumer (
@@ -507,13 +511,13 @@ private void createProducers(
507511 int producerIndex = 0 ;
508512 for (int i = 0 ; i < producerConnections .length ; i ++) {
509513 if (announceStartup ) {
510- System . out .println ("id: " + testID + ", starting producer #" + i );
514+ out .println ("id: " + testID + ", starting producer #" + i );
511515 }
512516 Connection producerConnection = createConnection (PRODUCER_THREAD_PREFIX + i );
513517 producerConnections [i ] = producerConnection ;
514518 for (int j = 0 ; j < params .getProducerChannelCount (); j ++) {
515519 if (announceStartup ) {
516- System . out .println ("id: " + testID + ", starting producer #" + i + ", channel #" + j );
520+ out .println ("id: " + testID + ", starting producer #" + i + ", channel #" + j );
517521 }
518522 AgentState agentState = new AgentState ();
519523 agentState .runnable =
@@ -538,7 +542,7 @@ private void startConsumers(Runnable[] consumerRunnables) throws InterruptedExce
538542 runnable .run ();
539543 LOGGER .debug ("Consumer runnable started" );
540544 if (params .getConsumerSlowStart ()) {
541- System . out .println ("Delaying start by 1 second because -S/--slow-start was requested" );
545+ out .println ("Delaying start by 1 second because -S/--slow-start was requested" );
542546 Thread .sleep (1000 );
543547 }
544548 }
@@ -551,8 +555,7 @@ private void startConsumers(Runnable[] consumerRunnables) throws InterruptedExce
551555 for (Runnable runnable : consumerRunnables ) {
552556 runnable .run ();
553557 if (params .getConsumerSlowStart ()) {
554- System .out .println (
555- "Delaying start by 1 second because -S/--slow-start was requested" );
558+ out .println ("Delaying start by 1 second because -S/--slow-start was requested" );
556559 try {
557560 Thread .sleep (1000 );
558561 } catch (InterruptedException e ) {
0 commit comments