2525import java .util .concurrent .ConcurrentHashMap ;
2626import java .util .concurrent .atomic .AtomicInteger ;
2727import java .util .concurrent .atomic .AtomicLong ;
28+ import java .util .function .BiConsumer ;
2829import java .util .stream .Stream ;
2930import org .junit .jupiter .api .AfterEach ;
3031import org .junit .jupiter .api .BeforeEach ;
@@ -119,24 +120,24 @@ void autoTrackingSecondConsumerShouldTakeOverWhereTheFirstOneLeftOff() throws Ex
119120 void manualTrackingSecondConsumerShouldTakeOverWhereTheFirstOneLeftOff () throws Exception {
120121 int messageCount = 10000 ;
121122 int storeEvery = 1000 ;
122- Map <Integer , AtomicInteger > receivedMessages = new ConcurrentHashMap <>();
123- receivedMessages .put (0 , new AtomicInteger (0 ));
124- receivedMessages .put (1 , new AtomicInteger (0 ));
123+ AtomicInteger consumer1MessageCount = new AtomicInteger (0 );
124+ AtomicInteger consumer2MessageCount = new AtomicInteger (0 );
125125 AtomicLong lastReceivedOffset = new AtomicLong (0 );
126126 String consumerName = "foo" ;
127127
128+ BiConsumer <MessageHandler .Context , AtomicInteger > handler =
129+ (ctx , count ) -> {
130+ lastReceivedOffset .set (ctx .offset ());
131+ if (count .incrementAndGet () % storeEvery == 0 ) {
132+ ctx .storeOffset ();
133+ }
134+ };
135+
128136 Consumer consumer1 =
129137 environment .consumerBuilder ().stream (stream )
130138 .name (consumerName )
131139 .singleActiveConsumer ()
132- .messageHandler (
133- (context , message ) -> {
134- lastReceivedOffset .set (context .offset ());
135- int count = receivedMessages .get (0 ).incrementAndGet ();
136- if (count % storeEvery == 0 ) {
137- context .storeOffset ();
138- }
139- })
140+ .messageHandler ((context , message ) -> handler .accept (context , consumer1MessageCount ))
140141 .offset (OffsetSpecification .first ())
141142 .manualTrackingStrategy ()
142143 .builder ()
@@ -146,24 +147,17 @@ void manualTrackingSecondConsumerShouldTakeOverWhereTheFirstOneLeftOff() throws
146147 environment .consumerBuilder ().stream (stream )
147148 .name (consumerName )
148149 .singleActiveConsumer ()
149- .messageHandler (
150- (context , message ) -> {
151- lastReceivedOffset .set (context .offset ());
152- int count = receivedMessages .get (1 ).incrementAndGet ();
153- if (count % storeEvery == 0 ) {
154- context .storeOffset ();
155- }
156- })
150+ .messageHandler ((context , message ) -> handler .accept (context , consumer2MessageCount ))
157151 .offset (OffsetSpecification .first ())
158152 .manualTrackingStrategy ()
159153 .builder ()
160154 .build ();
161155
162156 publishAndWaitForConfirms (cf , messageCount , stream );
163- waitAtMost (() -> receivedMessages . getOrDefault ( 0 , new AtomicInteger ( 0 )) .get () == messageCount );
157+ waitAtMost (() -> consumer1MessageCount .get () == messageCount );
164158
165159 assertThat (lastReceivedOffset ).hasPositiveValue ();
166- assertThat (receivedMessages . get ( 1 ) ).hasValue (0 );
160+ assertThat (consumer2MessageCount ).hasValue (0 );
167161
168162 long firstWaveLimit = lastReceivedOffset .get ();
169163
@@ -174,9 +168,9 @@ void manualTrackingSecondConsumerShouldTakeOverWhereTheFirstOneLeftOff() throws
174168
175169 publishAndWaitForConfirms (cf , messageCount , stream );
176170
177- waitAtMost (() -> receivedMessages . getOrDefault ( 0 , new AtomicInteger ( 1 )) .get () == messageCount );
171+ waitAtMost (() -> consumer2MessageCount .get () == messageCount );
178172 assertThat (lastReceivedOffset ).hasValueGreaterThan (firstWaveLimit );
179- assertThat (receivedMessages . get ( 0 ) ).hasValue (messageCount );
173+ assertThat (consumer1MessageCount ).hasValue (messageCount );
180174
181175 consumer2 .close ();
182176 }
0 commit comments