1616
1717import static com .rabbitmq .stream .impl .Assertions .assertThat ;
1818import static com .rabbitmq .stream .impl .LoadBalancerClusterTest .LOAD_BALANCER_ADDRESS ;
19+ import static com .rabbitmq .stream .impl .TestUtils .newLoggerLevel ;
1920import static com .rabbitmq .stream .impl .TestUtils .sync ;
20- import static io . vavr . Tuple . of ;
21+ import static com . rabbitmq . stream . impl . Tuples . pair ;
2122import static java .time .Duration .ofSeconds ;
2223import static java .util .stream .Collectors .toList ;
2324import static java .util .stream .IntStream .range ;
2425import static org .assertj .core .api .Assertions .assertThat ;
2526
2627import ch .qos .logback .classic .Level ;
28+ import com .google .common .collect .Streams ;
2729import com .google .common .util .concurrent .RateLimiter ;
2830import com .rabbitmq .stream .*;
2931import com .rabbitmq .stream .impl .TestUtils .Sync ;
32+ import com .rabbitmq .stream .impl .Tuples .Pair ;
3033import io .netty .channel .EventLoopGroup ;
31- import io .vavr .Tuple2 ;
3234import java .nio .charset .StandardCharsets ;
3335import java .time .Duration ;
3436import java .util .LinkedHashMap ;
@@ -58,13 +60,14 @@ public class RecoveryClusterTest {
5860 TestInfo testInfo ;
5961 EventLoopGroup eventLoopGroup ;
6062 EnvironmentBuilder environmentBuilder ;
61- static Level producersCoordinatorLogLevel ;
63+ static List <Level > logLevels ;
64+ static List <Class <?>> logClasses =
65+ List .of (ProducersCoordinator .class , ConsumersCoordinator .class , StreamEnvironment .class );
6266
6367 @ BeforeAll
6468 static void initAll () {
6569 nodes = Cli .nodes ();
66- producersCoordinatorLogLevel =
67- TestUtils .newLoggerLevel (ProducersCoordinator .class , Level .DEBUG );
70+ logLevels = logClasses .stream ().map (c -> newLoggerLevel (c , Level .DEBUG )).collect (toList ());
6871 }
6972
7073 @ BeforeEach
@@ -88,15 +91,16 @@ void tearDown() {
8891
8992 @ AfterAll
9093 static void tearDownAll () {
91- if (producersCoordinatorLogLevel != null ) {
92- TestUtils .newLoggerLevel (ProducersCoordinator .class , producersCoordinatorLogLevel );
94+ if (logLevels != null ) {
95+ Streams .zip (logClasses .stream (), logLevels .stream (), Tuples ::pair )
96+ .forEach (t -> newLoggerLevel (t .v1 (), t .v2 ()));
9397 }
9498 }
9599
96100 @ ParameterizedTest
97101 @ CsvSource ({
98- "false,false" ,
99- "true,true" ,
102+ // "false,false",
103+ // "true,true",
100104 "true,false" ,
101105 })
102106 void clusterRestart (boolean useLoadBalancer , boolean forceLeader ) throws InterruptedException {
@@ -167,17 +171,27 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru
167171
168172 Thread .sleep (BACK_OFF_DELAY_POLICY .delay (0 ).multipliedBy (2 ).toMillis ());
169173
170- List <Tuple2 <String , Sync >> streamsSyncs =
171- producers .stream ().map (p -> of (p .stream (), p .waitForNewMessages (1000 ))).collect (toList ());
174+ List <Pair <String , Sync >> streamsSyncs =
175+ producers .stream ()
176+ .map (p -> pair (p .stream (), p .waitForNewMessages (1000 )))
177+ .collect (toList ());
172178 streamsSyncs .forEach (
173- t -> {
174- LOGGER .info ("Checking publisher to {} still publishes" , t . _1 ());
175- assertThat (t . _2 ()).completes ();
176- LOGGER .info ("Publisher to {} still publishes" , t . _1 ());
179+ p -> {
180+ LOGGER .info ("Checking publisher to {} still publishes" , p . v1 ());
181+ assertThat (p . v2 ()).completes ();
182+ LOGGER .info ("Publisher to {} still publishes" , p . v1 ());
177183 });
178184
179- syncs = consumers .stream ().map (c -> c .waitForNewMessages (1000 )).collect (toList ());
180- syncs .forEach (s -> assertThat (s ).completes ());
185+ streamsSyncs =
186+ consumers .stream ()
187+ .map (c -> pair (c .stream (), c .waitForNewMessages (1000 )))
188+ .collect (toList ());
189+ streamsSyncs .forEach (
190+ p -> {
191+ LOGGER .info ("Checking consumer from {} still consumes" , p .v1 ());
192+ assertThat (p .v2 ()).completes ();
193+ LOGGER .info ("Consumer from {} still consumes" , p .v1 ());
194+ });
181195
182196 Map <String , Long > committedChunkIdPerStream = new LinkedHashMap <>(streamCount );
183197 streams .forEach (
@@ -271,11 +285,13 @@ public void close() {
271285
272286 private static class ConsumerState implements AutoCloseable {
273287
288+ private final String stream ;
274289 private final Consumer consumer ;
275290 final AtomicInteger receivedCount = new AtomicInteger ();
276291 final AtomicReference <Runnable > postHandle = new AtomicReference <>(() -> {});
277292
278293 private ConsumerState (String stream , Environment environment ) {
294+ this .stream = stream ;
279295 this .consumer =
280296 environment .consumerBuilder ().stream (stream )
281297 .offset (OffsetSpecification .first ())
@@ -300,6 +316,10 @@ Sync waitForNewMessages(int messageCount) {
300316 return sync ;
301317 }
302318
319+ String stream () {
320+ return this .stream ;
321+ }
322+
303323 @ Override
304324 public void close () {
305325 this .consumer .close ();
0 commit comments