4141import java .util .LinkedHashMap ;
4242import java .util .List ;
4343import java .util .Map ;
44+ import java .util .concurrent .ConcurrentHashMap ;
4445import java .util .concurrent .Executors ;
4546import java .util .concurrent .ScheduledExecutorService ;
4647import java .util .concurrent .ThreadFactory ;
4748import java .util .concurrent .atomic .AtomicBoolean ;
4849import java .util .concurrent .atomic .AtomicInteger ;
50+ import java .util .concurrent .atomic .AtomicLong ;
4951import java .util .concurrent .atomic .AtomicReference ;
52+ import java .util .stream .IntStream ;
5053import org .junit .jupiter .api .*;
5154import org .junit .jupiter .params .ParameterizedTest ;
5255import org .junit .jupiter .params .provider .CsvSource ;
56+ import org .junit .jupiter .params .provider .ValueSource ;
5357import org .slf4j .Logger ;
5458import org .slf4j .LoggerFactory ;
5559
@@ -201,15 +205,7 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru
201205 syncs = consumers .stream ().map (c -> c .waitForNewMessages (100 )).collect (toList ());
202206 syncs .forEach (s -> assertThat (s ).completes ());
203207
204- nodes .forEach (
205- n -> {
206- LOGGER .info ("Restarting node {}..." , n );
207- Cli .restartNode (n );
208- LOGGER .info ("Restarted node {}." , n );
209- });
210- LOGGER .info ("Rebalancing..." );
211- Cli .rebalance ();
212- LOGGER .info ("Rebalancing over." );
208+ restartCluster ();
213209
214210 Thread .sleep (BACK_OFF_DELAY_POLICY .delay (0 ).toMillis ());
215211
@@ -291,8 +287,118 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru
291287 }
292288 }
293289
290+ @ ParameterizedTest
291+ @ ValueSource (booleans = {true , false })
292+ void sacWithClusterRestart (boolean superStream ) throws InterruptedException {
293+ environment =
294+ environmentBuilder
295+ .uris (URIS )
296+ .netty ()
297+ .bootstrapCustomizer (
298+ b -> {
299+ b .option (
300+ ChannelOption .CONNECT_TIMEOUT_MILLIS ,
301+ (int ) BACK_OFF_DELAY_POLICY .delay (0 ).toMillis ());
302+ })
303+ .environmentBuilder ()
304+ .maxConsumersByConnection (1 )
305+ .build ();
306+
307+ int consumerCount = 3 ;
308+ AtomicLong lastOffset = new AtomicLong (0 );
309+ String app = "app-name" ;
310+ String s = TestUtils .streamName (testInfo );
311+ ProducerState pState = null ;
312+ List <ConsumerState > consumers = Collections .emptyList ();
313+ try {
314+ StreamCreator sCreator = environment .streamCreator ().stream (s );
315+ if (superStream ) {
316+ sCreator = sCreator .superStream ().partitions (1 ).creator ();
317+ }
318+ sCreator .create ();
319+
320+ pState = new ProducerState (s , true , superStream , environment );
321+ pState .start ();
322+
323+ Map <Integer , Boolean > consumerStatus = new ConcurrentHashMap <>();
324+ consumers =
325+ IntStream .range (0 , consumerCount )
326+ .mapToObj (
327+ i ->
328+ new ConsumerState (
329+ s ,
330+ environment ,
331+ b -> {
332+ b .singleActiveConsumer ()
333+ .name (app )
334+ .noTrackingStrategy ()
335+ .consumerUpdateListener (
336+ ctx -> {
337+ consumerStatus .put (i , ctx .isActive ());
338+ return OffsetSpecification .offset (lastOffset .get ());
339+ });
340+ if (superStream ) {
341+ b .superStream (s );
342+ } else {
343+ b .stream (s );
344+ }
345+ },
346+ (ctx , m ) -> lastOffset .set (ctx .offset ())))
347+ .collect (toList ());
348+
349+ Sync sync = pState .waitForNewMessages (100 );
350+ assertThat (sync ).completes ();
351+ sync = consumers .get (0 ).waitForNewMessages (100 );
352+ assertThat (sync ).completes ();
353+
354+ List <Cli .SubscriptionInfo > subscriptions =
355+ Cli .listGroupConsumers (superStream ? s + "-0" : s , app );
356+ assertThat (subscriptions ).hasSize (consumerCount );
357+ assertThat (subscriptions .stream ().filter (sub -> sub .state ().startsWith ("active" )).count ())
358+ .isEqualTo (1 );
359+ assertThat (subscriptions .stream ().filter (sub -> sub .state ().startsWith ("waiting" )).count ())
360+ .isEqualTo (2 );
361+
362+ restartCluster ();
363+
364+ Thread .sleep (BACK_OFF_DELAY_POLICY .delay (0 ).toMillis ());
365+
366+ sync = pState .waitForNewMessages (100 );
367+ assertThat (sync ).completes (ASSERTION_TIMEOUT );
368+ int activeIndex =
369+ consumerStatus .entrySet ().stream ()
370+ .filter (Map .Entry ::getValue )
371+ .map (Map .Entry ::getKey )
372+ .findFirst ()
373+ .orElseThrow (() -> new IllegalStateException ("No active consumer found" ));
374+
375+ sync = consumers .get (activeIndex ).waitForNewMessages (100 );
376+ assertThat (sync ).completes (ASSERTION_TIMEOUT );
377+
378+ subscriptions = Cli .listGroupConsumers (superStream ? s + "-0" : s , app );
379+ assertThat (subscriptions ).hasSize (consumerCount );
380+ assertThat (subscriptions .stream ().filter (sub -> sub .state ().startsWith ("active" )).count ())
381+ .isEqualTo (1 );
382+ assertThat (subscriptions .stream ().filter (sub -> sub .state ().startsWith ("waiting" )).count ())
383+ .isEqualTo (2 );
384+
385+ } finally {
386+ if (pState != null ) {
387+ pState .close ();
388+ }
389+ consumers .forEach (ConsumerState ::close );
390+ if (superStream ) {
391+ environment .deleteSuperStream (s );
392+ } else {
393+ environment .deleteStream (s );
394+ }
395+ }
396+ }
397+
294398 private static class ProducerState implements AutoCloseable {
295399
400+ private static final AtomicLong MSG_ID_SEQ = new AtomicLong (0 );
401+
296402 private static final byte [] BODY = "hello" .getBytes (StandardCharsets .UTF_8 );
297403
298404 private final String stream ;
@@ -306,9 +412,19 @@ private static class ProducerState implements AutoCloseable {
306412 final AtomicReference <Instant > lastExceptionInstant = new AtomicReference <>();
307413
308414 private ProducerState (String stream , boolean dynamicBatch , Environment environment ) {
415+ this (stream , dynamicBatch , false , environment );
416+ }
417+
418+ private ProducerState (
419+ String stream , boolean dynamicBatch , boolean superStream , Environment environment ) {
309420 this .stream = stream ;
310- this .producer =
311- environment .producerBuilder ().stream (stream ).dynamicBatch (dynamicBatch ).build ();
421+ ProducerBuilder builder = environment .producerBuilder ().dynamicBatch (dynamicBatch );
422+ if (superStream ) {
423+ builder .superStream (stream ).routing (m -> m .getProperties ().getMessageIdAsString ());
424+ } else {
425+ builder .stream (stream );
426+ }
427+ this .producer = builder .build ();
312428 }
313429
314430 void start () {
@@ -327,7 +443,14 @@ void start() {
327443 try {
328444 this .limiter .acquire (1 );
329445 this .producer .send (
330- producer .messageBuilder ().addData (BODY ).build (), confirmationHandler );
446+ producer
447+ .messageBuilder ()
448+ .properties ()
449+ .messageId (MSG_ID_SEQ .getAndIncrement ())
450+ .messageBuilder ()
451+ .addData (BODY )
452+ .build (),
453+ confirmationHandler );
331454 } catch (Throwable e ) {
332455 this .lastException .set (e );
333456 this .lastExceptionInstant .set (Instant .now ());
@@ -380,16 +503,27 @@ private static class ConsumerState implements AutoCloseable {
380503 final AtomicReference <Runnable > postHandle = new AtomicReference <>(() -> {});
381504
382505 private ConsumerState (String stream , Environment environment ) {
506+ this (stream , environment , b -> b .stream (stream ), (ctx , m ) -> {});
507+ }
508+
509+ private ConsumerState (
510+ String stream ,
511+ Environment environment ,
512+ java .util .function .Consumer <ConsumerBuilder > customizer ,
513+ MessageHandler delegateHandler ) {
383514 this .stream = stream ;
384- this .consumer =
385- environment .consumerBuilder ().stream (stream )
515+ ConsumerBuilder builder =
516+ environment
517+ .consumerBuilder ()
386518 .offset (OffsetSpecification .first ())
387519 .messageHandler (
388520 (ctx , m ) -> {
521+ delegateHandler .handle (ctx , m );
389522 receivedCount .incrementAndGet ();
390523 postHandle .get ().run ();
391- })
392- .build ();
524+ });
525+ customizer .accept (builder );
526+ this .consumer = builder .build ();
393527 }
394528
395529 Sync waitForNewMessages (int messageCount ) {
@@ -414,4 +548,16 @@ public void close() {
414548 this .consumer .close ();
415549 }
416550 }
551+
552+ private static void restartCluster () {
553+ nodes .forEach (
554+ n -> {
555+ LOGGER .info ("Restarting node {}..." , n );
556+ Cli .restartNode (n );
557+ LOGGER .info ("Restarted node {}." , n );
558+ });
559+ LOGGER .info ("Rebalancing..." );
560+ Cli .rebalance ();
561+ LOGGER .info ("Rebalancing over." );
562+ }
417563}
0 commit comments