3737import java .util .List ;
3838import java .util .Map ;
3939import java .util .concurrent .Executors ;
40+ import java .util .concurrent .ScheduledExecutorService ;
41+ import java .util .concurrent .ThreadFactory ;
4042import java .util .concurrent .atomic .AtomicBoolean ;
4143import java .util .concurrent .atomic .AtomicInteger ;
4244import java .util .concurrent .atomic .AtomicReference ;
@@ -62,7 +64,12 @@ public class RecoveryClusterTest {
6264 EnvironmentBuilder environmentBuilder ;
6365 static List <Level > logLevels ;
6466 static List <Class <?>> logClasses =
65- List .of (ProducersCoordinator .class , ConsumersCoordinator .class , StreamEnvironment .class );
67+ List .of (
68+ ProducersCoordinator .class ,
69+ ConsumersCoordinator .class ,
70+ StreamEnvironment .class ,
71+ AsyncRetry .class );
72+ ScheduledExecutorService scheduledExecutorService ;
6673
6774 @ BeforeAll
6875 static void initAll () {
@@ -72,10 +79,17 @@ static void initAll() {
7279
7380 @ BeforeEach
7481 void init (TestInfo info ) {
82+ int availableProcessors = Runtime .getRuntime ().availableProcessors ();
83+ LOGGER .info ("Available processors: {}" , availableProcessors );
84+ ThreadFactory threadFactory =
85+ new Utils .NamedThreadFactory ("rabbitmq-stream-environment-scheduler-" );
86+ scheduledExecutorService =
87+ Executors .newScheduledThreadPool (availableProcessors * 2 , threadFactory );
7588 environmentBuilder =
7689 Environment .builder ()
7790 .recoveryBackOffDelayPolicy (BACK_OFF_DELAY_POLICY )
7891 .topologyUpdateBackOffDelayPolicy (BACK_OFF_DELAY_POLICY )
92+ .scheduledExecutorService (scheduledExecutorService )
7993 .netty ()
8094 .eventLoopGroup (eventLoopGroup )
8195 .environmentBuilder ();
@@ -87,6 +101,9 @@ void tearDown() {
87101 if (environment != null ) {
88102 environment .close ();
89103 }
104+ if (scheduledExecutorService != null ) {
105+ scheduledExecutorService .shutdownNow ();
106+ }
90107 }
91108
92109 @ AfterAll
@@ -104,6 +121,10 @@ static void tearDownAll() {
104121 "true,false" ,
105122 })
106123 void clusterRestart (boolean useLoadBalancer , boolean forceLeader ) throws InterruptedException {
124+ LOGGER .info (
125+ "Cluster restart test, use load balancer {}, force leader {}" ,
126+ useLoadBalancer ,
127+ forceLeader );
107128 int streamCount = 10 ;
108129 int producerCount = streamCount * 2 ;
109130 int consumerCount = streamCount * 2 ;
0 commit comments